diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 190c60289f..00fe1207dd 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1395,25 +1395,23 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without // initialization , when the operation of check downstream tasks status is executed far quickly. - for(int32_t i = 0; i < numOfTasks; ++i) { + for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); continue; } - if (pTask->info.fillHistory == 1) { - if (pTask->pBackend == NULL) { // TODO: add test cases for this - code = expandFn(pTask); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); - streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); - } - } else { - stDebug("s-task:0x%x vgId:%d fill-history task backend has initializied already", pTaskId->taskId, vgId); + if (pTask->pBackend == NULL) { // TODO: add test cases for this + code = expandFn(pTask); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); + streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); } + } else { + stDebug("s-task:0x%x vgId:%d fill-history task backend has initialized already", pTaskId->taskId, vgId); } streamMetaReleaseTask(pMeta, pTask); @@ -1430,15 +1428,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa } STaskExecStatisInfo* pInfo = &pTask->execInfo; - if (pTask->pBackend == NULL) { - code = expandFn(pTask); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); - streamMetaReleaseTask(pMeta, pTask); - continue; - } - } // fill-history task can only be launched by related stream tasks. if (pTask->info.fillHistory == 1) {