Merge pull request #22395 from taosdata/fix/liaohj

fix(stream): execute the stream task directly
This commit is contained in:
Haojun Liao 2023-08-10 16:21:49 +08:00 committed by GitHub
commit 8341fa6878
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 5 deletions

View File

@ -592,14 +592,21 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (pTask->status.transferState) {
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
return code;
}
streamSchedExec(pTask);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus);
// the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
// call this function (streamExecForAll) directly.
code = streamExecForAll(pTask);
if (code < 0) {
// do nothing
}
}
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),