fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2024-01-19 20:28:43 +08:00
parent b1ac1deedb
commit 4a5ab10b3d
2 changed files with 5 additions and 2 deletions

View File

@ -84,8 +84,8 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -169,6 +169,7 @@ void mndCleanupStream(SMnode *pMnode) {
taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.pTaskMap);
taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosHashCleanup(execInfo.transMgmt.pWaitingList); taosHashCleanup(execInfo.transMgmt.pWaitingList);
taosHashCleanup(execInfo.pTransferStateStreams);
taosThreadMutexDestroy(&execInfo.lock); taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup"); mDebug("mnd stream exec info cleanup");
} }
@ -3077,7 +3078,9 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
// remove this entry // remove this entry
taosArrayDestroy(*(SArray**)pReqTaskList);
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t)); taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams); int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams); mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams);
} else { } else {