fix(stream): disable token bucket.
This commit is contained in:
parent
9a559b209a
commit
e4ac835554
|
@ -199,6 +199,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
||||||
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
|
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
|
||||||
|
|
||||||
|
// todo: this may be not the first one
|
||||||
pTask->chkInfo.startTs = taosGetTimestampMs();
|
pTask->chkInfo.startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
// update the child Id for downstream tasks
|
// update the child Id for downstream tasks
|
||||||
|
|
|
@ -942,7 +942,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClear(pTask->pRspMsgList);
|
taosArrayClear(pTask->pRspMsgList);
|
||||||
stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
|
stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
|
||||||
num);
|
num);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -580,7 +580,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer);
|
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer);
|
||||||
|
|
||||||
if (ver != pTask->chkInfo.checkpointVer) {
|
if (ver != pTask->chkInfo.checkpointVer) {
|
||||||
stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 " , currentVer:%" PRId64,
|
stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 ", nextProcessVer:%" PRId64,
|
||||||
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer);
|
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer);
|
||||||
pTask->chkInfo.checkpointVer = ver;
|
pTask->chkInfo.checkpointVer = ver;
|
||||||
}
|
}
|
||||||
|
|
|
@ -191,11 +191,11 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
||||||
}
|
}
|
||||||
|
|
||||||
STokenBucket* pBucket = pTask->pTokenBucket;
|
STokenBucket* pBucket = pTask->pTokenBucket;
|
||||||
if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this execution
|
// if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this execution
|
||||||
// stInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr,
|
// stInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr,
|
||||||
// pBucket->capacity, pBucket->rate);
|
// pBucket->capacity, pBucket->rate);
|
||||||
return TSDB_CODE_SUCCESS;
|
// return TSDB_CODE_SUCCESS;
|
||||||
}
|
// }
|
||||||
|
|
||||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
|
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
|
||||||
if (qItem == NULL) {
|
if (qItem == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue