Merge pull request #16286 from taosdata/feature/3_liaohj
fix(query): fix bug in descending order scan in lastblock.
This commit is contained in:
commit
f8310d4203
|
@ -730,7 +730,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray*
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s",
|
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s",
|
||||||
numOfTables, total, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk
|
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk
|
||||||
/ 1000.0, el, pReader->idStr);
|
/ 1000.0, el, pReader->idStr);
|
||||||
|
|
||||||
pReader->cost.numOfBlocks += total;
|
pReader->cost.numOfBlocks += total;
|
||||||
|
@ -1315,7 +1315,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
|
||||||
// log the reason why load the datablock for profile
|
// log the reason why load the datablock for profile
|
||||||
if (loadDataBlock) {
|
if (loadDataBlock) {
|
||||||
tsdbDebug("%p uid:%" PRIu64
|
tsdbDebug("%p uid:%" PRIu64
|
||||||
" need to load the datablock, reason overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
|
" need to load the datablock, overlapwithneighborblock:%d, hasDup:%d, partiallyRequired:%d, "
|
||||||
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
|
"overlapWithKey:%d, greaterThanBuf:%d, overlapWithDel:%d, overlapWithlastBlock:%d, %s",
|
||||||
pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey,
|
pReader, pFBlock->uid, overlapWithNeighbor, hasDup, partiallyRequired, overlapWithKey,
|
||||||
moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr);
|
moreThanOutputCapacity, overlapWithDel, overlapWithlastBlock, pReader->idStr);
|
||||||
|
@ -2007,7 +2007,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
if (pBlockData->nRow > 0) {
|
if (pBlockData->nRow > 0) {
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
|
|
||||||
// no last block
|
// no last block available, only data block exists
|
||||||
if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) {
|
if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) {
|
||||||
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2028,38 +2028,10 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
|
|
||||||
// row in last file block
|
// row in last file block
|
||||||
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
if (ts < key) { // save rows in last block
|
ASSERT(ts >= key);
|
||||||
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
|
|
||||||
|
|
||||||
STSRow* pTSRow = NULL;
|
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||||
SRowMerger merge = {0};
|
if (key < ts) {
|
||||||
|
|
||||||
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
|
|
||||||
|
|
||||||
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
|
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
|
||||||
|
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
|
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
|
||||||
tRowMergerClear(&merge);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else if (ts == key) {
|
|
||||||
STSRow* pTSRow = NULL;
|
|
||||||
SRowMerger merge = {0};
|
|
||||||
|
|
||||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
|
|
||||||
|
|
||||||
tRowMergerGetRow(&merge, &pTSRow);
|
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
|
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
|
||||||
tRowMergerClear(&merge);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else { // ts > key, asc; todo handle desc
|
|
||||||
// imem & mem are all empty, only file exist
|
// imem & mem are all empty, only file exist
|
||||||
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2076,6 +2048,43 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
tRowMergerClear(&merge);
|
tRowMergerClear(&merge);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
} else if (key == ts) {
|
||||||
|
STSRow* pTSRow = NULL;
|
||||||
|
SRowMerger merge = {0};
|
||||||
|
|
||||||
|
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||||
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
|
||||||
|
|
||||||
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
|
||||||
|
|
||||||
|
taosMemoryFree(pTSRow);
|
||||||
|
tRowMergerClear(&merge);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
} else { // desc order
|
||||||
|
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
|
||||||
|
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
|
||||||
|
|
||||||
|
STSRow* pTSRow = NULL;
|
||||||
|
SRowMerger merge = {0};
|
||||||
|
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||||
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
|
||||||
|
|
||||||
|
if (ts == key) {
|
||||||
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
}
|
||||||
|
|
||||||
|
tRowMergerGetRow(&merge, &pTSRow);
|
||||||
|
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
|
||||||
|
|
||||||
|
taosMemoryFree(pTSRow);
|
||||||
|
tRowMergerClear(&merge);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else { // only last block exists
|
} else { // only last block exists
|
||||||
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
|
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
|
||||||
|
@ -2445,10 +2454,10 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
|
||||||
pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr);
|
pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr);
|
||||||
} else {
|
} else {
|
||||||
tsdbDebug("%p load last block completed, uid:%" PRIu64
|
tsdbDebug("%p load last block completed, uid:%" PRIu64
|
||||||
" last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 " - %" PRId64
|
" last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 "-%" PRId64
|
||||||
" elapsed time:%.2f ms, %s",
|
" elapsed time:%.2f ms, %s",
|
||||||
pReader, uid, pLastBlockReader->currentBlockIndex, totalLastBlocks, pBlock->nRow, pBlock->minVer,
|
pReader, uid, index, totalLastBlocks, pBlock->nRow, pBlock->minVer, pBlock->maxVer, pBlock->minKey,
|
||||||
pBlock->maxVer, pBlock->minKey, pBlock->maxKey, el, pReader->idStr);
|
pBlock->maxKey, el, pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
pLastBlockReader->currentBlockIndex = index;
|
pLastBlockReader->currentBlockIndex = index;
|
||||||
|
@ -2575,6 +2584,14 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
// todo rows in buffer should be less than the file block in asc, greater than file block in desc
|
// todo rows in buffer should be less than the file block in asc, greater than file block in desc
|
||||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
|
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
|
||||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||||
|
} else {
|
||||||
|
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
|
||||||
|
// only return the rows in last block
|
||||||
|
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
|
ASSERT (tsLast >= pBlock->maxKey.ts);
|
||||||
|
tBlockDataReset(&pReader->status.fileBlockData);
|
||||||
|
|
||||||
|
code = buildComposedDataBlock(pReader);
|
||||||
} else { // whole block is required, return it directly
|
} else { // whole block is required, return it directly
|
||||||
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
|
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
|
||||||
pInfo->rows = pBlock->nRow;
|
pInfo->rows = pBlock->nRow;
|
||||||
|
@ -2583,6 +2600,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
setComposedBlockFlag(pReader, false);
|
setComposedBlockFlag(pReader, false);
|
||||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
|
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue