From a71f414d97b76cfa9920353497959427ba8f1c4e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 2 Mar 2025 02:06:14 +0800 Subject: [PATCH 1/3] refactor(stream): stop all tasks before drop vnodes. --- include/common/streamMsg.h | 9 ++- include/common/tmsgdef.h | 1 + include/dnode/vnode/tqCommon.h | 1 + source/common/src/msg/streamMsg.c | 25 +++++++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mnode/impl/inc/mndStream.h | 2 + source/dnode/mnode/impl/src/mndDb.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 1 + .../dnode/mnode/impl/src/mndStreamTransAct.c | 72 +++++++++++++++++++ source/dnode/snode/src/snode.c | 2 + source/dnode/vnode/src/inc/vnodeInt.h | 3 +- source/dnode/vnode/src/tq/tq.c | 4 ++ source/dnode/vnode/src/tq/tqStreamTask.c | 6 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 27 +++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 6 ++ source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- 18 files changed, 158 insertions(+), 7 deletions(-) diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h index d410bd17e0..6fc24ccf2e 100644 --- a/include/common/streamMsg.h +++ b/include/common/streamMsg.h @@ -17,7 +17,6 @@ #define TDENGINE_STREAMMSG_H #include "tmsg.h" -//#include "trpc.h" #ifdef __cplusplus extern "C" { @@ -256,6 +255,14 @@ typedef struct { int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq); int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq); +typedef struct { + SMsgHead head; + int64_t streamId; +} SStreamTaskStopReq; + +int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq); +int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq); + #ifdef __cplusplus } #endif diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 29a7e52482..13a26910c1 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -401,6 +401,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_UNUSED, "vnd-stream-unused", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_ALL_STOP, "vnd-stream-allstop", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_VND_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8 diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 4d5e18520c..7aedcc720d 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -39,6 +39,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* msg); +int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); diff --git a/source/common/src/msg/streamMsg.c b/source/common/src/msg/streamMsg.c index 7e7952eb60..44c0703337 100644 --- a/source/common/src/msg/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -866,3 +866,28 @@ int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) { _exit: return code; } + +int32_t tEncodeStreamTaskStopReq(SEncoder* pEncoder, const SStreamTaskStopReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeStreamTaskStopReq(SDecoder* pDecoder, SStreamTaskStopReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + tEndDecode(pDecoder); + +_exit: + return code; + +} diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 46b0877476..300b89d00e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -258,6 +258,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_ALL_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index c2d146462a..024e2e4e99 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -96,6 +96,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_ALL_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 234d4f41e1..7862373b1c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1022,6 +1022,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_ALL_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index d694dc67eb..509dac7a53 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -151,6 +151,8 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask *p int32_t mndStreamSetRestartAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, int8_t mndTrigger); +int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid); + int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList); int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq); int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, int32_t taskId, int64_t checkpointId, diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index ebf7f86ea6..f4226d78a3 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1714,6 +1714,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { #endif TAOS_CHECK_GOTO(mndDropSmasByDb(pMnode, pTrans, pDb), NULL, _OVER); TAOS_CHECK_GOTO(mndDropIdxsByDb(pMnode, pTrans, pDb), NULL, _OVER); + TAOS_CHECK_GOTO(mndStreamSetStopStreamTasksActions(pMnode, pTrans, pDb->uid), NULL, _OVER); TAOS_CHECK_GOTO(mndSetDropDbRedoActions(pMnode, pTrans, pDb), NULL, _OVER); TAOS_CHECK_GOTO(mndUserRemoveDb(pMnode, pTrans, pDb->name), NULL, _OVER); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e14b0d390e..07ed823ae4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -116,6 +116,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_STREAM_ALL_STOP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_DROP_ORPHANTASKS, mndProcessDropOrphanTaskReq); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_TASK_RESET, mndProcessResetStatusReq); diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 5ccb626609..f0e4c7559d 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -666,3 +666,75 @@ int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pS return 0; } + +static int32_t doSetStopAllTasksAction(SMnode* pMnode, STrans* pTrans, SVgObj* pVgObj) { + void *pBuf = NULL; + int32_t len = 0; + int32_t code = 0; + SEncoder encoder; + + SStreamTaskStopReq req = {.streamId = -1}; + tEncodeSize(tEncodeStreamTaskStopReq, &req, len, code); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + int32_t tlen = sizeof(SMsgHead) + len; + + pBuf = taosMemoryMalloc(tlen); + if (pBuf == NULL) { + return terrno; + } + + void *abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, tlen); + code = tEncodeStreamTaskStopReq(&encoder, &req); + if (code == -1) { + tEncoderClear(&encoder); + taosMemoryFree(pBuf); + return code; + } + + SMsgHead *pMsgHead = (SMsgHead *)pBuf; + pMsgHead->contLen = htonl(tlen); + pMsgHead->vgId = htonl(pVgObj->vgId); + + tEncoderClear(&encoder); + + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + + code = setTransAction(pTrans, pBuf, tlen, TDMT_VND_STREAM_ALL_STOP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); + if (code != TSDB_CODE_SUCCESS) { + mError("failed to create stop all streams trans, code:%s", tstrerror(code)); + taosMemoryFree(pBuf); + } + + return 0; +} + +int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid) { + int32_t code = 0; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == dbUid) { + if ((code = doSetStopAllTasksAction(pMnode, pTrans, pVgroup)) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + TAOS_RETURN(code); + } + } + + sdbRelease(pSdb, pVgroup); + } + + TAOS_RETURN(code); + return 0; +} \ No newline at end of file diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6eee8c510b..88e52a5813 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -160,6 +160,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); case TDMT_VND_STREAM_TASK_RESET: return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont); + case TDMT_VND_STREAM_ALL_STOP: + return tqStreamTaskProcessAllTaskStopReq(pSnode->pMeta, &pSnode->msgCb, pMsg); case TDMT_STREAM_TASK_PAUSE: return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_RESUME: diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b13a66da99..fc5e657f38 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -243,13 +243,14 @@ int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); void tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqScanWalAsync(STQ* pTq, bool ckPause); -int32_t tqStopStreamTasksAsync(STQ* pTq); +int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessAllTaskStopReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b9de5dce52..ad05346bb3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1388,6 +1388,10 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont); } +int32_t tqProcessAllTaskStopReq(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamTaskProcessAllTaskStopReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg); +} + int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 9ea84830f1..166eef1860 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -167,10 +167,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return code; } -int32_t tqStopStreamTasksAsync(STQ* pTq) { - SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t vgId = pMeta->vgId; - return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS); +int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) { + return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS); } int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 0c4a3932b7..2b86ec8264 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -991,6 +991,33 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { return TSDB_CODE_SUCCESS; } +int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SRpcMsg* pMsg) { + int32_t code = 0; + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; + + SStreamTaskStopReq req = {0}; + tDecoderInit(&decoder, (uint8_t*)msg, len); + if ((code = tDecodeStreamTaskStopReq(&decoder, &req)) < 0) { + tqError("vgId:%d failed to decode stop all streams, code:%s", pMeta->vgId, tstrerror(code)); + tDecoderClear(&decoder); + return TSDB_CODE_SUCCESS; + } + + tDecoderClear(&decoder); + + // stop all stream tasks, only invoked when trying to drop db + if (req.streamId <= 0) { + tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId); + streamMetaStopAllTasks(pMeta); + } else { // stop only one stream tasks + + } + return TSDB_CODE_SUCCESS; +} + int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SRetrieveChkptTriggerReq req = {0}; SStreamTask* pTask = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index abaa61744d..d80842b949 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -750,6 +750,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } + } break; + case TDMT_VND_STREAM_ALL_STOP: { + if (pVnode->restored && vnodeIsLeader(pVnode) && (code = tqProcessAllTaskStopReq(pVnode->pTq, pMsg)) < 0) { + goto _err; + } + } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 068f4dec3d..a7e8a43fae 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -624,7 +624,7 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) { if (pVnode->pTq) { tqUpdateNodeStage(pVnode->pTq, false); - if (tqStopStreamTasksAsync(pVnode->pTq) != 0) { + if (tqStopStreamAllTasksAsync(pVnode->pTq->pStreamMeta, &pVnode->msgCb) != 0) { vError("vgId:%d, failed to stop stream tasks", pVnode->config.vgId); } } From 06ba3027b10cc84a359140b1803bb499d483e40a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 2 Mar 2025 12:06:19 +0800 Subject: [PATCH 2/3] fix(stream): check return value. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 +++++++- tests/ci/func.txt | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2b86ec8264..efd847a1f3 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1011,10 +1011,16 @@ int32_t tqStreamTaskProcessAllTaskStopReq(SStreamMeta* pMeta, SMsgCb* pMsgCb, SR // stop all stream tasks, only invoked when trying to drop db if (req.streamId <= 0) { tqDebug("vgId:%d recv msg to stop all tasks in sync before dropping vnode", vgId); - streamMetaStopAllTasks(pMeta); + code = streamMetaStopAllTasks(pMeta); + if (code) { + tqError("vgId:%d failed to stop all tasks, code:%s", vgId, tstrerror(code)); + } + } else { // stop only one stream tasks } + + // always return success return TSDB_CODE_SUCCESS; } diff --git a/tests/ci/func.txt b/tests/ci/func.txt index 45d4fb1c11..52df0aad96 100644 --- a/tests/ci/func.txt +++ b/tests/ci/func.txt @@ -409,7 +409,7 @@ (void)tqProcessTaskConsenChkptIdReq (void)tqProcessTaskResetReq (void)tqScanWalAsync -(void)tqStopStreamTasksAsync +(void)tqStopStreamAllTasksAsync (void)tqUpdateTbUidList (void)transAcquireExHandle (void)transAsyncSend From 8abed822c54207d42c85790f34e00feb04d6ce29 Mon Sep 17 00:00:00 2001 From: qevolg <2227465945@qq.com> Date: Mon, 3 Mar 2025 20:26:29 +0800 Subject: [PATCH 3/3] chore: modify taos-connector-rust git tag --- cmake/taosws_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/taosws_CMakeLists.txt.in b/cmake/taosws_CMakeLists.txt.in index 17446d184d..b013d45911 100644 --- a/cmake/taosws_CMakeLists.txt.in +++ b/cmake/taosws_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosws-rs ExternalProject_Add(taosws-rs GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git - GIT_TAG 3.0 + GIT_TAG main SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" BINARY_DIR "" #BUILD_IN_SOURCE TRUE