From 5808c1b86fd5d6374702b8021460e989bdd232e6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Mar 2024 17:31:44 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 168 ++++++++---------------- 1 file changed, 54 insertions(+), 114 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index b0b8550462..fb893c845e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1697,7 +1697,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* // be changed, resulting in the corresponding changing of the value of sttRowKey tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey); - // ASC: file block ---> stt block -----> mem + // file block ---> stt block -----> mem if (pkCompEx(compFn, &minKey, pfKey) == 0) { int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); if (code != TSDB_CODE_SUCCESS) { @@ -1772,7 +1772,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); } else { // row in both stt file blocks and data file blocks - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); SRowKey* pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); int32_t ret = pkCompEx(pSttBlockReader->pkComparFn, pKey, pSttKey); @@ -1791,42 +1791,27 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } } + tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); + // the following for key == sttKey->key.ts - // ASC: file block ------> stt block - // DESC: stt block ------> file block + // file block ------> stt block SRow* pTSRow = NULL; - if (ASCENDING_TRAVERSE(pReader->info.order)) { - code = tsdbRowMergerAdd(pMerger, &fRow, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); - - TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - code = tsdbRowMergerAdd(pMerger, pRow1, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); - } else { - TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - code = tsdbRowMergerAdd(pMerger, pRow1, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); - - code = tsdbRowMergerAdd(pMerger, &fRow, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, + pReader->idStr); + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1896,9 +1881,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* SRowKey minKey; if (ASCENDING_TRAVERSE(pReader->info.order)) { - minKey = k; // let's find the minimum + minKey = k; // let's find the minimum - if (pkCompEx(compFn, &ik, &minKey) < 0) {//minKey > ik.key.ts) { + if (pkCompEx(compFn, &ik, &minKey) < 0) { // minKey > ik.key.ts) { minKey = ik; } @@ -1910,7 +1895,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = *pSttKey; } } else { - minKey = k; // let find the maximum ts value + minKey = k; // let find the maximum ts value if (pkCompEx(compFn, &ik, &minKey) > 0) { minKey = ik; } @@ -1926,95 +1911,50 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey); - // ASC: file block -----> stt block -----> imem -----> mem - // DESC: mem -----> imem -----> stt block -----> file block - if (ASCENDING_TRAVERSE(pReader->info.order)) { - if (pkCompEx(compFn, &minKey, pfKey) == 0) { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - code = tsdbRowMergerAdd(pMerger, &fRow, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + // file block -----> stt block -----> imem -----> mem + // if (ASCENDING_TRAVERSE(pReader->info.order)) { + if (pkCompEx(compFn, &minKey, pfKey) == 0) { + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + code = tsdbRowMergerAdd(pMerger, &fRow, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } - if (pkCompEx(compFn, &minKey, pSttKey) == 0) { - TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - code = tsdbRowMergerAdd(pMerger, pRow1, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + if (pkCompEx(compFn, &minKey, pSttKey) == 0) { + TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } - if (pkCompEx(compFn, &minKey, &ik) == 0) { - code = tsdbRowMergerAdd(pMerger, piRow, piSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, + pReader->idStr); + } - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + if (pkCompEx(compFn, &minKey, &ik) == 0) { + code = tsdbRowMergerAdd(pMerger, piRow, piSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; } - if (pkCompEx(compFn, &minKey, &k) == 0) { - code = tsdbRowMergerAdd(pMerger, pRow, pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; } - } else { - if (pkCompEx(compFn, &minKey, &k) == 0) { - code = tsdbRowMergerAdd(pMerger, pRow, pSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + } - code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + if (pkCompEx(compFn, &minKey, &k) == 0) { + code = tsdbRowMergerAdd(pMerger, pRow, pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; } - if (pkCompEx(compFn, &minKey, &ik) == 0) { - code = tsdbRowMergerAdd(pMerger, piRow, piSchema); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - if (pkCompEx(compFn, &minKey, pSttKey) == 0) { - TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - code = tsdbRowMergerAdd(pMerger, pRow1, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); - } - - if (pkCompEx(compFn, &minKey, pfKey) == 0) { - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - code = tsdbRowMergerAdd(pMerger, &fRow, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; } }