From 2acc8423c5263fe57fc205a9e2f26447926e56e0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 21 Dec 2023 16:00:50 +0800 Subject: [PATCH] fix(stream): send msg to mnode before closing all tasks. --- include/libs/stream/tstream.h | 3 + source/dnode/vnode/src/tq/tqStreamTask.c | 13 ++- source/dnode/vnode/src/tq/tqUtil.c | 6 ++ source/libs/stream/src/streamMeta.c | 128 +++++++++++++++-------- 4 files changed, 101 insertions(+), 49 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e38a677c8b..9b96a41516 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -494,6 +494,7 @@ typedef struct SStreamMeta { int32_t vgId; int64_t stage; int32_t role; + bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower. STaskStartInfo startInfo; SRWLatch lock; int32_t walScanCounter; @@ -782,6 +783,8 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ int64_t* oldStage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); +SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); + bool streamTaskAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 1b0a76e81c..99cc3a36ea 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -126,17 +126,16 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t tqStopStreamTasks(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = TD_VID(pTq->pVnode); - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + int32_t num = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d stop all %d stream task(s)", vgId, numOfTasks); - if (numOfTasks == 0) { + tqDebug("vgId:%d stop all %d stream task(s)", vgId, num); + if (num == 0) { return TSDB_CODE_SUCCESS; } - SArray* pTaskList = NULL; - streamMetaWLock(pMeta); - pTaskList = taosArrayDup(pMeta->pTaskList, NULL); - streamMetaWUnLock(pMeta); + // send hb msg to mnode before closing all tasks. + SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); + int32_t numOfTasks = taosArrayGetSize(pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 110cf79b4e..d3ea1f19e5 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -40,6 +40,12 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { int64_t stage = pMeta->stage; pMeta->stage = state.term; + + // mark the sign to send msg before close all tasks + if ((!isLeader) && (pMeta->role == NODE_ROLE_LEADER)) { + pMeta->sendMsgBeforeClosing = true; + } + pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER; if (isLeader) { tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d6e1286093..355e17db9a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1051,43 +1051,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { taosThreadMutexUnlock(&pTask->lock); } -void metaHbToMnode(void* param, void* tmrId) { - int64_t rid = *(int64_t*)param; - - SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); - if (pMeta == NULL) { - return; - } - - // need to stop, stop now - if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { - pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; - stDebug("vgId:%d jump out of meta timer", pMeta->vgId); - taosReleaseRef(streamMetaId, rid); - return; - } - - // not leader not send msg - if (pMeta->role != NODE_ROLE_LEADER) { - stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); - taosReleaseRef(streamMetaId, rid); - pMeta->pHbInfo->hbStart = 0; - return; - } - - // set the hb start time - if (pMeta->pHbInfo->hbStart == 0) { - pMeta->pHbInfo->hbStart = taosGetTimestampMs(); - } - - if (!waitForEnoughDuration(pMeta->pHbInfo)) { - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); - taosReleaseRef(streamMetaId, rid); - return; - } - - stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); - +static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { SStreamHbMsg hbMsg = {0}; SEpSet epset = {0}; bool hasMnodeEpset = false; @@ -1181,23 +1145,62 @@ void metaHbToMnode(void* param, void* tmrId) { } tEncoderClear(&encoder); - SRpcMsg msg = { - .info.noResp = 1, - }; + SRpcMsg msg = {.info.noResp = 1}; initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); pMeta->pHbInfo->hbCount += 1; - stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, pMeta->pHbInfo->hbCount); + tmsgSendReq(&epset, &msg); } else { stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); } -_end: + _end: streamMetaClearHbMsg(&hbMsg); taosArrayDestroy(pIdList); + return TSDB_CODE_SUCCESS; +} + +void metaHbToMnode(void* param, void* tmrId) { + int64_t rid = *(int64_t*)param; + + SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); + if (pMeta == NULL) { + return; + } + + // need to stop, stop now + if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { + pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; + stDebug("vgId:%d jump out of meta timer", pMeta->vgId); + taosReleaseRef(streamMetaId, rid); + return; + } + + // not leader not send msg + if (pMeta->role != NODE_ROLE_LEADER) { + stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); + taosReleaseRef(streamMetaId, rid); + pMeta->pHbInfo->hbStart = 0; + return; + } + + // set the hb start time + if (pMeta->pHbInfo->hbStart == 0) { + pMeta->pHbInfo->hbStart = taosGetTimestampMs(); + } + + if (!waitForEnoughDuration(pMeta->pHbInfo)) { + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); + taosReleaseRef(streamMetaId, rid); + return; + } + + stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); + metaHeartbeatToMnodeImpl(pMeta); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } @@ -1320,3 +1323,44 @@ int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, voi schedMsg.msg = code; return taosScheduleTask(pMeta->qHandle, &schedMsg); } + +SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { + SArray* pTaskList = NULL; + bool sendMsg = false; + + streamMetaWLock(pMeta); + pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + sendMsg = pMeta->sendMsgBeforeClosing; + streamMetaWUnLock(pMeta); + + if (!sendMsg) { + stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId); + return pTaskList; + } + + stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId); + + // send hb msg to mnode before closing all tasks. + int32_t numOfTasks = taosArrayGetSize(pTaskList); + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + continue; + } + + taosThreadMutexLock(&pTask->lock); + ETaskStatus s = streamTaskGetStatus(pTask, NULL); + if (s == TASK_STATUS__CK) { + streamTaskSetCheckpointFailedId(pTask); + stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId); + } + + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + } + + metaHeartbeatToMnodeImpl(pMeta); + pMeta->sendMsgBeforeClosing = false; + return pTaskList; +}