diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 91a721d80f..f8b3893782 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -217,6 +217,21 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) void transDestroyAsyncPool(SAsyncPool* pool); int transSendAsync(SAsyncPool* pool, queue* mq); +#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \ + do { \ + for (int i = 0; i < pool->nAsync; i++) { \ + uv_async_t* async = &(pool->asyncs[i]); \ + SAsyncItem* item = async->data; \ + while (!QUEUE_IS_EMPTY(&item->qmsg)) { \ + queue* h = QUEUE_HEAD(&item->qmsg); \ + QUEUE_REMOVE(h); \ + msgType* msg = QUEUE_DATA(h, msgType, q); \ + if (msg != NULL) { \ + freeFunc(msg); \ + } \ + } \ + } \ + } while (0) int transInitBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f0af0d99c9..3220e229a6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -892,6 +892,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { taosThreadJoin(pThrd->thread, NULL); CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); transDestroyAsyncPool(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 526f896ad2..be07fbd264 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -190,6 +190,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) } return pool; } + void transDestroyAsyncPool(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 12cfff093c..9018eaacf6 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -1036,6 +1036,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { } taosThreadJoin(pThrd->thread, NULL); SRV_RELEASE_UV(pThrd->loop); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSrvMsg, destroySmsg); transDestroyAsyncPool(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd);