fix(query): record the correct last accessed key in the stt file.
This commit is contained in:
parent
22e52ebb38
commit
3327beba1e
|
@ -404,7 +404,7 @@ void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
|
|||
tsdbDebug("try next last file block:%d from %d, trigger by uid:%" PRIu64 ", file index:%d, %s", pIter->iSttBlk,
|
||||
oldIndex, pIter->uid, pIter->iStt, idStr);
|
||||
} else {
|
||||
tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index::%d, %s", pIter->uid, oldIndex, idStr);
|
||||
tsdbDebug("no more last block qualified, uid:%" PRIu64 ", file index:%d, %s", pIter->uid, oldIndex, idStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ typedef struct SBlockIndex {
|
|||
typedef struct STableBlockScanInfo {
|
||||
uint64_t uid;
|
||||
TSKEY lastKey;
|
||||
TSKEY lastKeyInStt; // last accessed key in stt
|
||||
SMapData mapData; // block info (compressed)
|
||||
SArray* pBlockList; // block data index list, SArray<SBlockIndex>
|
||||
SIterInfo iter; // mem buffer skip list iterator
|
||||
|
@ -192,7 +193,7 @@ static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbRe
|
|||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
|
||||
SRowMerger* pMerger);
|
||||
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange);
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
|
||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
|
||||
STsdbReader* pReader);
|
||||
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
|
||||
|
@ -407,6 +408,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
|
|||
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
}
|
||||
|
||||
pScanInfo->lastKeyInStt = pScanInfo->lastKey;
|
||||
taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
|
||||
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
|
||||
pScanInfo->lastKey, pTsdbReader->idStr);
|
||||
|
@ -1795,21 +1797,21 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
|||
|
||||
while (1) {
|
||||
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
||||
if (!hasVal) {
|
||||
if (!hasVal) { // the next value will be the accessed key in stt
|
||||
pScanInfo->lastKeyInStt += step;
|
||||
return false;
|
||||
}
|
||||
|
||||
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
TSDBKEY k = TSDBROW_KEY(&row);
|
||||
if (hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
|
||||
pScanInfo->lastKey = k.ts;
|
||||
} else {
|
||||
pScanInfo->lastKeyInStt = k.ts;
|
||||
|
||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
|
||||
// the qualifed ts may equal to k.ts, only a greater version one.
|
||||
// here we need to fallback one step.
|
||||
if (pScanInfo->lastKey == k.ts) {
|
||||
pScanInfo->lastKey -= step;
|
||||
}
|
||||
|
||||
// if (pScanInfo->lastKey == k.ts) {
|
||||
// pScanInfo->lastKey -= step;
|
||||
// }
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1949,7 +1951,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
}
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == k.ts) {
|
||||
|
@ -1997,7 +1999,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
}
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == key) {
|
||||
|
@ -2050,7 +2052,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
|||
|
||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
tsdbRowMerge(&merge, &fRow1);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
|
||||
|
||||
code = tsdbRowMergerGetRow(&merge, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2068,7 +2070,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
|||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
|
||||
|
||||
// merge with block data if ts == key
|
||||
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
|
||||
|
@ -2121,7 +2123,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
|||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
tsdbRowMerge(&merge, &fRow1);
|
||||
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
|
||||
|
||||
code = tsdbRowMergerGetRow(&merge, &pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2230,7 +2232,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == ik.ts) {
|
||||
|
@ -2321,7 +2323,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
}
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange);
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
|
||||
}
|
||||
|
||||
if (minKey == key) {
|
||||
|
@ -2472,9 +2474,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
|||
int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1;
|
||||
STimeWindow w = pLBlockReader->window;
|
||||
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
|
||||
w.skey = pScanInfo->lastKey + step;
|
||||
w.skey = pScanInfo->lastKeyInStt;
|
||||
} else {
|
||||
w.ekey = pScanInfo->lastKey + step;
|
||||
w.ekey = pScanInfo->lastKeyInStt;
|
||||
}
|
||||
|
||||
tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
|
||||
|
@ -3567,13 +3569,16 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
}
|
||||
|
||||
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange) {
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
|
||||
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
|
||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
if (next1 == ts) {
|
||||
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
tsdbRowMerge(pMerger, &fRow1);
|
||||
} else {
|
||||
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
||||
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
|
||||
idStr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue