diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index bca180b64b..111c6b5962 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -307,12 +307,6 @@ size_t tsdbCacheGetCapacity(SVnode *pVnode); int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema); -struct SLDataIter; -int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iLast, int8_t backward, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pRange); -void tLDataIterClose(struct SLDataIter *pIter); -bool tLDataIterNextRow(struct SLDataIter *pIter); - // structs ======================= struct STsdbFS { SDelFile *pDelFile; @@ -640,16 +634,16 @@ typedef struct { typedef struct SMergeTree { int8_t backward; - SRBTreeNode *pNode; SRBTree rbt; + SArray *pIterList; struct SLDataIter *pIter; - SDataFReader* pLFileReader; } SMergeTree; void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange); void tMergeTreeAddIter(SMergeTree *pMTree, struct SLDataIter *pIter); bool tMergeTreeNext(SMergeTree* pMTree); TSDBROW tMergeTreeGetRow(SMergeTree* pMTree); +void tMergeTreeClose(SMergeTree* pMTree); // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 130a89af5c..47c09b51e9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -93,6 +93,7 @@ _exit: void tLDataIterClose(SLDataIter *pIter) { tBlockDataDestroy(&pIter->bData, 1); taosArrayDestroy(pIter->aBlockL); + taosMemoryFree(pIter); } extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData); @@ -272,8 +273,9 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, uint64_t uid, STimeWindow* pTimeWindow, SVersionRange* pVerRange) { pMTree->backward = backward; - pMTree->pNode = NULL; pMTree->pIter = NULL; + pMTree->pIterList = taosArrayInit(4, POINTER_BYTES); + tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); struct SLDataIter* pIterList[TSDB_DEFAULT_LAST_FILE] = {0}; @@ -281,7 +283,10 @@ void tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader* pFReader, /*int32_t code = */tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange); bool hasVal = tLDataIterNextRow(pIterList[i]); if (hasVal) { + taosArrayPush(pMTree->pIterList, &pIterList[i]); tMergeTreeAddIter(pMTree, pIterList[i]); + } else { + tLDataIterClose(pIterList[i]); } } } @@ -326,5 +331,12 @@ TSDBROW tMergeTreeGetRow(SMergeTree* pMTree) { } void tMergeTreeClose(SMergeTree* pMTree) { + size_t size = taosArrayGetSize(pMTree->pIterList); + for(int32_t i = 0; i < size; ++i) { + SLDataIter* pIter = taosArrayGetP(pMTree->pIterList, i); + tLDataIterClose(pIter); + } + pMTree->pIterList = taosArrayDestroy(pMTree->pIterList); + pMTree->pIter = NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7c720382d6..bde9aff908 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -48,7 +48,6 @@ typedef struct STableBlockScanInfo { int32_t fileDelIndex; // file block delete index int32_t lastBlockDelIndex; // delete index for last block bool iterInit; // whether to initialize the in-memory skip list iterator or not - int16_t indexInBlockL; // row position in last block } STableBlockScanInfo; typedef struct SBlockOrderWrapper { @@ -228,7 +227,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK } for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL}; + STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) { info.lastKey = pTsdbReader->window.skey; @@ -355,6 +354,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { pIter->index += step; pIter->pLastBlockReader->uid = 0; + tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) { return false; } @@ -567,7 +567,6 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) { } // reset the index in last block when handing a new file - px->indexInBlockL = INITIAL_ROW_INDEX_VAL; tMapDataClear(&px->mapData); taosArrayClear(px->pBlockList); } @@ -1764,17 +1763,20 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } -static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos, SDataFReader* pFReader) { +static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, SDataFReader* pFReader) { // the last block reader has been initialized for this table. if (pLastBlockReader->uid == uid) { return true; } + if (pLastBlockReader->uid != 0) { + tMergeTreeClose(&pLastBlockReader->mergeTree); + } + pLastBlockReader->uid = uid; /*int32_t code = */ tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pFReader, uid, &pLastBlockReader->window, &pLastBlockReader->verRange); - bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); - return hasVal; + return tMergeTreeNext(&pLastBlockReader->mergeTree); } static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { @@ -2274,8 +2276,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { while (1) { // load the last data block of current table STableBlockScanInfo* pScanInfo = pStatus->pTableIter; - bool hasVal = - initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL, pReader->pFileReader); + bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo->uid, pReader->pFileReader); if (!hasVal) { bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); if (!hasNexTable) { @@ -2284,26 +2285,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { continue; } - // int32_t index = pScanInfo->indexInBlockL; - - // if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { - // bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); - // if (!hasData) { // current table does not have rows in last block, try next table - // bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); - // if (!hasNexTable) { - // return TSDB_CODE_SUCCESS; - // } - // continue; - // } - // } - // } else { // no data in last block, try next table - // bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); - // if (!hasNexTable) { - // return TSDB_CODE_SUCCESS; - // } - // continue; - // } - code = doBuildDataBlock(pReader); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3144,7 +3125,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { ASSERT(pReader != NULL); taosHashClear(pReader->status.pTableMap); - STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL}; + STableBlockScanInfo info = {.lastKey = 0, .uid = uid}; taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); return TDB_CODE_SUCCESS; } @@ -3287,7 +3268,6 @@ void tsdbReaderClose(STsdbReader* pReader) { } SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; - tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap); taosMemoryFreeClear(pSupInfo->plist); taosMemoryFree(pSupInfo->colIds); @@ -3312,10 +3292,13 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbDataFReaderClose(&pReader->pFileReader); } + tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap); + taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pLastBlockReader != NULL) { + tMergeTreeClose(&pFilesetIter->pLastBlockReader->mergeTree); taosMemoryFree(pFilesetIter->pLastBlockReader); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b740ec21d3..47c04991c0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -695,6 +695,7 @@ static void destroyTableScanOperatorInfo(void* param) { cleanupQueryTableDataCond(&pTableScanInfo->cond); tsdbReaderClose(pTableScanInfo->dataReader); + pTableScanInfo->dataReader = NULL; if (pTableScanInfo->pColMatchInfo != NULL) { taosArrayDestroy(pTableScanInfo->pColMatchInfo);