fix(stream): fix memory leak in checkpt queue.

This commit is contained in:
Haojun Liao 2025-02-25 14:29:49 +08:00
parent 6e1206bbc1
commit b23f20a450
1 changed files with 3 additions and 2 deletions

View File

@ -32,11 +32,12 @@ typedef struct SQueueReader {
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id); static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
static void streamTaskPutbackToken(STokenBucket* pBucket); static void streamTaskPutbackToken(STokenBucket* pBucket);
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
static void streamQueueCleanup(SStreamQueue* pQueue) { static void streamQueueCleanup(SStreamQueue* pQueue) {
SStreamQueueItem* qItem = NULL; SStreamQueueItem* qItem = NULL;
while (1) { while (1) {
streamQueueNextItem(pQueue, &qItem); streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, NULL);
if (qItem == NULL) { if (qItem == NULL) {
break; break;
} }
@ -250,7 +251,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
while (1) { while (1) {
ETaskStatus status = streamTaskGetStatus(pTask).state; ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP || status == TASK_STATUS__STOP) { if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks); stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
return EXEC_CONTINUE; return EXEC_CONTINUE;
} }