From f3e747889681e3cac9b906e26bdadaa69edede45 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Mar 2022 10:47:45 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/bnode/inc/bmHandle.h | 5 +- source/dnode/mgmt/bnode/inc/bmInt.h | 3 - source/dnode/mgmt/bnode/src/bmHandle.c | 5 - source/dnode/mgmt/bnode/src/bmInt.c | 3 +- source/dnode/mgmt/container/inc/dndInt.h | 4 +- .../dnode/mgmt/container/inc/dndTransport.h | 2 +- source/dnode/mgmt/container/src/dndInt.c | 8 + source/dnode/mgmt/container/src/dndNode.c | 2 +- .../dnode/mgmt/container/src/dndTransport.c | 8 +- source/dnode/mgmt/dnode/inc/dmHandle.h | 4 +- source/dnode/mgmt/dnode/inc/dmInt.h | 1 - source/dnode/mgmt/dnode/src/dmHandle.c | 14 -- source/dnode/mgmt/dnode/src/dmInt.c | 1 - source/dnode/mgmt/mnode/inc/mmHandle.h | 4 +- source/dnode/mgmt/mnode/inc/mmInt.h | 1 - source/dnode/mgmt/mnode/src/mmHandle.c | 150 ++++++++---------- source/dnode/mgmt/mnode/src/mmInt.c | 1 - source/dnode/mgmt/qnode/inc/qmHandle.h | 4 +- source/dnode/mgmt/qnode/inc/qmInt.h | 1 - source/dnode/mgmt/qnode/src/qmHandle.c | 4 - source/dnode/mgmt/qnode/src/qmInt.c | 1 - source/dnode/mgmt/snode/inc/smHandle.h | 3 +- source/dnode/mgmt/snode/inc/smInt.h | 8 +- source/dnode/mgmt/snode/src/smHandle.c | 5 - source/dnode/mgmt/snode/src/smInt.c | 1 - source/dnode/mgmt/vnode/inc/vmHandle.h | 4 +- source/dnode/mgmt/vnode/inc/vmInt.h | 1 - source/dnode/mgmt/vnode/src/vmHandle.c | 76 ++++----- source/dnode/mgmt/vnode/src/vmInt.c | 1 - 29 files changed, 123 insertions(+), 202 deletions(-) diff --git a/source/dnode/mgmt/bnode/inc/bmHandle.h b/source/dnode/mgmt/bnode/inc/bmHandle.h index b4fecd36c4..ee7021579f 100644 --- a/source/dnode/mgmt/bnode/inc/bmHandle.h +++ b/source/dnode/mgmt/bnode/inc/bmHandle.h @@ -22,13 +22,10 @@ extern "C" { #endif -void bmInitMsgHandles(SMgmtWrapper *pWrapper); -SMsgHandle bmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); - +void bmInitMsgHandles(SMgmtWrapper *pWrapper); int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index 55fde8d4f8..abf8ee2e8a 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -30,9 +30,6 @@ typedef struct SBnodeMgmt { SBnode *pBnode; SRWLatch latch; SDnodeWorker writeWorker; - - // - SMsgHandle msgHandles[TDMT_MAX]; SProcObj *pProcess; bool singleProc; } SBnodeMgmt; diff --git a/source/dnode/mgmt/bnode/src/bmHandle.c b/source/dnode/mgmt/bnode/src/bmHandle.c index 100921833a..0c656398a1 100644 --- a/source/dnode/mgmt/bnode/src/bmHandle.c +++ b/source/dnode/mgmt/bnode/src/bmHandle.c @@ -23,8 +23,3 @@ int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} void bmInitMsgHandles(SMgmtWrapper *pWrapper) { } - -SMsgHandle bmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) { - SBnodeMgmt *pMgmt = pWrapper->pMgmt; - return pMgmt->msgHandles[msgIndex]; -} diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c index 7ac360adc5..a9beff5756 100644 --- a/source/dnode/mgmt/bnode/src/bmInt.c +++ b/source/dnode/mgmt/bnode/src/bmInt.c @@ -24,9 +24,8 @@ void bmGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.openFp = NULL; mgmtFp.closeFp = NULL; mgmtFp.requiredFp = bmRequireNode; - mgmtFp.getMsgHandleFp = bmGetMsgHandle; - // bmInitMsgHandles(pWrapper); + bmInitMsgHandles(pWrapper); pWrapper->name = "snode"; pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 1af2e0f24a..5b42ccffe6 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -77,7 +77,6 @@ typedef void (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper); -typedef SMsgHandle (*GetMsgHandleFp)(SMgmtWrapper *pWrapper, int32_t msgIndex); typedef struct SMsgHandle { RpcMsgFp rpcMsgFp; @@ -89,7 +88,6 @@ typedef struct SMgmtFp { OpenNodeFp openFp; CloseNodeFp closeFp; RequireNodeFp requiredFp; - GetMsgHandleFp getMsgHandleFp; } SMgmtFp; typedef struct SMgmtWrapper { @@ -100,6 +98,7 @@ typedef struct SMgmtWrapper { SProcObj *pProc; void *pMgmt; SDnode *pDnode; + SMsgHandle msgHandles[TDMT_MAX]; SMgmtFp fp; } SMgmtWrapper; @@ -145,6 +144,7 @@ void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); TdFilePtr dndCheckRunning(char *dataDir); SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType); +void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp); // dndMonitor.h void dndSendMonitorReport(SDnode *pDnode); diff --git a/source/dnode/mgmt/container/inc/dndTransport.h b/source/dnode/mgmt/container/inc/dndTransport.h index 892dea6eb2..49297abd6c 100644 --- a/source/dnode/mgmt/container/inc/dndTransport.h +++ b/source/dnode/mgmt/container/inc/dndTransport.h @@ -26,7 +26,7 @@ int32_t dndInitServer(SDnode *pDnode); void dndCleanupServer(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); -int32_t dndSetMsgHandle(SDnode *pDnode); +int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/container/src/dndInt.c b/source/dnode/mgmt/container/src/dndInt.c index 17015ebee0..ca3f1e14ea 100644 --- a/source/dnode/mgmt/container/src/dndInt.c +++ b/source/dnode/mgmt/container/src/dndInt.c @@ -64,6 +64,14 @@ void dndCleanup() { SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) { return &pDnode->wrappers[nodeType]; } +void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { + SMsgHandle *pHandle = &pWrapper->msgHandles[TMSG_INDEX(msgType)]; + + pHandle->pWrapper = pWrapper; + pHandle->nodeMsgFp = nodeMsgFp; + pHandle->rpcMsgFp = dndProcessRpcMsg; +} + EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } void dndSetStatus(SDnode *pDnode, EDndStatus status) { diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index cb82e88d14..2e2d9d4bad 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -98,7 +98,7 @@ SDnode *dndCreate(SDndCfg *pCfg) { bmGetMgmtFp(&pDnode->wrappers[BNODE]); memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg)); - if (dndSetMsgHandle(pDnode) != 0) { + if (dndInitMsgHandle(pDnode) != 0) { goto _OVER; } diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index f4aa43f39c..29f1e42ffc 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -246,16 +246,14 @@ void dndCleanupServer(SDnode *pDnode) { } } -int32_t dndSetMsgHandle(SDnode *pDnode) { +int32_t dndInitMsgHandle(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->trans; for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; - GetMsgHandleFp getMsgHandleFp = pWrapper->fp.getMsgHandleFp; - if (getMsgHandleFp == NULL) continue; + SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { - SMsgHandle msgHandle = (*getMsgHandleFp)(pWrapper, msgIndex); + SMsgHandle msgHandle = pWrapper->msgHandles[msgIndex]; if (msgHandle.rpcMsgFp == NULL) continue; SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; diff --git a/source/dnode/mgmt/dnode/inc/dmHandle.h b/source/dnode/mgmt/dnode/inc/dmHandle.h index fde1d8b2d7..cba18b322e 100644 --- a/source/dnode/mgmt/dnode/inc/dmHandle.h +++ b/source/dnode/mgmt/dnode/inc/dmHandle.h @@ -22,9 +22,7 @@ extern "C" { #endif -void dmInitMsgHandles(SMgmtWrapper *pWrapper); -SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); - +void dmInitMsgHandles(SMgmtWrapper *pWrapper); void dmSendStatusReq(SDnodeMgmt *pMgmt); void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq); diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index 1e5fcd33ed..6a3e205f7c 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -38,7 +38,6 @@ typedef struct SDnodeMgmt { SRWLatch latch; SDnodeWorker mgmtWorker; SDnodeWorker statusWorker; - SMsgHandle msgHandles[TDMT_MAX]; const char *path; SDnode *pDnode; } SDnodeMgmt; diff --git a/source/dnode/mgmt/dnode/src/dmHandle.c b/source/dnode/mgmt/dnode/src/dmHandle.c index 2fa3c92967..c04f4ef7fe 100644 --- a/source/dnode/mgmt/dnode/src/dmHandle.c +++ b/source/dnode/mgmt/dnode/src/dmHandle.c @@ -114,15 +114,6 @@ void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { rpcSendResponse(&rpcRsp); } -static void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; - SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; - - pHandle->pWrapper = pWrapper; - pHandle->nodeMsgFp = nodeMsgFp; - pHandle->rpcMsgFp = dndProcessRpcMsg; -} - void dmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg); @@ -147,8 +138,3 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg); } - -SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; - return pMgmt->msgHandles[msgIndex]; -} diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c index 916a7f8740..e895e2c8e0 100644 --- a/source/dnode/mgmt/dnode/src/dmInt.c +++ b/source/dnode/mgmt/dnode/src/dmInt.c @@ -177,7 +177,6 @@ void dmGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.openFp = dmInit; mgmtFp.closeFp = dmCleanup; mgmtFp.requiredFp = dmRequire; - mgmtFp.getMsgHandleFp = dmGetMsgHandle; dmInitMsgHandles(pWrapper); pWrapper->name = "dnode"; diff --git a/source/dnode/mgmt/mnode/inc/mmHandle.h b/source/dnode/mgmt/mnode/inc/mmHandle.h index df66b1b80c..2206e382ce 100644 --- a/source/dnode/mgmt/mnode/inc/mmHandle.h +++ b/source/dnode/mgmt/mnode/inc/mmHandle.h @@ -22,9 +22,7 @@ extern "C" { #endif -void mmInitMsgHandles(SMgmtWrapper *pWrapper); -SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); - +void mmInitMsgHandles(SMgmtWrapper *pWrapper); int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 60e3f2211a..fee2b65126 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -38,7 +38,6 @@ typedef struct SMnodeMgmt { SReplica replicas[TSDB_MAX_REPLICA]; // - SMsgHandle msgHandles[TDMT_MAX]; SProcObj *pProcess; bool singleProc; } SMnodeMgmt; diff --git a/source/dnode/mgmt/mnode/src/mmHandle.c b/source/dnode/mgmt/mnode/src/mmHandle.c index 5ea3480027..9ad4a7937b 100644 --- a/source/dnode/mgmt/mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mnode/src/mmHandle.c @@ -146,92 +146,78 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} -static void mmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; - - pHandle->pWrapper = pWrapper; - pHandle->nodeMsgFp = nodeMsgFp; - pHandle->rpcMsgFp = dndProcessRpcMsg; -} - void mmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg); // Requests handled by MNODE - mmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg); // Requests handled by VNODE - mmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg); - mmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg); -} - -SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - return pMgmt->msgHandles[msgIndex]; + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg); } diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index b08c11cca6..a2118c4d44 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -24,7 +24,6 @@ void mmGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.openFp = NULL; mgmtFp.closeFp = NULL; mgmtFp.requiredFp = mmRequireNode; - mgmtFp.getMsgHandleFp = mmGetMsgHandle; mmInitMsgHandles(pWrapper); pWrapper->name = "mnode"; diff --git a/source/dnode/mgmt/qnode/inc/qmHandle.h b/source/dnode/mgmt/qnode/inc/qmHandle.h index 3bd434772f..6b86f3eee8 100644 --- a/source/dnode/mgmt/qnode/inc/qmHandle.h +++ b/source/dnode/mgmt/qnode/inc/qmHandle.h @@ -22,9 +22,7 @@ extern "C" { #endif -void qmInitMsgHandles(SMgmtWrapper *pWrapper); -SMsgHandle qmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); - +void qmInitMsgHandles(SMgmtWrapper *pWrapper); int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index 43a3cae11b..aa31eade55 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -32,7 +32,6 @@ typedef struct SQnodeMgmt { SDnodeWorker fetchWorker; // - SMsgHandle msgHandles[TDMT_MAX]; SProcObj *pProcess; bool singleProc; } SQnodeMgmt; diff --git a/source/dnode/mgmt/qnode/src/qmHandle.c b/source/dnode/mgmt/qnode/src/qmHandle.c index 0471d3602e..d04650913a 100644 --- a/source/dnode/mgmt/qnode/src/qmHandle.c +++ b/source/dnode/mgmt/qnode/src/qmHandle.c @@ -23,7 +23,3 @@ int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg){return 0;} void qmInitMsgHandles(SMgmtWrapper *pWrapper) { } -SMsgHandle qmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) { - SQnodeMgmt *pMgmt = pWrapper->pMgmt; - return pMgmt->msgHandles[msgIndex]; -} diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index df0f0388a5..9059942a3a 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -24,7 +24,6 @@ void qmGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.openFp = NULL; mgmtFp.closeFp = NULL; mgmtFp.requiredFp = qmRequireNode; - mgmtFp.getMsgHandleFp = qmGetMsgHandle; // qmInitMsgHandles(pWrapper); pWrapper->name = "qnode"; diff --git a/source/dnode/mgmt/snode/inc/smHandle.h b/source/dnode/mgmt/snode/inc/smHandle.h index 5380b14cb7..3caaee7107 100644 --- a/source/dnode/mgmt/snode/inc/smHandle.h +++ b/source/dnode/mgmt/snode/inc/smHandle.h @@ -22,8 +22,7 @@ extern "C" { #endif -void smInitMsgHandles(SMgmtWrapper *pWrapper); -SMsgHandle smGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); +void smInitMsgHandles(SMgmtWrapper *pWrapper); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index 95177b7edf..26d5792a17 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -29,14 +29,10 @@ typedef struct SSnodeMgmt { SSnode *pSnode; SRWLatch latch; SDnodeWorker writeWorker; - - // - SMsgHandle msgHandles[TDMT_MAX]; - SProcObj *pProcess; - bool singleProc; + SProcObj *pProcess; + bool singleProc; } SSnodeMgmt; - void smGetMgmtFp(SMgmtWrapper *pMgmt); int32_t dndInitSnode(SDnode *pDnode); diff --git a/source/dnode/mgmt/snode/src/smHandle.c b/source/dnode/mgmt/snode/src/smHandle.c index e785d207c4..a9a73199fb 100644 --- a/source/dnode/mgmt/snode/src/smHandle.c +++ b/source/dnode/mgmt/snode/src/smHandle.c @@ -22,8 +22,3 @@ int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} void smInitMsgHandles(SMgmtWrapper *pWrapper) { } - -SMsgHandle smGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) { - SSnodeMgmt *pMgmt = pWrapper->pMgmt; - return pMgmt->msgHandles[msgIndex]; -} diff --git a/source/dnode/mgmt/snode/src/smInt.c b/source/dnode/mgmt/snode/src/smInt.c index 1bb3572163..eb0db21f03 100644 --- a/source/dnode/mgmt/snode/src/smInt.c +++ b/source/dnode/mgmt/snode/src/smInt.c @@ -25,7 +25,6 @@ void smGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.openFp = NULL; mgmtFp.closeFp = NULL; mgmtFp.requiredFp = smRequireNode; - mgmtFp.getMsgHandleFp = smGetMsgHandle; // smInitMsgHandles(pWrapper); pWrapper->name = "snode"; diff --git a/source/dnode/mgmt/vnode/inc/vmHandle.h b/source/dnode/mgmt/vnode/inc/vmHandle.h index 7401c5d8f1..7f519e14fb 100644 --- a/source/dnode/mgmt/vnode/inc/vmHandle.h +++ b/source/dnode/mgmt/vnode/inc/vmHandle.h @@ -22,9 +22,7 @@ extern "C" { #endif -void vmInitMsgHandles(SMgmtWrapper *pWrapper); -SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); - +void vmInitMsgHandles(SMgmtWrapper *pWrapper); int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq); diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index 42fa1ccb80..f0010072ef 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -43,7 +43,6 @@ typedef struct SVnodesMgmt { SWWorkerPool syncPool; SWWorkerPool writePool; STfs *pTfs; - SMsgHandle msgHandles[TDMT_MAX]; SProcObj *pProcess; bool singleProc; } SVnodesMgmt; diff --git a/source/dnode/mgmt/vnode/src/vmHandle.c b/source/dnode/mgmt/vnode/src/vmHandle.c index 15f1aa4729..629b2c086e 100644 --- a/source/dnode/mgmt/vnode/src/vmHandle.c +++ b/source/dnode/mgmt/vnode/src/vmHandle.c @@ -25,51 +25,37 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;} int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;} int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;} -static void vmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; - - pHandle->pWrapper = pWrapper; - pHandle->nodeMsgFp = nodeMsgFp; - pHandle->rpcMsgFp = dndProcessRpcMsg; -} - void vmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by VNODE - vmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg); - vmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg); -} - -SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - return pMgmt->msgHandles[msgIndex]; + dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg); } diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 2e747b86b6..6be4e87744 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -67,7 +67,6 @@ void vmGetMgmtFp(SMgmtWrapper *pWrapper) { mgmtFp.openFp = vmInit; mgmtFp.closeFp = vmCleanup; mgmtFp.requiredFp = vmRequire; - mgmtFp.getMsgHandleFp = vmGetMsgHandle; vmInitMsgHandles(pWrapper); pWrapper->name = "vnodes";