From 76bf0aea56ac3d2e7bafb93d1abac0e77b210931 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Sat, 6 May 2023 10:14:27 +0800 Subject: [PATCH] opt stream input queue --- source/libs/stream/src/streamExec.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 663cdb4d7c..7884463ebb 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -15,7 +15,8 @@ #include "streamInc.h" -#define STREAM_EXEC_MAX_BATCH_NUM 10240 +#define MAX_STREAM_EXEC_BATCH_NUM 10240 +#define MIN_STREAM_EXEC_BATCH_NUM 16 bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); @@ -260,12 +261,18 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchSize = 1; void* pInput = NULL; + int16_t times = 0; // merge multiple input data if possible in the input queue. while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { -// qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr); + if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { + times++; + taosMsleep(1); + qDebug("===stream===try agian batchSize:%d", batchSize); + continue; + } break; } @@ -284,7 +291,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { batchSize++; pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); - if (batchSize > STREAM_EXEC_MAX_BATCH_NUM) { + if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { break; } }