fix(stream): not restart tasks when in restoring procedure.

This commit is contained in:
Haojun Liao 2023-08-25 11:28:28 +08:00
parent 38bb123701
commit 4c7ad54c9c
1 changed files with 23 additions and 20 deletions

View File

@ -1884,30 +1884,33 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
_end:
tDecoderClear(&decoder);
// tmsgSendRsp(&rsp);
if (restartTasks) {
tqDebug("vgId:%d all tasks are stopped, restart them", vgId);
taosWLockLatch(&pMeta->lock);
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
} else {
tqDebug("vgId:%d all tasks are stopped, restart them", vgId);
taosWLockLatch(&pMeta->lock);
terrno = 0;
int32_t code = streamMetaReopen(pMeta, 0);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
taosWUnLockLatch(&pMeta->lock);
return -1;
}
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
taosWUnLockLatch(&pMeta->lock);
return -1;
}
terrno = 0;
int32_t code = streamMetaReopen(pMeta, 0);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
taosWUnLockLatch(&pMeta->lock);
return -1;
}
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
taosWUnLockLatch(&pMeta->lock);
return -1;
}
taosWUnLockLatch(&pMeta->lock);
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d, restart all stream tasks", vgId);
tqCheckStreamStatus(pTq);
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d, restart all stream tasks", vgId);
tqCheckStreamStatus(pTq);
}
}
}