From 82cde4661465de71b2071f44599a0be8ac4c56ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Apr 2024 19:48:50 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- source/libs/stream/src/streamCheckpoint.c | 1 + source/libs/stream/src/streamExec.c | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 67b68f73ad..36886329ac 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -190,6 +190,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); + streamFreeQitem((SStreamQueueItem*)pBlock); return code; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 93ede2707b..ab69a135f1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -46,6 +46,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); code = streamTaskPutDataIntoOutputQ(pTask, pBlock); if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock(pBlock); return code; } @@ -76,7 +77,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { // back pressure and record position - destroyStreamDataBlock(pStreamBlocks); return code; }