fix(stream): remove the invalid set of scheduler status.
This commit is contained in:
parent
2806fe1c56
commit
e8c9a019a4
|
@ -593,7 +593,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
bool streamTaskShouldStop(const SStreamStatus* pStatus);
|
||||||
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
bool streamTaskShouldPause(const SStreamStatus* pStatus);
|
||||||
bool streamTaskIsIdle(const SStreamTask* pTask);
|
bool streamTaskIsIdle(const SStreamTask* pTask);
|
||||||
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
|
|
||||||
|
|
||||||
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
||||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
|
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
|
||||||
|
|
|
@ -646,16 +646,6 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
|
||||||
pTask->status.taskStatus == TASK_STATUS__DROPPING);
|
pTask->status.taskStatus == TASK_STATUS__DROPPING);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
|
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
|
||||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
|
||||||
|
|
||||||
// 1. notify all downstream tasks to transfer executor state after handle all history blocks.
|
|
||||||
appendTranstateIntoInputQ(pTask);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTryExec(SStreamTask* pTask) {
|
int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
// this function may be executed by multi-threads, so status check is required.
|
// this function may be executed by multi-threads, so status check is required.
|
||||||
int8_t schedStatus =
|
int8_t schedStatus =
|
||||||
|
|
|
@ -400,9 +400,7 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->status.appendTranstateBlock = true;
|
pTask->status.appendTranstateBlock = true;
|
||||||
|
|
||||||
qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus);
|
qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus);
|
||||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
|
||||||
streamSchedExec(pTask);
|
streamSchedExec(pTask);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue