From f84bfc96fb5f1ffa30de4d17a98d04b09e246849 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 3 Jul 2023 14:30:11 +0800 Subject: [PATCH] fix(stream): fix error in pause in stream. --- source/libs/stream/src/streamExec.c | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9fddcf9155..9a53ffa088 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -325,7 +325,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { const char* id = pTask->id.idStr; while (1) { - int32_t batchSize = 1; + int32_t batchSize = 0; int16_t times = 0; SStreamQueueItem* pInput = NULL; @@ -335,8 +335,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { if (streamTaskShouldPause(&pTask->status)) { - qDebug("s-task:%s task should pause, input blocks:%s", pTask->id.idStr, batchSize); - if (batchSize > 1) { + qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, batchSize); + if (batchSize > 0) { break; } else { return 0; @@ -357,6 +357,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pInput == NULL) { + batchSize += 1; + pInput = qItem; streamQueueProcessSuccess(pTask->inputQueue); if (pTask->taskLevel == TASK_LEVEL__SINK) { @@ -364,18 +366,20 @@ int32_t streamExecForAll(SStreamTask* pTask) { } } else { // todo we need to sort the data block, instead of just appending into the array list. - void* newRet = NULL; - if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) { + ASSERT(batchSize >= 1); + + void* newRet = streamMergeQueueItem(pInput, qItem); + if (newRet == NULL) { streamQueueProcessFail(pTask->inputQueue); break; } else { - batchSize++; + batchSize += 1; + pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); - if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, - MAX_STREAM_EXEC_BATCH_NUM); + if (batchSize >= MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); break; } } @@ -390,6 +394,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { // } if (pInput == NULL) { + ASSERT(batchSize == 0); break; }