diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 255d5d4105..68ba00eefc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1490,7 +1490,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t tsLast = INT64_MIN; - if (hasDataInSttBlock(pBlockScanInfo)) { + if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { tsLast = getCurrentKeyInSttBlock(pSttBlockReader); } @@ -1509,7 +1509,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* int64_t minKey = 0; if (pReader->info.order == TSDB_ORDER_ASC) { minKey = INT64_MAX; // chosen the minimum value - if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { + if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { minKey = tsLast; } @@ -1522,7 +1522,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } else { minKey = INT64_MIN; - if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { + if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { minKey = tsLast; } @@ -1626,91 +1626,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } -static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, STsdbReader* pReader, - STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, - bool mergeBlockData) { - SRowMerger* pMerger = &pReader->status.merger; - SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - - int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader); - bool copied = false; - int32_t code = TSDB_CODE_SUCCESS; - SRow* pTSRow = NULL; - TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - - // create local variable to hold the row value - TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData}; - - tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid, - fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr); - - // only stt block exists - if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { - code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied); - if (code) { - return code; - } - - if (copied) { - pBlockScanInfo->lastProcKey = tsLastBlock; - return TSDB_CODE_SUCCESS; - } else { - code = tsdbRowMergerAdd(pMerger, &fRow, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, - pReader->idStr); - - code = tsdbRowMergerGetRow(pMerger, &pTSRow); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); - - taosMemoryFree(pTSRow); - tsdbRowMergerClear(pMerger); - - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - } else { // not merge block data - code = tsdbRowMergerAdd(pMerger, &fRow, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, - pReader->idStr); - - // merge with block data if ts == key - if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); - } - - code = tsdbRowMergerGetRow(pMerger, &pTSRow); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); - - taosMemoryFree(pTSRow); - tsdbRowMergerClear(pMerger); - - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - return TSDB_CODE_SUCCESS; -} - static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -1816,7 +1731,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); int64_t tsLast = INT64_MIN; - if (hasDataInSttBlock(pBlockScanInfo)) { + if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { tsLast = getCurrentKeyInSttBlock(pSttBlockReader); } @@ -1865,7 +1780,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = key; } - if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { + if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { minKey = tsLast; } } else { @@ -1882,7 +1797,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = key; } - if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { + if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { minKey = tsLast; } } @@ -2169,10 +2084,15 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan pScanInfo->sttKeyInfo.nextProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey; hasData = true; - } else { + } else { // not clean stt blocks + INIT_TIMEWINDOW(pScanInfo->sttWindow); //reset the time window + pScanInfo->sttBlockReturned = false; hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); } } else { + pScanInfo->cleanSttBlocks = false; + INIT_TIMEWINDOW(pScanInfo->sttWindow); //reset the time window + pScanInfo->sttBlockReturned = false; hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index b4b92c0008..33604d21de 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -22,12 +22,6 @@ #include "tsdbUtil2.h" #include "tsimplehash.h" -#define INIT_TIMEWINDOW(_w) \ - do { \ - (_w)->skey = INT64_MAX; \ - (_w)->ekey = INT64_MIN; \ - } while (0); - static bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); @@ -165,6 +159,9 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf INIT_TIMEWINDOW(&pScanInfo->sttWindow); INIT_TIMEWINDOW(&pScanInfo->filesetWindow); + pScanInfo->cleanSttBlocks = false; + pScanInfo->sttBlockReturned = false; + pUidList->tableUidList[j] = idList[j].uid; if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index cd9942a0a6..7a93e73a07 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -26,6 +26,12 @@ extern "C" { #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define INIT_TIMEWINDOW(_w) \ + do { \ + (_w)->skey = INT64_MAX; \ + (_w)->ekey = INT64_MIN; \ + } while (0); + typedef enum { READER_STATUS_SUSPEND = 0x1, READER_STATUS_NORMAL = 0x2,