diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index c7b0bc8f11..300b3bdb4d 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -32,11 +32,12 @@ typedef struct SQueueReader { static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id); static void streamTaskPutbackToken(STokenBucket* pBucket); static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); +static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id); static void streamQueueCleanup(SStreamQueue* pQueue) { SStreamQueueItem* qItem = NULL; while (1) { - streamQueueNextItem(pQueue, &qItem); + streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, NULL); if (qItem == NULL) { break; } @@ -250,7 +251,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte while (1) { ETaskStatus status = streamTaskGetStatus(pTask).state; - if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP || status == TASK_STATUS__STOP) { + if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) { stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks); return EXEC_CONTINUE; }