From 9ab9de0ad35c0acc0f8067536531501c4adafd63 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 00:30:38 +0800 Subject: [PATCH] fix(stream): limit sink quota. --- source/libs/stream/src/streamQueue.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 708b0572a4..dc0eb949da 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -20,7 +20,7 @@ #define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) -#define MAX_SMOOTH_BURST_RATIO 10 // 20 sec +#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec // todo refactor: // read data from input queue @@ -401,20 +401,20 @@ static void fillTokenBucket(STokenBucket* pBucket) { int32_t incNum = (delta / 1000.0) * pBucket->numRate; if (incNum > 0) { - if ((pBucket->numOfToken + incNum) < pBucket->numCapacity) { - pBucket->numOfToken += incNum; - } else { - pBucket->numOfToken = pBucket->numCapacity; - } + pBucket->numOfToken = MIN(pBucket->numOfToken + incNum, pBucket->numCapacity); pBucket->fillTimestamp = now; - stDebug("new token available, current:%d, inc:%d ts:%" PRId64, pBucket->numOfToken, incNum, now); } // increase the new available quota as time goes on double incSize = (delta / 1000.0) * pBucket->bytesRate; if (incSize > 0) { pBucket->bytesRemain = MIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); - stDebug("new bytes token available, current:%.2fKiB, inc:%.2fKiB ts:%" PRId64, pBucket->bytesRemain, incSize, now); + } + + if (incNum > 0) { + stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64 + " wait for %.2f Sec", + pBucket->numOfToken, incNum, pBucket->bytesRemain, incSize, now, delta / 1000.0); } }