refactor: do some internal refactor.
This commit is contained in:
parent
2ec24c1e04
commit
018e6f2a71
|
@ -1693,9 +1693,13 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
// copy the last key before the time of stt reader loading the next stt block, in which the underlying data block may
|
||||
// be changed, resulting in the corresponding changing of the value of sttRowKey
|
||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey);
|
||||
|
||||
// ASC: file block ---> last block -----> imem -----> mem
|
||||
// DESC: mem -----> imem -----> last block -----> file block
|
||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||
// if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||
if (pkCompEx(compFn, &minKey, pfKey) == 0) {
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1725,7 +1729,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
/*} else {
|
||||
if (pkCompEx(compFn, &minKey, &k) == 0) {
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1754,7 +1758,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader);
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
int32_t code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1951,6 +1955,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &minKey);
|
||||
|
||||
// ASC: file block -----> stt block -----> imem -----> mem
|
||||
// DESC: mem -----> imem -----> stt block -----> file block
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
|
@ -2303,8 +2309,9 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
|||
}
|
||||
}
|
||||
|
||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey);
|
||||
|
||||
if (copied) {
|
||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
@ -2325,8 +2332,6 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
|||
|
||||
taosMemoryFree(pTSRow);
|
||||
tsdbRowMergerClear(pMerger);
|
||||
|
||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -2352,8 +2357,9 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
|
|||
return code;
|
||||
}
|
||||
|
||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey);
|
||||
|
||||
if (copied) {
|
||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
|
@ -2374,8 +2380,6 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn
|
|||
|
||||
taosMemoryFree(pTSRow);
|
||||
tsdbRowMergerClear(pMerger);
|
||||
|
||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, &sttKey);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -3802,7 +3806,8 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIt
|
|||
|
||||
int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRowKey* piRowKey,
|
||||
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow) {
|
||||
SRowMerger* pMerger = pMerger;
|
||||
SRowMerger* pMerger = &pReader->status.merger;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
STSchema* pSchema = NULL;
|
||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||
|
@ -3821,7 +3826,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow
|
|||
}
|
||||
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -3831,15 +3836,18 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow
|
|||
return code;
|
||||
}
|
||||
|
||||
tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader);
|
||||
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, pRowKey, pBlockScanInfo->delSkyline, pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
|
||||
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -3848,14 +3856,20 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRow, SRow
|
|||
return code;
|
||||
}
|
||||
|
||||
tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, piRowKey, pBlockScanInfo->delSkyline, pReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow);
|
||||
tRowKeyAssign(&pBlockScanInfo->lastProcKey, pRowKey);
|
||||
|
||||
code = tsdbRowMergerGetRow(pMerger, pTSRow);
|
||||
tsdbRowMergerClear(pMerger);
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue