From 6d64d0e081866d93f1482e80efc28e586a73ae5d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 1 Jul 2024 15:34:04 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- source/libs/stream/inc/streamInt.h | 1 + source/libs/stream/src/streamHb.c | 17 +++++++++++++++++ source/libs/stream/src/streamMeta.c | 3 ++- 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 7ec453a68d..008d066717 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -220,6 +220,7 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); void streamMetaRemoveDB(void* arg, char* key); void streamMetaHbToMnode(void* param, void* tmrId); SMetaHbInfo* createMetaHbInfo(int64_t* pRid); +void* destroyMetaHbInfo(SMetaHbInfo* pInfo); void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta); void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount); int32_t streamMetaSendHbHelper(SStreamMeta* pMeta); diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 7bee454bd4..1ddcd87959 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -279,6 +279,21 @@ SMetaHbInfo* createMetaHbInfo(int64_t* pRid) { return pInfo; } +void* destroyMetaHbInfo(SMetaHbInfo* pInfo) { + if (pInfo != NULL) { + tCleanupStreamHbMsg(&pInfo->hbMsg); + + if (pInfo->hbTmr != NULL) { + taosTmrStop(pInfo->hbTmr); + pInfo->hbTmr = NULL; + } + + taosMemoryFree(pInfo); + } + + return NULL; +} + void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) { // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { @@ -315,6 +330,8 @@ int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) { pInfo->hbCount += 1; pInfo->msgSendTs = -1; + + tCleanupStreamHbMsg(&pInfo->hbMsg); } else { stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d9b78094a0..7b94f642e2 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -515,7 +515,8 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pFailedTaskSet); - taosMemoryFree(pMeta->pHbInfo); + pMeta->pHbInfo = destroyMetaHbInfo(pMeta->pHbInfo); + taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex);