From 68b003c8043d652e261e09e4261b979834afabd9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 Sep 2023 01:37:27 +0800 Subject: [PATCH] fix(stream): fix race condition. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 12 ++++++------ source/dnode/vnode/src/vnd/vnodeSync.c | 7 +------ source/libs/stream/src/streamRecover.c | 22 ++++++++++++++-------- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 20cf7bb110..59bf050677 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -405,7 +405,7 @@ typedef struct SStreamMeta { int32_t vgId; int64_t stage; bool leader; - int8_t taskWillbeLaunched; + int8_t taskStartedByNodeUpdate; SRWLatch lock; int32_t walScanCounter; void* streamBackend; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9f678a5563..543289d621 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1830,15 +1830,16 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet); if (updateTasks < numOfTasks) { - pMeta->taskWillbeLaunched = 1; - - tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, updateTasks, (numOfTasks - updateTasks)); + pMeta->taskStartedByNodeUpdate = 1; + tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, + updateTasks, (numOfTasks - updateTasks)); taosWUnLockLatch(&pMeta->lock); } else { taosHashClear(pMeta->pUpdateTaskSet); if (!pTq->pVnode->restored) { - tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId); + tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); + pMeta->taskStartedByNodeUpdate = 0; taosWUnLockLatch(&pMeta->lock); } else { tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); @@ -1860,14 +1861,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { - vInfo("vgId:%d, restart all stream tasks", vgId); + vInfo("vgId:%d restart all stream tasks", vgId); tqStartStreamTasks(pTq); tqCheckAndRunStreamTaskAsync(pTq); } else { vInfo("vgId:%d, follower node not start stream tasks", vgId); } - pMeta->taskWillbeLaunched = 0; taosWUnLockLatch(&pMeta->lock); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index cf299dc79c..2d96bcfffd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -551,13 +551,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; - if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) { - vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); - return; - } - taosWLockLatch(&pVnode->pTq->pStreamMeta->lock); - if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) { + if (pVnode->pTq->pStreamMeta->taskStartedByNodeUpdate) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); return; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index ff1728a6eb..3ae50b5b2b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -118,8 +118,8 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { // check status static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { - SDataRange* pRange = &pTask->dataRange; - STimeWindow* pWindow = &pRange->window; + SDataRange* pRange = &pTask->dataRange; + STimeWindow* pWindow = &pRange->window; SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -136,10 +136,10 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { req.downstreamTaskId = pTask->fixedDispatcher.taskId; pTask->checkReqId = req.reqId; - stDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 - "-%" PRId64 ", stage:%"PRId64" req:0x%" PRIx64, - pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, - pWindow->skey, pWindow->ekey, req.stage, req.reqId); + stDebug("s-task:%s stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 + " window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64, + pTask->id.idStr, req.reqId, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, + pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); streamSendCheckMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -158,8 +158,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, stage:%" PRId64, - pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i, req.stage); + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 "check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { @@ -907,6 +907,12 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) { stDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str); return; } +// +// if (pTask->status.downstreamReady == 0) { +// ASSERT(pTask->execInfo.start == 0); +// stDebug("s-task:%s in check downstream procedure, abort and paused", pTask->id.idStr); +// break; +// } const char* pStatus = streamGetTaskStatusStr(status); stDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId);