refactror: node mgmt
This commit is contained in:
parent
de17518577
commit
c4b409c31e
|
@ -425,9 +425,12 @@ enum {
|
|||
SND_WORKER_TYPE__UNIQUE,
|
||||
};
|
||||
|
||||
#define MNODE_HANDLE -1
|
||||
#define QNODE_HANDLE 1
|
||||
#define DEFAULT_HANDLE 0
|
||||
#define MNODE_HANDLE -1
|
||||
#define QNODE_HANDLE -2
|
||||
#define SNODE_HANDLE -3
|
||||
#define VNODE_HANDLE -4
|
||||
#define BNODE_HANDLE -5
|
||||
|
||||
#define TSDB_CONFIG_OPTION_LEN 16
|
||||
#define TSDB_CONIIG_VALUE_LEN 48
|
||||
|
|
|
@ -94,5 +94,5 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
void bmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, bmProcessMonitorMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, bmProcessMonitorMsg, 0);
|
||||
}
|
||||
|
|
|
@ -144,94 +144,94 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_MM_LOAD, mmProcessMonitorMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_MM_LOAD, mmProcessMonitorMsg, 0);
|
||||
|
||||
// Requests handled by DNODE
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, 0);
|
||||
|
||||
// Requests handled by MNODE
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_QNODE_LIST, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_ASK_EP, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_QNODE_LIST, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_ASK_EP, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, 0);
|
||||
|
||||
// Requests handled by VNODE
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, 0);
|
||||
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, MNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, MNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, 1);
|
||||
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_VNODE_RSP, mmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE_RSP, mmProcessWriteMsg, 0);
|
||||
}
|
||||
|
|
|
@ -94,17 +94,17 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
void qmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, qmProcessMonitorMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, qmProcessMonitorMsg, 0);
|
||||
|
||||
// Requests handled by VNODE
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, qmProcessFetchMsg, QNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, qmProcessFetchMsg, 1);
|
||||
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, QNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, QNODE_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, 1);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, 1);
|
||||
}
|
||||
|
|
|
@ -94,9 +94,9 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
void smInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, smProcessMonitorMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, smProcessMonitorMsg, 0);
|
||||
|
||||
// Requests handled by SNODE
|
||||
dmSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, 0);
|
||||
}
|
||||
|
|
|
@ -281,56 +281,56 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, 0);
|
||||
|
||||
// Requests handled by VNODE
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, 0);
|
||||
// dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE, vmProcessWriteMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, 0);
|
||||
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, vmProcessSyncMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, vmProcessSyncMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, vmProcessSyncMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, vmProcessSyncMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmProcessSyncMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, vmProcessSyncMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmProcessSyncMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, vmProcessSyncMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmProcessSyncMsg, 0);
|
||||
}
|
||||
|
|
|
@ -89,9 +89,8 @@ typedef int32_t (*DropNodeFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
|||
typedef int32_t (*RequireNodeFp)(struct SMgmtWrapper *pWrapper, bool *required);
|
||||
|
||||
typedef struct {
|
||||
SMgmtWrapper *pQndWrapper;
|
||||
SMgmtWrapper *pMndWrapper;
|
||||
SMgmtWrapper *pNdWrapper;
|
||||
EDndNodeType defaultNtype;
|
||||
bool needCheckVgId;
|
||||
} SMsgHandle;
|
||||
|
||||
typedef struct {
|
||||
|
@ -124,7 +123,7 @@ typedef struct SMgmtWrapper {
|
|||
SShm procShm;
|
||||
};
|
||||
struct {
|
||||
int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode
|
||||
bool needCheckVgIds[TDMT_MAX];
|
||||
NodeMsgFp msgFps[TDMT_MAX];
|
||||
};
|
||||
} SMgmtWrapper;
|
||||
|
|
|
@ -34,7 +34,7 @@ const char *dmProcStr(EDndProcType ptype);
|
|||
|
||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
|
||||
void dmSetEvent(SDnode *pDnode, EDndEvent event);
|
||||
void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
|
||||
void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgIds);
|
||||
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
|
||||
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc);
|
||||
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
|
|
|
@ -173,7 +173,7 @@ int32_t dmReadShmFile(SMgmtWrapper *pWrapper) {
|
|||
}
|
||||
}
|
||||
|
||||
if (!tsMultiProcess || pWrapper->nodeType == DNODE || pWrapper->nodeType == NODE_END) {
|
||||
if (!tsMultiProcess || pWrapper->nodeType == DNODE) {
|
||||
if (pWrapper->procShm.id >= 0) {
|
||||
dDebug("node:%s, shmid:%d, is closed, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size);
|
||||
taosDropShm(&pWrapper->procShm);
|
||||
|
|
|
@ -29,9 +29,9 @@ void dmSetEvent(SDnode *pDnode, EDndEvent event) {
|
|||
}
|
||||
}
|
||||
|
||||
void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
|
||||
void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgId) {
|
||||
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
|
||||
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
|
||||
pWrapper->needCheckVgIds[TMSG_INDEX(msgType)] = needCheckVgId;
|
||||
}
|
||||
|
||||
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
|
||||
|
|
|
@ -187,19 +187,19 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg)
|
|||
|
||||
static void dmSetMgmtMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by DNODE
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, 0);
|
||||
|
||||
// Requests handled by MNODE
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, 0);
|
||||
dmSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, 0);
|
||||
}
|
||||
|
||||
static int32_t dmStartMgmt(SMgmtWrapper *pWrapper) {
|
||||
|
|
|
@ -133,7 +133,7 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
tmsg_t msgType = pMsg->msgType;
|
||||
bool isReq = msgType & 1u;
|
||||
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
|
||||
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
|
||||
SMgmtWrapper *pWrapper = NULL;
|
||||
|
||||
switch (msgType) {
|
||||
case TDMT_DND_SERVER_STATUS:
|
||||
|
@ -171,7 +171,7 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (pWrapper == NULL) {
|
||||
if (pHandle->defaultNtype == NODE_END) {
|
||||
dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
|
||||
if (isReq) {
|
||||
SRpcMsg rspMsg = {
|
||||
|
@ -182,13 +182,14 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) {
|
||||
pWrapper = &pDnode->wrappers[pHandle->defaultNtype];
|
||||
if (pHandle->needCheckVgId) {
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
int32_t vgId = ntohl(pHead->vgId);
|
||||
if (vgId == QNODE_HANDLE) {
|
||||
pWrapper = pHandle->pQndWrapper;
|
||||
pWrapper = &pDnode->wrappers[QNODE];
|
||||
} else if (vgId == MNODE_HANDLE) {
|
||||
pWrapper = pHandle->pMndWrapper;
|
||||
pWrapper = &pDnode->wrappers[MNODE];
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
@ -202,34 +203,25 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
|
||||
int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||
SDnodeTrans *pTrans = &pDnode->trans;
|
||||
|
||||
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||
|
||||
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
|
||||
NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
|
||||
int8_t vgId = pWrapper->msgVgIds[msgIndex];
|
||||
if (msgFp == NULL) continue;
|
||||
|
||||
SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex];
|
||||
if (vgId == QNODE_HANDLE) {
|
||||
if (pHandle->pQndWrapper != NULL) {
|
||||
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
||||
return -1;
|
||||
}
|
||||
pHandle->pQndWrapper = pWrapper;
|
||||
} else if (vgId == MNODE_HANDLE) {
|
||||
if (pHandle->pMndWrapper != NULL) {
|
||||
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
||||
return -1;
|
||||
}
|
||||
pHandle->pMndWrapper = pWrapper;
|
||||
} else {
|
||||
if (pHandle->pNdWrapper != NULL) {
|
||||
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
||||
return -1;
|
||||
}
|
||||
pHandle->pNdWrapper = pWrapper;
|
||||
pHandle->defaultNtype = NODE_END;
|
||||
}
|
||||
}
|
||||
|
||||
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
|
||||
SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex];
|
||||
NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
|
||||
bool needCheckVgId = pWrapper->needCheckVgIds[msgIndex];
|
||||
|
||||
if (msgFp == NULL) continue;
|
||||
if (needCheckVgId) pHandle->needCheckVgId = needCheckVgId;
|
||||
if (!needCheckVgId) {
|
||||
pHandle->defaultNtype = ntype;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue