refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-03-22 17:31:44 +08:00
parent cbf22ed72a
commit 5808c1b86f
1 changed files with 54 additions and 114 deletions

View File

@ -1697,7 +1697,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
// be changed, resulting in the corresponding changing of the value of sttRowKey
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey);
// ASC: file block ---> stt block -----> mem
// file block ---> stt block -----> mem
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
@ -1791,11 +1791,11 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
}
}
tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey);
// the following for key == sttKey->key.ts
// ASC: file block ------> stt block
// DESC: stt block ------> file block
// file block ------> stt block
SRow* pTSRow = NULL;
if (ASCENDING_TRAVERSE(pReader->info.order)) {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -1809,23 +1809,8 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
} else {
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader);
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange,
pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
@ -1926,9 +1911,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey);
// ASC: file block -----> stt block -----> imem -----> mem
// DESC: mem -----> imem -----> stt block -----> file block
if (ASCENDING_TRAVERSE(pReader->info.order)) {
// file block -----> stt block -----> imem -----> mem
// if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
@ -1946,7 +1930,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange,
pReader->idStr);
}
if (pkCompEx(compFn, &minKey, &ik) == 0) {
@ -1972,51 +1957,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
} else {
if (pkCompEx(compFn, &minKey, &k) == 0) {
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, &k, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (pkCompEx(compFn, &minKey, &ik) == 0) {
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, &ik, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (pkCompEx(compFn, &minKey, pSttKey) == 0) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pSttKey, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr);
}
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
}
}
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {