From 31cfa1fa5b55fbc5e0f71c40da523543cd45c546 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 20 Jul 2022 14:14:46 +0800 Subject: [PATCH] fix: avoid rpc mem leak --- include/libs/transport/trpc.h | 18 +++++++--------- source/libs/transport/inc/transComm.h | 14 ++++++------ source/libs/transport/src/trans.c | 31 +++++++++++---------------- source/libs/transport/src/transCli.c | 29 ++++++++++++++----------- source/libs/transport/src/transSvr.c | 23 ++++++++++---------- 5 files changed, 56 insertions(+), 59 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 2ae1f7b854..50f9959177 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -124,18 +124,16 @@ void *rpcReallocCont(void *ptr, int32_t contLen); // Because taosd supports multi-process mode // These functions should not be used on the server side // Please use tmsg functions, which are defined in tmsgcb.h -void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); -void rpcSendResponse(const SRpcMsg *pMsg); -void rpcRegisterBrokenLinkArg(SRpcMsg *msg); -void rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock +int rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +int rpcSendResponse(const SRpcMsg *pMsg); +int rpcRegisterBrokenLinkArg(SRpcMsg *msg); +int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock // These functions will not be called in the child process -void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); -void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); -int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); -void* rpcAllocHandle(); +int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); +void *rpcAllocHandle(); #ifdef __cplusplus } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 2972f512f1..8af3c8b7fd 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -289,14 +289,14 @@ void transUnrefSrvHandle(void* handle); void transRefCliHandle(void* handle); void transUnrefCliHandle(void* handle); -void transReleaseCliHandle(void* handle); -void transReleaseSrvHandle(void* handle); +int transReleaseCliHandle(void* handle); +int transReleaseSrvHandle(void* handle); -void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); -void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); -void transSendResponse(const STransMsg* msg); -void transRegisterMsg(const STransMsg* msg); -void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); +int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); +int transSendResponse(const STransMsg* msg); +int transRegisterMsg(const STransMsg* msg); +int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); int64_t transAllocHandle(); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 79f2f17a6e..7633820292 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -25,7 +25,7 @@ void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; -void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; +int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle}; static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { *ip = taosGetIpv4FromFqdn(localFqdn); @@ -129,25 +129,20 @@ void* rpcReallocCont(void* ptr, int32_t contLen) { return st + TRANS_MSG_OVERHEAD; } -void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { - // deprecated api - assert(0); -} - int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; } void rpcCancelRequest(int64_t rid) { return; } -void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { - transSendRequest(shandle, pEpSet, pMsg, NULL); +int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { + return transSendRequest(shandle, pEpSet, pMsg, NULL); } -void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { - transSendRequest(shandle, pEpSet, pMsg, pCtx); +int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) { + return transSendRequest(shandle, pEpSet, pMsg, pCtx); } -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { - transSendRecv(shandle, pEpSet, pMsg, pRsp); +int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { + return transSendRecv(shandle, pEpSet, pMsg, pRsp); } -void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } +int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); @@ -159,15 +154,15 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } -void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { transRegisterMsg(msg); } -void rpcReleaseHandle(void* handle, int8_t type) { +int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); } +int rpcReleaseHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); - (*transReleaseHandle[type])(handle); + return (*transReleaseHandle[type])(handle); } -void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { +int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { // later - transSetDefaultAddr(thandle, ip, fqdn); + return transSetDefaultAddr(thandle, ip, fqdn); } void* rpcAllocHandle() { return (void*)transAllocHandle(); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index efb2434779..0a4b7ed9ab 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1224,13 +1224,13 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) { } return pThrd; } -void transReleaseCliHandle(void* handle) { +int transReleaseCliHandle(void* handle) { int idx = -1; bool valid = false; SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle, &valid); if (pThrd == NULL) { - return; + return -1; } STransMsg tmsg = {.info.handle = handle}; SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); @@ -1238,14 +1238,14 @@ void transReleaseCliHandle(void* handle) { cmsg->type = Release; transAsyncSend(pThrd->asyncPool, &cmsg->q); - return; + return 0; } -void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return; + return -1; } bool valid = false; @@ -1253,7 +1253,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra if (pThrd == NULL && valid == false) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return; + return -1; } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); @@ -1280,14 +1280,14 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return; + return 0; } -void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { +int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - return; + return -1; } bool valid = false; @@ -1295,7 +1295,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM if (pThrd == NULL && valid == false) { transFreeMsg(pReq->pCont); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return; + return -1; } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); @@ -1328,14 +1328,16 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM taosMemoryFree(sem); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return; + return 0; } /* * **/ -void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { +int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) return; + if (pTransInst == NULL) { + return -1; + } SCvtAddr cvtAddr = {0}; if (ip != NULL && fqdn != NULL) { @@ -1358,6 +1360,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { transAsyncSend(thrd->asyncPool, &(cliMsg->q)); } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return 0; } int64_t transAllocHandle() { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 9a511adf9b..7b9402f954 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1034,7 +1034,7 @@ void transUnrefSrvHandle(void* handle) { } } -void transReleaseSrvHandle(void* handle) { +int transReleaseSrvHandle(void* handle) { SRpcHandleInfo* info = handle; SExHandle* exh = info->handle; int64_t refId = info->refId; @@ -1053,16 +1053,16 @@ void transReleaseSrvHandle(void* handle) { tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(transGetRefMgt(), refId); - return; + return 0; _return1: tTrace("handle %p failed to send to release handle", exh); transReleaseExHandle(transGetRefMgt(), refId); - return; + return -1; _return2: tTrace("handle %p failed to send to release handle", exh); - return; + return -1; } -void transSendResponse(const STransMsg* msg) { +int transSendResponse(const STransMsg* msg) { SExHandle* exh = msg->info.handle; int64_t refId = msg->info.refId; ASYNC_CHECK_HANDLE(exh, refId); @@ -1082,18 +1082,18 @@ void transSendResponse(const STransMsg* msg) { tGTrace("conn %p start to send resp (1/2)", exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(transGetRefMgt(), refId); - return; + return 0; _return1: tTrace("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); transReleaseExHandle(transGetRefMgt(), refId); - return; + return -1; _return2: tTrace("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - return; + return -1; } -void transRegisterMsg(const STransMsg* msg) { +int transRegisterMsg(const STransMsg* msg) { SExHandle* exh = msg->info.handle; int64_t refId = msg->info.refId; ASYNC_CHECK_HANDLE(exh, refId); @@ -1112,16 +1112,17 @@ void transRegisterMsg(const STransMsg* msg) { tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(transGetRefMgt(), refId); - return; + return 0; _return1: tTrace("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); transReleaseExHandle(transGetRefMgt(), refId); - return; + return -1; _return2: tTrace("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); + return -1; } int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }