fix(stream): fix error in generating token in bucket.

This commit is contained in:
Haojun Liao 2023-11-20 15:26:50 +08:00
parent 7518103b96
commit 9f8a63099c
2 changed files with 14 additions and 9 deletions

View File

@ -75,7 +75,8 @@ struct STokenBucket {
double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second
double quotaRemain; // not consumed bytes per second
double quotaRate; // number of token per second
int64_t fillTimestamp; // fill timestamp
int64_t tokenFillTimestamp; // fill timestamp
int64_t quotaFillTimestamp; // fill timestamp
};
struct SStreamQueue {

View File

@ -388,32 +388,36 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
pBucket->quotaRemain = pBucket->quotaCapacity;
pBucket->fillTimestamp = taosGetTimestampMs();
pBucket->tokenFillTimestamp = taosGetTimestampMs();
pBucket->quotaFillTimestamp = taosGetTimestampMs();
stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
return TSDB_CODE_SUCCESS;
}
static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
int64_t now = taosGetTimestampMs();
int64_t delta = now - pBucket->fillTimestamp;
int64_t deltaToken = now - pBucket->tokenFillTimestamp;
ASSERT(pBucket->numOfToken >= 0);
int32_t incNum = (delta / 1000.0) * pBucket->numRate;
int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
if (incNum > 0) {
pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
pBucket->fillTimestamp = now;
pBucket->tokenFillTimestamp = now;
}
// increase the new available quota as time goes on
double incSize = (delta / 1000.0) * pBucket->quotaRate;
int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
if (incSize > 0) {
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
pBucket->fillTimestamp = now;
pBucket->quotaFillTimestamp = now;
}
if (incNum > 0 || incSize > 0) {
stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s",
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id);
stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
}
}