From 0979ff944b6627aaa63b3a9efb5938109e7b022a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Nov 2023 17:33:30 +0800 Subject: [PATCH] fix(stream): update the fill-time for quota limitation. --- source/libs/stream/src/streamQueue.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 239ab883ae..22a1c22be4 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -160,7 +160,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); - taosMsleep(10); +// taosMsleep(10); return TSDB_CODE_SUCCESS; } @@ -410,10 +410,11 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) { double incSize = (delta / 1000.0) * pBucket->quotaRate; if (incSize > 0) { pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); + pBucket->fillTimestamp = now; } if (incNum > 0 || incSize > 0) { - stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64 + stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.3fMiB, ts:%" PRId64 " idle for %.2f Sec, %s", pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id); }