From a1e554fbf32b68496f63a5a9c9a0d5fe41d25dfd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 15:05:13 +0800 Subject: [PATCH] refactor: exec directly not asynchnoized. --- source/dnode/vnode/src/tq/tq.c | 3 +-- source/libs/stream/src/streamExec.c | 36 ++++++----------------------- 2 files changed, 8 insertions(+), 31 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ddd0c49649..815e9647b5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1274,8 +1274,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->tsInfo.step2Start = taosGetTimestampMs(); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); appendTranstateIntoInputQ(pTask); - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - streamSchedExec(pTask); + streamTryExec(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 37c5808e02..ccfa331661 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -661,36 +661,14 @@ int32_t streamTryExec(SStreamTask* pTask) { } // todo the task should be commit here -// if (taosQueueEmpty(pTask->inputQueue->queue)) { - // fill-history WAL scan has completed -// if (pTask->status.transferState) { -// code = streamTransferStateToStreamTask(pTask); -// if (code != TSDB_CODE_SUCCESS) { -// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); -// return code; -// } + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->status.schedStatus); - // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by - // call this function (streamExecForAll) directly. - // code = streamExecForAll(pTask); - // if (code < 0) { - // do nothing - // } -// } - -// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); -// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, -// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); -// } else { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); - - if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) || - streamTaskShouldPause(&pTask->status))) { - streamSchedExec(pTask); - } -// } + if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) || + streamTaskShouldPause(&pTask->status))) { + streamSchedExec(pTask); + } } else { qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);