refactor(stream): remove unnecessary streamschedTask function to improve the performance.

This commit is contained in:
Haojun Liao 2023-09-13 11:02:38 +08:00
parent a678e722ce
commit 01d104706e
2 changed files with 14 additions and 14 deletions

View File

@ -256,8 +256,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
} }
tDeleteStreamDispatchReq(pReq); tDeleteStreamDispatchReq(pReq);
streamSchedExec(pTask); streamTryExec(pTask);
return 0; return 0;
} }

View File

@ -621,19 +621,20 @@ int32_t streamTryExec(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
if (schedStatus == TASK_SCHED_STATUS__WAITING) { if (schedStatus == TASK_SCHED_STATUS__WAITING) {
while (1) {
int32_t code = streamExecForAll(pTask); int32_t code = streamExecForAll(pTask);
if (code < 0) { // todo this status shoudl be removed if (code < 0) { // todo this status shoudl be removed
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1; return -1;
} }
if (taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) ||
streamTaskShouldPause(&pTask->status)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
pTask->status.schedStatus); streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
return 0;
if (!(taosQueueEmpty(pTask->inputInfo.queue->pQueue) || streamTaskShouldStop(&pTask->status) || }
streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
} }
} else { } else {
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,