diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 8b595c2593..ce90f29451 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -28,7 +28,7 @@ typedef struct SQueueReader { int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms } SQueueReader; -static bool streamTaskExtractAvailableToken(STokenBucket* pBucket); +static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id); static void streamTaskPutbackToken(STokenBucket* pBucket); static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); @@ -166,7 +166,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *blockSize = 0; // no available token in bucket for sink task, let's wait for a little bit - if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket))) { + if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait", id); return TSDB_CODE_SUCCESS; } @@ -422,8 +422,8 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) { } } -bool streamTaskExtractAvailableToken(STokenBucket* pBucket) { - fillTokenBucket(pBucket); +bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) { + fillTokenBucket(pBucket, id); if (pBucket->numOfToken > 0) { if (pBucket->quotaRemain > 0) {