diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 663cdb4d7c..7884463ebb 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,7 +15,8 @@ #include "streamInc.h" -#define STREAM_EXEC_MAX_BATCH_NUM 10240 +#define MAX_STREAM_EXEC_BATCH_NUM 10240 +#define MIN_STREAM_EXEC_BATCH_NUM 16 bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); @@ -260,12 +261,18 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchSize = 1; void* pInput = NULL; + int16_t times = 0; // merge multiple input data if possible in the input queue. while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { -// qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr); + if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { + times++; + taosMsleep(1); + qDebug("===stream===try agian batchSize:%d", batchSize); + continue; + } break; } @@ -284,7 +291,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { batchSize++; pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); - if (batchSize > STREAM_EXEC_MAX_BATCH_NUM) { + if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { break; } }