refactor(stream): scan wal is driven by time, instead of insert events.

This commit is contained in:
Haojun Liao 2025-02-22 12:33:32 +08:00
parent 2228a9fc33
commit 09b6642e45
8 changed files with 156 additions and 144 deletions

View File

@ -239,7 +239,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, tmsg_t msgType); int tqPushMsg(STQ*, tmsg_t msgType);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
void tqUnregisterPushHandle(STQ* pTq, void* pHandle); void tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqScanWalAsync(STQ* pTq, bool ckPause); int tqScanWalAsync(STQ* pTq);
int32_t tqStopStreamTasksAsync(STQ* pTq); int32_t tqStopStreamTasksAsync(STQ* pTq);
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);

View File

@ -920,12 +920,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
// now the fill-history task starts to scan data from wal files. // now the fill-history task starts to scan data from wal files.
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
if (code == TSDB_CODE_SUCCESS) { // if (code == TSDB_CODE_SUCCESS) {
code = tqScanWalAsync(pTq, false); // code = tqScanWalAsync(pTq, false);
if (code) { // if (code) {
tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code)); // tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
} // }
} // }
} }
} }
@ -1122,12 +1122,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
} }
// let's continue scan data in the wal files // let's continue scan data in the wal files
if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) { // if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
code = tqScanWalAsync(pTq, false); // it's ok to failed // code = tqScanWalAsync(pTq, false); // it's ok to failed
if (code) { // if (code) {
tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); // tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
} // }
} // }
return code; return code;
} }

View File

@ -49,20 +49,6 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
} }
} }
streamMetaRLock(pTq->pStreamMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
streamMetaRUnLock(pTq->pStreamMeta);
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
// push data for stream processing:
// 1. the vnode has already been restored.
// 2. the vnode should be the leader.
// 3. the stream is not suspended yet.
if ((!tsDisableStream) && (numOfTasks > 0)) {
code = tqScanWalAsync(pTq, true);
}
return code; return code;
} }

View File

@ -21,18 +21,16 @@
typedef struct SBuildScanWalMsgParam { typedef struct SBuildScanWalMsgParam {
int64_t metaId; int64_t metaId;
int32_t numOfTasks; // int8_t restored;
int8_t restored;
SMsgCb msgCb; SMsgCb msgCb;
} SBuildScanWalMsgParam; } SBuildScanWalMsgParam;
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta); static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask); static bool taskReadyForDataFromWal(SStreamTask* pTask);
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); static int32_t tqScanWalInFuture(STQ* pTq);
static int32_t doScanWalAsync(STQ* pTq, bool ckPause);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) { int32_t tqScanWal(STQ* pTq) {
@ -40,44 +38,38 @@ int32_t tqScanWal(STQ* pTq) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t numOfTasks = 0; int32_t numOfTasks = 0;
int64_t el = 0;
int32_t code = 0;
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanCounter, 0, 1);
if (old == 0) {
// check all tasks tqDebug("vgId:%d try to scan wal to extract data", vgId);
int32_t code = doScanWalForAllTasks(pMeta); } else {
if (code) { tqDebug("vgId:%d already in wal scan, abort", vgId);
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
return code; return code;
} }
streamMetaWLock(pMeta); // check all tasks
int32_t times = (--pMeta->scanInfo.scanCounter); code = doScanWalForAllTasks(pMeta, &numOfTasks);
if (times < 0) {
tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times);
times = 0;
}
numOfTasks = taosArrayGetSize(pMeta->pTaskList); el = (taosGetTimestampMs() - st);
streamMetaWUnLock(pMeta);
int64_t el = (taosGetTimestampMs() - st);
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 " ms", vgId, el);
if (times > 0) {
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION);
if (code) { if (code) {
tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION); tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
} tstrerror(code));
} else {
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms, next scan start in %dms", vgId,
el, SCAN_WAL_IDLE_DURATION);
} }
atomic_store_32(&pMeta->scanInfo.scanCounter, 0);
return code; return code;
} }
static void doStartScanWal(void* param, void* tmrId) { static void doStartScanWal(void* param, void* tmrId) {
int32_t vgId = 0; int32_t vgId = 0;
int32_t code = 0; int32_t code = 0;
SVnode* pVnode = NULL;
int32_t numOfTasks = 0;
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
@ -102,71 +94,93 @@ static void doStartScanWal(void* param, void* tmrId) {
vgId = pMeta->vgId; vgId = pMeta->vgId;
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, if (pMeta->role == NODE_ROLE_FOLLOWER) {
pParam->restored); tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
#if 0
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code == TSDB_CODE_SUCCESS) {
tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId);
} else {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));
}
taosMemFree(pParam);
return;
}
if (pMeta->startInfo.startAllTasks) {
tqTrace("vgId:%d in restart procedure, not ready to scan wal", vgId);
goto _end;
}
streamMetaRLock(pMeta);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
streamMetaRUnLock(pMeta);
if (numOfTasks == 0) {
goto _end;
}
tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
// #if 0
// wait for the vnode is freed, and invalid read may occur. // wait for the vnode is freed, and invalid read may occur.
taosMsleep(10000); taosMsleep(10000);
#endif // #endif
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
if (code) { if (code) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
} }
_end:
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, &pMeta->scanInfo.scanTimer, &pMeta->scanInfo.scanTimer,
vgId, "scan-wal");
code = taosReleaseRef(streamMetaRefPool, pParam->metaId); code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code) { if (code) {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code)); tstrerror(code));
} }
taosMemoryFree(pParam);
} }
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { int32_t tqScanWalAsync(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = 0; int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
tmr_h pTimer = NULL; tmr_h pTimer = NULL;
SBuildScanWalMsgParam* pParam = NULL; SBuildScanWalMsgParam* pParam = NULL;
// 1. the vnode should be the leader.
// 2. the stream isn't disabled
if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) {
tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
return TSDB_CODE_SUCCESS;
}
pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam)); pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
if (pParam == NULL) { if (pParam == NULL) {
tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code));
return terrno; return terrno;
} }
pParam->metaId = pMeta->rid; pParam->metaId = pMeta->rid;
pParam->numOfTasks = numOfTasks;
pParam->restored = pTq->pVnode->restored;
pParam->msgCb = pTq->pVnode->msgCb; pParam->msgCb = pTq->pVnode->msgCb;
code = streamTimerGetInstance(&pTimer); code = streamTimerGetInstance(&pTimer);
if (code) { if (code) {
tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId); tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
taosMemoryFree(pParam); taosMemoryFree(pParam);
} else { } else {
streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut"); // todo: start in 1sec for the first time
streamTmrStart(doStartScanWal, 1000, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
"scan-wal");
} }
return code; return code;
} }
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
SStreamMeta* pMeta = pTq->pStreamMeta;
bool alreadyRestored = pTq->pVnode->restored;
int32_t code = 0;
// do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
return TSDB_CODE_SUCCESS;
}
streamMetaWLock(pMeta);
code = doScanWalAsync(pTq, ckPause);
streamMetaWUnLock(pMeta);
return code;
}
int32_t tqStopStreamTasksAsync(STQ* pTq) { int32_t tqStopStreamTasksAsync(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
@ -347,13 +361,10 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt
return code; return code;
} }
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
int32_t vgId = pStreamMeta->vgId; int32_t vgId = pStreamMeta->vgId;
SArray* pTaskList = NULL; SArray* pTaskList = NULL;
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); int32_t numOfTasks = 0;
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
// clone the task list, to avoid the task update during scan wal files // clone the task list, to avoid the task update during scan wal files
streamMetaWLock(pStreamMeta); streamMetaWLock(pStreamMeta);
@ -364,10 +375,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
return terrno; return terrno;
} }
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
// update the new task number // update the new task number
numOfTasks = taosArrayGetSize(pTaskList); numOfTasks = taosArrayGetSize(pTaskList);
if (pNumOfTasks != NULL) {
*pNumOfTasks = numOfTasks;
}
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pTaskId = taosArrayGet(pTaskList, i); STaskId* pTaskId = taosArrayGet(pTaskList, i);
@ -426,51 +440,59 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doScanWalAsync(STQ* pTq, bool ckPause) { // int32_t doScanWalAsync(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta; // SStreamMeta* pMeta = pTq->pStreamMeta;
bool alreadyRestored = pTq->pVnode->restored; // bool alreadyRestored = pTq->pVnode->restored;
int32_t vgId = pMeta->vgId; // int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); // int32_t code = 0;
// int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) { //
tqDebug("vgId:%d no stream tasks existed to run", vgId); // // if (numOfTasks == 0) {
return 0; //// tqDebug("vgId:%d no stream tasks existed to run", vgId);
} //// return 0;
//// }
if (pMeta->startInfo.startAllTasks) { //
tqTrace("vgId:%d in restart procedure, not scan wal", vgId); //// if (pMeta->startInfo.startAllTasks) {
return 0; //// tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
} //// return 0;
//// }
pMeta->scanInfo.scanCounter += 1; //
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { //// pMeta->scanInfo.scanCounter += 1;
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; //// if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
} //// pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
//// }
if (pMeta->scanInfo.scanCounter > 1) { //
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); //// if (pMeta->scanInfo.scanCounter > 1) {
return 0; //// tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
} //// return 0;
//// }
int32_t numOfPauseTasks = pMeta->numOfPausedTasks; //
if (ckPause && numOfTasks == numOfPauseTasks) { //// int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); //// if (ckPause && numOfTasks == numOfPauseTasks) {
//// tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
// reset the counter value, since we do not launch the scan wal operation. ////
pMeta->scanInfo.scanCounter = 0; //// reset the counter value, since we do not launch the scan wal operation.
return 0; //// pMeta->scanInfo.scanCounter = 0;
} //// return 0;
//// }
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, //
numOfTasks, alreadyRestored); //// tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
//// numOfTasks, alreadyRestored);
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); ////
} //// code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
//// if (code) {
//// tqError("vgId:%d failed create msg to scan data in wal, retry in %dms", vgId, SCAN_WAL_IDLE_DURATION);
//// }
//
// code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION);
// if (code) {
// tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION);
// }
//}
void streamMetaFreeTQDuringScanWalError(STQ* pTq) { void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam)); SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
p->metaId = pTq->pStreamMeta->rid; p->metaId = pTq->pStreamMeta->rid;
p->numOfTasks = 0;
doStartScanWal(p, 0); doStartScanWal(p, 0);
} }

View File

@ -47,6 +47,10 @@ END:
void tqUpdateNodeStage(STQ* pTq, bool isLeader) { void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
SSyncState state = syncGetState(pTq->pVnode->sync); SSyncState state = syncGetState(pTq->pVnode->sync);
streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader); streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
if (isLeader) {
tqScanWalAsync(pTq);
}
} }
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {

View File

@ -938,10 +938,10 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
if (scanWal && (vgId != SNODE_HANDLE)) { // if (scanWal && (vgId != SNODE_HANDLE)) {
tqDebug("vgId:%d start scan wal for executing tasks", vgId); // tqDebug("vgId:%d start scan wal for executing tasks", vgId);
code = tqScanWalAsync(pMeta->ahandle, true); // code = tqScanWalAsync(pMeta->ahandle, true);
} // }
return code; return code;
} }
@ -1170,7 +1170,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
pTask->hTaskInfo.operatorOpen = false; pTask->hTaskInfo.operatorOpen = false;
code = streamStartScanHistoryAsync(pTask, igUntreated); code = streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
code = tqScanWalAsync((STQ*)handle, false); // code = tqScanWalAsync((STQ*)handle, false);
} else { } else {
code = streamTrySchedExec(pTask); code = streamTrySchedExec(pTask);
} }

View File

@ -341,7 +341,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role); stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role);
} else { } else {
stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid); stError("vgId:%d role:%d not leader not send hb to mnode, failed to release meta rid:%" PRId64, vgId, role, rid);
} }
// taosMemoryFree(param); // taosMemoryFree(param);
return; return;

View File

@ -1320,7 +1320,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
// mark the sign to send msg before close all tasks // mark the sign to send msg before close all tasks
// 1. for leader vnode, always send msg before closing // 1. for leader vnode, always send msg before closing
// 2. for follower vnode, if it's is changed from leader, also sending msg before closing. // 2. for follower vnode, if it's changed from leader, also sending msg before closing.
if (pMeta->role == NODE_ROLE_LEADER) { if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->sendMsgBeforeClosing = true; pMeta->sendMsgBeforeClosing = true;
} }