From 2eb30ee0f0a6e486a6b3c87471b2e4d7ca4f8b8c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 2 Apr 2022 18:08:38 +0800 Subject: [PATCH] refact transport --- source/dnode/mgmt/dm/inc/dmInt.h | 1 + source/dnode/mgmt/dm/src/dmInt.c | 14 +-- source/dnode/mgmt/dm/src/dmMsg.c | 4 +- source/dnode/mgmt/main/inc/dnd.h | 12 +-- source/dnode/mgmt/main/inc/dndInt.h | 1 - source/dnode/mgmt/main/src/dndExec.c | 63 ----------- source/dnode/mgmt/main/src/dndInt.c | 1 + source/dnode/mgmt/main/src/dndTransport.c | 125 ++++++++++++++++------ 8 files changed, 106 insertions(+), 115 deletions(-) diff --git a/source/dnode/mgmt/dm/inc/dmInt.h b/source/dnode/mgmt/dm/inc/dmInt.h index 3036d1f5ad..79d7e926ce 100644 --- a/source/dnode/mgmt/dm/inc/dmInt.h +++ b/source/dnode/mgmt/dm/inc/dmInt.h @@ -33,6 +33,7 @@ typedef struct SDnodeMgmt { SRWLatch latch; SSingleWorker mgmtWorker; SSingleWorker statusWorker; + SMsgCb msgCb; const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; diff --git a/source/dnode/mgmt/dm/src/dmInt.c b/source/dnode/mgmt/dm/src/dmInt.c index 3c5f394d5e..8c46c402ab 100644 --- a/source/dnode/mgmt/dm/src/dmInt.c +++ b/source/dnode/mgmt/dm/src/dmInt.c @@ -112,17 +112,14 @@ int32_t dmInit(SMgmtWrapper *pWrapper) { return -1; } - if (dndInitServer(pDnode) != 0) { - dError("failed to init trans server since %s", terrstr()); - return -1; - } - - if (dndInitClient(pDnode) != 0) { - dError("failed to init trans client since %s", terrstr()); + if (dndInitTrans(pDnode) != 0) { + dError("failed to init transport since %s", terrstr()); return -1; } pWrapper->pMgmt = pMgmt; + pMgmt->msgCb = dndCreateMsgcb(pWrapper); + dInfo("dnode-mgmt is initialized"); return 0; } @@ -151,8 +148,7 @@ void dmCleanup(SMgmtWrapper *pWrapper) { taosMemoryFree(pMgmt); pWrapper->pMgmt = NULL; - dndCleanupServer(pDnode); - dndCleanupClient(pDnode); + dndCleanupTrans(pDnode); dInfo("dnode-mgmt is cleaned up"); } diff --git a/source/dnode/mgmt/dm/src/dmMsg.c b/source/dnode/mgmt/dm/src/dmMsg.c index 3795cffbfa..02a288e2fd 100644 --- a/source/dnode/mgmt/dm/src/dmMsg.c +++ b/source/dnode/mgmt/dm/src/dmMsg.c @@ -57,7 +57,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { pMgmt->statusSent = 1; dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle); - dndSendReqToMnode(pMgmt->pWrapper, &rpcMsg); + SEpSet epSet = {0}; + dmGetMnodeEpSet(pMgmt, &epSet); + tmsgSendReq(&pMgmt->msgCb, &epSet, &rpcMsg); } static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index cf33787108..d948aa305c 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -92,6 +92,7 @@ typedef struct SMgmtWrapper { char *path; int32_t refCount; SRWLatch latch; + ENodeType ntype; bool deployed; bool required; EProcType procType; @@ -160,13 +161,10 @@ const char *dndNodeProcStr(ENodeType ntype); const char *dndEventStr(EDndEvent ev); // dndTransport.h -int32_t dndInitServer(SDnode *pDnode); -void dndCleanupServer(SDnode *pDnode); -int32_t dndInitClient(SDnode *pDnode); -void dndCleanupClient(SDnode *pDnode); -int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); -SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); +int32_t dndInitTrans(SDnode *pDnode); +void dndCleanupTrans(SDnode *pDnode); +SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); +SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/main/inc/dndInt.h b/source/dnode/mgmt/main/inc/dndInt.h index 0c4ada1df3..b4d9de5184 100644 --- a/source/dnode/mgmt/main/inc/dndInt.h +++ b/source/dnode/mgmt/main/inc/dndInt.h @@ -49,7 +49,6 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); // dndTransport.c int32_t dndInitMsgHandle(SDnode *pDnode); -void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); // dndFile.c TdFilePtr dndCheckRunning(const char *dataDir); diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index 3bbdb1a686..a98c0954fe 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -65,53 +65,6 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { dDebug("node:%s, mgmt has been closed", pWrapper->name); } -static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, - ProcFuncType ftype) { - SRpcMsg *pRpc = &pMsg->rpcMsg; - pRpc->pCont = pCont; - dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle); - - NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; - int32_t code = (*msgFp)(pWrapper, pMsg); - - if (code != 0) { - dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - if (pRpc->msgType & 1U) { - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - tmsgSendRsp(&rsp); - } - - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pCont); - } -} - -static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, - ProcFuncType ftype) { - pMsg->pCont = pCont; - dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle); - - switch (ftype) { - case PROC_REGIST: - rpcRegisterBrokenLinkArg(pMsg); - break; - case PROC_RELEASE: - rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); - rpcFreeCont(pCont); - break; - case PROC_REQ: - dndSendReqToMnode(pWrapper, pMsg); - // dndSendReq(pWrapper, (const SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); - break; - case PROC_RSP: - dndSendRpcRsp(pWrapper, pMsg); - break; - default: - break; - } - taosMemoryFree(pMsg); -} static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { char tstr[8] = {0}; @@ -135,22 +88,6 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { return 0; } -static SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { - SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, - .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, - .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, - .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .shm = pWrapper->shm, - .pParent = pWrapper, - .name = pWrapper->name}; - return cfg; -} static int32_t dndRunInSingleProcess(SDnode *pDnode) { dInfo("dnode run in single process"); diff --git a/source/dnode/mgmt/main/src/dndInt.c b/source/dnode/mgmt/main/src/dndInt.c index 25e4d89c5b..5e4b0a1f7b 100644 --- a/source/dnode/mgmt/main/src/dndInt.c +++ b/source/dnode/mgmt/main/src/dndInt.c @@ -95,6 +95,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { pWrapper->path = strdup(path); pWrapper->shm.id = -1; pWrapper->pDnode = pDnode; + pWrapper->ntype = n; if (pWrapper->path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; diff --git a/source/dnode/mgmt/main/src/dndTransport.c b/source/dnode/mgmt/main/src/dndTransport.c index f863db2a0c..cefa77890d 100644 --- a/source/dnode/mgmt/main/src/dndTransport.c +++ b/source/dnode/mgmt/main/src/dndTransport.c @@ -61,7 +61,7 @@ static void dndProcessResponse(SDnode *pDnode, SRpcMsg *pRsp, SEpSet *pEpSet) { } } -int32_t dndInitClient(SDnode *pDnode) { +static int32_t dndInitClient(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->trans; SRpcInit rpcInit; @@ -91,7 +91,7 @@ int32_t dndInitClient(SDnode *pDnode) { return 0; } -void dndCleanupClient(SDnode *pDnode) { +static void dndCleanupClient(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->trans; if (pMgmt->clientRpc) { rpcClose(pMgmt->clientRpc); @@ -227,7 +227,7 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch return rpcRsp.code; } -int32_t dndInitServer(SDnode *pDnode) { +static int32_t dndInitServer(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->trans; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); @@ -257,7 +257,7 @@ int32_t dndInitServer(SDnode *pDnode) { return 0; } -void dndCleanupServer(SDnode *pDnode) { +static void dndCleanupServer(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->trans; if (pMgmt->serverRpc) { rpcClose(pMgmt->serverRpc); @@ -266,6 +266,17 @@ void dndCleanupServer(SDnode *pDnode) { } } +int32_t dndInitTrans(SDnode *pDnode) { + if (dndInitServer(pDnode) != 0) return -1; + if (dndInitClient(pDnode) != 0) return -1; + return 0; +} + +void dndCleanupTrans(SDnode *pDnode) { + dndCleanupServer(pDnode); + dndCleanupClient(pDnode); +} + int32_t dndInitMsgHandle(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->trans; @@ -315,46 +326,28 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p return 0; } -int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { - SDnode *pDnode = pWrapper->pDnode; - STransMgmt *pTrans = &pDnode->trans; - SEpSet epSet = {0}; - - SMgmtWrapper *pWrapper2 = dndAcquireWrapper(pDnode, DNODE); - if (pWrapper2 != NULL) { - dmGetMnodeEpSet(pWrapper2->pMgmt, &epSet); - dndReleaseWrapper(pWrapper2); - } - return dndSendRpcReq(pTrans, &epSet, pReq); -} - -void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { +static void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) { - SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); - if (pDnodeWrapper != NULL) { - dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp); - dndReleaseWrapper(pDnodeWrapper); - } else { - rpcSendResponse(pRsp); + if (pWrapper->ntype == MNODE) { + dmSendRedirectRsp(pWrapper->pMgmt, pRsp); + return; } - } else { - rpcSendResponse(pRsp); } + + rpcSendResponse(pRsp); } -int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { - SDnode *pDnode = pWrapper->pDnode; - if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { +static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { + if (dndGetStatus(pWrapper->pDnode) != DND_STAT_RUNNING) { terrno = TSDB_CODE_DND_OFFLINE; dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); return -1; } if (pWrapper->procType != PROC_CHILD) { - return dndSendRpcReq(&pDnode->trans, pEpSet, pReq); + return dndSendRpcReq(&pWrapper->pDnode->trans, pEpSet, pReq); } else { - int32_t headLen = sizeof(SRpcMsg) + sizeof(SEpSet); - char *pHead = taosMemoryMalloc(headLen); + char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet)); if (pHead == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -362,8 +355,8 @@ int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) memcpy(pHead, pReq, sizeof(SRpcMsg)); memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); - - taosProcPutToParentQ(pWrapper->pProc, pReq, headLen, pReq->pCont, pReq->contLen, PROC_REQ); + taosProcPutToParentQ(pWrapper->pProc, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen, + PROC_REQ); taosMemoryFree(pHead); return 0; } @@ -404,3 +397,67 @@ SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { }; return msgCb; } + +static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, + ProcFuncType ftype) { + SRpcMsg *pRpc = &pMsg->rpcMsg; + pRpc->pCont = pCont; + dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle); + + NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; + int32_t code = (*msgFp)(pWrapper, pMsg); + + if (code != 0) { + dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + if (pRpc->msgType & 1U) { + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; + dndSendRsp(pWrapper, &rsp); + } + + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pCont); + } +} + +static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, + ProcFuncType ftype) { + pMsg->pCont = pCont; + dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle); + + switch (ftype) { + case PROC_REGIST: + rpcRegisterBrokenLinkArg(pMsg); + break; + case PROC_RELEASE: + rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); + rpcFreeCont(pCont); + break; + case PROC_REQ: + dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); + break; + case PROC_RSP: + dndSendRpcRsp(pWrapper, pMsg); + break; + default: + break; + } + taosMemoryFree(pMsg); +} + +SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = pWrapper->shm, + .pParent = pWrapper, + .name = pWrapper->name}; + return cfg; +} \ No newline at end of file