From a0c77f5baafeeb44ea9588da2577c35368b2b6ad Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 29 Feb 2024 18:40:21 +0800 Subject: [PATCH 1/5] fix mem leak while taosd quit --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 9 +++--- source/libs/transport/inc/transComm.h | 30 +++++++++---------- source/libs/transport/src/transCli.c | 18 +++++++++-- source/libs/transport/src/transSvr.c | 7 +++-- 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 1a31f08801..c54da860dd 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -87,9 +87,9 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) { } } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SDnodeTrans *pTrans = &pDnode->trans; + SDnodeTrans * pTrans = &pDnode->trans; int32_t code = -1; - SRpcMsg *pMsg = NULL; + SRpcMsg * pMsg = NULL; SMgmtWrapper *pWrapper = NULL; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; @@ -256,11 +256,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SArray *pArray = (*pWrapper->func.getHandlesFp)(); + SArray * pArray = (*pWrapper->func.getHandlesFp)(); if (pArray == NULL) return -1; for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SMgmtHandle *pMgmt = taosArrayGet(pArray, i); + SMgmtHandle * pMgmt = taosArrayGet(pArray, i); SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; @@ -345,6 +345,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.parent = pDnode; rpcInit.rfp = rpcRfp; rpcInit.compressSize = tsCompressMsgSize; + rpcInit.dfp = destroyAhandle; rpcInit.retryMinInterval = tsRedirectPeriod; rpcInit.retryStepFactor = tsRedirectFactor; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5b18d56d70..c010e31320 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -256,21 +256,21 @@ void transAsyncPoolDestroy(SAsyncPool* pool); int transAsyncSend(SAsyncPool* pool, queue* mq); bool transAsyncPoolIsEmpty(SAsyncPool* pool); -#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \ - do { \ - for (int i = 0; i < pool->nAsync; i++) { \ - uv_async_t* async = &(pool->asyncs[i]); \ - SAsyncItem* item = async->data; \ - while (!QUEUE_IS_EMPTY(&item->qmsg)) { \ - tTrace("destroy msg in async pool "); \ - queue* h = QUEUE_HEAD(&item->qmsg); \ - QUEUE_REMOVE(h); \ - msgType* msg = QUEUE_DATA(h, msgType, q); \ - if (msg != NULL) { \ - freeFunc(msg); \ - } \ - } \ - } \ +#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \ + do { \ + for (int i = 0; i < pool->nAsync; i++) { \ + uv_async_t* async = &(pool->asyncs[i]); \ + SAsyncItem* item = async->data; \ + while (!QUEUE_IS_EMPTY(&item->qmsg)) { \ + tTrace("destroy msg in async pool "); \ + queue* h = QUEUE_HEAD(&item->qmsg); \ + QUEUE_REMOVE(h); \ + msgType* msg = QUEUE_DATA(h, msgType, q); \ + if (msg != NULL) { \ + freeFunc(msg, param); \ + } \ + } \ + } \ } while (0) #define ASYNC_CHECK_HANDLE(exh1, id) \ diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e937d2e65e..e369bf8f7d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -219,6 +219,8 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, /// NULL,cliHandleUpdate}; static FORCE_INLINE void destroyCmsg(void* cmsg); + +static FORCE_INLINE void destroyQueuedMsg(void* arg, void* param); static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); @@ -1963,7 +1965,19 @@ static FORCE_INLINE void destroyCmsg(void* arg) { transFreeMsg(pMsg->msg.pCont); taosMemoryFree(pMsg); } - +static FORCE_INLINE void destroyQueuedMsg(void* arg, void* param) { + SCliMsg* pMsg = arg; + if (pMsg == NULL) { + return; + } + if (param != NULL) { + SCliThrd* pThrd = param; + if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle); + } + transDestroyConnCtx(pMsg->ctx); + transFreeMsg(pMsg->msg.pCont); + taosMemoryFree(pMsg); +} static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { if (param == NULL) return; @@ -2057,7 +2071,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosThreadJoin(pThrd->thread, NULL); CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); - TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyQueuedMsg, (void*)pThrd); transAsyncPoolDestroy(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index b324ca5f91..5a1ef31b7d 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg); static void uvNotifyLinkBrokenToApp(SSvrConn* conn); -static FORCE_INLINE void destroySmsg(SSvrMsg* smsg); +static FORCE_INLINE void destroySmsg(SSvrMsg* smsg); static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); @@ -671,7 +671,8 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) { transFreeMsg(smsg->msg.pCont); taosMemoryFree(smsg); } -static void destroyAllConn(SWorkThrd* pThrd) { +static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); } +static void destroyAllConn(SWorkThrd* pThrd) { tTrace("thread %p destroy all conn ", pThrd); while (!QUEUE_IS_EMPTY(&pThrd->conn)) { queue* h = QUEUE_HEAD(&pThrd->conn); @@ -1394,7 +1395,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { } taosThreadJoin(pThrd->thread, NULL); SRV_RELEASE_UV(pThrd->loop); - TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsgWrapper, NULL); transAsyncPoolDestroy(pThrd->asyncPool); uvWhiteListDestroy(pThrd->pWhiteList); From c3220aec44583595a5c2aae8c55675849c2bb79a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 29 Feb 2024 10:46:09 +0000 Subject: [PATCH 2/5] fix mem leak while taosd quit --- source/libs/transport/src/thttp.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 6de10cbb9e..e76f073d63 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -183,7 +183,8 @@ static void* httpThread(void* arg) { return NULL; } -static void httpDestroyMsg(SHttpMsg* msg) { +static void httpDestroyMsg(void* cont, void* param) { + SHttpMsg* msg = cont; if (msg == NULL) return; taosMemoryFree(msg->server); @@ -554,7 +555,7 @@ void transHttpEnvDestroy() { httpSendQuit(); taosThreadJoin(load->thread, NULL); - TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg); + TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg, NULL); transAsyncPoolDestroy(load->asyncPool); uv_loop_close(load->loop); taosMemoryFree(load->loop); From 881ffacb4f6050ec04fdedca85de2e86a692e9f5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 29 Feb 2024 10:52:50 +0000 Subject: [PATCH 3/5] fix mem leak while taosd quit --- source/libs/transport/src/thttp.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index e76f073d63..c4ca39c323 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -183,8 +183,7 @@ static void* httpThread(void* arg) { return NULL; } -static void httpDestroyMsg(void* cont, void* param) { - SHttpMsg* msg = cont; +static void httpDestroyMsg(SHttpMsg* msg) { if (msg == NULL) return; taosMemoryFree(msg->server); @@ -192,6 +191,15 @@ static void httpDestroyMsg(void* cont, void* param) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } +static void httpDestroyMsgWrapper(void* cont, void* param) { + httpDestroyMsg((SHttpMsg*)cont); + // if (msg == NULL) return; + + // taosMemoryFree(msg->server); + // taosMemoryFree(msg->uri); + // taosMemoryFree(msg->cont); + // taosMemoryFree(msg); +} static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { SHttpMsg *msg = NULL, *quitMsg = NULL; @@ -555,7 +563,7 @@ void transHttpEnvDestroy() { httpSendQuit(); taosThreadJoin(load->thread, NULL); - TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg, NULL); + TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL); transAsyncPoolDestroy(load->asyncPool); uv_loop_close(load->loop); taosMemoryFree(load->loop); From e0b1b5a1c62958e3f9c9a41d497bf9c107fff3aa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 29 Feb 2024 10:54:35 +0000 Subject: [PATCH 4/5] fix mem leak while taosd quit --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c54da860dd..77760b16f4 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -87,9 +87,9 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) { } } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SDnodeTrans * pTrans = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; - SRpcMsg * pMsg = NULL; + SRpcMsg *pMsg = NULL; SMgmtWrapper *pWrapper = NULL; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; @@ -256,11 +256,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SArray * pArray = (*pWrapper->func.getHandlesFp)(); + SArray *pArray = (*pWrapper->func.getHandlesFp)(); if (pArray == NULL) return -1; for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SMgmtHandle * pMgmt = taosArrayGet(pArray, i); + SMgmtHandle *pMgmt = taosArrayGet(pArray, i); SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; From 867e81c5f5f07b60cec275ad457fe2b8e1075dc2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 29 Feb 2024 10:58:49 +0000 Subject: [PATCH 5/5] fix mem leak while taosd quit --- source/libs/transport/src/transCli.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e369bf8f7d..6ae72eac14 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -220,7 +220,7 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, static FORCE_INLINE void destroyCmsg(void* cmsg); -static FORCE_INLINE void destroyQueuedMsg(void* arg, void* param); +static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param); static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); @@ -1965,7 +1965,7 @@ static FORCE_INLINE void destroyCmsg(void* arg) { transFreeMsg(pMsg->msg.pCont); taosMemoryFree(pMsg); } -static FORCE_INLINE void destroyQueuedMsg(void* arg, void* param) { +static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) { SCliMsg* pMsg = arg; if (pMsg == NULL) { return; @@ -1974,9 +1974,7 @@ static FORCE_INLINE void destroyQueuedMsg(void* arg, void* param) { SCliThrd* pThrd = param; if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle); } - transDestroyConnCtx(pMsg->ctx); - transFreeMsg(pMsg->msg.pCont); - taosMemoryFree(pMsg); + destroyCmsg(pMsg); } static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { if (param == NULL) return; @@ -2071,7 +2069,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosThreadJoin(pThrd->thread, NULL); CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); - TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyQueuedMsg, (void*)pThrd); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsgWrapper, (void*)pThrd); transAsyncPoolDestroy(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);