From af06bea439ade363a7ad863c42849e39dc23e2d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Mar 2025 19:38:00 +0800 Subject: [PATCH] fix(stream): add start/stop message. (#30400) --- include/common/tmsgdef.h | 3 +++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mnode/impl/inc/mndStream.h | 4 ++-- source/dnode/mnode/impl/src/mndStream.c | 15 +++++++++------ source/dnode/mnode/impl/src/mndStreamTrans.c | 4 ++-- source/dnode/mnode/impl/src/mndStreamTransAct.c | 2 +- source/dnode/snode/src/snode.c | 4 ++++ 9 files changed, 24 insertions(+), 11 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index c304f27800..07647ee2a0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -361,6 +361,7 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_CHKPT_EXEC, "stream-exec-chkpt", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_START, "stream-task-start", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 @@ -429,6 +430,8 @@ TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_ASSIGN_LEADER, "mnd-arb-assign-leader", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_START_STREAM, "start-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STOP_STREAM, "stop-stream", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_MND_ARB_MSG) TD_NEW_MSG_SEG(TDMT_MAX_MSG) // msg end mark diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 83afffddf8..35b125b19c 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -251,6 +251,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_START_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 11710d7b39..a422ad5e8e 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -89,6 +89,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_START, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 8ee7e6c2b2..e43f2a34c1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1033,6 +1033,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_START, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index be123a3e6c..e4b8006370 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -36,7 +36,7 @@ extern "C" { #define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" #define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update" #define MND_STREAM_CHKPT_CONSEN_NAME "stream-chkpt-consen" -#define MND_STREAM_RESTART_NAME "stream-restart" +#define MND_STREAM_STOP_NAME "stream-stop" typedef struct SStreamTransInfo { int64_t startTime; @@ -148,7 +148,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj * int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId); int32_t mndStreamSetChkptIdAction(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t checkpointId, SArray *pList); -int32_t mndStreamSetRestartAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetStopAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, int8_t mndTrigger); int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 324f401cf6..df1f45a9db 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -107,6 +107,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_START_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp); @@ -133,6 +134,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessPauseStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq); @@ -1092,7 +1095,7 @@ _OVER: return code; } -static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) { +static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; int32_t code = 0; @@ -1120,7 +1123,7 @@ static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) { } // check if it is conflict with other trans in both sourceDb and targetDb. - code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true); + code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_STOP_NAME, true); if (code) { sdbRelease(pMnode->pSdb, pStream); return code; @@ -1134,15 +1137,15 @@ static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) { } STrans *pTrans = NULL; - code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream", + code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, "stop the stream", &pTrans); if (pTrans == NULL || code) { - mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code)); + mError("stream:%s failed to stop stream since %s", pauseReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } - code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid); + code = mndStreamRegisterTrans(pTrans, MND_STREAM_STOP_NAME, pStream->uid); if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1150,7 +1153,7 @@ static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) { } // if nodeUpdate happened, not send pause trans - code = mndStreamSetRestartAction(pMnode, pTrans, pStream); + code = mndStreamSetStopAction(pMnode, pTrans, pStream); if (code) { mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 7cee4971b7..e81d988303 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -127,7 +127,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) && - (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) { + (strcmp(pTransName, MND_STREAM_STOP_NAME) != 0)) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); return TSDB_CODE_MND_TRANS_CONFLICT; @@ -138,7 +138,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_CHKPT_CONSEN_NAME) == 0) || - strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) { + strcmp(tInfo.name, MND_STREAM_STOP_NAME) == 0) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); return TSDB_CODE_MND_TRANS_CONFLICT; diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index c210b12a67..c777f754df 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -740,7 +740,7 @@ int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask return code; } -int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) { +int32_t mndStreamSetStopAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) { return 0; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 415eb202cb..4abd1a9bc8 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -168,6 +168,10 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_RESUME: return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); + case TDMT_STREAM_TASK_STOP: + return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont); + case TDMT_STREAM_TASK_START: + return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_UPDATE_CHKPT: return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont); case TDMT_STREAM_CONSEN_CHKPT: