fix(stream): set the stream task load flag.

This commit is contained in:
Haojun Liao 2024-05-13 16:45:38 +08:00
parent 3dfffe9113
commit b7e7de0354
2 changed files with 21 additions and 5 deletions

View File

@ -491,6 +491,7 @@ typedef struct SStreamMeta {
int32_t vgId;
int64_t stage;
int32_t role;
bool taskLoadFlag;
bool closeFlag;
bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
STaskStartInfo startInfo;

View File

@ -576,22 +576,37 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
if (tsDisableStream) {
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else {
vInfo("vgId:%d sync restore finished, start to load and launch stream task(s)", pVnode->config.vgId);
vInfo("vgId:%d sync restore finished, start to load and launch stream task(s)", vgId);
if (pMeta->startInfo.startAllTasks == 1) {
pMeta->startInfo.restartCount += 1;
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount);
} else {
pMeta->startInfo.startAllTasks = 1;
bool loadTaskInfo = pMeta->taskLoadFlag;
pMeta->taskLoadFlag = true;
streamMetaWUnLock(pMeta);
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS);
if (loadTaskInfo) {
tqInfo("vgId:%d stream task already loaded, start them", vgId);
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
} else {
tqInfo("vgId:%d start load and launch stream task(s)", vgId);
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS);
}
return;
}
}
} else {
vInfo("vgId:%d, sync restore finished, load stream tasks, not start tasks since not leader", vgId);
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_ALL_TASKS);
if (!pMeta->taskLoadFlag) {
pMeta->taskLoadFlag = true;
vInfo("vgId:%d, sync restore finished, load stream tasks, not start tasks since not leader", vgId);
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_ALL_TASKS);
} else {
vInfo("vgId:%d, sync restore finished, not load stream tasks since already loaded for follower");
}
}
streamMetaWUnLock(pMeta);