From 182a5ee4b5a0d8742c180e2432c6ec4c94cfbd38 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 29 Mar 2022 11:37:29 +0800 Subject: [PATCH] shm --- include/common/tmsgcb.h | 14 +++++++++----- include/libs/transport/trpc.h | 18 +++++++++--------- include/util/tprocess.h | 4 ++-- source/common/src/tmsgcb.c | 8 ++++++-- source/dnode/mgmt/container/inc/dnd.h | 4 ++-- source/dnode/mgmt/container/inc/dndInt.h | 2 +- source/dnode/mgmt/container/src/dndTransport.c | 8 ++++---- source/dnode/mgmt/dnode/inc/dm.h | 2 +- source/dnode/mgmt/dnode/src/dmInt.c | 2 +- source/util/src/tprocess.c | 12 ++++++------ 10 files changed, 41 insertions(+), 33 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 192b4cebbc..50ff735f86 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SRpcMsg SRpcMsg; typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; + typedef enum { QUERY_QUEUE, FETCH_QUEUE, @@ -38,10 +39,11 @@ typedef enum { typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); -typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); +typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); -typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); -typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg *pMsg); +typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp); +typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); +typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type); typedef struct { SMgmtWrapper* pWrapper; @@ -51,14 +53,16 @@ typedef struct { SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp; + ReleaseHandleFp releaseHandleFp; } SMsgCb; int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); -int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); +int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); -void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); +void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); +void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type); #ifdef __cplusplus } diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 54813a77a3..157e0cb721 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -107,20 +107,20 @@ void rpcClose(void *); void * rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void * rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +// Because taosd supports multi-process mode +// These functions should not be used on the server side +// Please use tmsg functions, which are defined in tmsgcb.h +void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); +void rpcRegisterBrokenLinkArg(SRpcMsg *msg); +void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock + +// These functions will not be called in the child process void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); +void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcReportProgress(void *pConn, char *pCont, int contLen); -void rpcCancelRequest(int64_t rid); -void rpcRegisterBrokenLinkArg(SRpcMsg *msg); -// just release client conn to rpc instance, no close sock -void rpcReleaseHandle(void *handle, int8_t type); // -void rpcRefHandle(void *handle, int8_t type); -void rpcUnrefHandle(void *handle, int8_t type); #ifdef __cplusplus } diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 052a6619c8..be6d58615e 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -55,9 +55,9 @@ int32_t taosProcRun(SProcObj *pProc); void taosProcStop(SProcObj *pProc); bool taosProcIsChild(SProcObj *pProc); int32_t taosProcChildId(SProcObj *pProc); -int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, +int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); -int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, +int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); #ifdef __cplusplus diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index 328a0c2b0c..2b68aab95a 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -24,7 +24,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype); } -int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) { +int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); } @@ -32,8 +32,12 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); } -void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } +void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); +} + +void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type) { + (*pMsgCb->releaseHandleFp)(pMsgCb->pWrapper, handle, type); } \ No newline at end of file diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 6968ae4609..3cdde09532 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -137,8 +137,8 @@ void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc void dndSendMonitorReport(SDnode *pDnode); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); -void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); +int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); +void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index d10835b67f..0f0cc78a1d 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -56,7 +56,7 @@ void dndCleanupServer(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode); -void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); +void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 797e2535d6..baf25b73d6 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -309,7 +309,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { return 0; } -static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { +static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *pReq) { if (pMgmt->clientRpc == NULL) { terrno = TSDB_CODE_DND_OFFLINE; return -1; @@ -319,7 +319,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { return 0; } -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pReq) { +int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { if (pWrapper->procType == PROC_CHILD) { } else { SDnode *pDnode = pWrapper->pDnode; @@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { } } -void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { +void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) { SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); if (pDnodeWrapper != NULL) { @@ -362,7 +362,7 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { } } -void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { +void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (pWrapper->procType == PROC_CHILD) { int32_t code = -1; do { diff --git a/source/dnode/mgmt/dnode/inc/dm.h b/source/dnode/mgmt/dnode/inc/dm.h index 6c18d7969c..3984e6dbd4 100644 --- a/source/dnode/mgmt/dnode/inc/dm.h +++ b/source/dnode/mgmt/dnode/inc/dm.h @@ -29,7 +29,7 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper); void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); -void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); +void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c index 53049f7e78..b729888a72 100644 --- a/source/dnode/mgmt/dnode/src/dmInt.c +++ b/source/dnode/mgmt/dnode/src/dmInt.c @@ -54,7 +54,7 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd taosRUnLockLatch(&pMgmt->latch); } -void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) { +void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pReq) { SDnode *pDnode = pMgmt->pDnode; SEpSet epSet = {0}; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 4dce5f560f..2675c6cdf2 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -207,8 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { } } -static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int16_t rawHeadLen, char *pBody, int32_t rawBodyLen, - ProcFuncType funcType) { +static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, + int32_t rawBodyLen, ProcFuncType funcType) { const int32_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; @@ -471,12 +471,12 @@ void taosProcCleanup(SProcObj *pProc) { } } -int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, - ProcFuncType funcType) { +int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, + ProcFuncType funcType) { return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType); } -int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, - ProcFuncType funcType) { +int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, + ProcFuncType funcType) { return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType); }