diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4a7d3a2a05..2850e8e733 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -75,7 +75,8 @@ struct STokenBucket { double quotaCapacity; // available capacity for maximum input size, KiloBytes per Second double quotaRemain; // not consumed bytes per second double quotaRate; // number of token per second - int64_t fillTimestamp; // fill timestamp + int64_t tokenFillTimestamp; // fill timestamp + int64_t quotaFillTimestamp; // fill timestamp }; struct SStreamQueue { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 556de169b4..d19dfc13bf 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -388,32 +388,36 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t pBucket->quotaCapacity = quotaRate * MAX_SMOOTH_BURST_RATIO; pBucket->quotaRemain = pBucket->quotaCapacity; - pBucket->fillTimestamp = taosGetTimestampMs(); + pBucket->tokenFillTimestamp = taosGetTimestampMs(); + pBucket->quotaFillTimestamp = taosGetTimestampMs(); stDebug("s-task:%s sink quotaRate:%.2fMiB, numRate:%d", id, quotaRate, numRate); return TSDB_CODE_SUCCESS; } static void fillTokenBucket(STokenBucket* pBucket, const char* id) { int64_t now = taosGetTimestampMs(); - int64_t delta = now - pBucket->fillTimestamp; + + int64_t deltaToken = now - pBucket->tokenFillTimestamp; ASSERT(pBucket->numOfToken >= 0); - int32_t incNum = (delta / 1000.0) * pBucket->numRate; + int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate; if (incNum > 0) { pBucket->numOfToken = TMIN(pBucket->numOfToken + incNum, pBucket->numCapacity); - pBucket->fillTimestamp = now; + pBucket->tokenFillTimestamp = now; } // increase the new available quota as time goes on - double incSize = (delta / 1000.0) * pBucket->quotaRate; + int64_t deltaQuota = now - pBucket->quotaFillTimestamp; + double incSize = (deltaQuota / 1000.0) * pBucket->quotaRate; if (incSize > 0) { pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); - pBucket->fillTimestamp = now; + pBucket->quotaFillTimestamp = now; } if (incNum > 0 || incSize > 0) { - stTrace("token/quota available, token:%d inc:%d, quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle:%" PRId64 "ms, %s", - pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta, id); + stTrace("token/quota available, token:%d inc:%d, token_TsDelta:%" PRId64 + ", quota:%.2fMiB inc:%.3fMiB quotaTs:%" PRId64 " now:%" PRId64 "ms, %s", + pBucket->numOfToken, incNum, deltaToken, pBucket->quotaRemain, incSize, deltaQuota, now, id); } }