fix(stream): limit sink quota.
This commit is contained in:
parent
2c0e54f4f7
commit
ecb3b44026
|
@ -20,7 +20,7 @@
|
|||
#define STREAM_TASK_QUEUE_CAPACITY 20480
|
||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
||||
#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50)
|
||||
#define MAX_SMOOTH_BURST_RATIO 10 // 20 sec
|
||||
#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec
|
||||
|
||||
// todo refactor:
|
||||
// read data from input queue
|
||||
|
@ -401,20 +401,20 @@ static void fillTokenBucket(STokenBucket* pBucket) {
|
|||
|
||||
int32_t incNum = (delta / 1000.0) * pBucket->numRate;
|
||||
if (incNum > 0) {
|
||||
if ((pBucket->numOfToken + incNum) < pBucket->numCapacity) {
|
||||
pBucket->numOfToken += incNum;
|
||||
} else {
|
||||
pBucket->numOfToken = pBucket->numCapacity;
|
||||
}
|
||||
pBucket->numOfToken = MIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
|
||||
pBucket->fillTimestamp = now;
|
||||
stDebug("new token available, current:%d, inc:%d ts:%" PRId64, pBucket->numOfToken, incNum, now);
|
||||
}
|
||||
|
||||
// increase the new available quota as time goes on
|
||||
double incSize = (delta / 1000.0) * pBucket->bytesRate;
|
||||
if (incSize > 0) {
|
||||
pBucket->bytesRemain = MIN(pBucket->bytesRemain + incSize, pBucket->bytesCapacity);
|
||||
stDebug("new bytes token available, current:%.2fKiB, inc:%.2fKiB ts:%" PRId64, pBucket->bytesRemain, incSize, now);
|
||||
}
|
||||
|
||||
if (incNum > 0) {
|
||||
stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64
|
||||
" wait for %.2f Sec",
|
||||
pBucket->numOfToken, incNum, pBucket->bytesRemain, incSize, now, delta / 1000.0);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue