refactor(stream): idle for 1s when exec for 5s.
This commit is contained in:
parent
81b718ec69
commit
9d0a4f88f3
|
@ -807,6 +807,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
|
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
|
||||||
if (ret == EXEC_AFTER_IDLE) {
|
if (ret == EXEC_AFTER_IDLE) {
|
||||||
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
|
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
|
||||||
|
@ -841,8 +843,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
|
||||||
|
|
||||||
// here only handle the data block sink operation
|
// here only handle the data block sink operation
|
||||||
if (type == STREAM_INPUT__DATA_BLOCK) {
|
if (type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
pTask->execInfo.sink.dataSize += blockSize;
|
pTask->execInfo.sink.dataSize += blockSize;
|
||||||
|
@ -873,6 +873,13 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
|
if (el > 5.0) { // elapsed more than 5 sec, not occupy the CPU anymore
|
||||||
|
stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id);
|
||||||
|
streamTaskSetIdleInfo(pTask, 500);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue