Merge pull request #24964 from taosdata/fix/fixMemleakWhileQuit

Fix/fix memleak while taosd quit
This commit is contained in:
Hongze Cheng 2024-03-01 09:26:14 +08:00 committed by GitHub
commit ddea43e5d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 44 additions and 21 deletions

View File

@ -345,6 +345,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.dfp = destroyAhandle;
rpcInit.retryMinInterval = tsRedirectPeriod;
rpcInit.retryStepFactor = tsRedirectFactor;

View File

@ -256,21 +256,21 @@ void transAsyncPoolDestroy(SAsyncPool* pool);
int transAsyncSend(SAsyncPool* pool, queue* mq);
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \
do { \
for (int i = 0; i < pool->nAsync; i++) { \
uv_async_t* async = &(pool->asyncs[i]); \
SAsyncItem* item = async->data; \
while (!QUEUE_IS_EMPTY(&item->qmsg)) { \
tTrace("destroy msg in async pool "); \
queue* h = QUEUE_HEAD(&item->qmsg); \
QUEUE_REMOVE(h); \
msgType* msg = QUEUE_DATA(h, msgType, q); \
if (msg != NULL) { \
freeFunc(msg); \
} \
} \
} \
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \
do { \
for (int i = 0; i < pool->nAsync; i++) { \
uv_async_t* async = &(pool->asyncs[i]); \
SAsyncItem* item = async->data; \
while (!QUEUE_IS_EMPTY(&item->qmsg)) { \
tTrace("destroy msg in async pool "); \
queue* h = QUEUE_HEAD(&item->qmsg); \
QUEUE_REMOVE(h); \
msgType* msg = QUEUE_DATA(h, msgType, q); \
if (msg != NULL) { \
freeFunc(msg, param); \
} \
} \
} \
} while (0)
#define ASYNC_CHECK_HANDLE(exh1, id) \

View File

@ -191,6 +191,15 @@ static void httpDestroyMsg(SHttpMsg* msg) {
taosMemoryFree(msg->cont);
taosMemoryFree(msg);
}
static void httpDestroyMsgWrapper(void* cont, void* param) {
httpDestroyMsg((SHttpMsg*)cont);
// if (msg == NULL) return;
// taosMemoryFree(msg->server);
// taosMemoryFree(msg->uri);
// taosMemoryFree(msg->cont);
// taosMemoryFree(msg);
}
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
SHttpMsg *msg = NULL, *quitMsg = NULL;
@ -554,7 +563,7 @@ void transHttpEnvDestroy() {
httpSendQuit();
taosThreadJoin(load->thread, NULL);
TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg);
TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL);
transAsyncPoolDestroy(load->asyncPool);
uv_loop_close(load->loop);
taosMemoryFree(load->loop);

View File

@ -219,6 +219,8 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq,
/// NULL,cliHandleUpdate};
static FORCE_INLINE void destroyCmsg(void* cmsg);
static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param);
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
@ -1963,7 +1965,17 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
transFreeMsg(pMsg->msg.pCont);
taosMemoryFree(pMsg);
}
static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) {
SCliMsg* pMsg = arg;
if (pMsg == NULL) {
return;
}
if (param != NULL) {
SCliThrd* pThrd = param;
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle);
}
destroyCmsg(pMsg);
}
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
if (param == NULL) return;
@ -2057,7 +2069,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosThreadJoin(pThrd->thread, NULL);
CLI_RELEASE_UV(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsgWrapper, (void*)pThrd);
transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);

View File

@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
@ -671,7 +671,8 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
transFreeMsg(smsg->msg.pCont);
taosMemoryFree(smsg);
}
static void destroyAllConn(SWorkThrd* pThrd) {
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); }
static void destroyAllConn(SWorkThrd* pThrd) {
tTrace("thread %p destroy all conn ", pThrd);
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
queue* h = QUEUE_HEAD(&pThrd->conn);
@ -1394,7 +1395,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
}
taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsgWrapper, NULL);
transAsyncPoolDestroy(pThrd->asyncPool);
uvWhiteListDestroy(pThrd->pWhiteList);