diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index 7263268075..6c972e96ba 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -202,7 +202,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { } SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); - if (dmStart(pWrapper) != 0) { + if (dmStart(pWrapper->pMgmt) != 0) { dError("failed to start dnode worker since %s", terrstr()); return -1; } @@ -372,7 +372,7 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { - dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE), pEpSet); + dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet); } int32_t code = -1; @@ -396,7 +396,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user); if (pWrapper->procType == PROC_SINGLE) { - code = (*msgFp)(pWrapper, pMsg); + code = (*msgFp)(pWrapper->pMgmt, pMsg); } else if (pWrapper->procType == PROC_PARENT) { code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); } else { diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index bad0bc2bee..91bf0eda87 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -131,7 +131,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp STransMgmt *pMgmt = &pDnode->trans; SEpSet epSet = {0}; - dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet); + dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE)->pMgmt, &epSet); rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); } @@ -297,14 +297,15 @@ int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) { SDnode *pDnode = pWrapper->pDnode; STransMgmt *pTrans = &pDnode->trans; SEpSet epSet = {0}; - dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet); + dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE)->pMgmt, &epSet); return dndSendRpcReq(pTrans, &epSet, pReq); } } void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) { - dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp); + SMgmtWrapper *pDnodeWrapper = dndGetWrapper(pWrapper->pDnode, DNODE); + dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp); } else { rpcSendResponse(pRsp); } diff --git a/source/dnode/mgmt/dnode/inc/dm.h b/source/dnode/mgmt/dnode/inc/dm.h index 05de6e0436..8228b2df5f 100644 --- a/source/dnode/mgmt/dnode/inc/dm.h +++ b/source/dnode/mgmt/dnode/inc/dm.h @@ -22,13 +22,15 @@ extern "C" { #endif +typedef struct SDnodeMgmt SDnodeMgmt; + void dmGetMgmtFp(SMgmtWrapper *pWrapper); void dmInitMsgHandles(SMgmtWrapper *pWrapper); -void dmGetMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet); -void dmUpdateMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet); -void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t dmStart(SMgmtWrapper *pWrapper); +void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); +void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); +void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t dmStart(SDnodeMgmt *pMgmt); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c index df82526ccd..c5867789b2 100644 --- a/source/dnode/mgmt/dnode/src/dmInt.c +++ b/source/dnode/mgmt/dnode/src/dmInt.c @@ -16,19 +16,16 @@ #define _DEFAULT_SOURCE #include "dmInt.h" -void dmGetMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; +void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet) { taosRLockLatch(&pMgmt->latch); *pEpSet = pMgmt->mnodeEpSet; taosRUnLockLatch(&pMgmt->latch); } -void dmUpdateMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet) { +void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet) { dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); - SDnodeMgmt *pMgmt = pWrapper->pMgmt; taosWLockLatch(&pMgmt->latch); - pMgmt->mnodeEpSet = *pEpSet; for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); @@ -57,11 +54,11 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd taosRUnLockLatch(&pMgmt->latch); } -void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { - SDnode *pDnode = pWrapper->pDnode; +void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) { + SDnode *pDnode = pMgmt->pDnode; SEpSet epSet = {0}; - dmGetMnodeEpSet(pWrapper, &epSet); + dmGetMnodeEpSet(pMgmt, &epSet); dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { @@ -76,10 +73,7 @@ void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { rpcSendRedirectRsp(pReq->handle, &epSet); } -int32_t dmStart(SMgmtWrapper *pWrapper) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; - return dmStartWorker(pMgmt); -} +int32_t dmStart(SDnodeMgmt *pMgmt) { return dmStartWorker(pMgmt); } int32_t dmInit(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 4f440739cb..b64d1ea813 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -52,47 +52,57 @@ static void *dmThreadRoutine(void *param) { } } -static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pMsg) { - int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; +static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; + SDnode *pDnode = pMgmt->pDnode; + SMgmtWrapper *pWrapper = NULL; dTrace("msg:%p, will be processed", pMsg); switch (msgType) { case TDMT_DND_CREATE_MNODE: - code = mmProcessCreateReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg); + pWrapper = dndGetWrapper(pDnode, MNODE); + code = mmProcessCreateReq(pWrapper->pMgmt, pMsg); break; case TDMT_DND_DROP_MNODE: - code = mmProcessDropReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg); + pWrapper = dndGetWrapper(pDnode, MNODE); + code = mmProcessDropReq(pWrapper->pMgmt, pMsg); break; case TDMT_DND_CREATE_QNODE: - code = qmProcessCreateReq(dndGetWrapper(pDnode, QNODE)->pMgmt, pMsg); + pWrapper = dndGetWrapper(pDnode, QNODE); + code = qmProcessCreateReq(pWrapper->pMgmt, pMsg); break; case TDMT_DND_DROP_QNODE: - code = qmProcessDropReq(dndGetWrapper(pDnode, QNODE)->pMgmt, pMsg); + pWrapper = dndGetWrapper(pDnode, QNODE); + code = qmProcessDropReq(pWrapper->pMgmt, pMsg); break; case TDMT_DND_CREATE_SNODE: - code = smProcessCreateReq(dndGetWrapper(pDnode, SNODE)->pMgmt, pMsg); + pWrapper = dndGetWrapper(pDnode, SNODE); + code = smProcessCreateReq(pWrapper->pMgmt, pMsg); break; case TDMT_DND_DROP_SNODE: - code = smProcessDropReq(dndGetWrapper(pDnode, SNODE)->pMgmt, pMsg); + pWrapper = dndGetWrapper(pDnode, SNODE); + code = smProcessDropReq(pWrapper->pMgmt, pMsg); break; case TDMT_DND_CREATE_BNODE: - code = bmProcessCreateReq(dndGetWrapper(pDnode, BNODE)->pMgmt, pMsg); + pWrapper = dndGetWrapper(pDnode, BNODE); + code = bmProcessCreateReq(pWrapper->pMgmt, pMsg); break; case TDMT_DND_DROP_BNODE: - code = bmProcessDropReq(dndGetWrapper(pDnode, BNODE)->pMgmt, pMsg); + pWrapper = dndGetWrapper(pDnode, BNODE); + code = bmProcessDropReq(pWrapper->pMgmt, pMsg); break; case TDMT_DND_CONFIG_DNODE: - code = dmProcessConfigReq(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); + code = dmProcessConfigReq(pMgmt, pMsg); break; case TDMT_MND_STATUS_RSP: - code = dmProcessStatusRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); + code = dmProcessStatusRsp(pMgmt, pMsg); break; case TDMT_MND_AUTH_RSP: - code = dmProcessAuthRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); + code = dmProcessAuthRsp(pMgmt, pMsg); break; case TDMT_MND_GRANT_RSP: - code = dmProcessGrantRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); + code = dmProcessGrantRsp(pMgmt, pMsg); break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; @@ -114,14 +124,12 @@ static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { - SDnode *pDnode = pMgmt->pDnode; - - if (dndInitWorker(pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) { + if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) { + if (dndInitWorker(pMgmt, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; }