diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c index dbfcfd15f8..5a1b25a4ec 100644 --- a/source/dnode/mgmt/bnode/src/bmInt.c +++ b/source/dnode/mgmt/bnode/src/bmInt.c @@ -53,7 +53,7 @@ static bool bmRequire(SMgmtWrapper *pWrapper) { if (mgmt.dropped) { dInfo("bnode has been dropped and needs to be deleted"); - mndDestroy(mgmt.path); + taosRemoveDir(mgmt.path); return false; } @@ -66,7 +66,6 @@ static bool bmRequire(SMgmtWrapper *pWrapper) { void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { SDnode *pDnode = pMgmt->pDnode; - pOption->pWrapper = pMgmt->pWrapper; pOption->sendReqFp = dndSendReqToDnode; pOption->sendMnodeReqFp = dndSendReqToMnode; diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 050a437405..5de9d0938a 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -162,8 +162,7 @@ int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, cons void dndCleanupWorker(SDnodeWorker *pWorker); int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); -int32_t dndProcessCreateNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); -int32_t dndProcessDropNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); +int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index ca60b850bf..d173771d3d 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -41,8 +41,8 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); // dndExec.c -int32_t dndOpenNode(SMgmtWrapper *pWrapper); -void dndCloseNode(SMgmtWrapper *pWrapper); +int32_t dndOpenNode(SDnode *pDnode, ENodeType nodeType); +int32_t dndCloseNode(SDnode *pDnode, ENodeType nodeType); int32_t dndRun(SDnode *pDnode); // dndObj.c @@ -51,6 +51,7 @@ void dndClose(SDnode *pDnode); void dndHandleEvent(SDnode *pDnode, EDndEvent event); SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); +int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); void dndReleaseWrapper(SMgmtWrapper *pWrapper); // dndTransport.c diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index c2a2205d70..bc1359664f 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -35,37 +35,51 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) { return required; } -int32_t dndOpenNode(SMgmtWrapper *pWrapper) { +int32_t dndOpenNode(SDnode *pDnode, ENodeType ntype) { + SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); + if (pWrapper != NULL) { + dndReleaseWrapper(pWrapper); + terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; + return -1; + } + + pWrapper = &pDnode->wrappers[ntype]; int32_t code = (*pWrapper->fp.openFp)(pWrapper); if (code != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); - return -1; } else { dDebug("node:%s, has been opened", pWrapper->name); + pWrapper->deployed = true; } - pWrapper->deployed = true; - return 0; + return code; } -void dndCloseNode(SMgmtWrapper *pWrapper) { - taosWLockLatch(&pWrapper->latch); - if (pWrapper->deployed) { - (*pWrapper->fp.closeFp)(pWrapper); - pWrapper->deployed = false; +int32_t dndCloseNode(SDnode *pDnode, ENodeType ntype) { + SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); + if (pWrapper == NULL) { + terrno = TSDB_CODE_NODE_NOT_DEPLOYED; + return -1; } + + taosWLockLatch(&pWrapper->latch); + (*pWrapper->fp.closeFp)(pWrapper); + pWrapper->deployed = false; if (pWrapper->pProc) { taosProcCleanup(pWrapper->pProc); pWrapper->pProc = NULL; } taosWUnLockLatch(&pWrapper->latch); + + dndReleaseWrapper(pWrapper); + return 0; } static int32_t dndRunInSingleProcess(SDnode *pDnode) { dInfo("dnode run in single process mode"); - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; @@ -77,14 +91,14 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { dInfo("node:%s, will start in single process", pWrapper->name); pWrapper->procType = PROC_SINGLE; - if (dndOpenNode(pWrapper) != 0) { + if (dndOpenNode(pDnode, ntype) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); return -1; } } SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); - int32_t code = dmStart(pWrapper->pMgmt); + int32_t code = dmStart(pWrapper->pMgmt); if (code != 0) { dError("failed to start dnode worker since %s", terrstr()); } @@ -95,10 +109,9 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { dndCleanupServer(pDnode); - for (ENodeType n = 0; n < NODE_MAX; ++n) { - if (except == n) continue; - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - dndCloseNode(pWrapper); + for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) { + if (except == ntype) continue; + (void)dndCloseNode(pDnode, ntype); } } @@ -111,8 +124,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t int32_t code = (*msgFp)(pWrapper, pMsg); if (code != 0) { - bool isReq = (pRpc->msgType & 1U); - if (isReq) { + if (pRpc->msgType & 1U) { SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; dndSendRsp(pWrapper, &rsp); } @@ -133,8 +145,8 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t static int32_t dndRunInMultiProcess(SDnode *pDnode) { dInfo("dnode run in multi process mode"); - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; @@ -144,10 +156,10 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { return -1; } - if (n == DNODE) { + if (ntype == DNODE) { dInfo("node:%s, will start in parent process", pWrapper->name); pWrapper->procType = PROC_SINGLE; - if (dndOpenNode(pWrapper) != 0) { + if (dndOpenNode(pDnode, ntype) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); return -1; } @@ -183,10 +195,10 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { dndResetLog(pWrapper); dInfo("node:%s, clean up resources inherited from parent", pWrapper->name); - dndClearNodesExecpt(pDnode, n); + dndClearNodesExecpt(pDnode, ntype); dInfo("node:%s, will be initialized in child process", pWrapper->name); - dndOpenNode(pWrapper); + dndOpenNode(pDnode, ntype); } else { dInfo("node:%s, will not start in parent process", pWrapper->name); pWrapper->procType = PROC_PARENT; diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c index 9497c9368d..f4b763725e 100644 --- a/source/dnode/mgmt/container/src/dndMsg.c +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -16,7 +16,24 @@ #define _DEFAULT_SOURCE #include "dndInt.h" -static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { +static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { + SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); + if (pWrapper != NULL) { + dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet); + } + dndReleaseWrapper(pWrapper); +} + +static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; + if (msgFp == NULL) { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + } + + return msgFp; +} + +static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) { SRpcConnInfo connInfo = {0}; if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; @@ -25,47 +42,34 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { } memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); - pMsg->rpcMsg = *pRpc; + memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg)); return 0; } void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { - dmUpdateMnodeEpSet(dndAcquireWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet); + dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet); } int32_t code = -1; SNodeMsg *pMsg = NULL; + NodeMsgFp msgFp = NULL; - NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; - if (msgFp == NULL) { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - goto _OVER; - } - - pMsg = taosAllocateQitem(sizeof(SNodeMsg)); - if (pMsg == NULL) { - goto _OVER; - } - - if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) { - goto _OVER; - } + if (dndMarkWrapper(pWrapper) != 0) goto _OVER; + if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER; + if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER; + if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER; dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user); - if (pWrapper->procType == PROC_SINGLE) { code = (*msgFp)(pWrapper->pMgmt, pMsg); } else if (pWrapper->procType == PROC_PARENT) { code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); } else { - terrno = TSDB_CODE_MEMORY_CORRUPTED; - dError("msg:%p, won't be processed for it is child process", pMsg); } _OVER: - if (code == 0) { if (pWrapper->procType == PROC_PARENT) { dTrace("msg:%p, is freed", pMsg); @@ -74,8 +78,7 @@ _OVER: } } else { dError("msg:%p, failed to process since %s", pMsg, terrstr()); - bool isReq = (pRpc->msgType & 1U); - if (isReq) { + if (pRpc->msgType & 1U) { SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; dndSendRsp(pWrapper, &rsp); } @@ -83,57 +86,29 @@ _OVER: taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); } + + dndReleaseWrapper(pWrapper); } -static SMgmtWrapper *dndGetWrapperFromMsg(SDnode *pDnode, SNodeMsg *pMsg) { - SMgmtWrapper *pWrapper = NULL; +int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { switch (pMsg->rpcMsg.msgType) { case TDMT_DND_CREATE_MNODE: - return dndAcquireWrapper(pDnode, MNODE); - case TDMT_DND_CREATE_QNODE: - return dndAcquireWrapper(pDnode, QNODE); - case TDMT_DND_CREATE_SNODE: - return dndAcquireWrapper(pDnode, SNODE); - case TDMT_DND_CREATE_BNODE: - return dndAcquireWrapper(pDnode, BNODE); - default: - return NULL; - } -} - -int32_t dndProcessCreateNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { - SMgmtWrapper *pWrapper = dndGetWrapperFromMsg(pDnode, pMsg); - if (pWrapper->procType == PROC_SINGLE) { - switch (pMsg->rpcMsg.msgType) { - case TDMT_DND_CREATE_MNODE: - return mmProcessCreateReq(pWrapper->pMgmt, pMsg); - case TDMT_DND_CREATE_QNODE: - return qmProcessCreateReq(pWrapper->pMgmt, pMsg); - case TDMT_DND_CREATE_SNODE: - return smProcessCreateReq(pWrapper->pMgmt, pMsg); - case TDMT_DND_CREATE_BNODE: - return bmProcessCreateReq(pWrapper->pMgmt, pMsg); - default: - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - return -1; - } - } else { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - return -1; - } -} - -int32_t dndProcessDropNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { - SMgmtWrapper *pWrapper = dndGetWrapperFromMsg(pDnode, pMsg); - switch (pMsg->rpcMsg.msgType) { + return dndOpenNode(pDnode, MNODE); case TDMT_DND_DROP_MNODE: - return mmProcessDropReq(pWrapper->pMgmt, pMsg); + return dndCloseNode(pDnode, MNODE); + case TDMT_DND_CREATE_QNODE: + return dndOpenNode(pDnode, QNODE); case TDMT_DND_DROP_QNODE: - return qmProcessDropReq(pWrapper->pMgmt, pMsg); + return dndCloseNode(pDnode, QNODE); + case TDMT_DND_CREATE_SNODE: + return dndOpenNode(pDnode, SNODE); case TDMT_DND_DROP_SNODE: - return smProcessDropReq(pWrapper->pMgmt, pMsg); + return dndCloseNode(pDnode, MNODE); + case TDMT_DND_CREATE_BNODE: + return dndOpenNode(pDnode, BNODE); case TDMT_DND_DROP_BNODE: - return bmProcessDropReq(pWrapper->pMgmt, pMsg); + return dndCloseNode(pDnode, BNODE); + default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; return -1; diff --git a/source/dnode/mgmt/container/src/dndObj.c b/source/dnode/mgmt/container/src/dndObj.c index e148710c1b..e6681a446d 100644 --- a/source/dnode/mgmt/container/src/dndObj.c +++ b/source/dnode/mgmt/container/src/dndObj.c @@ -124,9 +124,8 @@ void dndClose(SDnode *pDnode) { dndCleanupServer(pDnode); dndCleanupClient(pDnode); - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - dndCloseNode(pWrapper); + for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) { + (void)dndCloseNode(pDnode, ntype); } dndClearMemory(pDnode); @@ -138,8 +137,8 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) { pDnode->event = event; } -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pRetWrapper = pWrapper; taosRLockLatch(&pWrapper->latch); @@ -155,6 +154,22 @@ SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType) { return pRetWrapper; } +int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) { + int32_t code = 0; + + taosRLockLatch(&pWrapper->latch); + if (pWrapper->deployed) { + int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); + dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount); + } else { + terrno = TSDB_CODE_NODE_NOT_DEPLOYED; + code = -1; + } + taosRUnLockLatch(&pWrapper->latch); + + return code; +} + void dndReleaseWrapper(SMgmtWrapper *pWrapper) { if (pWrapper == NULL) return; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index c135473d25..9f8e30e11b 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -63,13 +63,12 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { case TDMT_DND_CREATE_QNODE: case TDMT_DND_CREATE_SNODE: case TDMT_DND_CREATE_BNODE: - code = dndProcessCreateNodeMsg(pMgmt->pDnode, pMsg); - case TDMT_DND_DROP_MNODE: case TDMT_DND_DROP_QNODE: case TDMT_DND_DROP_SNODE: case TDMT_DND_DROP_BNODE: - code = dndProcessDropNodeMsg(pMgmt->pDnode, pMsg); + code = dndProcessNodeMsg(pMgmt->pDnode, pMsg); + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; code = -1;