From 43cf0a99287bfd02866f04905fa3aff671dda456 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Mar 2022 19:58:29 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/container/inc/dnd.h | 4 +- source/dnode/mgmt/container/inc/dndInt.h | 1 + source/dnode/mgmt/container/src/dndNode.c | 28 ----------- .../dnode/mgmt/container/src/dndTransport.c | 47 +++++++++++++++---- source/dnode/mgmt/dnode/src/dmInt.c | 3 +- source/dnode/mgmt/dnode/src/dmMsg.c | 3 +- source/dnode/mgmt/dnode/src/dmWorker.c | 2 +- 7 files changed, 45 insertions(+), 43 deletions(-) diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index ac2ad2ac8f..db33420631 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -147,11 +147,11 @@ void dndSetStatus(SDnode *pDnode, EDndStatus stat); SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType); void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp); void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); +void dndSendMonitorReport(SDnode *pDnode); -void dndSendMonitorReport(SDnode *pDnode); int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pMsg); -void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); +void dndSendRsp(void *pWrapper, SRpcMsg *pRsp); int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, int32_t maxNum, void *queueFp); diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 4960b022f4..f87c4de244 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -51,6 +51,7 @@ void dndCleanupServer(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode); +void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index 709e44faf9..7263268075 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -219,34 +219,6 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { } } -static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { - if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) { - dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp); - } else { - rpcSendResponse(pRsp); - } -} - -void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { - int32_t code = -1; - - if (pWrapper->procType != PROC_CHILD) { - dndSendRpcRsp(pWrapper, pRsp); - } else { - do { - code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen); - if (code != 0) { - taosMsleep(10); - } - } while (code != 0); - } -} - -void dndSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { - pRsp->code = TSDB_CODE_APP_NOT_READY; - dndSendRsp(pWrapper, pRsp); -} - static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { dTrace("msg:%p, get from child queue", pMsg); SRpcMsg *pRpc = &pMsg->rpcMsg; diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 43e49678cb..bad0bc2bee 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -269,7 +269,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { return 0; } -static int32_t dndSetReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { +static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { if (pMgmt->clientRpc == NULL) { terrno = TSDB_CODE_DND_OFFLINE; return -1; @@ -281,16 +281,47 @@ static int32_t dndSetReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) { SMgmtWrapper *pWrapper = wrapper; - STransMgmt *pTrans = &pWrapper->pDnode->trans; - return dndSetReq(pTrans, pEpSet, pReq); + + if (pWrapper->procType == PROC_CHILD) { + } else { + STransMgmt *pTrans = &pWrapper->pDnode->trans; + return dndSendRpcReq(pTrans, pEpSet, pReq); + } } int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) { SMgmtWrapper *pWrapper = wrapper; - SDnode *pDnode = pWrapper->pDnode; - STransMgmt *pTrans = &pDnode->trans; - SEpSet epSet = {0}; - dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet); - return dndSetReq(pTrans, &epSet, pReq); + if (pWrapper->procType == PROC_CHILD) { + } else { + SDnode *pDnode = pWrapper->pDnode; + STransMgmt *pTrans = &pDnode->trans; + SEpSet epSet = {0}; + dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet); + return dndSendRpcReq(pTrans, &epSet, pReq); + } +} + +void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { + if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) { + dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp); + } else { + rpcSendResponse(pRsp); + } +} + +void dndSendRsp(void *wrapper, SRpcMsg *pRsp) { + SMgmtWrapper *pWrapper = wrapper; + + if (pWrapper->procType == PROC_CHILD) { + int32_t code = -1; + do { + code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen); + if (code != 0) { + taosMsleep(10); + } + } while (code != 0); + } else { + dndSendRpcRsp(pWrapper, pRsp); + } } diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c index f98c91fc6d..df82526ccd 100644 --- a/source/dnode/mgmt/dnode/src/dmInt.c +++ b/source/dnode/mgmt/dnode/src/dmInt.c @@ -59,12 +59,11 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { SDnode *pDnode = pWrapper->pDnode; - tmsg_t msgType = pReq->msgType; SEpSet epSet = {0}; dmGetMnodeEpSet(pWrapper, &epSet); - dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); + dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); if (strcmp(epSet.eps[i].fqdn, pDnode->localFqdn) == 0 && epSet.eps[i].port == pDnode->serverPort) { diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index 22551974d2..dbbc8f0411 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -53,7 +53,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { pMgmt->statusSent = 1; dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle); - dndSendReqToMnode(pMgmt->dnodeEps, &rpcMsg); + dndSendReqToMnode(pMgmt->pWrapper, &rpcMsg); } static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { @@ -115,7 +115,6 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { void dmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg); diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index ecb0b6a09d..4f440739cb 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -154,4 +154,4 @@ int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); return dndWriteMsgToWorker(pWorker, pMsg, 0); -} \ No newline at end of file +}