refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-12-05 10:47:19 +08:00
parent 4daaeb3265
commit cb8dfae0cc
1 changed files with 99 additions and 32 deletions

View File

@ -47,6 +47,8 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan
STsdbReader* pReader, SRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader);
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
@ -1741,6 +1743,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger;
int32_t code = TSDB_CODE_SUCCESS;
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
@ -1751,45 +1754,68 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
}
}
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
// no last block available, only data block exists
if (!hasDataInSttBlock(pSttBlockReader)) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
}
// row in last file block
bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo);
bool dataInSttFile = hasDataInSttBlock(pSttBlockReader);
if (dataInDataFile && (!dataInSttFile)) {
// no stt file block available, only data block exists
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if ((!dataInDataFile) && dataInSttFile) {
// no data ile block exists
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
} else {
// row in both stt file blocks and data file blocks
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int64_t tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (key < tsLast) {
if (key < tsLast) { // asc
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key > tsLast) {
return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false);
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
}
} else {
} else { // desc
if (key > tsLast) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key < tsLast) {
return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false);
return mergeRowsInSttBlocks(pSttBlockReader, pBlockScanInfo, pReader);
}
}
// the following for key == tsLast
// ASC: file block ------> stt block
// DESC: stt block ------> file block
SRow* pTSRow = NULL;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
if (ASCENDING_TRAVERSE(pReader->info.order)) {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
} else {
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -1800,9 +1826,6 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
return code;
} else { // only last block exists
return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false);
}
}
@ -1889,8 +1912,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
// ASC: file block -----> last block -----> imem -----> mem
// DESC: mem -----> imem -----> last block -----> file block
// ASC: file block -----> stt block -----> imem -----> mem
// DESC: mem -----> imem -----> stt block -----> file block
if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
@ -2200,6 +2223,50 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
}
}
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
bool copied = false;
SRow* pTSRow = NULL;
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
SRowMerger* pMerger = &pReader->status.merger;
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
int32_t code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
if (code) {
return code;
}
if (copied) {
pBlockScanInfo->lastProcKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
return code;
}
}
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
SBlockData* pBlockData, SSttBlockReader* pSttBlockReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
@ -2221,17 +2288,17 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pSttBlockReader);
}
// imem + file + last block
// imem + file + stt block
if (pBlockScanInfo->iiter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pSttBlockReader);
}
// mem + file + last block
// mem + file + stt block
if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pSttBlockReader);
}
// files data blocks + last block
// files data blocks + stt block
return mergeFileBlockAndSttBlock(pReader, pSttBlockReader, key, pBlockScanInfo, pBlockData);
}