fix(stream): fix syntax error.

This commit is contained in:
Haojun Liao 2023-10-07 14:56:09 +08:00
parent 7ce3d2c200
commit d11c0a113c
1 changed files with 4 additions and 4 deletions

View File

@ -28,7 +28,7 @@ typedef struct SQueueReader {
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
} SQueueReader;
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket);
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
static void streamTaskPutbackToken(STokenBucket* pBucket);
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
@ -166,7 +166,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
*blockSize = 0;
// no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket))) {
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket, pTask->id.idStr))) {
stDebug("s-task:%s no available token in bucket for sink data, wait", id);
return TSDB_CODE_SUCCESS;
}
@ -422,8 +422,8 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
}
}
bool streamTaskExtractAvailableToken(STokenBucket* pBucket) {
fillTokenBucket(pBucket);
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
fillTokenBucket(pBucket, id);
if (pBucket->numOfToken > 0) {
if (pBucket->quotaRemain > 0) {