From a5cdf3320c84cd8c95375c19f993687ddc3e4e2f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 26 Jan 2025 01:00:22 +0800 Subject: [PATCH] refactor(stream): handle stream check in ctrl queue. --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9d0e553b40..3677fc5616 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1010,8 +1010,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f5a028f8c3..43b3d8676d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -941,10 +941,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) switch (pMsg->msgType) { case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_TASK_CHECK: - return tqProcessTaskCheckReq(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_TASK_CHECK_RSP: - return tqProcessTaskCheckRsp(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE: return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: @@ -987,6 +983,10 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn return tqProcessTaskDispatchReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_TASK_CHECK: + return tqProcessTaskCheckReq(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_TASK_CHECK_RSP: + return tqProcessTaskCheckRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType); return TSDB_CODE_APP_ERROR;