From a3b04c3c582ee8c2cfd5686e44237b96ed7e084d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Feb 2024 14:50:09 +0800 Subject: [PATCH] fix(stream): set the processVer for streamTask when initializing the task object. --- source/dnode/snode/src/snode.c | 5 +++-- source/dnode/vnode/src/tq/tq.c | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f173c327c7..9a017e7074 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -85,8 +85,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer SCheckpointInfo *pChkInfo = &pTask->chkInfo; // checkpoint ver is the kept version, handled data should be the next version. - if (pTask->chkInfo.checkpointId != 0) { - pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; + if (pChkInfo->checkpointId != 0) { + pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; + pChkInfo->processedVer = pChkInfo->checkpointVer; sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bde6889ecd..e9d559481d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -835,6 +835,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { // checkpoint ver is the kept version, handled data should be the next version. if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; + pChkInfo->processedVer = pChkInfo->checkpointVer; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); }