From f9a5485d52a265b0afa81b3b1d2b28807c16c605 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 16 Dec 2023 01:46:29 +0800 Subject: [PATCH 1/2] fix(stream): refactor the stream trans conflict detection strategy. --- source/dnode/mnode/impl/inc/mndStream.h | 15 ++++- source/dnode/mnode/impl/src/mndStream.c | 60 ++++++++------------ source/dnode/mnode/impl/src/mndStreamTrans.c | 42 +++++++------- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 +- 4 files changed, 62 insertions(+), 60 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index e3ac4fd6fc..b0f6273acc 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -24,8 +24,9 @@ extern "C" { typedef struct SStreamTransInfo { int64_t startTime; - int32_t transId; + int64_t streamUid; const char *name; + int32_t transId; } SStreamTransInfo; // time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard @@ -52,6 +53,14 @@ typedef struct SStreamExecInfo { TdThreadMutex lock; } SStreamExecInfo; +#define MND_STREAM_CREATE_NAME "stream-create" +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" +#define MND_STREAM_PAUSE_NAME "stream-pause" +#define MND_STREAM_RESUME_NAME "stream-resume" +#define MND_STREAM_DROP_NAME "stream-drop" +#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" +#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" + extern SStreamExecInfo execInfo; int32_t mndInitStream(SMnode *pMnode); @@ -61,9 +70,9 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pName, const char *pSrcDb, const char *pDstDb); +int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid); int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId); -bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb, bool lock); +bool streamTransConflictOtherTrans(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock); // for sma // TODO refactor diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a2f9c40c58..5bea917e38 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -16,12 +16,9 @@ #include "mndStream.h" #include "audit.h" #include "mndDb.h" -#include "mndDnode.h" -#include "mndMnode.h" #include "mndPrivilege.h" #include "mndScheduler.h" #include "mndShow.h" -#include "mndSnode.h" #include "mndStb.h" #include "mndTrans.h" #include "mndVgroup.h" @@ -34,13 +31,6 @@ #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_MAX_NUM 60 -#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" -#define MND_STREAM_PAUSE_NAME "stream-pause" -#define MND_STREAM_RESUME_NAME "stream-resume" -#define MND_STREAM_DROP_NAME "stream-drop" -#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" -#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" - typedef struct SNodeEntry { int32_t nodeId; bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. @@ -839,7 +829,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_CREATE_NAME); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); goto _OVER; @@ -1030,7 +1020,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre return -1; } - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, true); if (conflict) { mndAddtoCheckpointWaitingList(pStream, checkpointId); mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb, @@ -1038,14 +1028,14 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, MND_STREAM_CHECKPOINT_NAME); if (pTrans == NULL) { return -1; } - mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->sourceDb, pStream->targetDb); + mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTrancCheckConflict(pMnode, pTrans) != 0) { mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); @@ -1382,14 +1372,14 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. -// bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); -// if (conflict) { -// sdbRelease(pMnode->pSdb, pStream); -// tFreeMDropStreamReq(&dropReq); -// return -1; -// } + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true); + if (conflict) { + sdbRelease(pMnode->pSdb, pStream); + tFreeMDropStreamReq(&dropReq); + return -1; + } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_DROP_NAME); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1399,7 +1389,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTransCheckConflict(pMnode, pTrans) != 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1407,7 +1397,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->sourceDb, pStream->targetDb); + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid); // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { @@ -1866,7 +1856,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true); if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; @@ -1878,7 +1868,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream"); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_PAUSE_NAME); if (pTrans == NULL) { mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1887,14 +1877,14 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTransCheckConflict(pMnode, pTrans) != 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb); + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid); // if nodeUpdate happened, not send pause trans if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) { @@ -2001,13 +1991,13 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true); if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_RESUME_NAME); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_RESUME_NAME); if (pTrans == NULL) { mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -2016,14 +2006,14 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTransCheckConflict(pMnode, pTrans) != 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->sourceDb, pStream->targetDb); + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); // resume all tasks if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { @@ -2660,7 +2650,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, name); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name); if (pTrans == NULL) { mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -2669,7 +2659,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTransCheckConflict(pMnode, pTrans) != 0) { terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno)); @@ -2788,7 +2778,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { break; } - bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, false); + bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); if (conflict) { mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name, pStream->sourceDb, pStream->targetDb); diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 43e85a9405..0db3cb0a8a 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -23,21 +23,17 @@ typedef struct SKeyInfo { static int32_t clearFinishedTrans(SMnode* pMnode); -int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb) { - SStreamTransInfo info = {.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pName}; - taosHashPut(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb), &info, sizeof(SStreamTransInfo)); - - if (strcmp(pSrcDb, pDstDb) != 0) { - taosHashPut(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb), &info, sizeof(SStreamTransInfo)); - } - +int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid) { + SStreamTransInfo info = { + .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamUid = streamUid}; + taosHashPut(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid), &info, sizeof(SStreamTransInfo)); return 0; } int32_t clearFinishedTrans(SMnode* pMnode) { size_t keyLen = 0; - SArray* pList = taosArrayInit(4, sizeof(SKeyInfo)); void* pIter = NULL; + SArray* pList = taosArrayInit(4, sizeof(SKeyInfo)); while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter; @@ -69,7 +65,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) { return 0; } -bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb, bool lock) { +bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char* pTransName, bool lock) { if (lock) { taosThreadMutexLock(&execInfo.lock); } @@ -84,22 +80,26 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const cha clearFinishedTrans(pMnode); - SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb)); + SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid)); if (pEntry != NULL) { - if (lock) { - taosThreadMutexUnlock(&execInfo.lock); - } - mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); - return true; - } + SStreamTransInfo tInfo = *pEntry; - pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb)); - if (pEntry != NULL) { if (lock) { taosThreadMutexUnlock(&execInfo.lock); } - mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); - return true; + + if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { + if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) { + mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, + tInfo.name); + return true; + } + } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || + (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0)) { + mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, + tInfo.name); + return true; + } } if (lock) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a106fe148b..9421b6b1af 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -617,6 +617,9 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen streamMetaReleaseTask(pMeta, pTask); } + // TODO: send the checkpoint complete msg if it is in checkpoint procedure. + + // drop the stream task now streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); @@ -628,8 +631,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen if (streamMetaCommit(pMeta) < 0) { // persist to disk } - streamMetaWUnLock(pMeta); + streamMetaWUnLock(pMeta); return 0; } From 4cc0f5be5961db9dfc5459128bf026224a12bcde Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 16 Dec 2023 15:37:17 +0800 Subject: [PATCH 2/2] fix(tsdb): add desc check for clean file block. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 67 ++++++++++++------------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 4cca39f220..3f5edc6288 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2347,53 +2347,50 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf } static int32_t buildComposedDataBlock(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - int64_t st = taosGetTimestampUs(); - int32_t step = asc ? 1 : -1; - double el = 0; - + int32_t code = TSDB_CODE_SUCCESS; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int64_t st = taosGetTimestampUs(); + int32_t step = asc ? 1 : -1; + double el = 0; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; - SBrinRecord* pRecord = &pBlockInfo->record; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + SBrinRecord* pRecord = NULL; STableBlockScanInfo* pBlockScanInfo = NULL; - if (pBlockInfo != NULL) { - if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { - setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order); - return code; - } + if (pBlockInfo == NULL) { + return 0; + } - pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); - if (pBlockScanInfo == NULL) { - goto _end; - } + pRecord = &pBlockInfo->record; - pRecord = &pBlockInfo->record; - TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); + if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { + setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order); + return code; + } - // it is a clean block, load it directly - int64_t cap = pReader->resBlockInfo.capacity; - if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= cap)) { - if (asc || (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) { - code = copyBlockDataToSDataBlock(pReader); - if (code) { - goto _end; - } + pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); + if (pBlockScanInfo == NULL) { + goto _end; + } - // record the last key value - pBlockScanInfo->lastProcKey = asc ? pRecord->lastKey : pRecord->firstKey; + pRecord = &pBlockInfo->record; + TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); + + // it is a clean block, load it directly + int64_t cap = pReader->resBlockInfo.capacity; + if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= cap)) { + if (((asc && (pRecord->firstKey < keyInBuf.ts)) || (!asc && (pRecord->lastKey > keyInBuf.ts))) && + (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) { + code = copyBlockDataToSDataBlock(pReader); + if (code) { goto _end; } - } - } else { // file blocks not exist - ASSERT(0); - pBlockScanInfo = *pReader->status.pTableIter; - if (pReader->pIgnoreTables && - taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) { - return code; + + // record the last key value + pBlockScanInfo->lastProcKey = asc ? pRecord->lastKey : pRecord->firstKey; + goto _end; } }