From c4b409c31e61aa89153d12348e36b93b6299c7f5 Mon Sep 17 00:00:00 2001 From: Shengliang Date: Tue, 10 May 2022 22:46:43 +0800 Subject: [PATCH] refactror: node mgmt --- include/util/tdef.h | 7 +- source/dnode/mgmt/mgmt_bnode/src/bmHandle.c | 2 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 164 +++++++++--------- source/dnode/mgmt/mgmt_qnode/src/qmHandle.c | 20 +-- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 6 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 98 +++++------ source/dnode/mgmt/node_common/inc/dmDef.h | 7 +- source/dnode/mgmt/node_common/inc/dmInt.h | 2 +- source/dnode/mgmt/node_common/src/dmFile.c | 2 +- source/dnode/mgmt/node_common/src/dmUtil.c | 4 +- source/dnode/mgmt/node_mgmt/src/dmHandle.c | 22 +-- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 54 +++--- 12 files changed, 191 insertions(+), 197 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 022fd8ba8e..f95d96be56 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -425,9 +425,12 @@ enum { SND_WORKER_TYPE__UNIQUE, }; -#define MNODE_HANDLE -1 -#define QNODE_HANDLE 1 #define DEFAULT_HANDLE 0 +#define MNODE_HANDLE -1 +#define QNODE_HANDLE -2 +#define SNODE_HANDLE -3 +#define VNODE_HANDLE -4 +#define BNODE_HANDLE -5 #define TSDB_CONFIG_OPTION_LEN 16 #define TSDB_CONIIG_VALUE_LEN 48 diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c index 3b314b1d2b..e9f1617e85 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c @@ -94,5 +94,5 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } void bmInitMsgHandle(SMgmtWrapper *pWrapper) { - dmSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, bmProcessMonitorMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, bmProcessMonitorMsg, 0); } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index ed9384a869..bb6e3aad83 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -144,94 +144,94 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { } void mmInitMsgHandle(SMgmtWrapper *pWrapper) { - dmSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MON_MM_LOAD, mmProcessMonitorMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MON_MM_LOAD, mmProcessMonitorMsg, 0); // Requests handled by DNODE - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, 0); // Requests handled by MNODE - dmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_QNODE_LIST, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_MQ_ASK_EP, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_QNODE_LIST, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_MQ_ASK_EP, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, 0); // Requests handled by VNODE - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, MNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, MNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MNODE_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, 1); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_VNODE_RSP, mmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE_RSP, mmProcessWriteMsg, 0); } diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 11b91f0568..e641dbf6ed 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -94,17 +94,17 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } void qmInitMsgHandle(SMgmtWrapper *pWrapper) { - dmSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, qmProcessMonitorMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, qmProcessMonitorMsg, 0); // Requests handled by VNODE - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, qmProcessFetchMsg, QNODE_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, qmProcessFetchMsg, 1); - dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, QNODE_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, QNODE_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, 1); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, 1); } diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index defc5ab136..34aff001bc 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -94,9 +94,9 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } void smInitMsgHandle(SMgmtWrapper *pWrapper) { - dmSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, smProcessMonitorMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, smProcessMonitorMsg, 0); // Requests handled by SNODE - dmSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, 0); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c5919e06b6..a8aa336ddb 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -281,56 +281,56 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { } void vmInitMsgHandle(SMgmtWrapper *pWrapper) { - dmSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, 0); // Requests handled by VNODE - dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, DEFAULT_HANDLE); - // dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, 0); + // dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE, vmProcessWriteMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, 0); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, vmProcessSyncMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, vmProcessSyncMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, vmProcessSyncMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, vmProcessSyncMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmProcessSyncMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, vmProcessSyncMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmProcessSyncMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, vmProcessSyncMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmProcessSyncMsg, 0); } diff --git a/source/dnode/mgmt/node_common/inc/dmDef.h b/source/dnode/mgmt/node_common/inc/dmDef.h index d36754e862..5a0da2cf09 100644 --- a/source/dnode/mgmt/node_common/inc/dmDef.h +++ b/source/dnode/mgmt/node_common/inc/dmDef.h @@ -89,9 +89,8 @@ typedef int32_t (*DropNodeFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*RequireNodeFp)(struct SMgmtWrapper *pWrapper, bool *required); typedef struct { - SMgmtWrapper *pQndWrapper; - SMgmtWrapper *pMndWrapper; - SMgmtWrapper *pNdWrapper; + EDndNodeType defaultNtype; + bool needCheckVgId; } SMsgHandle; typedef struct { @@ -124,7 +123,7 @@ typedef struct SMgmtWrapper { SShm procShm; }; struct { - int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode + bool needCheckVgIds[TDMT_MAX]; NodeMsgFp msgFps[TDMT_MAX]; }; } SMgmtWrapper; diff --git a/source/dnode/mgmt/node_common/inc/dmInt.h b/source/dnode/mgmt/node_common/inc/dmInt.h index 1e7f757226..692249e311 100644 --- a/source/dnode/mgmt/node_common/inc/dmInt.h +++ b/source/dnode/mgmt/node_common/inc/dmInt.h @@ -34,7 +34,7 @@ const char *dmProcStr(EDndProcType ptype); void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); void dmSetEvent(SDnode *pDnode, EDndEvent event); -void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); +void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgIds); void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc); void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/node_common/src/dmFile.c b/source/dnode/mgmt/node_common/src/dmFile.c index 76f8cbf191..a5ed36a177 100644 --- a/source/dnode/mgmt/node_common/src/dmFile.c +++ b/source/dnode/mgmt/node_common/src/dmFile.c @@ -173,7 +173,7 @@ int32_t dmReadShmFile(SMgmtWrapper *pWrapper) { } } - if (!tsMultiProcess || pWrapper->nodeType == DNODE || pWrapper->nodeType == NODE_END) { + if (!tsMultiProcess || pWrapper->nodeType == DNODE) { if (pWrapper->procShm.id >= 0) { dDebug("node:%s, shmid:%d, is closed, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size); taosDropShm(&pWrapper->procShm); diff --git a/source/dnode/mgmt/node_common/src/dmUtil.c b/source/dnode/mgmt/node_common/src/dmUtil.c index 015a33d6a8..c1d15f62de 100644 --- a/source/dnode/mgmt/node_common/src/dmUtil.c +++ b/source/dnode/mgmt/node_common/src/dmUtil.c @@ -29,9 +29,9 @@ void dmSetEvent(SDnode *pDnode, EDndEvent event) { } } -void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { +void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgId) { pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; - pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; + pWrapper->needCheckVgIds[TMSG_INDEX(msgType)] = needCheckVgId; } SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmHandle.c b/source/dnode/mgmt/node_mgmt/src/dmHandle.c index fc413edd2d..7e28b1b038 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmHandle.c +++ b/source/dnode/mgmt/node_mgmt/src/dmHandle.c @@ -187,19 +187,19 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) static void dmSetMgmtMsgHandle(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, 0); // Requests handled by MNODE - dmSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, 0); + dmSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, 0); } static int32_t dmStartMgmt(SMgmtWrapper *pWrapper) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index a58999bf2d..62e2d8821f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -133,7 +133,7 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { tmsg_t msgType = pMsg->msgType; bool isReq = msgType & 1u; SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; - SMgmtWrapper *pWrapper = pHandle->pNdWrapper; + SMgmtWrapper *pWrapper = NULL; switch (msgType) { case TDMT_DND_SERVER_STATUS: @@ -171,7 +171,7 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } - if (pWrapper == NULL) { + if (pHandle->defaultNtype == NODE_END) { dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle); if (isReq) { SRpcMsg rspMsg = { @@ -182,13 +182,14 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } - if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) { + pWrapper = &pDnode->wrappers[pHandle->defaultNtype]; + if (pHandle->needCheckVgId) { SMsgHead *pHead = pMsg->pCont; int32_t vgId = ntohl(pHead->vgId); if (vgId == QNODE_HANDLE) { - pWrapper = pHandle->pQndWrapper; + pWrapper = &pDnode->wrappers[QNODE]; } else if (vgId == MNODE_HANDLE) { - pWrapper = pHandle->pMndWrapper; + pWrapper = &pDnode->wrappers[MNODE]; } else { } } @@ -202,34 +203,25 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { int32_t dmInitMsgHandle(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; - - for (EDndNodeType n = DNODE; n < NODE_END; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - + for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { - NodeMsgFp msgFp = pWrapper->msgFps[msgIndex]; - int8_t vgId = pWrapper->msgVgIds[msgIndex]; - if (msgFp == NULL) continue; - SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex]; - if (vgId == QNODE_HANDLE) { - if (pHandle->pQndWrapper != NULL) { - dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); - return -1; - } - pHandle->pQndWrapper = pWrapper; - } else if (vgId == MNODE_HANDLE) { - if (pHandle->pMndWrapper != NULL) { - dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); - return -1; - } - pHandle->pMndWrapper = pWrapper; - } else { - if (pHandle->pNdWrapper != NULL) { - dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); - return -1; - } - pHandle->pNdWrapper = pWrapper; + pHandle->defaultNtype = NODE_END; + } + } + + for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; + for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { + SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex]; + NodeMsgFp msgFp = pWrapper->msgFps[msgIndex]; + bool needCheckVgId = pWrapper->needCheckVgIds[msgIndex]; + + if (msgFp == NULL) continue; + if (needCheckVgId) pHandle->needCheckVgId = needCheckVgId; + if (!needCheckVgId) { + pHandle->defaultNtype = ntype; } } }