From 686108050ba6bcde7946423ff1d6f9246dd9715f Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 10 Jan 2024 18:44:04 +0800 Subject: [PATCH] fix: table merge scan return disordered rows --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 15 ++++++++++++++- source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 1 + 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index b57d2dfb45..256ef10c5e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -73,6 +73,7 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order static void resetTableListIndex(SReaderStatus* pStatus); static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t* pMinKey); static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo); +static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader); static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } @@ -3095,6 +3096,17 @@ static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) { return TSDB_READ_RETURN; } + if (pReader->status.bProcMemPreFileset) { + code = buildFromPreFilesetBuffer(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pResBlock->info.rows > 0) { + pReader->status.processingMemPreFileSet = true; + return TSDB_READ_RETURN; + } + } + if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed. return TSDB_READ_CONTINUE; } else { // all blocks in data file are checked, let's check the data in last files @@ -4342,6 +4354,7 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { } else { tsdbDebug("finished pre-fileset %d buffer processing. %s", fid, pReader->idStr); pStatus->bProcMemPreFileset = false; + pStatus->processingMemPreFileSet = false; if (pReader->notifyFn) { STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; @@ -4374,7 +4387,7 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { pStatus->bProcMemFirstFileset, pReader->idStr); if (pStatus->bProcMemPreFileset) { if (pBlock->info.rows > 0) { - if (pReader->notifyFn) { + if (pReader->notifyFn && !pReader->status.processingMemPreFileSet) { int32_t fid = pReader->status.pCurrentFileset->fid; STsdReaderNotifyInfo info = {0}; info.duration.filesetId = fid; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 39e65f22b1..2157c7a423 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -219,6 +219,7 @@ typedef struct SReaderStatus { int64_t prevFilesetStartKey; int64_t prevFilesetEndKey; bool bProcMemFirstFileset; + bool processingMemPreFileSet; STableUidList procMemUidList; STableBlockScanInfo** pProcMemTableIter; } SReaderStatus;