refactor: do some internal refactor.
This commit is contained in:
parent
77ed2bc1e1
commit
64db9afeb7
|
@ -1205,6 +1205,26 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key, SFileBlockDumpInfo* pDumpInfo) {
|
||||||
|
|
||||||
|
// opt version
|
||||||
|
// 1. it is not a border point
|
||||||
|
// 2. the direct next point is not an duplicated timestamp
|
||||||
|
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
|
||||||
|
(pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
|
||||||
|
int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1;
|
||||||
|
|
||||||
|
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
||||||
|
if (nextKey != key) { // merge is not needed
|
||||||
|
doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
||||||
|
pDumpInfo->rowIndex += step;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
|
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
|
||||||
SIterInfo* pIter, int64_t key) {
|
SIterInfo* pIter, int64_t key) {
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
|
@ -1219,10 +1239,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
// ascending order traverse
|
// ascending order traverse
|
||||||
if (ASCENDING_TRAVERSE(pReader->order)) {
|
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||||
if (key < k.ts) {
|
if (key < k.ts) {
|
||||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
// imem & mem are all empty, only file exist
|
||||||
|
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
return TSDB_CODE_SUCCESS;
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
} else {
|
||||||
|
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||||
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
|
}
|
||||||
} else if (k.ts < key) { // k.ts < key
|
} else if (k.ts < key) { // k.ts < key
|
||||||
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
|
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
|
||||||
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
|
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
|
||||||
|
@ -1238,10 +1262,13 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
if (key < k.ts) {
|
if (key < k.ts) {
|
||||||
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
|
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
|
||||||
} else if (k.ts < key) {
|
} else if (k.ts < key) {
|
||||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
} else {
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||||
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
|
}
|
||||||
} else { // descending order: mem rows -----> imem rows ------> file block
|
} else { // descending order: mem rows -----> imem rows ------> file block
|
||||||
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
updateSchema(pRow, pBlockScanInfo->uid, pReader);
|
||||||
|
|
||||||
|
@ -1438,34 +1465,23 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
}
|
}
|
||||||
|
|
||||||
// imem & mem are all empty, only file exist
|
// imem & mem are all empty, only file exist
|
||||||
|
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
|
|
||||||
// opt version
|
STSRow* pTSRow = NULL;
|
||||||
// 1. it is not a border point
|
SRowMerger merge = {0};
|
||||||
// 2. the direct next point is not an duplicated timestamp
|
|
||||||
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
|
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||||
(pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1;
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
||||||
if (nextKey != key) { // merge is not needed
|
|
||||||
doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
taosMemoryFree(pTSRow);
|
||||||
pDumpInfo->rowIndex += step;
|
tRowMergerClear(&merge);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
|
||||||
|
|
||||||
STSRow* pTSRow = NULL;
|
|
||||||
SRowMerger merge = {0};
|
|
||||||
|
|
||||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
|
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
|
||||||
tRowMergerClear(&merge);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue