diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 676e85eadc..7a1ac052a6 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -155,10 +155,6 @@ const char* streamQueueItemGetTypeStr(int32_t type) { } } -static void doLaunchSinkTask(void* param, void* tmrId) { - -} - int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize) { int32_t retryTimes = 0; @@ -172,7 +168,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { - stDebug("s-task:%s no available token in bucket for sink data, wait", id); + stDebug("s-task:%s no available token in bucket for sink data, wait for 50ms", id); + taosMsleep(50); // if (streamTaskAllUpstreamClosed(pTask)) { // int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); @@ -433,7 +430,7 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) { pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity); } - if (incNum > 0) { + if (incNum > 0 || incSize > 0) { stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64 " idle for %.2f Sec, %s", pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id);