From 018e6f2a7188c86db71b0146b41996b97e4d84c6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Mar 2024 17:18:31 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 48 ++++++++++++++++--------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 9368208377..4332a571a8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1693,9 +1693,13 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } + // copy the last key before the time of stt reader loading the next stt block, in which the underlying data block may + // be changed, resulting in the corresponding changing of the value of sttRowKey + tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey); + // ASC: file block ---> last block -----> imem -----> mem // DESC: mem -----> imem -----> last block -----> file block - if (pReader->info.order == TSDB_ORDER_ASC) { +// if (pReader->info.order == TSDB_ORDER_ASC) { if (pkCompEx(compFn, &minKey, pfKey) == 0) { int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { @@ -1725,7 +1729,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - } else { + /*} else { if (pkCompEx(compFn, &minKey, &k) == 0) { int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { @@ -1754,7 +1758,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); } - } + }*/ int32_t code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -1951,6 +1955,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } + tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey); + // ASC: file block -----> stt block -----> imem -----> mem // DESC: mem -----> imem -----> stt block -----> file block if (ASCENDING_TRAVERSE(pReader->info.order)) { @@ -2303,8 +2309,9 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } } + tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); + if (copied) { - tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); return TSDB_CODE_SUCCESS; } else { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); @@ -2325,8 +2332,6 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); - - tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); return code; } } @@ -2352,8 +2357,9 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn return code; } + tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey); + if (copied) { - tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey); return TSDB_CODE_SUCCESS; } else { code = tsdbRowMergerAdd(pMerger, &fRow, NULL); @@ -2374,8 +2380,6 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn taosMemoryFree(pTSRow); tsdbRowMergerClear(pMerger); - - tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey); return code; } } @@ -3802,7 +3806,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRowKey* piRowKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow) { - SRowMerger* pMerger = pMerger; + SRowMerger* pMerger = &pReader->status.merger; + int32_t code = TSDB_CODE_SUCCESS; STSchema* pSchema = NULL; if (pRow->type == TSDBROW_ROW_FMT) { @@ -3821,7 +3826,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow } if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem - int32_t code = tsdbRowMergerAdd(pMerger, piRow, piSchema); + code = tsdbRowMergerAdd(pMerger, piRow, piSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3831,15 +3836,18 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow return code; } - tsdbRowMergerAdd(pMerger, pRow, pSchema); - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader); + code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } else { - int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); - if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) { + code = tsdbRowMergerAdd(pMerger, pRow, pSchema); + if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3848,14 +3856,20 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow return code; } - tsdbRowMergerAdd(pMerger, piRow, piSchema); + code = tsdbRowMergerAdd(pMerger, piRow, piSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, piRowKey, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } } - int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow); + tRowKeyAssign(&pBlockScanInfo->lastProcKey, pRowKey); + + code = tsdbRowMergerGetRow(pMerger, pTSRow); tsdbRowMergerClear(pMerger); return code; }