From 9e9d83d4a13b7bdfbce7731c60362174df3b2a3c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 17 Aug 2023 11:57:04 +0800 Subject: [PATCH] refactor: do some internal refactor --- source/libs/stream/src/streamExec.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d1827993a5..20facb5952 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -90,6 +90,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return 0; } + if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); + taosMsleep(1000); + continue; + } + SSDataBlock* output = NULL; uint64_t ts = 0; if ((code = qExecTask(pExecutor, &output, &ts)) < 0) { @@ -570,12 +576,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } - if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr); - taosMsleep(1000); - continue; - } - // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id);