From de7231e59e839c8d6fc58439bc0bdf2b2c805e75 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 May 2023 15:35:59 +0800 Subject: [PATCH] fix(stream): fix memory leak in handling dispatch msg when output buffer is full. --- source/libs/stream/src/stream.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 8cc1ef1dd3..d3e4b23ad1 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -129,11 +129,11 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr); } else { - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock) == 0) { - status = TASK_INPUT_STATUS__NORMAL; - } else { // input queue is full, upstream is blocked now - status = TASK_INPUT_STATUS__BLOCKED; - } + int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); + // input queue is full, upstream is blocked now + status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED; + + } // rsp by input status @@ -303,6 +303,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { taosFreeQitem(pItem); return -1; } + taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { @@ -310,6 +311,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); + destroyStreamDataBlock((SStreamDataBlock*) pItem); + taosFreeQitem(pItem); return -1; }