fix(stream): add start/stop message. (#30400)

This commit is contained in:
Haojun Liao 2025-03-24 19:38:00 +08:00 committed by GitHub
parent b6c1de4d6d
commit af06bea439
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 24 additions and 11 deletions

View File

@ -361,6 +361,7 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_CHKPT_EXEC, "stream-exec-chkpt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_CHKPT_EXEC, "stream-exec-chkpt", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_START, "stream-task-start", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG) TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8
@ -429,6 +430,8 @@
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_ASSIGN_LEADER, "mnd-arb-assign-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_ASSIGN_LEADER, "mnd-arb-assign-leader", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_START_STREAM, "start-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STOP_STREAM, "stop-stream", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_MND_ARB_MSG) TD_CLOSE_MSG_SEG(TDMT_MND_ARB_MSG)
TD_NEW_MSG_SEG(TDMT_MAX_MSG) // msg end mark TD_NEW_MSG_SEG(TDMT_MAX_MSG) // msg end mark

View File

@ -251,6 +251,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_START_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CONSEN_CHKPT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -89,6 +89,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_START, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -1033,6 +1033,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_START, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -36,7 +36,7 @@ extern "C" {
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" #define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
#define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update" #define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update"
#define MND_STREAM_CHKPT_CONSEN_NAME "stream-chkpt-consen" #define MND_STREAM_CHKPT_CONSEN_NAME "stream-chkpt-consen"
#define MND_STREAM_RESTART_NAME "stream-restart" #define MND_STREAM_STOP_NAME "stream-stop"
typedef struct SStreamTransInfo { typedef struct SStreamTransInfo {
int64_t startTime; int64_t startTime;
@ -148,7 +148,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId); int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId);
int32_t mndStreamSetChkptIdAction(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t checkpointId, SArray *pList); int32_t mndStreamSetChkptIdAction(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t checkpointId, SArray *pList);
int32_t mndStreamSetRestartAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetStopAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
int8_t mndTrigger); int8_t mndTrigger);
int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid); int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid);

View File

@ -107,6 +107,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_START_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_UPDATE_CHKPT_RSP, mndTransProcessRsp);
@ -133,6 +134,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessPauseStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessPauseStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_RESET_STREAM, mndProcessResetStreamReq);
@ -1092,7 +1095,7 @@ _OVER:
return code; return code;
} }
static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) { static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
int32_t code = 0; int32_t code = 0;
@ -1120,7 +1123,7 @@ static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_RESTART_NAME, true); code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_STOP_NAME, true);
if (code) { if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return code; return code;
@ -1134,15 +1137,15 @@ static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
} }
STrans *pTrans = NULL; STrans *pTrans = NULL;
code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RESTART_NAME, "restart the stream", code = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, "stop the stream",
&pTrans); &pTrans);
if (pTrans == NULL || code) { if (pTrans == NULL || code) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, tstrerror(code)); mError("stream:%s failed to stop stream since %s", pauseReq.name, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return code; return code;
} }
code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESTART_NAME, pStream->uid); code = mndStreamRegisterTrans(pTrans, MND_STREAM_STOP_NAME, pStream->uid);
if (code) { if (code) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -1150,7 +1153,7 @@ static int32_t mndProcessRestartStreamReq(SRpcMsg *pReq) {
} }
// if nodeUpdate happened, not send pause trans // if nodeUpdate happened, not send pause trans
code = mndStreamSetRestartAction(pMnode, pTrans, pStream); code = mndStreamSetStopAction(pMnode, pTrans, pStream);
if (code) { if (code) {
mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code)); mError("stream:%s, failed to restart task since %s", pauseReq.name, tstrerror(code));
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);

View File

@ -127,7 +127,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) && if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) &&
(strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) { (strcmp(pTransName, MND_STREAM_STOP_NAME) != 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name); tInfo.name);
return TSDB_CODE_MND_TRANS_CONFLICT; return TSDB_CODE_MND_TRANS_CONFLICT;
@ -138,7 +138,7 @@ static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, cons
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_CHKPT_CONSEN_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_CHKPT_CONSEN_NAME) == 0) ||
strcmp(tInfo.name, MND_STREAM_RESTART_NAME) == 0) { strcmp(tInfo.name, MND_STREAM_STOP_NAME) == 0) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name); tInfo.name);
return TSDB_CODE_MND_TRANS_CONFLICT; return TSDB_CODE_MND_TRANS_CONFLICT;

View File

@ -740,7 +740,7 @@ int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
return code; return code;
} }
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) { int32_t mndStreamSetStopAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream) {
return 0; return 0;
} }

View File

@ -168,6 +168,10 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont); return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
case TDMT_STREAM_TASK_RESUME: case TDMT_STREAM_TASK_RESUME:
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
case TDMT_STREAM_TASK_STOP:
return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
case TDMT_STREAM_TASK_START:
return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
case TDMT_STREAM_TASK_UPDATE_CHKPT: case TDMT_STREAM_TASK_UPDATE_CHKPT:
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont); return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont);
case TDMT_STREAM_CONSEN_CHKPT: case TDMT_STREAM_CONSEN_CHKPT: