From cb8dfae0cccdabad378c29fad9460f90d05097a2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 5 Dec 2023 10:47:19 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 131 ++++++++++++++++++------ 1 file changed, 99 insertions(+), 32 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index bb80478d73..0753c2454d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -47,6 +47,8 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan STsdbReader* pReader, SRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); +static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, + STsdbReader* pReader); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost); static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, @@ -1741,6 +1743,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SRowMerger* pMerger = &pReader->status.merger; + int32_t code = TSDB_CODE_SUCCESS; // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized if (pMerger->pArray == NULL) { @@ -1751,45 +1754,68 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } } - if (hasDataInFileBlock(pBlockData, pDumpInfo)) { - // no last block available, only data block exists - if (!hasDataInSttBlock(pSttBlockReader)) { - return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); - } - - // row in last file block + bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo); + bool dataInSttFile = hasDataInSttBlock(pSttBlockReader); + if (dataInDataFile && (!dataInSttFile)) { + // no stt file block available, only data block exists + return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); + } else if ((!dataInDataFile) && dataInSttFile) { + // no data ile block exists + return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); + } else { + // row in both stt file blocks and data file blocks TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); int64_t tsLast = getCurrentKeyInSttBlock(pSttBlockReader); if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (key < tsLast) { + if (key < tsLast) { // asc return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key > tsLast) { - return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false); + return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } - } else { + } else { // desc if (key > tsLast) { return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key < tsLast) { - return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false); + return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader); } } + // the following for key == tsLast + // ASC: file block ------> stt block + // DESC: stt block ------> file block SRow* pTSRow = NULL; - int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; + if (ASCENDING_TRAVERSE(pReader->info.order)) { + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); + + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + } else { + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); - - TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - code = tsdbRowMergerAdd(pMerger, pRow1, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); - code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1800,9 +1826,6 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); return code; - - } else { // only last block exists - return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false); } } @@ -1889,8 +1912,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - // ASC: file block -----> last block -----> imem -----> mem - // DESC: mem -----> imem -----> last block -----> file block + // ASC: file block -----> stt block -----> imem -----> mem + // DESC: mem -----> imem -----> stt block -----> file block if (ASCENDING_TRAVERSE(pReader->info.order)) { if (minKey == key) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); @@ -2200,6 +2223,50 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } } +int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { + bool copied = false; + SRow* pTSRow = NULL; + int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader); + + SRowMerger* pMerger = &pReader->status.merger; + TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); + TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData}; + + tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid, + fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr); + + int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied); + if (code) { + return code; + } + + if (copied) { + pBlockScanInfo->lastProcKey = tsLastBlock; + return TSDB_CODE_SUCCESS; + } else { + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); + tsdbRowMergerAdd(pMerger, pRow1, NULL); + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, + pReader->idStr); + + code = tsdbRowMergerGetRow(pMerger, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo); + + taosMemoryFree(pTSRow); + tsdbRowMergerClear(pMerger); + return code; + } +} + static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SSttBlockReader* pSttBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2221,17 +2288,17 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pSttBlockReader); } - // imem + file + last block + // imem + file + stt block if (pBlockScanInfo->iiter.hasVal) { return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pSttBlockReader); } - // mem + file + last block + // mem + file + stt block if (pBlockScanInfo->iter.hasVal) { return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pSttBlockReader); } - // files data blocks + last block + // files data blocks + stt block return mergeFileBlockAndSttBlock(pReader, pSttBlockReader, key, pBlockScanInfo, pBlockData); }