fix(stream): update the check order.

This commit is contained in:
Haojun Liao 2024-01-25 15:17:14 +08:00
parent 2ecc202cb3
commit 49bc3924fb
1 changed files with 14 additions and 14 deletions

View File

@ -779,18 +779,10 @@ int32_t streamResumeTask(SStreamTask* pTask) {
/*int32_t code = */ doStreamExecTask(pTask); /*int32_t code = */ doStreamExecTask(pTask);
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
// check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) {
schedTaskInFuture(pTask);
taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
return 0;
} else {
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
clearTaskSchedInfo(pTask);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs()); setLastExecTs(pTask, taosGetTimestampMs());
@ -800,6 +792,14 @@ int32_t streamResumeTask(SStreamTask* pTask) {
pTask->status.schedStatus, pTask->status.lastExecTs); pTask->status.schedStatus, pTask->status.lastExecTs);
return 0; return 0;
} else {
// check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) {
schedTaskInFuture(pTask);
taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
return 0;
} }
} }