From bf2d1a79892547061f3614443100f2f904fb0861 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 12 May 2022 15:53:54 +0800 Subject: [PATCH] refactor: node mgmt --- include/common/tmsgcb.h | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 +- source/dnode/mgmt/node_mgmt/inc/dmImp.h | 2 -- source/dnode/mgmt/node_mgmt/src/dmExec.c | 4 +++- source/dnode/mgmt/node_mgmt/src/dmObj.c | 12 ++++++++++-- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 17 ++++++++++------- 6 files changed, 25 insertions(+), 14 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 68f70a30af..f13815d320 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -42,7 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp); -typedef void (*SendMnodeRecvFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq, const SRpcMsg* pRsp); +typedef void (*SendMnodeRecvFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq, SRpcMsg* pRsp); typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7772ddb6a3..784fc6fb1f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -274,7 +274,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { SArray *vmGetMsgHandles() { int32_t code = -1; - SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); + SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmImp.h b/source/dnode/mgmt/node_mgmt/inc/dmImp.h index 5dbc6d14fc..e35b308d35 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmImp.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmImp.h @@ -116,8 +116,6 @@ void dmCleanupClient(SDnode *pDnode); SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper); SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper); int32_t dmInitMsgHandle(SDnode *pDnode); -void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp); // mgmt nodes SMgmtFunc dmGetMgmtFunc(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmExec.c b/source/dnode/mgmt/node_mgmt/src/dmExec.c index d026177f79..8837853e12 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmExec.c +++ b/source/dnode/mgmt/node_mgmt/src/dmExec.c @@ -114,8 +114,10 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { return -1; } - SMgmtInputOpt *pInput = &pWrapper->pDnode->input; SMgmtOutputOpt output = {0}; + SMgmtInputOpt *pInput = &pWrapper->pDnode->input; + pInput->name = pWrapper->name; + pInput->path = pWrapper->path; pInput->msgCb = dmGetMsgcb(pWrapper); if (pWrapper->nodeType == DNODE) { tmsgSetDefaultMsgCb(&pInput->msgCb); diff --git a/source/dnode/mgmt/node_mgmt/src/dmObj.c b/source/dnode/mgmt/node_mgmt/src/dmObj.c index 4341ee9733..d34f0b0bcc 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmObj.c +++ b/source/dnode/mgmt/node_mgmt/src/dmObj.c @@ -71,8 +71,12 @@ static void dmClearVars(SDnode *pDnode) { } static bool dmRequireNode(SMgmtWrapper *pWrapper) { + SMgmtInputOpt *pInput = &pWrapper->pDnode->input; + pInput->name = pWrapper->name; + pInput->path = pWrapper->path; + bool required = false; - int32_t code = (*pWrapper->func.requiredFp)(&pWrapper->pDnode->input, &required); + int32_t code = (*pWrapper->func.requiredFp)(pInput, &required); if (!required) { dDebug("node:%s, does not require startup", pWrapper->name); } @@ -80,7 +84,7 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) { } SDnode *dmCreate(const SDnodeOpt *pOption) { - dDebug("start to create dnode"); + dInfo("start to create dnode"); int32_t code = -1; char path[PATH_MAX + 100] = {0}; SDnode *pDnode = NULL; @@ -91,6 +95,10 @@ SDnode *dmCreate(const SDnodeOpt *pOption) { goto _OVER; } + if (dmInitVars(pDnode, pOption) != 0) { + goto _OVER; + } + dmSetStatus(pDnode, DND_STAT_INIT); pDnode->wrappers[DNODE].func = dmGetMgmtFunc(); pDnode->wrappers[MNODE].func = mmGetMgmtFunc(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 8c22bd577f..57d9136467 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -256,14 +256,14 @@ static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) { } } -void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { +static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp); } -void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) { +static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) { SEpSet epSet = {0}; - dmGetMnodeEpSet(pDnode, &epSet); - rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp); + dmGetMnodeEpSet(pWrapper->pDnode, &epSet); + dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp); } static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { @@ -485,8 +485,10 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcRsp = {0}; + SEpSet epSet = {0}; dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt); - dmSendToMnodeRecv(pDnode, &rpcMsg, &rpcRsp); + dmGetMnodeEpSet(pDnode, &epSet); + dmSendRecv(pDnode, &epSet, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { terrno = rpcRsp.code; @@ -543,14 +545,15 @@ void dmCleanupServer(SDnode *pDnode) { SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { SMsgCb msgCb = { + .pWrapper = pWrapper, + .clientRpc = pWrapper->pDnode->trans.clientRpc, .sendReqFp = dmSendReq, .sendRspFp = dmSendRsp, + .sendMnodeRecvFp = dmSendToMnodeRecv, .sendRedirectRspFp = dmSendRedirectRsp, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .releaseHandleFp = dmReleaseHandle, .reportStartupFp = dmReportStartupByWrapper, - .clientRpc = pWrapper->pDnode->trans.clientRpc, - .pWrapper = pWrapper, }; return msgCb; }