From e8c9a019a4938671fbc0a77fe4e523139c987dd8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 11:41:24 +0800 Subject: [PATCH] fix(stream): remove the invalid set of scheduler status. --- include/libs/stream/tstream.h | 1 - source/libs/stream/src/streamExec.c | 10 ---------- source/libs/stream/src/streamRecover.c | 2 -- 3 files changed, 13 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d3b670d0ec..02bb65b762 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -593,7 +593,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); -int32_t streamTaskEndScanWAL(SStreamTask* pTask); SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a3ff752bc5..37c5808e02 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -646,16 +646,6 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { pTask->status.taskStatus == TASK_STATUS__DROPPING); } -int32_t streamTaskEndScanWAL(SStreamTask* pTask) { - const char* id = pTask->id.idStr; - double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); - - // 1. notify all downstream tasks to transfer executor state after handle all history blocks. - appendTranstateIntoInputQ(pTask); - return TSDB_CODE_SUCCESS; -} - int32_t streamTryExec(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. int8_t schedStatus = diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 72dae735e1..42ff9b9b4e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -400,9 +400,7 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { } pTask->status.appendTranstateBlock = true; - qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus); - pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; streamSchedExec(pTask); return TSDB_CODE_SUCCESS;