fix(stream): disable the time slice usage limitation for stream task.

This commit is contained in:
Haojun Liao 2023-11-28 09:28:43 +08:00
parent 57b8388dad
commit 1df29b5d6c
1 changed files with 3 additions and 3 deletions

View File

@ -281,12 +281,12 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
} }
// dispatch the generated results // dispatch the generated results
int32_t code = handleResultBlocks(pTask, pRes, size); /*int32_t code = */handleResultBlocks(pTask, pRes, size);
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
// downstream task input queue is full, try in 5sec // downstream task input queue is full, try in 5sec
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000};
} }
@ -294,7 +294,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};
} }
if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
pTask->info.fillHistory, el / 1000.0); pTask->info.fillHistory, el / 1000.0);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100};