fix(stream): fix error in generating token in bucket.

This commit is contained in:
Haojun Liao 2023-11-20 15:26:50 +08:00
parent db6cbeb674
commit e78b04be51
2 changed files with 14 additions and 9 deletions

View File

@ -75,7 +75,8 @@ struct STokenBucket {
double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second
double quotaRemain; // not consumed bytes per second double quotaRemain; // not consumed bytes per second
double quotaRate; // number of token per second double quotaRate; // number of token per second
int64_t fillTimestamp; // fill timestamp int64_t tokenFillTimestamp; // fill timestamp
int64_t quotaFillTimestamp; // fill timestamp
}; };
struct SStreamQueue { struct SStreamQueue {

View File

@ -388,32 +388,36 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO; pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO;
pBucket->quotaRemain = pBucket->quotaCapacity; pBucket->quotaRemain = pBucket->quotaCapacity;
pBucket->fillTimestamp = taosGetTimestampMs(); pBucket->tokenFillTimestamp = taosGetTimestampMs();
pBucket->quotaFillTimestamp = taosGetTimestampMs();
stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate); stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void fillTokenBucket(STokenBucket* pBucket, const char* id) { static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int64_t delta = now - pBucket->fillTimestamp;
int64_t deltaToken = now - pBucket->tokenFillTimestamp;
ASSERT(pBucket->numOfToken >= 0); ASSERT(pBucket->numOfToken >= 0);
int32_t incNum = (delta / 1000.0) * pBucket->numRate; int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
if (incNum > 0) { if (incNum > 0) {
pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity); pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity);
pBucket->fillTimestamp = now; pBucket->tokenFillTimestamp = now;
} }
// increase the new available quota as time goes on // increase the new available quota as time goes on
double incSize = (delta / 1000.0) * pBucket->quotaRate; int64_t deltaQuota = now - pBucket->quotaFillTimestamp;
double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate;
if (incSize > 0) { if (incSize > 0) {
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
pBucket->fillTimestamp = now; pBucket->quotaFillTimestamp = now;
} }
if (incNum > 0 || incSize > 0) { if (incNum > 0 || incSize > 0) {
stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s", stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id); ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s",
pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id);
} }
} }