From 4a5ab10b3df7dd13828b48b6e1f1d25dbada06c2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 Jan 2024 20:28:43 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 4 ++-- source/dnode/mnode/impl/src/mndStream.c | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index a1af11f2ec..7a372a56cc 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -84,8 +84,8 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ef804f87b5..5143515a55 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -169,6 +169,7 @@ void mndCleanupStream(SMnode *pMnode) { taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.transMgmt.pWaitingList); + taosHashCleanup(execInfo.pTransferStateStreams); taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); } @@ -3077,7 +3078,9 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); // remove this entry + taosArrayDestroy(*(SArray**)pReqTaskList); taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); + int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); } else {