From 61b08259ba2672c186dd760785660ffb4dd014bc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 May 2024 15:38:16 +0800 Subject: [PATCH] fix(stream): initialization fill-history tasks before start all other stream tasks. --- source/libs/stream/src/streamMeta.c | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 519800be28..190c60289f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1392,6 +1392,33 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa // broadcast the check downstream tasks msg numOfTasks = taosArrayGetSize(pTaskList); + + // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without + // initialization , when the operation of check downstream tasks status is executed far quickly. + for(int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); + streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId); + continue; + } + + if (pTask->info.fillHistory == 1) { + if (pTask->pBackend == NULL) { // TODO: add test cases for this + code = expandFn(pTask); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId); + streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs); + } + } else { + stDebug("s-task:0x%x vgId:%d fill-history task backend has initializied already", pTaskId->taskId, vgId); + } + } + + streamMetaReleaseTask(pMeta, pTask); + } + for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);