From b23f20a4507c78673b848175946acc5c4a3c3576 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Feb 2025 14:29:49 +0800 Subject: [PATCH] fix(stream): fix memory leak in checkpt queue. --- source/libs/stream/src/streamQueue.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index c7b0bc8f11..300b3bdb4d 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -32,11 +32,12 @@ typedef struct SQueueReader { static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id); static void streamTaskPutbackToken(STokenBucket* pBucket); static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); +static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id); static void streamQueueCleanup(SStreamQueue* pQueue) { SStreamQueueItem* qItem = NULL; while (1) { - streamQueueNextItem(pQueue, &qItem); + streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, NULL); if (qItem == NULL) { break; } @@ -250,7 +251,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte while (1) { ETaskStatus status = streamTaskGetStatus(pTask).state; - if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP || status == TASK_STATUS__STOP) { + if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) { stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks); return EXEC_CONTINUE; }