From f7cf8229c8f7b2582cf2288783415b206c3a6c8b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Apr 2022 18:04:40 +0800 Subject: [PATCH] refact(cluster): node mgmt --- include/dnode/mgmt/dnode.h | 2 +- source/dnode/mgmt/exe/dndMain.c | 2 +- source/dnode/mgmt/implement/inc/dndNode.h | 8 -- source/dnode/mgmt/implement/src/dndObj.c | 68 ------------- source/dnode/mgmt/interface/inc/dndInt.h | 62 +++--------- .../{implement => interface}/src/dndEnv.c | 2 +- source/dnode/mgmt/interface/src/dndFile.c | 3 +- source/dnode/mgmt/interface/src/dndInt.c | 96 +++++++++++++++++-- source/dnode/mgmt/test/sut/src/server.cpp | 2 +- 9 files changed, 106 insertions(+), 139 deletions(-) rename source/dnode/mgmt/{implement => interface}/src/dndEnv.c (98%) diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index e4f4bdf8f9..576b44e14b 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -81,7 +81,7 @@ int32_t dndRun(SDnode *pDnode); * @param pDnode The dnode object to close. * @param event The event to handle. */ -void dndHandleEvent(SDnode *pDnode, EDndEvent event); +void dndSetEvent(SDnode *pDnode, EDndEvent event); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/exe/dndMain.c b/source/dnode/mgmt/exe/dndMain.c index 20a73fd893..93892996c7 100644 --- a/source/dnode/mgmt/exe/dndMain.c +++ b/source/dnode/mgmt/exe/dndMain.c @@ -32,7 +32,7 @@ static struct { static void dndStopDnode(int signum, void *info, void *ctx) { SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode); if (pDnode != NULL) { - dndHandleEvent(pDnode, DND_EVENT_STOP); + dndSetEvent(pDnode, DND_EVENT_STOP); } } diff --git a/source/dnode/mgmt/implement/inc/dndNode.h b/source/dnode/mgmt/implement/inc/dndNode.h index f5d93d2f61..237b65a962 100644 --- a/source/dnode/mgmt/implement/inc/dndNode.h +++ b/source/dnode/mgmt/implement/inc/dndNode.h @@ -25,14 +25,6 @@ extern "C" { int32_t dndOpenNode(SMgmtWrapper *pWrapper); void dndCloseNode(SMgmtWrapper *pWrapper); -void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType nType); -int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); -void dndReleaseWrapper(SMgmtWrapper *pWrapper); -void dndHandleEvent(SDnode *pDnode, EDndEvent event); -void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); -void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); - // dndTransport.c int32_t dndInitTrans(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode); diff --git a/source/dnode/mgmt/implement/src/dndObj.c b/source/dnode/mgmt/implement/src/dndObj.c index 4300ee0853..b008b5ce8a 100644 --- a/source/dnode/mgmt/implement/src/dndObj.c +++ b/source/dnode/mgmt/implement/src/dndObj.c @@ -149,72 +149,4 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) { } } -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SMgmtWrapper *pRetWrapper = pWrapper; - taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed) { - int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); - dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount); - } else { - terrno = TSDB_CODE_NODE_NOT_DEPLOYED; - pRetWrapper = NULL; - } - taosRUnLockLatch(&pWrapper->latch); - - return pRetWrapper; -} - -int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) { - int32_t code = 0; - - taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) { - int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); - dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount); - } else { - terrno = TSDB_CODE_NODE_NOT_DEPLOYED; - code = -1; - } - taosRUnLockLatch(&pWrapper->latch); - - return code; -} - -void dndReleaseWrapper(SMgmtWrapper *pWrapper) { - if (pWrapper == NULL) return; - - taosRLockLatch(&pWrapper->latch); - int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); - taosRUnLockLatch(&pWrapper->latch); - dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount); -} - -void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { - pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; - pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; -} - -void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { - SStartupReq *pStartup = &pDnode->startup; - tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); - tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); - pStartup->finished = 0; -} - -static void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { - memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq)); - pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); -} - -void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { - dDebug("startup req is received"); - SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); - dndGetStartup(pDnode, pStartup); - - dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); - SRpcMsg rpcRsp = { - .handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle}; - rpcSendResponse(&rpcRsp); -} \ No newline at end of file diff --git a/source/dnode/mgmt/interface/inc/dndInt.h b/source/dnode/mgmt/interface/inc/dndInt.h index 1f9d5cd8f4..542df7c784 100644 --- a/source/dnode/mgmt/interface/inc/dndInt.h +++ b/source/dnode/mgmt/interface/inc/dndInt.h @@ -16,21 +16,26 @@ #ifndef _TD_DND_INT_H_ #define _TD_DND_INT_H_ -#include "dndLog.h" #include "dndDef.h" #ifdef __cplusplus extern "C" { #endif -const char *dndStatName(EDndRunStatus stat); -const char *dndLogName(EDndNodeType ntype); -const char *dndProcName(EDndNodeType ntype); -const char *dndEventName(EDndEvent ev); - -// dndExec.c -int32_t dndOpenNode(SMgmtWrapper *pWrapper); -void dndCloseNode(SMgmtWrapper *pWrapper); +// dndInt.c +const char *dndStatName(EDndRunStatus stat); +const char *dndLogName(EDndNodeType ntype); +const char *dndProcName(EDndNodeType ntype); +const char *dndEventName(EDndEvent ev); +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType nType); +int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); +void dndReleaseWrapper(SMgmtWrapper *pWrapper); +EDndRunStatus dndGetStatus(SDnode *pDnode); +void dndSetStatus(SDnode *pDnode, EDndRunStatus stat); +void dndSetEvent(SDnode *pDnode, EDndEvent event); +void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); +void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); // dndFile.c int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); @@ -39,45 +44,6 @@ TdFilePtr dndCheckRunning(const char *dataDir); int32_t dndReadShmFile(SDnode *pDnode); int32_t dndWriteShmFile(SDnode *pDnode); -// dndInt.c -EDndRunStatus dndGetStatus(SDnode *pDnode); -void dndSetStatus(SDnode *pDnode, EDndRunStatus stat); -void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType nType); -int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); -void dndReleaseWrapper(SMgmtWrapper *pWrapper); -void dndHandleEvent(SDnode *pDnode, EDndEvent event); -void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); -void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); - -// dndTransport.c -int32_t dndInitTrans(SDnode *pDnode); -void dndCleanupTrans(SDnode *pDnode); -SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); -SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper); -int32_t dndInitMsgHandle(SDnode *pDnode); -void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); - -// mgmt -void dmSetMgmtFp(SMgmtWrapper *pWrapper); -void bmSetMgmtFp(SMgmtWrapper *pWrapper); -void qmSetMgmtFp(SMgmtWrapper *pMgmt); -void smSetMgmtFp(SMgmtWrapper *pWrapper); -void vmSetMgmtFp(SMgmtWrapper *pWrapper); -void mmSetMgmtFp(SMgmtWrapper *pMgmt); - -void dmGetMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet); -void dmUpdateMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet); -void dmSendRedirectRsp(SDnodeData *pMgmt, const SRpcMsg *pMsg); - -void dmGetMonitorSysInfo(SMonSysInfo *pInfo); -void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); -void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo); -void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo); -void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo); -void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo); -void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/implement/src/dndEnv.c b/source/dnode/mgmt/interface/src/dndEnv.c similarity index 98% rename from source/dnode/mgmt/implement/src/dndEnv.c rename to source/dnode/mgmt/interface/src/dndEnv.c index a7a3ee8c91..9f75594335 100644 --- a/source/dnode/mgmt/implement/src/dndEnv.c +++ b/source/dnode/mgmt/interface/src/dndEnv.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dndNode.h" +#include "dndInt.h" #include "wal.h" static int8_t once = DND_ENV_INIT; diff --git a/source/dnode/mgmt/interface/src/dndFile.c b/source/dnode/mgmt/interface/src/dndFile.c index c35d7fbb4a..0358e90fb3 100644 --- a/source/dnode/mgmt/interface/src/dndFile.c +++ b/source/dnode/mgmt/interface/src/dndFile.c @@ -232,7 +232,8 @@ int32_t dndWriteShmFile(SDnode *pDnode) { if (ntype == NODE_END - 1) { len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d\n", dndProcName(ntype), pWrapper->procShm.size); } else { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dndProcName(ntype), pWrapper->procShm.size); + len += + snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dndProcName(ntype), pWrapper->procShm.size); } } len += snprintf(content + len, MAXLEN - len, "}\n"); diff --git a/source/dnode/mgmt/interface/src/dndInt.c b/source/dnode/mgmt/interface/src/dndInt.c index 5e4cd47824..fb2338e698 100644 --- a/source/dnode/mgmt/interface/src/dndInt.c +++ b/source/dnode/mgmt/interface/src/dndInt.c @@ -16,15 +16,6 @@ #define _DEFAULT_SOURCE #include "dndInt.h" -EDndRunStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } - -void dndSetStatus(SDnode *pDnode, EDndRunStatus status) { - if (pDnode->status != status) { - dDebug("dnode status set from %s to %s", dndStatName(pDnode->status), dndStatName(status)); - pDnode->status = status; - } -} - const char *dndStatName(EDndRunStatus status) { switch (status) { case DND_STAT_INIT: @@ -83,4 +74,89 @@ const char *dndEventName(EDndEvent ev) { default: return "UNKNOWN"; } -} \ No newline at end of file +} + +EDndRunStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } + +void dndSetStatus(SDnode *pDnode, EDndRunStatus status) { + if (pDnode->status != status) { + dDebug("dnode status set from %s to %s", dndStatName(pDnode->status), dndStatName(status)); + pDnode->status = status; + } +} + +void dndSetEvent(SDnode *pDnode, EDndEvent event) { + if (event == DND_EVENT_STOP) { + pDnode->event = event; + } +} + +void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { + pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; + pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; +} + +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; + SMgmtWrapper *pRetWrapper = pWrapper; + + taosRLockLatch(&pWrapper->latch); + if (pWrapper->deployed) { + int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); + dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount); + } else { + terrno = TSDB_CODE_NODE_NOT_DEPLOYED; + pRetWrapper = NULL; + } + taosRUnLockLatch(&pWrapper->latch); + + return pRetWrapper; +} + +int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) { + int32_t code = 0; + + taosRLockLatch(&pWrapper->latch); + if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) { + int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); + dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount); + } else { + terrno = TSDB_CODE_NODE_NOT_DEPLOYED; + code = -1; + } + taosRUnLockLatch(&pWrapper->latch); + + return code; +} + +void dndReleaseWrapper(SMgmtWrapper *pWrapper) { + if (pWrapper == NULL) return; + + taosRLockLatch(&pWrapper->latch); + int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); + taosRUnLockLatch(&pWrapper->latch); + dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount); +} + +void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { + SStartupReq *pStartup = &pDnode->startup; + tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); + tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); + pStartup->finished = 0; +} + +static void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { + memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq)); + pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); +} + +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { + dDebug("startup req is received"); + SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); + dndGetStartup(pDnode, pStartup); + + dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); + SRpcMsg rpcRsp = { + .handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle}; + rpcSendResponse(&rpcRsp); +} diff --git a/source/dnode/mgmt/test/sut/src/server.cpp b/source/dnode/mgmt/test/sut/src/server.cpp index c5379c6d17..332dc9327c 100644 --- a/source/dnode/mgmt/test/sut/src/server.cpp +++ b/source/dnode/mgmt/test/sut/src/server.cpp @@ -68,7 +68,7 @@ bool TestServer::Start(const char* path, const char* fqdn, uint16_t port, const } void TestServer::Stop() { - dndHandleEvent(pDnode, DND_EVENT_STOP); + dndSetEvent(pDnode, DND_EVENT_STOP); taosThreadJoin(threadId, NULL); if (pDnode != NULL) {