fix(stream): fix deadlock.

This commit is contained in:
Haojun Liao 2024-01-07 14:53:17 +08:00
parent 29219ba9f4
commit a024fe960c
1 changed files with 4 additions and 5 deletions

View File

@ -567,31 +567,30 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
if (vnodeIsRoleLeader(pVnode)) { if (vnodeIsRoleLeader(pVnode)) {
// start to restore all stream tasks // start to restore all stream tasks
if (tsDisableStream) { if (tsDisableStream) {
streamMetaWUnLock(pMeta);
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId);
int32_t numOfTasks = 0; int32_t numOfTasks = 0;
tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta, &numOfTasks); tqStreamTaskResetStatus(pMeta, &numOfTasks);
if (numOfTasks > 0) { if (numOfTasks > 0) {
if (pMeta->startInfo.taskStarting == 1) { if (pMeta->startInfo.taskStarting == 1) {
pMeta->startInfo.restartCount += 1; pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount); pMeta->startInfo.restartCount);
streamMetaWUnLock(pMeta);
} else { } else {
pMeta->startInfo.taskStarting = 1; pMeta->startInfo.taskStarting = 1;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
return;
} }
} }
} }
} else { } else {
streamMetaWUnLock(pMeta);
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
} }
streamMetaWUnLock(pMeta);
} }
static void vnodeBecomeFollower(const SSyncFSM *pFsm) { static void vnodeBecomeFollower(const SSyncFSM *pFsm) {