From 9d0a4f88f370bc1f24cbba5905e5bb4ab20d0684 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Feb 2025 21:47:39 +0800 Subject: [PATCH] refactor(stream): idle for 1s when exec for 5s. --- source/libs/stream/src/streamExec.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5e099712ca..ee34648a47 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -807,6 +807,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } + int64_t st = taosGetTimestampMs(); + EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (ret == EXEC_AFTER_IDLE) { streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); @@ -841,8 +843,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { continue; } - int64_t st = taosGetTimestampMs(); - // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { pTask->execInfo.sink.dataSize += blockSize; @@ -873,6 +873,13 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (code) { return code; } + + double el = (taosGetTimestampMs() - st) / 1000.0; + if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore + stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id); + streamTaskSetIdleInfo(pTask, 500); + return code; + } } } }