diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 93e0064192..cb616f7afc 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -41,5 +41,6 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode); +void tqSetRestoreVersionInfo(SStreamTask* pTask); #endif // TDENGINE_TQ_COMMON_H diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f17716eda0..b717504e1e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -58,17 +58,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer streamSetupScheduleTrigger(pTask); SCheckpointInfo *pChkInfo = &pTask->chkInfo; - - // checkpoint ver is the kept version, handled data should be the next version. - if (pChkInfo->checkpointId != 0) { - pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; - pChkInfo->processedVer = pChkInfo->checkpointVer; - pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; - pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; - - sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, - pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); - } + tqSetRestoreVersionInfo(pTask); char* p = streamTaskGetStatus(pTask)->name; if (pTask->info.fillHistory) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8edc0fed4d..21ca3290a0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -760,16 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; - - // checkpoint ver is the kept version, handled data should be the next version. - if (pChkInfo->checkpointId != 0) { - pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; - pChkInfo->processedVer = pChkInfo->checkpointVer; - pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; - pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; - tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, - pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); - } + tqSetRestoreVersionInfo(pTask); char* p = streamTaskGetStatus(pTask)->name; const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index d2c7924cf5..4ce8579ea0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -86,6 +86,22 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) return TSDB_CODE_SUCCESS; } +void tqSetRestoreVersionInfo(SStreamTask* pTask) { + SCheckpointInfo* pChkInfo = &pTask->chkInfo; + + // checkpoint ver is the kept version, handled data should be the next version. + if (pChkInfo->checkpointId != 0) { + pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; + pChkInfo->processedVer = pChkInfo->checkpointVer; + pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; + + tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, + pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); + } + + pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; +} + int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);