diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ab69a135f1..891e0aa142 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,6 +21,7 @@ #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data #define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms #define MIN_INVOKE_INTERVAL 50 // 50ms +#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); @@ -244,6 +245,10 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* } } +static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) { + return (SScanhistoryDataInfo){code, idleTime}; +} + SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); @@ -260,7 +265,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { if (streamTaskShouldPause(pTask)) { stDebug("s-task:%s paused from the scan-history task", id); // quit from step1, not continue to handle the step2 - return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); @@ -275,7 +280,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { if(streamTaskShouldStop(pTask)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } // dispatch the generated results @@ -285,38 +290,21 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { // downstream task input queue is full, try in 5sec if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { - return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); } if (finished) { - return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0); } if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, pTask->info.fillHistory, el / 1000.0); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100); } } } -// wait for the stream task to be idle -static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { - const char* id = pTask->id.idStr; - - int64_t st = taosGetTimestampMs(); - while (!streamTaskIsIdle(pStreamTask)) { - stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel, - pStreamTask->id.idStr); - taosMsleep(100); - } - - double el = (taosGetTimestampMs() - st) / 1000.0; - if (el > 0) { - stDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", id, pStreamTask->id.idStr, el); - } -} - int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; const char* id = pTask->id.idStr;