From 4c7ad54c9c90aa84b26b2fe54c709604c954f4c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Aug 2023 11:28:28 +0800 Subject: [PATCH] fix(stream): not restart tasks when in restoring procedure. --- source/dnode/vnode/src/tq/tq.c | 43 ++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 98bc87ecee..044b353ddd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1884,30 +1884,33 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { _end: tDecoderClear(&decoder); - // tmsgSendRsp(&rsp); if (restartTasks) { - tqDebug("vgId:%d all tasks are stopped, restart them", vgId); - taosWLockLatch(&pMeta->lock); + if (!pTq->pVnode->restored) { + tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); + } else { + tqDebug("vgId:%d all tasks are stopped, restart them", vgId); + taosWLockLatch(&pMeta->lock); + + terrno = 0; + int32_t code = streamMetaReopen(pMeta, 0); + if (code != 0) { + tqError("vgId:%d failed to reopen stream meta", vgId); + taosWUnLockLatch(&pMeta->lock); + return -1; + } + + if (streamLoadTasks(pTq->pStreamMeta) < 0) { + tqError("vgId:%d failed to load stream tasks", vgId); + taosWUnLockLatch(&pMeta->lock); + return -1; + } - terrno = 0; - int32_t code = streamMetaReopen(pMeta, 0); - if (code != 0) { - tqError("vgId:%d failed to reopen stream meta", vgId); taosWUnLockLatch(&pMeta->lock); - return -1; - } - - if (streamLoadTasks(pTq->pStreamMeta) < 0) { - tqError("vgId:%d failed to load stream tasks", vgId); - taosWUnLockLatch(&pMeta->lock); - return -1; - } - - taosWUnLockLatch(&pMeta->lock); - if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - vInfo("vgId:%d, restart all stream tasks", vgId); - tqCheckStreamStatus(pTq); + if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { + vInfo("vgId:%d, restart all stream tasks", vgId); + tqCheckStreamStatus(pTq); + } } }