From 14b9d920bacac018552b7e75c7b056b4ad575c72 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jul 2023 16:02:41 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- source/libs/stream/src/streamCheckpoint.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e90e974318..ff7d7a0963 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -119,6 +119,7 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock; taosArrayPush(pChkpoint->blocks, pBlock); + taosMemoryFree(pBlock); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) { taosFreeQitem(pChkpoint); return TSDB_CODE_OUT_OF_MEMORY; @@ -163,6 +164,8 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // set the task status pTask->checkpointingId = checkpointId; + + // set task status pTask->status.taskStatus = TASK_STATUS__CK; //todo fix race condition: set the status and append checkpoint block @@ -211,6 +214,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc code = continueDispatchCheckpointBlock(pBlock, pTask); } + streamFreeQitem((SStreamQueueItem*)pBlock); return code; }