From fc692411e96a7e14716efd7ce7685d17af38d0ec Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Mon, 4 May 2020 23:59:30 +0000 Subject: [PATCH] messages received are handled to different modules --- src/dnode/src/dnodeClient.c | 77 ----------------- src/dnode/src/dnodeDnode.c | 165 ++++++++++++++++++++++++++++++++++++ src/dnode/src/dnodeServer.c | 95 --------------------- src/dnode/src/dnodeShell.c | 39 ++++++++- src/inc/dnode.h | 2 +- src/mnode/inc/mgmtServer.h | 1 - src/mnode/src/mgmtDnode.c | 2 +- src/mnode/src/mgmtServer.c | 2 +- src/mnode/src/mgmtShell.c | 3 - src/mnode/src/mgmtTable.c | 2 +- src/mnode/src/mgmtVgroup.c | 2 +- 11 files changed, 207 insertions(+), 183 deletions(-) delete mode 100644 src/dnode/src/dnodeClient.c create mode 100644 src/dnode/src/dnodeDnode.c delete mode 100644 src/dnode/src/dnodeServer.c diff --git a/src/dnode/src/dnodeClient.c b/src/dnode/src/dnodeClient.c deleted file mode 100644 index aa3ec0595f..0000000000 --- a/src/dnode/src/dnodeClient.c +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "taosmsg.h" -#include "trpc.h" -#include "tutil.h" -#include "tglobal.h" -#include "dnode.h" -#include "dnodeLog.h" -#include "dnodeMgmt.h" - -static void *tsDnodeClientRpc; -static void (*dnodeProcessDnodeRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg); -extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); - -int32_t dnodeInitClient() { - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.label = "DND-C"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = dnodeProcessRspFromDnode; - rpcInit.ufp = dnodeUpdateIpSet; - rpcInit.sessions = 100; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.idleTime = tsShellActivityTimer * 2000; - rpcInit.user = "t"; - rpcInit.ckey = "key"; - rpcInit.secret = "secret"; - - tsDnodeClientRpc = rpcOpen(&rpcInit); - if (tsDnodeClientRpc == NULL) { - dError("failed to init mnode rpc client"); - return -1; - } - - dPrint("inter-dndoes rpc client is opened"); - return 0; -} - -void dnodeCleanupClient() { - if (tsDnodeClientRpc) { - rpcClose(tsDnodeClientRpc); - tsDnodeClientRpc = NULL; - dPrint("inter-dnodes rpc client is closed"); - } -} - -static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) { - if (dnodeProcessDnodeRspFp[pMsg->msgType]) { - (*dnodeProcessDnodeRspFp[pMsg->msgType])(pMsg); - } else { - dError("%s is not processed", taosMsg[pMsg->msgType]); - } - rpcFreeCont(pMsg->pCont); -} - -void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { - dnodeProcessDnodeRspFp[msgType] = fp; -} - -void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { - rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg); -} diff --git a/src/dnode/src/dnodeDnode.c b/src/dnode/src/dnodeDnode.c new file mode 100644 index 0000000000..dc48262009 --- /dev/null +++ b/src/dnode/src/dnodeDnode.c @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +/* this file is mainly responsible for the communication between DNODEs. Each + * dnode works as both server and client. Dnode may send status, grant, config + * messages to mnode, mnode may send create/alter/drop table/vnode messages + * to dnode. All theses messages are handled from here + */ + +#include "os.h" +#include "taosmsg.h" +#include "tglobal.h" +#include "trpc.h" +#include "dnode.h" +#include "dnodeLog.h" +#include "dnodeMgmt.h" +#include "dnodeWrite.h" +#include "mnode.h" + +extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); +static void (*dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg); +static void (*dnodeProcessRspMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg); +static void *tsDnodeServerRpc = NULL; +static void *tsDnodeClientRpc = NULL; + +int32_t dnodeInitServer() { + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeWrite; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeWrite; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeWrite; + + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeMgmt; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeMgmt; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeMgmt; + + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mgmtProcessReqMsgFromDnode; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mgmtProcessReqMsgFromDnode; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mgmtProcessReqMsgFromDnode; + dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mgmtProcessReqMsgFromDnode; + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = tsDnodeDnodePort; + rpcInit.label = "DND-S"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = dnodeProcessReqMsgFromDnode; + rpcInit.sessions = 100; + rpcInit.connType = TAOS_CONN_SERVER; + rpcInit.idleTime = tsShellActivityTimer * 2000; + + tsDnodeServerRpc = rpcOpen(&rpcInit); + if (tsDnodeServerRpc == NULL) { + dError("failed to init inter-dnodes RPC server"); + return -1; + } + + dPrint("inter-dnodes RPC server is opened"); + return 0; +} + +void dnodeCleanupServer() { + if (tsDnodeServerRpc) { + rpcClose(tsDnodeServerRpc); + tsDnodeServerRpc = NULL; + dPrint("inter-dnodes RPC server is closed"); + } +} + +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { + SRpcMsg rspMsg; + rspMsg.handle = pMsg->handle; + rspMsg.pCont = NULL; + rspMsg.contLen = 0; + + if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { + rspMsg.code = TSDB_CODE_NOT_READY; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + dTrace("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); + return; + } + + if (pMsg->pCont == NULL) { + rspMsg.code = TSDB_CODE_INVALID_MSG_LEN; + rpcSendResponse(&rspMsg); + return; + } + + if (dnodeProcessReqMsgFp[pMsg->msgType]) { + (*dnodeProcessReqMsgFp[pMsg->msgType])(pMsg); + } else { + rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); + dTrace("RPC %p, message:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); + return; + + } +} + +int32_t dnodeInitClient() { + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.label = "DND-C"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = dnodeProcessRspFromDnode; + rpcInit.ufp = dnodeUpdateIpSet; + rpcInit.sessions = 100; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.idleTime = tsShellActivityTimer * 2000; + rpcInit.user = "t"; + rpcInit.ckey = "key"; + rpcInit.secret = "secret"; + + tsDnodeClientRpc = rpcOpen(&rpcInit); + if (tsDnodeClientRpc == NULL) { + dError("failed to init mnode rpc client"); + return -1; + } + + dPrint("inter-dndoes rpc client is opened"); + return 0; +} + +void dnodeCleanupClient() { + if (tsDnodeClientRpc) { + rpcClose(tsDnodeClientRpc); + tsDnodeClientRpc = NULL; + dPrint("inter-dnodes rpc client is closed"); + } +} + +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) { + + if (dnodeProcessRspMsgFp[pMsg->msgType]) { + (*dnodeProcessRspMsgFp[pMsg->msgType])(pMsg); + } else { + dError("RPC %p, msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); + } + + rpcFreeCont(pMsg->pCont); +} + +void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { + dnodeProcessRspMsgFp[msgType] = fp; +} + +void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { + rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg); +} diff --git a/src/dnode/src/dnodeServer.c b/src/dnode/src/dnodeServer.c deleted file mode 100644 index 169cd6cffa..0000000000 --- a/src/dnode/src/dnodeServer.c +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "os.h" -#include "taosmsg.h" -#include "tglobal.h" -#include "trpc.h" -#include "dnode.h" -#include "dnodeLog.h" -#include "dnodeMgmt.h" -#include "dnodeWrite.h" - -static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg); -static void *tsDnodeServerRpc = NULL; - -int32_t dnodeInitServer() { - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeWrite; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeWrite; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeWrite; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeMgmt; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeMgmt; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeMgmt; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeMgmt; - - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = tsDnodeDnodePort; - rpcInit.label = "DND-S"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = dnodeProcessReqMsgFromDnode; - rpcInit.sessions = 100; - rpcInit.connType = TAOS_CONN_SERVER; - rpcInit.idleTime = tsShellActivityTimer * 2000; - - tsDnodeServerRpc = rpcOpen(&rpcInit); - if (tsDnodeServerRpc == NULL) { - dError("failed to init inter-dnodes RPC server"); - return -1; - } - - dPrint("inter-dnodes RPC server is opened"); - return 0; -} - -void dnodeCleanupServer() { - if (tsDnodeServerRpc) { - rpcClose(tsDnodeServerRpc); - tsDnodeServerRpc = NULL; - dPrint("inter-dnodes RPC server is closed"); - } -} - -void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg); - -static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { - SRpcMsg rspMsg; - rspMsg.handle = pMsg->handle; - rspMsg.pCont = NULL; - rspMsg.contLen = 0; - - if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { - rspMsg.code = TSDB_CODE_NOT_READY; - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); - dTrace("thandle:%p, query msg is ignored since dnode not running", pMsg->handle); - return; - } - - if (pMsg->pCont == NULL) { - rspMsg.code = TSDB_CODE_INVALID_MSG_LEN; - rpcSendResponse(&rspMsg); - return; - } - - if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { - (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); - } else { - mgmtProcessReqMsgFromDnode(pMsg); - } -} - diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index e6a392a341..b16227cf39 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -41,6 +41,37 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeRead; + // the following message shall be treated as mnode write + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mgmtProcessMsgFromShell; + + // the following message shall be treated as mnode query + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mgmtProcessMsgFromShell; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= mgmtProcessMsgFromShell; + int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); if (numOfThreads < 1) { @@ -82,7 +113,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { rpcMsg.contLen = 0; if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { - dError("RPC %p, shell msg is ignored since dnode not running", pMsg->handle); + dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); rpcMsg.code = TSDB_CODE_NOT_READY; rpcSendResponse(&rpcMsg); rpcFreeCont(pMsg->pCont); @@ -98,7 +129,11 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg) { if ( dnodeProcessShellMsgFp[pMsg->msgType] ) { (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg); } else { - mgmtProcessMsgFromShell(pMsg); + dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); + rpcMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; + rpcSendResponse(&rpcMsg); + rpcFreeCont(pMsg->pCont); + return; } } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 99b9046aac..5145a46831 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -50,7 +50,7 @@ void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); -void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); +void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtServer.h b/src/mnode/inc/mgmtServer.h index 180e893cb0..08e4463ad8 100644 --- a/src/mnode/inc/mgmtServer.h +++ b/src/mnode/inc/mgmtServer.h @@ -22,7 +22,6 @@ extern "C" { int32_t mgmtInitServer(); void mgmtCleanupServer(); -void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 33c012f079..30835dd201 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -151,7 +151,7 @@ int32_t mgmtInitDnodes() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DNODE, mgmtProcessDropDnodeMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp); - mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg); + dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta); diff --git a/src/mnode/src/mgmtServer.c b/src/mnode/src/mgmtServer.c index c2b07a3f4e..2fa6e68f65 100644 --- a/src/mnode/src/mgmtServer.c +++ b/src/mnode/src/mgmtServer.c @@ -52,7 +52,7 @@ void mgmtCleanupServer() { } } -void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { +void dnodeAddServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { mgmtProcessDnodeMsgFp[msgType] = fp; } diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index a5659fee54..c04f9bb2d1 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -43,7 +43,6 @@ typedef int32_t (*SShowRetrieveFp)(SShowObj *pShow, char *data, int32_t rows, vo //static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); static bool mgmtCheckMsgReadOnly(SQueuedMsg *pMsg); -//static void mgmtProcessMsgFromShell(SRpcMsg *pMsg); static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg); static void mgmtProcessShowMsg(SQueuedMsg *queuedMsg); static void mgmtProcessRetrieveMsg(SQueuedMsg *queuedMsg); @@ -52,7 +51,6 @@ static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg); static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg); void *tsMgmtTmr; -//static void *tsMgmtShellRpc = NULL; static void *tsMgmtTranQhandle = NULL; static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0}; static void *tsQhandleCache = NULL; @@ -121,7 +119,6 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { } void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { - assert(rpcMsg); if (rpcMsg->pCont == NULL) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index c6d973fd81..a73a6abb97 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -542,7 +542,7 @@ int32_t mgmtInitTables() { dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); - mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); + dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 4b455de41a..9efbdfeea2 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -221,7 +221,7 @@ int32_t mgmtInitVgroups() { mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); - mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg); + dnodeAddServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg); mTrace("table:vgroups is created");