[migration] add back instance init error check

This commit is contained in:
Kassian Sun 2023-04-22 14:58:59 +08:00
parent 28a6c55671
commit b263c480c7
1 changed files with 9 additions and 3 deletions

View File

@ -91,8 +91,12 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
processor := DispatcherProcessor{
id: util.GetUUID(),
config: &cfg,
state: map[string]DispatcherState{},
}
err := processor.refreshInstanceJobsFromES()
if err != nil {
return nil, err
}
processor.refreshInstanceJobsFromES()
processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName)
return &processor, nil
@ -1311,7 +1315,7 @@ func (p *DispatcherProcessor) getInstanceState(instanceID string) DispatcherStat
return p.state[instanceID]
}
func (p *DispatcherProcessor) refreshInstanceJobsFromES() {
func (p *DispatcherProcessor) refreshInstanceJobsFromES() error {
log.Debug("refreshing instance state from ES")
p.stateLock.Lock()
defer p.stateLock.Unlock()
@ -1319,9 +1323,11 @@ func (p *DispatcherProcessor) refreshInstanceJobsFromES() {
state, err := p.getInstanceTaskState()
if err != nil {
log.Errorf("failed to get instance task state, err: %v", err)
return
return err
}
p.state = state
return nil
}
func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) {