fix(stream): initialization tasks before start all tasks.
This commit is contained in:
parent
61b08259ba
commit
69567799eb
|
@ -1395,25 +1395,23 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
|
||||||
|
|
||||||
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
|
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
|
||||||
// initialization , when the operation of check downstream tasks status is executed far quickly.
|
// initialization , when the operation of check downstream tasks status is executed far quickly.
|
||||||
for(int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
||||||
streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.fillHistory == 1) {
|
if (pTask->pBackend == NULL) { // TODO: add test cases for this
|
||||||
if (pTask->pBackend == NULL) { // TODO: add test cases for this
|
code = expandFn(pTask);
|
||||||
code = expandFn(pTask);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
|
||||||
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
|
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
|
||||||
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
stDebug("s-task:0x%x vgId:%d fill-history task backend has initializied already", pTaskId->taskId, vgId);
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:0x%x vgId:%d fill-history task backend has initialized already", pTaskId->taskId, vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1430,15 +1428,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
||||||
if (pTask->pBackend == NULL) {
|
|
||||||
code = expandFn(pTask);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
|
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// fill-history task can only be launched by related stream tasks.
|
// fill-history task can only be launched by related stream tasks.
|
||||||
if (pTask->info.fillHistory == 1) {
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
|
Loading…
Reference in New Issue