Merge pull request #16851 from taosdata/feature/3_liaohj
refactor(query): do some internal refactor.
This commit is contained in:
commit
dec379586b
|
@ -1362,15 +1362,10 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
|||
pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
|
||||
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
|
||||
|
||||
// todo here we need to each key in the last files to identify if it is really overlapped with last block
|
||||
// todo
|
||||
bool overlapWithlastBlock = false;
|
||||
#if 0
|
||||
if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) {
|
||||
SSttBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex);
|
||||
overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey);
|
||||
if (hasDataInLastBlock(pLastBlockReader)) {
|
||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
|
||||
}
|
||||
#endif
|
||||
|
||||
pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
|
||||
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
|
||||
|
@ -1896,151 +1891,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
||||
SRowMerger merge = {0};
|
||||
STSRow* pTSRow = NULL;
|
||||
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
SArray* pDelList = pBlockScanInfo->delSkyline;
|
||||
|
||||
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
|
||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||
ASSERT(pRow != NULL && piRow != NULL);
|
||||
|
||||
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||
bool freeTSRow = false;
|
||||
|
||||
uint64_t uid = pBlockScanInfo->uid;
|
||||
|
||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||
TSDBKEY ik = TSDBROW_KEY(piRow);
|
||||
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||
// [1&2] key <= [k.ts && ik.ts]
|
||||
if (key <= k.ts && key <= ik.ts) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
|
||||
if (ik.ts == key) {
|
||||
tRowMerge(&merge, piRow);
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
||||
if (k.ts == key) {
|
||||
tRowMerge(&merge, pRow);
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else { // key > ik.ts || key > k.ts
|
||||
ASSERT(key != ik.ts);
|
||||
|
||||
// [3] ik.ts < key <= k.ts
|
||||
// [4] ik.ts < k.ts <= key
|
||||
if (ik.ts < k.ts) {
|
||||
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
if (freeTSRow) {
|
||||
taosMemoryFree(pTSRow);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// [5] k.ts < key <= ik.ts
|
||||
// [6] k.ts < ik.ts <= key
|
||||
if (k.ts < ik.ts) {
|
||||
doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
if (freeTSRow) {
|
||||
taosMemoryFree(pTSRow);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// [7] k.ts == ik.ts < key
|
||||
if (k.ts == ik.ts) {
|
||||
ASSERT(key > ik.ts && key > k.ts);
|
||||
|
||||
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
taosMemoryFree(pTSRow);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
} else { // descending order scan
|
||||
// [1/2] k.ts >= ik.ts && k.ts >= key
|
||||
if (k.ts >= ik.ts && k.ts >= key) {
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
|
||||
tRowMergerInit(&merge, pRow, pSchema);
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
|
||||
if (ik.ts == k.ts) {
|
||||
tRowMerge(&merge, piRow);
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
||||
if (k.ts == key) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMerge(&merge, &fRow);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
}
|
||||
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch
|
||||
|
||||
// [3] ik.ts > k.ts >= Key
|
||||
// [4] ik.ts > key >= k.ts
|
||||
if (ik.ts > key) {
|
||||
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
if (freeTSRow) {
|
||||
taosMemoryFree(pTSRow);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// [5] key > ik.ts > k.ts
|
||||
// [6] key > k.ts > ik.ts
|
||||
if (key > ik.ts) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
taosMemoryFree(pTSRow);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//[7] key = ik.ts > k.ts
|
||||
if (key == ik.ts) {
|
||||
doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow);
|
||||
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMerge(&merge, &fRow);
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
tRowMergerGetRow(&merge, &pTSRow);
|
||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid);
|
||||
|
||||
taosMemoryFree(pTSRow);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||
if (pBlockScanInfo->iterInit) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2257,9 +2107,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
|
||||
|
||||
|
||||
while (1) {
|
||||
// todo check the validate of row in file block
|
||||
bool hasBlockData = false;
|
||||
{
|
||||
while (pBlockData->nRow > 0) { // find the first qualified row in data block
|
||||
|
@ -2620,7 +2468,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
code = buildComposedDataBlock(pReader);
|
||||
} else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
|
||||
// data in memory that are earlier than current file block
|
||||
// todo rows in buffer should be less than the file block in asc, greater than file block in desc
|
||||
// rows in buffer should be less than the file block in asc, greater than file block in desc
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
|
||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue