diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2991e3cef5..f576345f64 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -545,7 +545,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId); if (p != NULL && restored && p->info.fillHistory == 0) { - EStreamTaskEvent event = /*(HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT; + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; streamTaskHandleEvent(p->status.pSM, event); } else if (!restored) { tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 048092131d..c41aea255d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -571,7 +571,20 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } else { vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta); - tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); + + { + streamMetaWLock(pMeta); + if (pMeta->startInfo.taskStarting == 1) { + pMeta->startInfo.restartCount += 1; + tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, + pMeta->startInfo.restartCount); + streamMetaWUnLock(pMeta); + } else { + pMeta->startInfo.taskStarting = 1; + streamMetaWUnLock(pMeta); + tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); + } + } } } else { vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);