fix(stream): wait for a while when no available token.
This commit is contained in:
parent
45bb3f50d7
commit
e4e4c5a36e
|
@ -155,10 +155,6 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
static void doLaunchSinkTask(void* param, void* tmrId) {
|
||||
|
||||
}
|
||||
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
int32_t* blockSize) {
|
||||
int32_t retryTimes = 0;
|
||||
|
@ -172,7 +168,8 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
|
||||
// no available token in bucket for sink task, let's wait for a little bit
|
||||
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
|
||||
stDebug("s-task:%s no available token in bucket for sink data, wait", id);
|
||||
stDebug("s-task:%s no available token in bucket for sink data, wait for 50ms", id);
|
||||
taosMsleep(50);
|
||||
|
||||
// if (streamTaskAllUpstreamClosed(pTask)) {
|
||||
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
|
@ -433,7 +430,7 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
|
|||
pBucket->quotaRemain = TMIN(pBucket->quotaRemain + incSize, pBucket->quotaCapacity);
|
||||
}
|
||||
|
||||
if (incNum > 0) {
|
||||
if (incNum > 0 || incSize > 0) {
|
||||
stDebug("new token and capacity available, current token:%d inc:%d, current quota:%.2fMiB inc:%.2fMiB, ts:%" PRId64
|
||||
" idle for %.2f Sec, %s",
|
||||
pBucket->numOfToken, incNum, pBucket->quotaRemain, incSize, now, delta / 1000.0, id);
|
||||
|
|
Loading…
Reference in New Issue