From b263c480c7aa4eb9f40a702836732939f89d6b11 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Sat, 22 Apr 2023 14:58:59 +0800 Subject: [PATCH] [migration] add back instance init error check --- plugin/migration/pipeline.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 49b76b0e..18832f6f 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -91,8 +91,12 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro processor := DispatcherProcessor{ id: util.GetUUID(), config: &cfg, + state: map[string]DispatcherState{}, + } + err := processor.refreshInstanceJobsFromES() + if err != nil { + return nil, err } - processor.refreshInstanceJobsFromES() processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName) return &processor, nil @@ -1311,7 +1315,7 @@ func (p *DispatcherProcessor) getInstanceState(instanceID string) DispatcherStat return p.state[instanceID] } -func (p *DispatcherProcessor) refreshInstanceJobsFromES() { +func (p *DispatcherProcessor) refreshInstanceJobsFromES() error { log.Debug("refreshing instance state from ES") p.stateLock.Lock() defer p.stateLock.Unlock() @@ -1319,9 +1323,11 @@ func (p *DispatcherProcessor) refreshInstanceJobsFromES() { state, err := p.getInstanceTaskState() if err != nil { log.Errorf("failed to get instance task state, err: %v", err) - return + return err } p.state = state + + return nil } func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) {