refactor: exec directly not asynchnoized.
This commit is contained in:
parent
91710b0c0f
commit
a1e554fbf3
|
@ -1274,8 +1274,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
||||
appendTranstateIntoInputQ(pTask);
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
streamSchedExec(pTask);
|
||||
streamTryExec(pTask); // exec directly
|
||||
} else {
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||
|
|
|
@ -661,36 +661,14 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
// todo the task should be commit here
|
||||
// if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||
// fill-history WAL scan has completed
|
||||
// if (pTask->status.transferState) {
|
||||
// code = streamTransferStateToStreamTask(pTask);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
// return code;
|
||||
// }
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
pTask->status.schedStatus);
|
||||
|
||||
// the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
|
||||
// call this function (streamExecForAll) directly.
|
||||
// code = streamExecForAll(pTask);
|
||||
// if (code < 0) {
|
||||
// do nothing
|
||||
// }
|
||||
// }
|
||||
|
||||
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
|
||||
// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||
// } else {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
pTask->status.schedStatus);
|
||||
|
||||
if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
|
||||
streamTaskShouldPause(&pTask->status))) {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
// }
|
||||
if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
|
||||
streamTaskShouldPause(&pTask->status))) {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
} else {
|
||||
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||
|
|
Loading…
Reference in New Issue