fix(stream): fix syntax error.
This commit is contained in:
parent
59e284c332
commit
e95aea84ae
|
@ -28,7 +28,7 @@ typedef struct SQueueReader {
|
|||
int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms
|
||||
} SQueueReader;
|
||||
|
||||
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket);
|
||||
static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id);
|
||||
static void streamTaskPutbackToken(STokenBucket* pBucket);
|
||||
static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes);
|
||||
|
||||
|
@ -166,7 +166,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
*blockSize = 0;
|
||||
|
||||
// no available token in bucket for sink task, let's wait for a little bit
|
||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket))) {
|
||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket, pTask->id.idStr))) {
|
||||
stDebug("s-task:%s no available token in bucket for sink data, wait", id);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -422,8 +422,8 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
|
|||
}
|
||||
}
|
||||
|
||||
bool streamTaskExtractAvailableToken(STokenBucket* pBucket) {
|
||||
fillTokenBucket(pBucket);
|
||||
bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id) {
|
||||
fillTokenBucket(pBucket, id);
|
||||
|
||||
if (pBucket->numOfToken > 0) {
|
||||
if (pBucket->quotaRemain > 0) {
|
||||
|
|
Loading…
Reference in New Issue