diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5e099712ca..ee34648a47 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -807,6 +807,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } + int64_t st = taosGetTimestampMs(); + EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (ret == EXEC_AFTER_IDLE) { streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); @@ -841,8 +843,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { continue; } - int64_t st = taosGetTimestampMs(); - // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { pTask->execInfo.sink.dataSize += blockSize; @@ -873,6 +873,13 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (code) { return code; } + + double el = (taosGetTimestampMs() - st) / 1000.0; + if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore + stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id); + streamTaskSetIdleInfo(pTask, 500); + return code; + } } } }