From b7e7de0354709de9f422d071464085e935b3227a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 13 May 2024 16:45:38 +0800 Subject: [PATCH] fix(stream): set the stream task load flag. --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/vnd/vnodeSync.c | 25 ++++++++++++++++++++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bddc76cfb5..9e376b9792 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -491,6 +491,7 @@ typedef struct SStreamMeta { int32_t vgId; int64_t stage; int32_t role; + bool taskLoadFlag; bool closeFlag; bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower. STaskStartInfo startInfo; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d2c20500be..00203c7bb1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -576,22 +576,37 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) if (tsDisableStream) { vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); } else { - vInfo("vgId:%d sync restore finished, start to load and launch stream task(s)", pVnode->config.vgId); + vInfo("vgId:%d sync restore finished, start to load and launch stream task(s)", vgId); if (pMeta->startInfo.startAllTasks == 1) { pMeta->startInfo.restartCount += 1; - tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, + vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, pMeta->startInfo.restartCount); } else { pMeta->startInfo.startAllTasks = 1; + + bool loadTaskInfo = pMeta->taskLoadFlag; + pMeta->taskLoadFlag = true; streamMetaWUnLock(pMeta); - streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS); + if (loadTaskInfo) { + tqInfo("vgId:%d stream task already loaded, start them", vgId); + streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS); + } else { + tqInfo("vgId:%d start load and launch stream task(s)", vgId); + streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS); + } + return; } } } else { - vInfo("vgId:%d, sync restore finished, load stream tasks, not start tasks since not leader", vgId); - streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_ALL_TASKS); + if (!pMeta->taskLoadFlag) { + pMeta->taskLoadFlag = true; + vInfo("vgId:%d, sync restore finished, load stream tasks, not start tasks since not leader", vgId); + streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_ALL_TASKS); + } else { + vInfo("vgId:%d, sync restore finished, not load stream tasks since already loaded for follower"); + } } streamMetaWUnLock(pMeta);