diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 3677fc5616..cd67823ee4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1016,15 +1016,15 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 43b3d8676d..391826f618 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -947,20 +947,12 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECKPOINT_READY: - return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: - return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER: return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg); - case TDMT_MND_STREAM_REQ_CHKPT_RSP: - return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); case TDMT_VND_GET_STREAM_PROGRESS: return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); - case TDMT_MND_STREAM_CHKPT_REPORT_RSP: - return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; @@ -987,6 +979,14 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn return tqProcessTaskCheckReq(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: return tqProcessTaskCheckRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY: + return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: + return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_REQ_CHKPT_RSP: + return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_CHKPT_REPORT_RSP: + return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType); return TSDB_CODE_APP_ERROR;