fix(stream): fix memory leak in checkpt queue.
This commit is contained in:
parent
c7d9a1cd73
commit
e46a063c2f
|
@ -32,11 +32,12 @@ typedef struct SQueueReader {
|
|||
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
|
||||
static void streamTaskPutbackToken(STokenBucket* pBucket);
|
||||
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
|
||||
static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id);
|
||||
|
||||
static void streamQueueCleanup(SStreamQueue* pQueue) {
|
||||
SStreamQueueItem* qItem = NULL;
|
||||
while (1) {
|
||||
streamQueueNextItem(pQueue, &qItem);
|
||||
streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, NULL);
|
||||
if (qItem == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -250,7 +251,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
|||
|
||||
while (1) {
|
||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||
if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP || status == TASK_STATUS__STOP) {
|
||||
if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) {
|
||||
stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks);
|
||||
return EXEC_CONTINUE;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue