fix(stream): avoid race condition in starting threads.

This commit is contained in:
Haojun Liao 2024-01-02 10:33:25 +08:00
parent 8ac02adc0b
commit d217412691
2 changed files with 15 additions and 2 deletions

View File

@ -545,7 +545,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId);
if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = /*(HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);

View File

@ -571,7 +571,20 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
} else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqStreamTaskResetStatus(pVnode->pTq->pStreamMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
{
streamMetaWLock(pMeta);
if (pMeta->startInfo.taskStarting == 1) {
pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount);
streamMetaWUnLock(pMeta);
} else {
pMeta->startInfo.taskStarting = 1;
streamMetaWUnLock(pMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
}
}
}
} else {
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);