fix(query): handle the reverse case.

This commit is contained in:
Haojun Liao 2022-08-18 22:57:26 +08:00
parent 04a06d4353
commit 5eae03fcab
1 changed files with 30 additions and 10 deletions

View File

@ -1376,24 +1376,43 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLast = INT64_MIN;
if (pLastBlockReader->lastBlockData.nRow > 0) {
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
}
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t minKey = INT64_MAX; int64_t minKey = 0;
if (minKey > tsLast) { if (pReader->order == TSDB_ORDER_ASC) {
minKey = tsLast; minKey = INT64_MAX; // chosen the minimum value
} if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
minKey = tsLast;
}
if (minKey > k.ts) { if (minKey > k.ts) {
minKey = k.ts; minKey = k.ts;
} }
if (minKey > key && pBlockData->nRow > 0) { if (minKey > key && pBlockData->nRow > 0) {
minKey = key; minKey = key;
}
} else {
minKey = INT64_MIN;
if (minKey < tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
minKey = tsLast;
}
if (minKey < k.ts) {
minKey = k.ts;
}
if (minKey < key && pBlockData->nRow > 0) {
minKey = key;
}
} }
bool init = false; bool init = false;
@ -1530,6 +1549,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo handle the desc order check
static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0}; SRowMerger merge = {0};
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;