fix(stream): sleep for 10ms

This commit is contained in:
Haojun Liao 2023-10-07 19:47:49 +08:00
parent 9af8ce21ed
commit 9a7167aaba
2 changed files with 2 additions and 15 deletions

View File

@ -358,7 +358,7 @@ typedef struct STaskOutputInfo {
STaskSinkFetch fetchSink; STaskSinkFetch fetchSink;
}; };
void* pTimer; // timer for launch sink tasks // void* pTimer; // timer for launch sink tasks
int8_t type; int8_t type;
STokenBucket* pTokenBucket; STokenBucket* pTokenBucket;
} STaskOutputInfo; } STaskOutputInfo;

View File

@ -169,20 +169,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
// no available token in bucket for sink task, let's wait for a little bit // no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
stDebug("s-task:%s no available token in bucket for sink data, wait for 50ms", id); stDebug("s-task:%s no available token in bucket for sink data, wait for 50ms", id);
taosMsleep(50); taosMsleep(10);
// if (streamTaskAllUpstreamClosed(pTask)) {
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
// stDebug("s-task:%s try start task in %dms in tmr, since all upstream inputQ is closed, ref:%d", pTask->id.idStr,
// SINK_TASK_IDLE_DURATION, ref);
//
// if (pTask->outputInfo.pTimer == NULL) {
// pTask->outputInfo.pTimer = taosTmrStart(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer);
// } else {
// taosTmrReset(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer, &pTask->outputInfo.pTimer);
// }
// }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }