From 49bc3924fb06bd3958a2d2a70f338269b1806d7c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Jan 2024 15:17:14 +0800 Subject: [PATCH] fix(stream): update the check order. --- source/libs/stream/src/streamExec.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index eb5ce87b1c..27748c84a0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -776,29 +776,29 @@ int32_t streamResumeTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; while (1) { - /*int32_t code = */doStreamExecTask(pTask); + /*int32_t code = */ doStreamExecTask(pTask); taosThreadMutexLock(&pTask->lock); - // check if this task needs to be idle for a while - if (pTask->status.schedIdleTime > 0) { - schedTaskInFuture(pTask); - + int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); + if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + clearTaskSchedInfo(pTask); taosThreadMutexUnlock(&pTask->lock); + setLastExecTs(pTask, taosGetTimestampMs()); + + char* p = streamTaskGetStatus(pTask)->name; + stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p, + pTask->status.schedStatus, pTask->status.lastExecTs); + return 0; } else { - int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); + // check if this task needs to be idle for a while + if (pTask->status.schedIdleTime > 0) { + schedTaskInFuture(pTask); - if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); taosThreadMutexUnlock(&pTask->lock); - setLastExecTs(pTask, taosGetTimestampMs()); - - char* p = streamTaskGetStatus(pTask)->name; - stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p, - pTask->status.schedStatus, pTask->status.lastExecTs); - return 0; } }