refactor(stream): dispatch req handled in ctrl queue.

This commit is contained in:
Haojun Liao 2025-01-25 19:03:31 +08:00
parent fa2229c453
commit 0f04f47337
2 changed files with 3 additions and 3 deletions

View File

@ -1006,7 +1006,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;

View File

@ -941,8 +941,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK:
return tqProcessTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK_RSP:
@ -985,6 +983,8 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn
switch (pMsg->msgType) {
case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
default: