diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e0ee01d345..8ab8f3852e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -281,12 +281,12 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { } // dispatch the generated results - int32_t code = handleResultBlocks(pTask, pRes, size); + /*int32_t code = */handleResultBlocks(pTask, pRes, size); int64_t el = taosGetTimestampMs() - st; // downstream task input queue is full, try in 5sec - if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; } @@ -294,7 +294,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; } - if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { + if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, pTask->info.fillHistory, el / 1000.0); return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};