From cea647daf6905b12cab2e2ee606a0de8b315c6ba Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 Dec 2024 12:21:28 +0800 Subject: [PATCH] refactor: limit the number of items in inputq to be 5120 --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamSched.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 863bc76c79..8f9e4a311c 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -37,7 +37,7 @@ extern "C" { #define META_HB_CHECK_INTERVAL 200 #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) -#define STREAM_TASK_QUEUE_CAPACITY 20480 +#define STREAM_TASK_QUEUE_CAPACITY 5120 #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) // clang-format off diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index bf402234ba..0436ae7ee4 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -225,7 +225,8 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig } pTask->status.latestForceWindow = w; - if (w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) { + if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || + streamQueueIsFull(pTask->inputq.queue)) { int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); *pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now;