fix(query): set the correct sma data
This commit is contained in:
parent
9a0e15f1c1
commit
d9e5172357
|
@ -750,7 +750,7 @@ TEST(testCase, projection_query_stables) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
TEST(testCase, agg_query_tables) {
|
TEST(testCase, agg_query_tables) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
@ -763,7 +763,7 @@ TEST(testCase, agg_query_tables) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "show table distributed st1");
|
pRes = taos_query(pConn, "show table distributed tup");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
@ -775,6 +775,7 @@ TEST(testCase, agg_query_tables) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
/*
|
/*
|
||||||
--- copy the following script in the shell to setup the environment ---
|
--- copy the following script in the shell to setup the environment ---
|
||||||
|
|
||||||
|
@ -820,7 +821,7 @@ TEST(testCase, async_api_test) {
|
||||||
getchar();
|
getchar();
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
TEST(testCase, update_test) {
|
TEST(testCase, update_test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
@ -857,5 +858,5 @@ TEST(testCase, update_test) {
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
|
@ -55,7 +55,7 @@ typedef struct SIOCostSummary {
|
||||||
} SIOCostSummary;
|
} SIOCostSummary;
|
||||||
|
|
||||||
typedef struct SBlockLoadSuppInfo {
|
typedef struct SBlockLoadSuppInfo {
|
||||||
SColumnDataAgg* pstatis;
|
SColumnDataAgg tsColAgg;
|
||||||
SColumnDataAgg** plist;
|
SColumnDataAgg** plist;
|
||||||
int16_t* colIds; // column ids for loading file block data
|
int16_t* colIds; // column ids for loading file block data
|
||||||
int32_t* slotIds; // colId to slotId
|
int32_t* slotIds; // colId to slotId
|
||||||
|
@ -364,13 +364,14 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
|
||||||
pSup->pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
|
|
||||||
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
pSup->plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
|
||||||
if (pSup->pstatis == NULL || pSup->plist == NULL) {
|
if (pSup->plist == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
|
|
||||||
pReader->pResBlock = createResBlock(pCond, pReader->capacity);
|
pReader->pResBlock = createResBlock(pCond, pReader->capacity);
|
||||||
if (pReader->pResBlock == NULL) {
|
if (pReader->pResBlock == NULL) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
|
@ -2647,8 +2648,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(pReader->pResBlock);
|
blockDataDestroy(pReader->pResBlock);
|
||||||
|
|
||||||
taosMemoryFreeClear(pReader->suppInfo.pstatis);
|
|
||||||
taosMemoryFreeClear(pReader->suppInfo.plist);
|
taosMemoryFreeClear(pReader->suppInfo.plist);
|
||||||
taosMemoryFree(pReader->suppInfo.slotIds);
|
taosMemoryFree(pReader->suppInfo.slotIds);
|
||||||
|
|
||||||
|
@ -2744,56 +2743,48 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg***
|
||||||
|
|
||||||
int64_t stime = taosGetTimestampUs();
|
int64_t stime = taosGetTimestampUs();
|
||||||
|
|
||||||
|
SArray* pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
|
||||||
if (tBlockHasSma(pBlock)) {
|
if (tBlockHasSma(pBlock)) {
|
||||||
SArray* pColAgg = taosArrayInit(4, sizeof(SColumnDataAgg));
|
|
||||||
code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pColAgg, NULL);
|
code = tsdbReadBlockSma(pReader->pFileReader, pBlock, pColAgg, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64", code:%s, %s", 0, pFBlock->uid,
|
tsdbDebug("vgId:%d, failed to load block SMA for uid %" PRIu64", code:%s, %s", 0, pFBlock->uid,
|
||||||
tstrerror(code), pReader->idStr);
|
tstrerror(code), pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
*pBlockStatis = NULL;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - stime;
|
*allHave = true;
|
||||||
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64", elapsed time:%"PRId64"us, %s", 0, pFBlock->uid,
|
|
||||||
el, pReader->idStr);
|
|
||||||
|
|
||||||
// int16_t* colIds = pReader->suppInfo.defaultLoadColumn->pData;
|
|
||||||
|
|
||||||
// size_t numOfCols = QH_GET_NUM_OF_COLS(pReader);
|
|
||||||
// memset(pReader->suppInfo.plist, 0, numOfCols * POINTER_BYTES);
|
|
||||||
// memset(pReader->suppInfo.pstatis, 0, numOfCols * sizeof(SColumnDataAgg));
|
|
||||||
|
|
||||||
// for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
// pReader->suppInfo.pstatis[i].colId = colIds[i];
|
|
||||||
// }
|
|
||||||
|
|
||||||
// *allHave = true;
|
|
||||||
// tsdbGetBlockStatis(&pReader->rhelper, pReader->suppInfo.pstatis, (int)numOfCols, pBlockInfo->compBlock);
|
|
||||||
|
|
||||||
// always load the first primary timestamp column data
|
// always load the first primary timestamp column data
|
||||||
SColumnDataAgg* pTsAgg = &pReader->suppInfo.pstatis[0];
|
SColumnDataAgg* pTsAgg = &pReader->suppInfo.tsColAgg;
|
||||||
assert(pTsAgg->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
|
|
||||||
|
|
||||||
pTsAgg->numOfNull = 0;
|
pTsAgg->numOfNull = 0;
|
||||||
|
pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||||
pTsAgg->min = pReader->pResBlock->info.window.skey;
|
pTsAgg->min = pReader->pResBlock->info.window.skey;
|
||||||
pTsAgg->max = pReader->pResBlock->info.window.ekey;
|
pTsAgg->max = pReader->pResBlock->info.window.ekey;
|
||||||
pReader->suppInfo.plist[0] = &pReader->suppInfo.pstatis[0];
|
pReader->suppInfo.plist[0] = pTsAgg;
|
||||||
|
|
||||||
// update the number of NULL data rows
|
// update the number of NULL data rows
|
||||||
size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);
|
size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock);
|
||||||
int32_t* slotIds = pReader->suppInfo.slotIds;
|
|
||||||
|
|
||||||
for (int32_t i = 1; i < numOfCols; ++i) {
|
int32_t i = 0, j = 0;
|
||||||
// ASSERT(colIds[i] == pReader->pSchema->columns[slotIds[i]].colId);
|
while(j < numOfCols && i < taosArrayGetSize(pColAgg)) {
|
||||||
if (IS_BSMA_ON(&(pReader->pSchema->columns[slotIds[i]]))) {
|
SColumnDataAgg* pAgg = taosArrayGet(pColAgg, i);
|
||||||
if (pReader->suppInfo.pstatis[i].numOfNull == -1) { // set the column data are all NULL
|
if (pAgg->colId == pReader->suppInfo.colIds[j]) {
|
||||||
// pReader->suppInfo.pstatis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) {
|
||||||
|
pReader->suppInfo.plist[j] = pAgg;
|
||||||
|
i += 1;
|
||||||
|
j += 1;
|
||||||
|
} else {
|
||||||
|
*allHave = false;
|
||||||
}
|
}
|
||||||
|
} else if (pAgg->colId < pReader->suppInfo.colIds[j]) {
|
||||||
pReader->suppInfo.plist[i] = &pReader->suppInfo.pstatis[i];
|
i += 1;
|
||||||
} else {
|
} else if (pReader->suppInfo.colIds[j] < pAgg->colId) {
|
||||||
*allHave = false;
|
j += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2801,6 +2792,10 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg***
|
||||||
pReader->cost.smaLoadTime += elapsed;
|
pReader->cost.smaLoadTime += elapsed;
|
||||||
|
|
||||||
*pBlockStatis = pReader->suppInfo.plist;
|
*pBlockStatis = pReader->suppInfo.plist;
|
||||||
|
|
||||||
|
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64", elapsed time:%"PRId64"us, %s", 0, pFBlock->uid,
|
||||||
|
elapsed, pReader->idStr);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2843,7 +2838,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_
|
||||||
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows[tWinIdx]);
|
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows[tWinIdx]);
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
|
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
|
||||||
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
|
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
|
||||||
|
|
||||||
// todo set the correct numOfTables
|
// todo set the correct numOfTables
|
||||||
|
@ -2899,9 +2894,9 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
||||||
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
|
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
|
||||||
|
|
||||||
pTableBlockInfo->numOfTables = numOfTables;
|
pTableBlockInfo->numOfTables = numOfTables;
|
||||||
|
bool hasNext = true;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
bool hasNext = blockIteratorNext(&pStatus->blockIter);
|
|
||||||
if (hasNext) {
|
if (hasNext) {
|
||||||
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
|
||||||
STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
|
||||||
|
@ -2924,6 +2919,9 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
||||||
|
|
||||||
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
|
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows);
|
||||||
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
|
pTableBlockInfo->blockRowsHisto[bucketIndex]++;
|
||||||
|
|
||||||
|
hasNext = blockIteratorNext(&pStatus->blockIter);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||||
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
||||||
|
@ -2933,6 +2931,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
||||||
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
|
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
hasNext = blockIteratorNext(&pStatus->blockIter);
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
|
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
|
||||||
// pReader->pFileGroup->fid, pReader->idStr);
|
// pReader->pFileGroup->fid, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
|
@ -5547,30 +5547,18 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t delta = maxVal - minVal;
|
// maximum number of step is 80
|
||||||
int32_t step = delta / 50;
|
double factor = pData->numOfBlocks / 80.0;
|
||||||
if (step == 0) {
|
|
||||||
step = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]);
|
int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]);
|
||||||
int32_t bucketRange = (pData->maxRows - pData->minRows) / numOfBuckets;
|
int32_t bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets;
|
||||||
|
|
||||||
bool singleModel = false;
|
|
||||||
if (bucketRange == 0) {
|
|
||||||
singleModel = true;
|
|
||||||
step = 20;
|
|
||||||
bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
|
for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) {
|
||||||
len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1));
|
len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * i);
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
if (singleModel && pData->blockRowsHisto[i] > 0) {
|
if (pData->blockRowsHisto[i] > 0) {
|
||||||
num = 20;
|
num = (pData->blockRowsHisto[i]) / factor;
|
||||||
} else {
|
|
||||||
num = (pData->blockRowsHisto[i] + step - 1) / step;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < num; ++j) {
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
|
@ -5578,9 +5566,10 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
len += x;
|
len += x;
|
||||||
}
|
}
|
||||||
|
|
||||||
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
|
if (num > 0) {
|
||||||
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
|
double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks;
|
||||||
printf("%s\n", st);
|
len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%');
|
||||||
|
}
|
||||||
|
|
||||||
varDataSetLen(st, len);
|
varDataSetLen(st, len);
|
||||||
colDataAppend(pColInfo, row++, st, false);
|
colDataAppend(pColInfo, row++, st, false);
|
||||||
|
|
Loading…
Reference in New Issue