From 01d104706efde7e3ecd39c2fd0bf09e1820b3ed6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Sep 2023 11:02:38 +0800 Subject: [PATCH] refactor(stream): remove unnecessary streamschedTask function to improve the performance. --- source/libs/stream/src/stream.c | 3 +-- source/libs/stream/src/streamExec.c | 25 +++++++++++++------------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d1bf6a91c5..d16822be60 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -256,8 +256,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } tDeleteStreamDispatchReq(pReq); - streamSchedExec(pTask); - + streamTryExec(pTask); return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f03a6a32d4..16fc54d8be 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -621,19 +621,20 @@ int32_t streamTryExec(SStreamTask* pTask) { const char* id = pTask->id.idStr; if (schedStatus == TASK_SCHED_STATUS__WAITING) { - int32_t code = streamExecForAll(pTask); - if (code < 0) { // todo this status shoudl be removed - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); - return -1; - } + while (1) { + int32_t code = streamExecForAll(pTask); + if (code < 0) { // todo this status shoudl be removed + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); + return -1; + } - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); - - if (!(taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) || - streamTaskShouldPause(&pTask->status))) { - streamSchedExec(pTask); + if (taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) || + streamTaskShouldPause(&pTask->status)) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, + streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); + return 0; + } } } else { qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,