From 1153eac4dc281da4f7b6ff52545dcabdcf4a7dbe Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 24 Mar 2022 16:37:37 +0800 Subject: [PATCH] mnd worker --- include/util/tdef.h | 4 + source/dnode/mgmt/container/inc/dnd.h | 6 +- .../dnode/mgmt/container/src/dndTransport.c | 52 +++--- source/dnode/mgmt/dnode/src/dmMsg.c | 26 +-- source/dnode/mgmt/mnode/src/mmMsg.c | 158 +++++++++--------- source/dnode/mgmt/qnode/src/qmMsg.c | 18 +- source/dnode/mgmt/snode/src/smMsg.c | 4 +- source/dnode/mgmt/vnode/src/vmMsg.c | 84 +++++----- 8 files changed, 184 insertions(+), 168 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 7a3e26aa15..fd8194e63e 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -453,6 +453,10 @@ enum { SND_WORKER_TYPE__UNIQUE, }; +#define MND_VGID -1 +#define QND_VGID 1 +#define VND_VGID 0 + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index af552b5357..6805c4a232 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -75,10 +75,8 @@ typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required); typedef struct SMsgHandle { - int32_t vgId; - NodeMsgFp vgIdMsgFp; - SMgmtWrapper *pVgIdWrapper; // Handle the case where the same message type is distributed to qnode or vnode - NodeMsgFp msgFp; + SMgmtWrapper *pQndWrapper; + SMgmtWrapper *pMndWrapper; SMgmtWrapper *pWrapper; } SMsgHandle; diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 20069d89d6..25edb82baa 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -20,13 +20,15 @@ #define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" -static inline void dndProcessQVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) { +static inline void dndProcessQMVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) { SMsgHead *pHead = pMsg->pCont; int32_t vgId = htonl(pHead->vgId); SMgmtWrapper *pWrapper = pHandle->pWrapper; - if (vgId == pHandle->vgId && pHandle->pVgIdWrapper != NULL) { - pWrapper = pHandle->pVgIdWrapper; + if (vgId == QND_VGID) { + pWrapper = pHandle->pQndWrapper; + } else if (vgId == MND_VGID) { + pWrapper = pHandle->pMndWrapper; } dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name, @@ -46,13 +48,13 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { } SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; - if (pHandle->msgFp != NULL) { - if (pHandle->vgId == 0) { + if (pHandle->pWrapper != NULL) { + if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) { dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType), pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code)); dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); } else { - dndProcessQVnodeRpcMsg(pHandle, pRsp, pEpSet); + dndProcessQMVnodeRpcMsg(pHandle, pRsp, pEpSet); } } else { dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle); @@ -126,13 +128,13 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { } SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; - if (pHandle->msgFp != NULL) { - if (pHandle->vgId == 0) { + if (pHandle->pWrapper != NULL) { + if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) { dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name, pReq->handle, pReq->ahandle); dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet); } else { - dndProcessQVnodeRpcMsg(pHandle, pReq, pEpSet); + dndProcessQMVnodeRpcMsg(pHandle, pReq, pEpSet); } } else { dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle); @@ -269,21 +271,27 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { int32_t vgId = pWrapper->msgVgIds[msgIndex]; if (msgFp == NULL) continue; + dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId); + SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; - if (pHandle->msgFp != NULL && pHandle->vgId == vgId) { - dError("msg:%s has multiple process nodes, prev node:%s:%d, curr node:%s:%d", tMsgInfo[msgIndex], - pHandle->pWrapper->name, pHandle->pWrapper->msgVgIds[msgIndex], pWrapper->name, vgId); - return -1; - } else { - dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId); - if (vgId == 0) { - pHandle->msgFp = msgFp; - pHandle->pWrapper = pWrapper; - } else { - pHandle->vgId = vgId; - pHandle->vgIdMsgFp = msgFp; - pHandle->pVgIdWrapper = pWrapper; + if (vgId == QND_VGID) { + if (pHandle->pQndWrapper != NULL) { + dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); + return -1; } + pHandle->pQndWrapper = pWrapper; + } else if (vgId == MND_VGID) { + if (pHandle->pMndWrapper != NULL) { + dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); + return -1; + } + pHandle->pMndWrapper = pWrapper; + } else { + if (pHandle->pWrapper != NULL) { + dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); + return -1; + } + pHandle->pWrapper = pWrapper; } } } diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index 7876d854ea..1df0798310 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -114,19 +114,19 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { void dmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); } diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index 6f719f3d8d..1cae2220ad 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -75,84 +75,90 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { void mmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessReadMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessReadMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessReadMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)mmProcessReadMsg, MND_VGID); + } diff --git a/source/dnode/mgmt/qnode/src/qmMsg.c b/source/dnode/mgmt/qnode/src/qmMsg.c index ddcd791b7b..ebe6477e81 100644 --- a/source/dnode/mgmt/qnode/src/qmMsg.c +++ b/source/dnode/mgmt/qnode/src/qmMsg.c @@ -56,14 +56,14 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { void qmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)qmProcessQueryMsg, 1); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)qmProcessQueryMsg, 1); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)qmProcessFetchMsg, 1); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)qmProcessFetchMsg, 1); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)qmProcessQueryMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)qmProcessQueryMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)qmProcessFetchMsg, 1); - dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)qmProcessFetchMsg, 1); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)qmProcessFetchMsg, 1); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)qmProcessFetchMsg, 1); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)qmProcessFetchMsg, 1); + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); } diff --git a/source/dnode/mgmt/snode/src/smMsg.c b/source/dnode/mgmt/snode/src/smMsg.c index 0986556207..aea1dded56 100644 --- a/source/dnode/mgmt/snode/src/smMsg.c +++ b/source/dnode/mgmt/snode/src/smMsg.c @@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { void smInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by SNODE - dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg, VND_VGID); } diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index a98dccbca3..29696b4079 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -244,47 +244,47 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { void vmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); }