diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 30720da8a7..87a6a90a7e 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -258,8 +258,6 @@ typedef struct SQueryTableDataCond { int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock); -int32_t tEncodeDataBlocks(void** buf, const SArray* blocks); -void* tDecodeDataBlocks(const void* buf, SArray** blocks); void colDataDestroy(SColumnInfoData* pColData); //====================================================================================================================== @@ -294,7 +292,7 @@ typedef struct STableBlockDistInfo { int32_t defMaxRows; int32_t firstSeekTimeUs; uint32_t numOfInmemRows; - uint32_t numOfSmallBlocks; + uint32_t numOfSttRows; uint32_t numOfVgroups; int32_t blockRowsHisto[20]; } STableBlockDistInfo; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 9307a620ad..74e4b66098 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -153,26 +153,6 @@ typedef struct STsdbReader STsdbReader; #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 -int32_t tsdbReaderOpen(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, - SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly, - SHashObj **pIgnoreTables); -int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); -void tsdbReaderSetId(STsdbReader *pReader, const char *idstr); -void tsdbReaderClose(STsdbReader *pReader); -int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); -int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA); -void tsdbReleaseDataBlock(STsdbReader *pReader); -SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); -int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); -int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); -int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); -void *tsdbGetIdx(SMeta *pMeta); -void *tsdbGetIvtIdx(SMeta *pMeta); -uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader); -void tsdbReaderSetCloseFlag(STsdbReader *pReader); -int64_t tsdbGetLastTimestamp(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); - -//====================================================================================================================== int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly, SHashObj **pIgnoreTables); @@ -190,7 +170,6 @@ void *tsdbGetIdx2(SMeta *pMeta); void *tsdbGetIvtIdx2(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); -int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); //====================================================================================================================== int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 197952d190..79f9caab33 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -886,10 +886,9 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); -SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); -void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); +SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost); -void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); +void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); // tsdbCache ============================================================================================== diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 36675a771c..8017f1f4d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -22,7 +22,7 @@ static void tLDataIterClose2(SLDataIter *pIter); // SLDataIter ================================================= -SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) { +SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) { SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo)); if (pLoadInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -61,7 +61,7 @@ void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pL } } -void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { +void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo == NULL) { return NULL; } @@ -78,14 +78,19 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { pInfo->sttBlockIndex = -1; pInfo->pin = false; + if (pLoadInfo->statisBlock != NULL) { + tStatisBlockDestroy(pLoadInfo->statisBlock); + taosMemoryFreeClear(pLoadInfo->statisBlock); + } + taosArrayDestroy(pLoadInfo->aSttBlk); taosMemoryFree(pLoadInfo); return NULL; } -static void destroyLDataIter(SLDataIter *pIter) { +void destroyLDataIter(SLDataIter *pIter) { tLDataIterClose2(pIter); - destroyLastBlockLoadInfo(pIter->pBlockLoadInfo); + destroySttBlockLoadInfo(pIter->pBlockLoadInfo); taosMemoryFree(pIter); } @@ -732,25 +737,6 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR return -1 * tLDataIterCmprFn(p1, p2); } -static void adjustValidLDataIters(SArray *pLDIterList, int32_t numOfFileObj) { - int32_t size = taosArrayGetSize(pLDIterList); - - if (size < numOfFileObj) { - int32_t inc = numOfFileObj - size; - for (int32_t k = 0; k < inc; ++k) { - SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); - taosArrayPush(pLDIterList, &pIter); - } - } else if (size > numOfFileObj) { // remove unused LDataIter - int32_t inc = size - numOfFileObj; - - for (int i = 0; i < inc; ++i) { - SLDataIter *pIter = taosArrayPop(pLDIterList); - destroyLDataIter(pIter); - } - } -} - int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { int32_t code = TSDB_CODE_SUCCESS; @@ -773,19 +759,13 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { } // add the list/iter placeholder - while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < numOfLevels) { - SArray *pList = taosArrayInit(4, POINTER_BYTES); - taosArrayPush(pConf->pSttFileBlockIterArray, &pList); - } + adjustLDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset); for (int32_t j = 0; j < numOfLevels; ++j) { SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j]; SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); - int32_t numOfFileObj = TARRAY2_SIZE(pSttLevel->fobjArr); - adjustValidLDataIters(pList, numOfFileObj); - - for (int32_t i = 0; i < numOfFileObj; ++i) { // open all last file + for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file SLDataIter *pIter = taosArrayGetP(pList, i); SSttFileReader *pSttFileReader = pIter->pReader; @@ -805,7 +785,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { } if (pLoadInfo == NULL) { - pLoadInfo = tCreateOneLastBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); + pLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); } memset(pIter, 0, sizeof(SLDataIter)); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index cb899f9ee8..bb80478d73 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -23,14 +23,14 @@ #include "tsimplehash.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) -#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey) +#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey) static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); -static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, +static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, @@ -52,10 +52,10 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32 static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); -static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static int32_t doBuildDataBlock(STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); +static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo); @@ -138,16 +138,16 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA pIter->pFilesetList = pFileSetArray; pIter->numOfFiles = numOfFileset; - if (pIter->pLastBlockReader == NULL) { - pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader)); - if (pIter->pLastBlockReader == NULL) { + if (pIter->pSttBlockReader == NULL) { + pIter->pSttBlockReader = taosMemoryCalloc(1, sizeof(struct SSttBlockReader)); + if (pIter->pSttBlockReader == NULL) { int32_t code = TSDB_CODE_OUT_OF_MEMORY; tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr); return code; } } - SLastBlockReader* pLReader = pIter->pLastBlockReader; + SSttBlockReader* pLReader = pIter->pSttBlockReader; pLReader->order = pReader->info.order; pLReader->window = pReader->info.window; pLReader->verRange = pReader->info.verRange; @@ -171,8 +171,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo SReadCostSummary* pCost = &pReader->cost; - pIter->pLastBlockReader->uid = 0; - tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); + pIter->pSttBlockReader->uid = 0; + tMergeTreeClose(&pIter->pSttBlockReader->mergeTree); pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); @@ -1404,26 +1404,26 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB return code; } -static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, +static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SVersionRange* pVerRange) { - int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1; + int32_t step = ASCENDING_TRAVERSE(pSttBlockReader->order) ? 1 : -1; while (1) { - bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); + bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree); if (!hasVal) { // the next value will be the accessed key in stt pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; pScanInfo->sttKeyInfo.nextProcKey += step; return false; } - TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow]; int64_t ver = pRow->pBlockData->aVersion[pRow->iRow]; - pLastBlockReader->currentKey = key; + pSttBlockReader->currentKey = key; pScanInfo->sttKeyInfo.nextProcKey = key; - if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order, + if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pSttBlockReader->order, pVerRange)) { pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; return true; @@ -1431,26 +1431,26 @@ static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlock } } -static void doPinSttBlock(SLastBlockReader* pLastBlockReader) { - tMergeTreePinSttBlock(&pLastBlockReader->mergeTree); +static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { + tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); } -static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) { - tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree); +static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { + tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); } -static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, +static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, bool* copied) { int32_t code = TSDB_CODE_SUCCESS; *copied = false; // avoid the fetch next row replace the referenced stt block in buffer - doPinSttBlock(pLastBlockReader); - bool hasVal = nextRowFromSttBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange); - doUnpinSttBlock(pLastBlockReader); + doPinSttBlock(pSttBlockReader); + bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); + doUnpinSttBlock(pSttBlockReader); if (hasVal) { - int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t next1 = getCurrentKeyInSttBlock(pSttBlockReader); if (next1 != ts) { code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); if (code) { @@ -1507,15 +1507,15 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* } static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, - SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { + SIterInfo* pIter, int64_t key, SSttBlockReader* pSttBlockReader) { SRowMerger* pMerger = &pReader->status.merger; SRow* pTSRow = NULL; SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t tsLast = INT64_MIN; - if (hasDataInLastBlock(pLastBlockReader)) { - tsLast = getCurrentKeyInLastBlock(pLastBlockReader); + if (hasDataInSttBlock(pSttBlockReader)) { + tsLast = getCurrentKeyInSttBlock(pSttBlockReader); } TSDBKEY k = TSDBROW_KEY(pRow); @@ -1533,7 +1533,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* int64_t minKey = 0; if (pReader->info.order == TSDB_ORDER_ASC) { minKey = INT64_MAX; // chosen the minimum value - if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) { + if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) { minKey = tsLast; } @@ -1546,7 +1546,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } else { minKey = INT64_MIN; - if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) { + if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) { minKey = tsLast; } @@ -1571,12 +1571,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == tsLast) { - TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } @@ -1621,12 +1621,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == tsLast) { - TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } @@ -1652,27 +1652,27 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } -static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, +static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, bool mergeBlockData) { SRowMerger* pMerger = &pReader->status.merger; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader); bool copied = false; int32_t code = TSDB_CODE_SUCCESS; SRow* pTSRow = NULL; - TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); // create local variable to hold the row value TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData}; - tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid, + tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid, fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr); // only stt block exists if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { - code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied); + code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied); if (code) { return code; } @@ -1686,9 +1686,9 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); @@ -1711,7 +1711,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr); // merge with block data if ts == key @@ -1737,8 +1737,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return TSDB_CODE_SUCCESS; } -static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key, - STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { +static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key, + STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SRowMerger* pMerger = &pReader->status.merger; @@ -1753,24 +1753,24 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader if (hasDataInFileBlock(pBlockData, pDumpInfo)) { // no last block available, only data block exists - if (!hasDataInLastBlock(pLastBlockReader)) { + if (!hasDataInSttBlock(pSttBlockReader)) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } // row in last file block TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t tsLast = getCurrentKeyInSttBlock(pSttBlockReader); if (ASCENDING_TRAVERSE(pReader->info.order)) { if (key < tsLast) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key > tsLast) { - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); + return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false); } } else { if (key > tsLast) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key < tsLast) { - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); + return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false); } } // the following for key == tsLast @@ -1782,13 +1782,13 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); - TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -1802,12 +1802,12 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader return code; } else { // only last block exists - return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); + return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false); } } static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, - SLastBlockReader* pLastBlockReader) { + SSttBlockReader* pSttBlockReader) { SRowMerger* pMerger = &pReader->status.merger; SRow* pTSRow = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -1818,8 +1818,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); int64_t tsLast = INT64_MIN; - if (hasDataInLastBlock(pLastBlockReader)) { - tsLast = getCurrentKeyInLastBlock(pLastBlockReader); + if (hasDataInSttBlock(pSttBlockReader)) { + tsLast = getCurrentKeyInSttBlock(pSttBlockReader); } int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; @@ -1867,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = key; } - if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) { + if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) { minKey = tsLast; } } else { @@ -1884,7 +1884,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = key; } - if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) { + if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) { minKey = tsLast; } } @@ -1903,13 +1903,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == tsLast) { - TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } @@ -1962,13 +1962,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == tsLast) { - TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); } @@ -2087,10 +2087,10 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum return true; } -static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { +static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { // the last block reader has been initialized for this table. if (pLBlockReader->uid == pScanInfo->uid) { - return hasDataInLastBlock(pLBlockReader); + return hasDataInSttBlock(pLBlockReader); } if (pLBlockReader->uid != 0) { @@ -2139,13 +2139,13 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); int64_t el = taosGetTimestampUs() - st; - pReader->cost.initLastBlockReader += (el / 1000.0); + pReader->cost.initSttBlockReader += (el / 1000.0); tsdbDebug("init last block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr); return code; } -static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } +static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; } bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { @@ -2201,7 +2201,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, - SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { + SBlockData* pBlockData, SSttBlockReader* pSttBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; TSDBROW *pRow = NULL, *piRow = NULL; @@ -2218,21 +2218,21 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI // two levels of mem-table does contain the valid rows if (pRow != NULL && piRow != NULL) { - return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); + return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pSttBlockReader); } // imem + file + last block if (pBlockScanInfo->iiter.hasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader); + return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pSttBlockReader); } // mem + file + last block if (pBlockScanInfo->iter.hasVal) { - return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader); + return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pSttBlockReader); } // files data blocks + last block - return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData); + return mergeFileBlockAndSttBlock(pReader, pSttBlockReader, key, pBlockScanInfo, pBlockData); } static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo, @@ -2293,7 +2293,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); - SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; bool asc = ASCENDING_TRAVERSE(pReader->info.order); int64_t st = taosGetTimestampUs(); @@ -2342,7 +2342,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } SBlockData* pBlockData = &pReader->status.fileBlockData; - initLastBlockReader(pLastBlockReader, pBlockScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); while (1) { bool hasBlockData = false; @@ -2376,7 +2376,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { break; } - code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); + code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pSttBlockReader); if (code) { goto _end; } @@ -2561,9 +2561,9 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt return (pStatus->pTableIter != NULL); } -static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { +static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; - SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; + SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; STableUidList* pUidList = &pStatus->uidList; int32_t code = TSDB_CODE_SUCCESS; @@ -2601,7 +2601,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { continue; } - bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); + bool hasDataInLastFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader); if (!hasDataInLastFile) { bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { @@ -2613,14 +2613,14 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); while (1) { - bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); + bool hasBlockLData = hasDataInSttBlock(pSttBlockReader); // no data in last block and block, no need to proceed. if (hasBlockLData == false) { break; } - code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); + code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader); if (code) { return code; } @@ -2667,7 +2667,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SDataBlockIter* pBlockIter = &pStatus->blockIter; STableBlockScanInfo* pScanInfo = NULL; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); - SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; bool asc = ASCENDING_TRAVERSE(pReader->info.order); if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { @@ -2685,7 +2685,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) { - initLastBlockReader(pLastBlockReader, pScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pScanInfo, pReader); } TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); @@ -2730,13 +2730,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); // let's load data from stt files - initLastBlockReader(pLastBlockReader, pScanInfo, pReader); + initSttBlockReader(pSttBlockReader, pScanInfo, pReader); // no data in last block, no need to proceed. - while (hasDataInLastBlock(pLastBlockReader)) { + while (hasDataInSttBlock(pSttBlockReader)) { ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); - code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); + code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2746,7 +2746,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } // data in stt now overlaps with current active file data block, need to composed with file data block. - int64_t lastKeyInStt = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader); if ((lastKeyInStt >= pBlockInfo->record.firstKey && asc) || (lastKeyInStt <= pBlockInfo->record.lastKey && (!asc))) { tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader, @@ -2816,11 +2816,11 @@ _end: static int32_t doSumSttBlockRows(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; - SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; + SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; SSttBlockLoadInfo* pBlockLoadInfo = NULL; #if 0 for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file - pBlockLoadInfo = &pLastBlockReader->pInfo[i]; + pBlockLoadInfo = &pSttBlockReader->pInfo[i]; code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk); if (code) { @@ -3020,7 +3020,7 @@ typedef enum { TSDB_READ_CONTINUE = 0x2, } ERetrieveType; -static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { +static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SDataBlockIter* pBlockIter = &pReader->status.blockIter; @@ -3030,7 +3030,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { while (1) { terrno = 0; - code = doLoadLastBlockSequentially(pReader); + code = doLoadSttBlockSequentially(pReader); if (code != TSDB_CODE_SUCCESS) { terrno = code; return TSDB_READ_RETURN; @@ -3067,7 +3067,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { if (pBlockIter->numOfBlocks == 0) { // let's try to extract data from stt files. - ERetrieveType type = doReadDataFromLastFiles(pReader); + ERetrieveType type = doReadDataFromSttFiles(pReader); if (type == TSDB_READ_RETURN) { return terrno; } @@ -3102,7 +3102,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { resetDataBlockIterator(pBlockIter, pReader->info.order); resetTableListIndex(&pReader->status); - ERetrieveType type = doReadDataFromLastFiles(pReader); + ERetrieveType type = doReadDataFromSttFiles(pReader); if (type == TSDB_READ_RETURN) { return terrno; } @@ -3434,12 +3434,12 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc return TSDB_CODE_SUCCESS; } -int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, - SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { - while (nextRowFromSttBlocks(pLastBlockReader, pScanInfo, pVerRange)) { - int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); +int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, + SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { + while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) { + int64_t next1 = getCurrentKeyInSttBlock(pSttBlockReader); if (next1 == ts) { - TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); } else { tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, @@ -4097,8 +4097,8 @@ void tsdbReaderClose2(STsdbReader* pReader) { SReadCostSummary* pCost = &pReader->cost; SFilesetIter* pFilesetIter = &pReader->status.fileIter; - if (pFilesetIter->pLastBlockReader != NULL) { - SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; + if (pFilesetIter->pSttBlockReader != NULL) { + SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader; tMergeTreeClose(&pLReader->mergeTree); taosMemoryFree(pLReader); } @@ -4121,12 +4121,12 @@ void tsdbReaderClose2(STsdbReader* pReader) { "build in-memory-block-time:%.2f ms, sttBlocks:%" PRId64 ", sttBlocks-time:%.2f ms, sttStatisBlock:%" PRId64 ", stt-statis-Block-time:%.2f ms, composed-blocks:%" PRId64 ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,createSkylineIterTime:%.2f " - "ms, initLastBlockReader:%.2fms, %s", + "ms, initSttBlockReader:%.2fms, %s", pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->sttCost.loadBlocks, pCost->sttCost.blockElapsedTime, pCost->sttCost.loadStatisBlocks, pCost->sttCost.statisElapsedTime, pCost->composedBlocks, pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, - pCost->createSkylineIterTime, pCost->initLastBlockReader, pReader->idStr); + pCost->createSkylineIterTime, pCost->initSttBlockReader, pReader->idStr); taosMemoryFree(pReader->idStr); @@ -4730,7 +4730,20 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT return code; } } + + SMergeTreeConf conf = { + .pReader = pReader, + .pSchema = pReader->info.pSchema, + .pCols = pReader->suppInfo.colId, + .numOfCols = pReader->suppInfo.numOfCols, + .suid = pReader->info.suid, + }; + SReaderStatus* pStatus = &pReader->status; + if (pStatus->pCurrentFileset != NULL) { + pTableBlockInfo->numOfSttRows += tsdbGetRowsInSttFiles(pStatus->pCurrentFileset, pStatus->pLDataIterArray, + pReader->pTsdb, &conf, pReader->idStr); + } STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg; pTableBlockInfo->defMinRows = pc->minRows; @@ -4767,10 +4780,6 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT pTableBlockInfo->minRows = numOfRows; } - if (numOfRows < defaultRows) { - pTableBlockInfo->numOfSmallBlocks += 1; - } - pTableBlockInfo->totalSize += pBlockInfo->record.blockSize; int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBuckets); @@ -4783,13 +4792,18 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT break; } + // add the data in stt files of new fileset + if (pStatus->pCurrentFileset != NULL) { + pTableBlockInfo->numOfSttRows += tsdbGetRowsInSttFiles(pStatus->pCurrentFileset, pStatus->pLDataIterArray, + pReader->pTsdb, &conf, pReader->idStr); + } + pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks; hasNext = (pBlockIter->numOfBlocks > 0); } - - // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, - // pReader->pFileGroup->fid, pReader->idStr); } + + // record the data in stt files tsdbReleaseReader(pReader); return code; } @@ -4976,7 +4990,7 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { taosMemoryFreeClear(pReader->idStr); pReader->idStr = taosStrdup(idstr); - pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr; + pReader->status.fileIter.pSttBlockReader->mergeTree.idStr = pReader->idStr; } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 24c526a906..a058c0173d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -488,6 +488,8 @@ typedef enum { BLK_CHECK_QUIT = 0x2, } ETombBlkCheckEnum; +static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo, + const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j); static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j, ETombBlkCheckEnum* pRet) { int32_t code = 0; @@ -659,3 +661,177 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT } } } + +int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, + const uint64_t* pUidList, int32_t numOfTables) { + int32_t num = 0; + + const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray; + if (TARRAY2_SIZE(pStatisBlkArray) <= 0) { + return 0; + } + + int32_t i = 0; + while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].minTbid.suid < suid)) { + ++i; + } + + if (i >= TARRAY2_SIZE(pStatisBlkArray)) { + return 0; + } + + SStatisBlk *p = &pStatisBlkArray->data[i]; + if (pBlockLoadInfo->statisBlock == NULL) { + pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock)); + tStatisBlockInit(pBlockLoadInfo->statisBlock); + } + + int64_t st = taosGetTimestampMs(); + tsdbSttFileReadStatisBlock(pSttFileReader, p, pBlockLoadInfo->statisBlock); + pBlockLoadInfo->statisBlockIndex = i; + + double el = (taosGetTimestampMs() - st) / 1000.0; + pBlockLoadInfo->cost.loadStatisBlocks += 1; + pBlockLoadInfo->cost.statisElapsedTime += el; + + STbStatisBlock *pBlock = pBlockLoadInfo->statisBlock; + + int32_t index = 0; + while (index < TARRAY2_SIZE(pBlock->suid) && pBlock->suid->data[index] < suid) { + ++index; + } + + if (index >= TARRAY2_SIZE(pBlock->suid)) { + return num; + } + + int32_t j = index; + int32_t uidIndex = 0; + while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex <= numOfTables) { + p = &pStatisBlkArray->data[i]; + if (p->minTbid.suid > suid) { + return num; + } + + uint64_t uid = pUidList[uidIndex]; + + if (pBlock->uid->data[j] == uid) { + num += pBlock->count->data[j]; + uidIndex += 1; + j += 1; + loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j); + } else if (pBlock->uid->data[j] < uid) { + j += 1; + loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j); + } else { + uidIndex += 1; + } + } + + return num; +} + +// load next stt statistics block +static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo, + const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) { + if ((*j) >= numOfRows) { + (*i) += 1; + (*j) = 0; + if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) { + tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pBlockLoadInfo->statisBlock); + } + } +} + +void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) { + int32_t size = taosArrayGetSize(pLDIterList); + + if (size < numOfFileObj) { + int32_t inc = numOfFileObj - size; + for (int32_t k = 0; k < inc; ++k) { + SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); + taosArrayPush(pLDIterList, &pIter); + } + } else if (size > numOfFileObj) { // remove unused LDataIter + int32_t inc = size - numOfFileObj; + + for (int i = 0; i < inc; ++i) { + SLDataIter *pIter = taosArrayPop(pLDIterList); + destroyLDataIter(pIter); + } + } +} + +int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) { + int32_t numOfLevels = pFileSet->lvlArr->size; + + // add the list/iter placeholder + while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) { + SArray* pList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pSttFileBlockIterArray, &pList); + } + + for(int32_t j = 0; j < numOfLevels; ++j) { + SSttLvl* pSttLevel = pFileSet->lvlArr->data[j]; + SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j); + doAdjustValidDataIters(pList, TARRAY2_SIZE(pSttLevel->fobjArr)); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf, + const char* pstr) { + int32_t numOfRows = 0; + + // no data exists, go to end + int32_t numOfLevels = pFileSet->lvlArr->size; + if (numOfLevels == 0) { + return numOfRows; + } + + // add the list/iter placeholder + adjustLDataIters(pSttFileBlockIterArray, pFileSet); + + for (int32_t j = 0; j < numOfLevels; ++j) { + SSttLvl* pSttLevel = pFileSet->lvlArr->data[j]; + SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j); + + for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { // open all last file + SLDataIter* pIter = taosArrayGetP(pList, i); + + // open stt file reader if not opened yet + // if failed to open this stt file, ignore the error and try next one + if (pIter->pReader == NULL) { + SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize}; + conf.file[0] = *pSttLevel->fobjArr->data[i]->f; + + const char* pName = pSttLevel->fobjArr->data[i]->fname; + int32_t code = tsdbSttFileReaderOpen(pName, &conf, &pIter->pReader); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("open stt file reader error. file:%s, code %s, %s", pName, tstrerror(code), pstr); + continue; + } + } + + if (pIter->pBlockLoadInfo == NULL) { + pIter->pBlockLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); + } + + // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file + int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pIter->pBlockLoadInfo->pSttStatisBlkArray); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr); + continue; + } + + // extract rows from each stt file one-by-one + STsdbReader* pReader = pConf->pReader; + int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); + uint64_t* pUidList = pReader->status.uidList.tableUidList; + numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pConf->suid, pUidList, numOfTables); + } + } + + return numOfRows; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 709e311ff0..e9c7449082 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -109,7 +109,7 @@ typedef struct SReadCostSummary { double buildComposedBlockTime; double createScanInfoList; double createSkylineIterTime; - double initLastBlockReader; + double initSttBlockReader; } SReadCostSummary; typedef struct STableUidList { @@ -145,21 +145,21 @@ typedef struct SBlockLoadSuppInfo { bool smaValid; // the sma on all queried columns are activated } SBlockLoadSuppInfo; -typedef struct SLastBlockReader { +typedef struct SSttBlockReader { STimeWindow window; SVersionRange verRange; int32_t order; uint64_t uid; SMergeTree mergeTree; int64_t currentKey; -} SLastBlockReader; +} SSttBlockReader; typedef struct SFilesetIter { int32_t numOfFiles; // number of total files int32_t index; // current accessed index in the list TFileSetArray* pFilesetList; // data file set list int32_t order; - SLastBlockReader* pLastBlockReader; // last file block reader + SSttBlockReader* pSttBlockReader; // last file block reader } SFilesetIter; typedef struct SFileDataBlockInfo { @@ -262,7 +262,12 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr); void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); - +int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid, + const uint64_t* pUidList, int32_t numOfTables); +void destroyLDataIter(SLDataIter* pIter); +int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet); +int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf, + const char* pstr); typedef struct { SArray* pTombData; } STableLoadInfo; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 071afe0159..25a3c509a7 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5472,7 +5472,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { SColumnInfoData* pInputCol = pInput->pData[0]; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo p1 = {0}; @@ -5481,6 +5480,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->numOfBlocks += p1.numOfBlocks; pDistInfo->numOfTables += p1.numOfTables; pDistInfo->numOfInmemRows += p1.numOfInmemRows; + pDistInfo->numOfSttRows += p1.numOfSttRows; pDistInfo->totalSize += p1.totalSize; pDistInfo->totalRows += p1.totalRows; pDistInfo->numOfFiles += p1.numOfFiles; @@ -5488,7 +5488,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->defMinRows = p1.defMinRows; pDistInfo->defMaxRows = p1.defMaxRows; pDistInfo->rowSize = p1.rowSize; - pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks; if (pDistInfo->minRows > p1.minRows) { pDistInfo->minRows = p1.minRows; @@ -5523,7 +5522,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist if (tEncodeI32(&encoder, pInfo->defMaxRows) < 0) return -1; if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1; + if (tEncodeU32(&encoder, pInfo->numOfSttRows) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1; for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { @@ -5555,7 +5554,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo if (tDecodeI32(&decoder, &pInfo->defMaxRows) < 0) return -1; if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1; + if (tDecodeU32(&decoder, &pInfo->numOfSttRows) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1; for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { @@ -5589,7 +5588,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } int32_t len = sprintf(st + VARSTR_HEADER_SIZE, - "Total_Blocks=[%d] Total_Size=[%.2f KB] Average_size=[%.2f KB] Compression_Ratio=[%.2f %c]", + "Total_Blocks=[%d] Total_Size=[%.2f KiB] Average_size=[%.2f KiB] Compression_Ratio=[%.2f %c]", pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%'); varDataSetLen(st, len); @@ -5600,14 +5599,16 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { avgRows = pData->totalRows / pData->numOfBlocks; } - len = sprintf(st + VARSTR_HEADER_SIZE, - "Total_Rows=[%" PRId64 "] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%" PRId64 "]", - pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, avgRows); - + len = sprintf(st + VARSTR_HEADER_SIZE, "Block_Rows=[%" PRId64 "] MinRows=[%d] MaxRows=[%d] AvgRows=[%" PRId64 "]", + pData->totalRows, pData->minRows, pData->maxRows, avgRows); varDataSetLen(st, len); colDataSetVal(pColInfo, row++, st, false); - len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables, + len = sprintf(st + VARSTR_HEADER_SIZE, "Inmem_Rows=[%d] Stt_Rows=[%d] ", pData->numOfInmemRows, pData->numOfSttRows); + varDataSetLen(st, len); + colDataSetVal(pColInfo, row++, st, false); + + len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Filesets=[%d] Total_Vgroups=[%d]", pData->numOfTables, pData->numOfFiles, pData->numOfVgroups); varDataSetLen(st, len);