From 5015a6a78b252bbe523e5cd45c8a3e12f2a5df45 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 1 Apr 2022 21:27:47 +0800 Subject: [PATCH] shm --- include/util/tprocess.h | 4 +- source/dnode/mgmt/bm/src/bmInt.c | 6 +-- source/dnode/mgmt/main/inc/dnd.h | 2 +- source/dnode/mgmt/main/src/dndExec.c | 13 +++-- source/dnode/mgmt/main/src/dndTransport.c | 58 ++++++++++++----------- source/util/src/tprocess.c | 8 ++-- 6 files changed, 49 insertions(+), 42 deletions(-) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 3a47450eec..bc0c8fe5a4 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,7 +22,7 @@ extern "C" { #endif -typedef enum { PROC_REQ, PROC_RSP, PROC_REG, PROC_RELEASE } ProcFuncType; +typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; typedef struct SProcQueue SProcQueue; typedef struct SProcObj SProcObj; @@ -53,7 +53,7 @@ void taosProcCleanup(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); -int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, +void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); #ifdef __cplusplus diff --git a/source/dnode/mgmt/bm/src/bmInt.c b/source/dnode/mgmt/bm/src/bmInt.c index cbaf247a5b..4ca7afbb6d 100644 --- a/source/dnode/mgmt/bm/src/bmInt.c +++ b/source/dnode/mgmt/bm/src/bmInt.c @@ -19,11 +19,7 @@ static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index b9c760c980..255a739add 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -149,7 +149,7 @@ int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index 54cadb177a..8837da6d48 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -90,10 +90,10 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, ProcFuncType ftype) { pMsg->pCont = pCont; - dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle); + dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle); switch (ftype) { - case PROC_REG: + case PROC_REGIST: rpcRegisterBrokenLinkArg(pMsg); break; case PROC_RELEASE: @@ -101,11 +101,14 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t rpcFreeCont(pCont); break; case PROC_REQ: - // todo send to dnode dndSendReqToMnode(pWrapper, pMsg); - default: + // dndSendReq(pWrapper, (const SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); + break; + case PROC_RSP: dndSendRpcRsp(pWrapper, pMsg); break; + default: + break; } taosMemoryFree(pMsg); } @@ -325,6 +328,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { } } + dndSetStatus(pDnode, DND_STAT_RUNNING); + if (taosProcRun(pWrapper->pProc) != 0) { dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; diff --git a/source/dnode/mgmt/main/src/dndTransport.c b/source/dnode/mgmt/main/src/dndTransport.c index a87937bc8d..d463f2ba7a 100644 --- a/source/dnode/mgmt/main/src/dndTransport.c +++ b/source/dnode/mgmt/main/src/dndTransport.c @@ -319,22 +319,6 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p return 0; } -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { - if (pWrapper->procType != PROC_CHILD) { - SDnode *pDnode = pWrapper->pDnode; - if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { - terrno = TSDB_CODE_DND_OFFLINE; - dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); - return -1; - } - return dndSendRpcReq(&pDnode->trans, pEpSet, pReq); - } else { - while (taosProcPutToParentQ(pWrapper->pProc, pReq, sizeof(SRpcMsg), pReq->pCont, pReq->contLen, PROC_REQ) != 0) { - taosMsleep(1); - } - } -} - int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { SDnode *pDnode = pWrapper->pDnode; STransMgmt *pTrans = &pDnode->trans; @@ -362,13 +346,37 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { } } +int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { + SDnode *pDnode = pWrapper->pDnode; + if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { + terrno = TSDB_CODE_DND_OFFLINE; + dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); + return -1; + } + + if (pWrapper->procType != PROC_CHILD) { + return dndSendRpcReq(&pDnode->trans, pEpSet, pReq); + } else { + int32_t headLen = sizeof(SRpcMsg) + sizeof(SEpSet); + char *pHead = taosMemoryMalloc(headLen); + if (pHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(pHead, pReq, sizeof(SRpcMsg)); + memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); + + taosProcPutToParentQ(pWrapper->pProc, pReq, headLen, pReq->pCont, pReq->contLen, PROC_REQ); + taosMemoryFree(pHead); + return 0; + } +} + void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (pWrapper->procType != PROC_CHILD) { dndSendRpcRsp(pWrapper, pRsp); } else { - while (taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP) != 0) { - taosMsleep(1); - } + taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); } } @@ -376,9 +384,7 @@ void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { if (pWrapper->procType != PROC_CHILD) { rpcRegisterBrokenLinkArg(pMsg); } else { - while (taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REG) != 0) { - taosMsleep(1); - } + taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST); } } @@ -387,19 +393,17 @@ static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) rpcReleaseHandle(handle, type); } else { SRpcMsg msg = {.handle = handle, .code = type}; - while (taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE) != 0) { - taosMsleep(1); - } + taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE); } } SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { SMsgCb msgCb = { .pWrapper = pWrapper, + .sendReqFp = dndSendReq, + .sendRspFp = dndSendRsp, .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg, .releaseHandleFp = dndReleaseHandle, - .sendReqFp = dndSendReqToDnode, - .sendRspFp = dndSendRsp, }; return msgCb; } \ No newline at end of file diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 1d41bd4a48..600413068c 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -434,7 +434,9 @@ int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype); } -int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType ftype) { - return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype); +void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, + ProcFuncType ftype) { + while (taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype) != 0) { + taosMsleep(1); + } }