fix(stream): limit sink quota.

This commit is contained in:
Haojun Liao 2023-09-27 00:30:38 +08:00
parent 27dcbcb96f
commit 9ab9de0ad3
1 changed files with 8 additions and 8 deletions

View File

@ -20,7 +20,7 @@
#define STREAM_TASK_QUEUE_CAPACITY 20480 #define STREAM_TASK_QUEUE_CAPACITY 20480
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) #define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) #define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)
#define MAX_SMOOTH_BURST_RATIO 10 // 20 sec #define MAX_SMOOTH_BURST_RATIO 5 // 20 sec
// todo refactor: // todo refactor:
// read data from input queue // read data from input queue
@ -401,20 +401,20 @@ static void fillTokenBucket(STokenBucket* pBucket) {
int32_t incNum = (delta / 1000.0) * pBucket->numRate; int32_t incNum = (delta / 1000.0) * pBucket->numRate;
if (incNum > 0) { if (incNum > 0) {
if ((pBucket->numOfToken + incNum) < pBucket->numCapacity) { pBucket->numOfToken = MIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
pBucket->numOfToken += incNum;
} else {
pBucket->numOfToken = pBucket->numCapacity;
}
pBucket->fillTimestamp = now; pBucket->fillTimestamp = now;
stDebug("new token available, current:%d, inc:%d ts:%" PRId64, pBucket->numOfToken, incNum, now);
} }
// increase the new available quota as time goes on // increase the new available quota as time goes on
double incSize = (delta / 1000.0) * pBucket->bytesRate; double incSize = (delta / 1000.0) * pBucket->bytesRate;
if (incSize > 0) { if (incSize > 0) {
pBucket->bytesRemain = MIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity); pBucket->bytesRemain = MIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity);
stDebug("new bytes token available, current:%.2fKiB, inc:%.2fKiB ts:%" PRId64, pBucket->bytesRemain, incSize, now); }
if (incNum > 0) {
stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64
" wait for %.2f Sec",
pBucket->numOfToken, incNum, pBucket->bytesRemain, incSize, now, delta / 1000.0);
} }
} }