diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index f6b89186..9ce2c9c5 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -1333,12 +1333,11 @@ func (p *DispatcherProcessor) refreshInstanceJobsFromES() error { func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) { var err error instance := model.Instance{} - instanceID := taskItem.Metadata.Labels["execution_instance_id"] - if instanceID == "" { + instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + if instance.ID == "" { log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID) return } - instance.ID, _ = util.ExtractString(instanceID) _, err = orm.Get(&instance) if err != nil { log.Errorf("failed to get instance, err: %v", err) diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index c34d084f..8e579bff 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -218,8 +218,9 @@ func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.In return } err = instance.DeletePipeline(taskItem.ID) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "task not found") { log.Errorf("delete pipeline failed, err: %v", err) + return } return instance, nil