fix:mem table has data in the fileset duration
This commit is contained in:
parent
4fb0f83f07
commit
5a75d74421
|
@ -59,6 +59,7 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
|
|||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
||||
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
||||
static void resetTableListIndex(SReaderStatus* pStatus);
|
||||
static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey);
|
||||
|
||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||
|
||||
|
@ -2549,8 +2550,19 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
|||
}
|
||||
|
||||
if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) {
|
||||
pReader->status.bProcMemPreFileset = true;
|
||||
resetTableListIndex(&pReader->status);
|
||||
if (pReader->bDurationOrder) {
|
||||
int32_t fid = pReader->status.pCurrentFileset->fid;
|
||||
STimeWindow win = {0};
|
||||
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
|
||||
|
||||
if ((ASCENDING_TRAVERSE(pReader->info.order) &&
|
||||
win.skey >= pReader->status.memTableMinKey && win.skey <= pReader->status.memTableMaxKey) ||
|
||||
(!ASCENDING_TRAVERSE(pReader->info.order) &&
|
||||
win.ekey >= pReader->status.memTableMinKey && win.ekey <= pReader->status.memTableMaxKey)) {
|
||||
pReader->status.bProcMemPreFileset = true;
|
||||
resetTableListIndex(&pReader->status);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -4243,6 +4255,10 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
|
||||
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
|
||||
pReader->pReadSnap = NULL;
|
||||
if (pReader->bDurationOrder) {
|
||||
pReader->status.memTableMinKey = INT64_MAX;
|
||||
pReader->status.memTableMaxKey = INT64_MIN;
|
||||
}
|
||||
pReader->flag = READER_STATUS_SUSPEND;
|
||||
|
||||
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
|
||||
|
@ -4314,6 +4330,10 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pReader->bDurationOrder) {
|
||||
getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey);
|
||||
}
|
||||
|
||||
pReader->flag = READER_STATUS_NORMAL;
|
||||
tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
|
||||
pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
|
||||
|
@ -4899,6 +4919,54 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
|
|||
return code;
|
||||
}
|
||||
|
||||
static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int64_t rows = 0;
|
||||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
int32_t iter = 0;
|
||||
int64_t maxKey = INT64_MIN;
|
||||
int64_t minKey = INT64_MAX;
|
||||
|
||||
pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter);
|
||||
while (pStatus->pTableIter != NULL) {
|
||||
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
||||
|
||||
STbData* d = NULL;
|
||||
if (pReader->pReadSnap->pMem != NULL) {
|
||||
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->info.suid, pBlockScanInfo->uid);
|
||||
if (d != NULL) {
|
||||
if (d->maxKey > maxKey) {
|
||||
maxKey = d->maxKey;
|
||||
}
|
||||
if (d->minKey < minKey) {
|
||||
minKey = d->minKey;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
STbData* di = NULL;
|
||||
if (pReader->pReadSnap->pIMem != NULL) {
|
||||
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->info.suid, pBlockScanInfo->uid);
|
||||
if (di != NULL) {
|
||||
if (d->maxKey > maxKey) {
|
||||
maxKey = d->maxKey;
|
||||
}
|
||||
if (d->minKey < minKey) {
|
||||
minKey = d->minKey;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// current table is exhausted, let's try the next table
|
||||
pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, pStatus->pTableIter, &iter);
|
||||
}
|
||||
|
||||
*pMaxKey = maxKey;
|
||||
*pMinKey = minKey;
|
||||
}
|
||||
|
||||
int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int64_t rows = 0;
|
||||
|
|
|
@ -207,6 +207,8 @@ typedef struct SReaderStatus {
|
|||
SRowMerger merger;
|
||||
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
|
||||
bool bProcMemPreFileset;
|
||||
int64_t memTableMaxKey;
|
||||
int64_t memTableMinKey;
|
||||
} SReaderStatus;
|
||||
|
||||
struct STsdbReader {
|
||||
|
|
Loading…
Reference in New Issue