From ac6b121348d2e772be9984146e5eff9b42fe740e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 29 Mar 2022 13:39:55 +0800 Subject: [PATCH] shm --- include/common/tmsgcb.h | 3 +- include/util/tprocess.h | 2 +- source/common/src/tmsgcb.c | 8 ++- source/dnode/mgmt/container/inc/dnd.h | 3 ++ source/dnode/mgmt/container/src/dndExec.c | 11 +++- .../dnode/mgmt/container/src/dndTransport.c | 51 ++++++++++++------- source/libs/qworker/src/qworker.c | 4 +- 7 files changed, 57 insertions(+), 25 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 50ff735f86..3ccbbf4c47 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -56,13 +56,14 @@ typedef struct { ReleaseHandleFp releaseHandleFp; } SMsgCb; +void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); -void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type); +void tmsgReleaseHandle(void* handle, int8_t type); #ifdef __cplusplus } diff --git a/include/util/tprocess.h b/include/util/tprocess.h index be6d58615e..51ce0243b7 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,7 +22,7 @@ extern "C" { #endif -typedef enum { PROC_REQ, PROC_RSP, PROC_REGISTER } ProcFuncType; +typedef enum { PROC_REQ, PROC_RSP, PROC_REG, PROC_RELEASE } ProcFuncType; typedef struct SProcQueue SProcQueue; typedef struct SProcObj SProcObj; diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 2b68aab95a..857a501625 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -16,6 +16,10 @@ #define _DEFAULT_SOURCE #include "tmsgcb.h" +static SMsgCb tsDefaultMsgCb; + +void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } + int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); } @@ -38,6 +42,6 @@ void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); } -void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type) { - (*pMsgCb->releaseHandleFp)(pMsgCb->pWrapper, handle, type); +void tmsgReleaseHandle(void* handle, int8_t type) { + (*tsDefaultMsgCb.releaseHandleFp)(tsDefaultMsgCb.pWrapper, handle, type); } \ No newline at end of file diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 3cdde09532..4c6a9cf1ab 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -33,6 +33,7 @@ #include "tthread.h" #include "ttime.h" #include "tworker.h" +#include "tmsgcb.h" #include "dnode.h" #include "monitor.h" @@ -140,6 +141,8 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type); +SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index 412fa388b6..56e64d3aed 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -77,6 +77,8 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; + SMsgCb msgCb = dndCreateMsgcb(pWrapper); + tmsgSetDefaultMsgCb(&msgCb); if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -146,9 +148,13 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t pMsg->ahandle); switch (ftype) { - case PROC_REGISTER: + case PROC_REG: rpcRegisterBrokenLinkArg(pMsg); break; + case PROC_RELEASE: + rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); + rpcFreeCont(pCont); + break; default: dndSendRpcRsp(pWrapper, pMsg); break; @@ -164,6 +170,9 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; + SMsgCb msgCb = dndCreateMsgcb(pWrapper); + tmsgSetDefaultMsgCb(&msgCb); + if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index baf25b73d6..626dd6babb 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -363,29 +363,44 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { } void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { - if (pWrapper->procType == PROC_CHILD) { - int32_t code = -1; - do { - code = taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); - if (code != 0) { - taosMsleep(10); - } - } while (code != 0); - } else { + if (pWrapper->procType != PROC_CHILD) { dndSendRpcRsp(pWrapper, pRsp); + } else { + while (taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP) != 0) { + taosMsleep(1); + } } } void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { - if (pWrapper->procType == PROC_CHILD) { - int32_t code = -1; - do { - code = taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGISTER); - if (code != 0) { - taosMsleep(10); - } - } while (code != 0); - } else { + if (pWrapper->procType != PROC_CHILD) { rpcRegisterBrokenLinkArg(pMsg); + } else { + while (taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REG) != 0) { + taosMsleep(1); + } } +} + +void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { + if (pWrapper->procType != PROC_CHILD) { + rpcReleaseHandle(handle, type); + } else { + SRpcMsg msg = {.handle = handle, .code = type}; + while (taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE) != 0) { + taosMsleep(1); + } + } +} + +SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { + SMsgCb msgCb = { + .pWrapper = pWrapper, + .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg, + .releaseHandleFp = dndReleaseHandle, + .sendMnodeReqFp = dndSendReqToMnode, + .sendReqFp = dndSendReqToDnode, + .sendRspFp = dndSendRsp, + }; + return msgCb; } \ No newline at end of file diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 133a80b154..7612ca5704 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -432,7 +432,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { - rpcReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER); + tmsgReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER); ctx->connInfo.handle = NULL; qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); @@ -1282,7 +1282,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_LOCK(QW_WRITE, &sch->hbConnLock); if (sch->hbConnInfo.handle) { - rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); + tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); } memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));