From 23fb26798f40967fa69a65c3efbdbb0d6a153869 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 Oct 2023 19:47:49 +0800 Subject: [PATCH] fix(stream): sleep for 10ms --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/streamQueue.c | 15 +-------------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5c5a2e6adb..17c3fbf9c6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -358,7 +358,7 @@ typedef struct STaskOutputInfo { STaskSinkFetch fetchSink; }; - void* pTimer; // timer for launch sink tasks +// void* pTimer; // timer for launch sink tasks int8_t type; STokenBucket* pTokenBucket; } STaskOutputInfo; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 7a1ac052a6..776b69848c 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -169,20 +169,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 50ms", id); - taosMsleep(50); - -// if (streamTaskAllUpstreamClosed(pTask)) { -// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); -// stDebug("s-task:%s try start task in %dms in tmr, since all upstream inputQ is closed, ref:%d", pTask->id.idStr, -// SINK_TASK_IDLE_DURATION, ref); -// -// if (pTask->outputInfo.pTimer == NULL) { -// pTask->outputInfo.pTimer = taosTmrStart(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer); -// } else { -// taosTmrReset(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer, &pTask->outputInfo.pTimer); -// } -// } - + taosMsleep(10); return TSDB_CODE_SUCCESS; }