From e00f74ccbc3f7f1b33e91c5a0ace429120d7b1b4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 2 Apr 2022 20:05:42 +0800 Subject: [PATCH] refact dm --- source/dnode/mgmt/bm/src/bmInt.c | 2 +- source/dnode/mgmt/dm/inc/dmInt.h | 9 +++- source/dnode/mgmt/dm/src/dmHandle.c | 22 +++++----- source/dnode/mgmt/dm/src/dmInt.c | 8 ++-- source/dnode/mgmt/dm/src/dmWorker.c | 14 +----- source/dnode/mgmt/main/inc/dnd.h | 67 ++++++++++++++++++++--------- source/dnode/mgmt/main/src/dndInt.c | 12 +++--- source/dnode/mgmt/mm/src/mmInt.c | 2 +- source/dnode/mgmt/qm/src/qmInt.c | 2 +- source/dnode/mgmt/sm/src/smInt.c | 2 +- source/dnode/mgmt/vm/src/vmInt.c | 2 +- 11 files changed, 83 insertions(+), 59 deletions(-) diff --git a/source/dnode/mgmt/bm/src/bmInt.c b/source/dnode/mgmt/bm/src/bmInt.c index 4ca7afbb6d..2cb0f50dfe 100644 --- a/source/dnode/mgmt/bm/src/bmInt.c +++ b/source/dnode/mgmt/bm/src/bmInt.c @@ -109,7 +109,7 @@ int32_t bmOpen(SMgmtWrapper *pWrapper) { return code; } -void bmGetMgmtFp(SMgmtWrapper *pWrapper) { +void bmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = bmOpen; mgmtFp.closeFp = bmClose; diff --git a/source/dnode/mgmt/dm/inc/dmInt.h b/source/dnode/mgmt/dm/inc/dmInt.h index 98abb8ef54..e68f7ddb65 100644 --- a/source/dnode/mgmt/dm/inc/dmInt.h +++ b/source/dnode/mgmt/dm/inc/dmInt.h @@ -39,18 +39,25 @@ typedef struct SDnodeMgmt { SMgmtWrapper *pWrapper; } SDnodeMgmt; +// dmInt.c +void dmSetMgmtFp(SMgmtWrapper *pWrapper); +void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); +void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); +void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg); + // dmFile.c int32_t dmReadFile(SDnodeMgmt *pMgmt); int32_t dmWriteFile(SDnodeMgmt *pMgmt); void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps); -// dmMsg.c +// dmHandle.c void dmInitMsgHandle(SMgmtWrapper *pWrapper); void dmSendStatusReq(SDnodeMgmt *pMgmt); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessCDnodeMsg(SDnode *pDnode, SNodeMsg *pMsg); // dmWorker.c int32_t dmStartThread(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/dm/src/dmHandle.c b/source/dnode/mgmt/dm/src/dmHandle.c index 07eca3f230..6492995c5c 100644 --- a/source/dnode/mgmt/dm/src/dmHandle.c +++ b/source/dnode/mgmt/dm/src/dmHandle.c @@ -118,7 +118,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { } -static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) { +static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); if (pWrapper != NULL) { dndReleaseWrapper(pWrapper); @@ -146,7 +146,7 @@ static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg return code; } -static int32_t dndProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) { +static int32_t dmProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); if (pWrapper == NULL) { terrno = TSDB_CODE_NODE_NOT_DEPLOYED; @@ -171,24 +171,24 @@ static int32_t dndProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg * return code; } -int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { +int32_t dmProcessCDnodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { switch (pMsg->rpcMsg.msgType) { case TDMT_DND_CREATE_MNODE: - return dndProcessCreateNodeMsg(pDnode, MNODE, pMsg); + return dmProcessCreateNodeMsg(pDnode, MNODE, pMsg); case TDMT_DND_DROP_MNODE: - return dndProcessDropNodeMsg(pDnode, MNODE, pMsg); + return dmProcessDropNodeMsg(pDnode, MNODE, pMsg); case TDMT_DND_CREATE_QNODE: - return dndProcessCreateNodeMsg(pDnode, QNODE, pMsg); + return dmProcessCreateNodeMsg(pDnode, QNODE, pMsg); case TDMT_DND_DROP_QNODE: - return dndProcessDropNodeMsg(pDnode, QNODE, pMsg); + return dmProcessDropNodeMsg(pDnode, QNODE, pMsg); case TDMT_DND_CREATE_SNODE: - return dndProcessCreateNodeMsg(pDnode, SNODE, pMsg); + return dmProcessCreateNodeMsg(pDnode, SNODE, pMsg); case TDMT_DND_DROP_SNODE: - return dndProcessDropNodeMsg(pDnode, SNODE, pMsg); + return dmProcessDropNodeMsg(pDnode, SNODE, pMsg); case TDMT_DND_CREATE_BNODE: - return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg); + return dmProcessCreateNodeMsg(pDnode, BNODE, pMsg); case TDMT_DND_DROP_BNODE: - return dndProcessDropNodeMsg(pDnode, BNODE, pMsg); + return dmProcessDropNodeMsg(pDnode, BNODE, pMsg); default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; return -1; diff --git a/source/dnode/mgmt/dm/src/dmInt.c b/source/dnode/mgmt/dm/src/dmInt.c index 1e38b8304d..c710af9006 100644 --- a/source/dnode/mgmt/dm/src/dmInt.c +++ b/source/dnode/mgmt/dm/src/dmInt.c @@ -78,7 +78,7 @@ static int32_t dmStart(SMgmtWrapper *pWrapper) { return dmStartThread(pWrapper->pMgmt); } -int32_t dmInit(SMgmtWrapper *pWrapper) { +static int32_t dmInit(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt)); dInfo("dnode-mgmt start to init"); @@ -124,7 +124,7 @@ int32_t dmInit(SMgmtWrapper *pWrapper) { return 0; } -void dmCleanup(SMgmtWrapper *pWrapper) { +static void dmCleanup(SMgmtWrapper *pWrapper) { SDnodeMgmt *pMgmt = pWrapper->pMgmt; if (pMgmt == NULL) return; @@ -153,12 +153,12 @@ void dmCleanup(SMgmtWrapper *pWrapper) { dInfo("dnode-mgmt is cleaned up"); } -int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) { +static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) { *required = true; return 0; } -void dmGetMgmtFp(SMgmtWrapper *pWrapper) { +void dmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = dmInit; mgmtFp.closeFp = dmCleanup; diff --git a/source/dnode/mgmt/dm/src/dmWorker.c b/source/dnode/mgmt/dm/src/dmWorker.c index 6061189ca1..40ca858183 100644 --- a/source/dnode/mgmt/dm/src/dmWorker.c +++ b/source/dnode/mgmt/dm/src/dmWorker.c @@ -66,16 +66,6 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { dTrace("msg:%p, will be processed in dnode queue", pMsg); switch (pRpc->msgType) { - case TDMT_DND_CREATE_MNODE: - case TDMT_DND_CREATE_QNODE: - case TDMT_DND_CREATE_SNODE: - case TDMT_DND_CREATE_BNODE: - case TDMT_DND_DROP_MNODE: - case TDMT_DND_DROP_QNODE: - case TDMT_DND_DROP_SNODE: - case TDMT_DND_DROP_BNODE: - code = dndProcessNodeMsg(pMgmt->pDnode, pMsg); - break; case TDMT_DND_CONFIG_DNODE: code = dmProcessConfigReq(pMgmt, pMsg); break; @@ -89,8 +79,8 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { code = dmProcessGrantRsp(pMgmt, pMsg); break; default: - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dError("msg:%p, type:%s not processed in dnode queue", pRpc->handle, TMSG_INFO(pRpc->msgType)); + code = dmProcessCDnodeMsg(pMgmt->pDnode, pMsg); + break; } if (pRpc->msgType & 1u) { diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index b324a279bc..449c4399db 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -27,13 +27,13 @@ #include "tlockfree.h" #include "tlog.h" #include "tmsg.h" +#include "tmsgcb.h" #include "tprocess.h" #include "tqueue.h" #include "trpc.h" #include "tthread.h" #include "ttime.h" #include "tworker.h" -#include "tmsgcb.h" #include "dnode.h" #include "monitor.h" @@ -42,12 +42,42 @@ extern "C" { #endif -#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} -#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} -#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} -#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} -#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} -#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} +#define dFatal(...) \ + { \ + if (dDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ + } \ + } +#define dError(...) \ + { \ + if (dDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ + } \ + } +#define dWarn(...) \ + { \ + if (dDebugFlag & DEBUG_WARN) { \ + taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ + } \ + } +#define dInfo(...) \ + { \ + if (dDebugFlag & DEBUG_INFO) { \ + taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); \ + } \ + } +#define dDebug(...) \ + { \ + if (dDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); \ + } \ + } +#define dTrace(...) \ + { \ + if (dDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); \ + } \ + } typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; @@ -73,7 +103,7 @@ typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required); typedef struct SMsgHandle { - SMgmtWrapper *pQndWrapper; + SMgmtWrapper *pQndWrapper; SMgmtWrapper *pMndWrapper; SMgmtWrapper *pWrapper; } SMsgHandle; @@ -146,14 +176,11 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); void dndReleaseWrapper(SMgmtWrapper *pWrapper); +void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); // dndMonitor.h void dndSendMonitorReport(SDnode *pDnode); -// dndMsg.h -void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); -int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); - // dndStr.h const char *dndStatStr(EDndStatus stat); const char *dndNodeLogStr(ENodeType ntype); @@ -168,12 +195,12 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper); int32_t dndInitMsgHandle(SDnode *pDnode); // mgmt -void dmGetMgmtFp(SMgmtWrapper *pWrapper); -void bmGetMgmtFp(SMgmtWrapper *pWrapper); -void qmGetMgmtFp(SMgmtWrapper *pMgmt); -void smGetMgmtFp(SMgmtWrapper *pWrapper); -void vmGetMgmtFp(SMgmtWrapper *pWrapper); -void mmGetMgmtFp(SMgmtWrapper *pMgmt); +void dmSetMgmtFp(SMgmtWrapper *pWrapper); +void bmSetMgmtFp(SMgmtWrapper *pWrapper); +void qmSetMgmtFp(SMgmtWrapper *pMgmt); +void smSetMgmtFp(SMgmtWrapper *pWrapper); +void vmSetMgmtFp(SMgmtWrapper *pWrapper); +void mmSetMgmtFp(SMgmtWrapper *pMgmt); void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); @@ -194,8 +221,8 @@ void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads); int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo); void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo); int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo); - + SMonGrantInfo *pGrantInfo); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/main/src/dndInt.c b/source/dnode/mgmt/main/src/dndInt.c index 72feed7c62..80fefdb3ef 100644 --- a/source/dnode/mgmt/main/src/dndInt.c +++ b/source/dnode/mgmt/main/src/dndInt.c @@ -82,12 +82,12 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { } dndSetStatus(pDnode, DND_STAT_INIT); - dmGetMgmtFp(&pDnode->wrappers[DNODE]); - mmGetMgmtFp(&pDnode->wrappers[MNODE]); - vmGetMgmtFp(&pDnode->wrappers[VNODES]); - qmGetMgmtFp(&pDnode->wrappers[QNODE]); - smGetMgmtFp(&pDnode->wrappers[SNODE]); - bmGetMgmtFp(&pDnode->wrappers[BNODE]); + dmSetMgmtFp(&pDnode->wrappers[DNODE]); + mmSetMgmtFp(&pDnode->wrappers[MNODE]); + vmSetMgmtFp(&pDnode->wrappers[VNODES]); + qmSetMgmtFp(&pDnode->wrappers[QNODE]); + smSetMgmtFp(&pDnode->wrappers[SNODE]); + bmSetMgmtFp(&pDnode->wrappers[BNODE]); for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; diff --git a/source/dnode/mgmt/mm/src/mmInt.c b/source/dnode/mgmt/mm/src/mmInt.c index bd7446780d..97fdafe139 100644 --- a/source/dnode/mgmt/mm/src/mmInt.c +++ b/source/dnode/mgmt/mm/src/mmInt.c @@ -227,7 +227,7 @@ static int32_t mmStart(SMgmtWrapper *pWrapper) { return mndStart(pMgmt->pMnode); } -void mmGetMgmtFp(SMgmtWrapper *pWrapper) { +void mmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = mmOpen; mgmtFp.closeFp = mmClose; diff --git a/source/dnode/mgmt/qm/src/qmInt.c b/source/dnode/mgmt/qm/src/qmInt.c index 8079859b96..53c43bf841 100644 --- a/source/dnode/mgmt/qm/src/qmInt.c +++ b/source/dnode/mgmt/qm/src/qmInt.c @@ -112,7 +112,7 @@ int32_t qmOpen(SMgmtWrapper *pWrapper) { return code; } -void qmGetMgmtFp(SMgmtWrapper *pWrapper) { +void qmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = qmOpen; mgmtFp.closeFp = qmClose; diff --git a/source/dnode/mgmt/sm/src/smInt.c b/source/dnode/mgmt/sm/src/smInt.c index 6d2e2aaefd..a279655609 100644 --- a/source/dnode/mgmt/sm/src/smInt.c +++ b/source/dnode/mgmt/sm/src/smInt.c @@ -109,7 +109,7 @@ int32_t smOpen(SMgmtWrapper *pWrapper) { return code; } -void smGetMgmtFp(SMgmtWrapper *pWrapper) { +void smSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = smOpen; mgmtFp.closeFp = smClose; diff --git a/source/dnode/mgmt/vm/src/vmInt.c b/source/dnode/mgmt/vm/src/vmInt.c index d9ef7b5ae6..d78a567b69 100644 --- a/source/dnode/mgmt/vm/src/vmInt.c +++ b/source/dnode/mgmt/vm/src/vmInt.c @@ -333,7 +333,7 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) { return 0; } -void vmGetMgmtFp(SMgmtWrapper *pWrapper) { +void vmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = vmInit; mgmtFp.closeFp = vmCleanup;