fix(stream): initialization fill-history tasks before start all other stream tasks.

This commit is contained in:
Haojun Liao 2024-05-23 15:38:16 +08:00
parent f146ae4198
commit 61b08259ba
1 changed files with 27 additions and 0 deletions

View File

@ -1392,6 +1392,33 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
// broadcast the check downstream tasks msg
numOfTasks = taosArrayGetSize(pTaskList);
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization , when the operation of check downstream tasks status is executed far quickly.
for(int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
continue;
}
if (pTask->info.fillHistory == 1) {
if (pTask->pBackend == NULL) { // TODO: add test cases for this
code = expandFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
}
} else {
stDebug("s-task:0x%x vgId:%d fill-history task backend has initializied already", pTaskId->taskId, vgId);
}
}
streamMetaReleaseTask(pMeta, pTask);
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);