diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index b213c207..70ff0484 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -25,11 +25,6 @@ type processor struct { LogIndexName string } -var ( - // task types with pipline as children - parentTaskTypes = []string{"index_migration", "index_comparison"} -) - func NewProcessor(elasticsearch, indexName, logIndexName string) migration_model.Processor { return &processor{ Elasticsearch: elasticsearch, @@ -187,13 +182,12 @@ func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.In } func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance model.Instance, err error) { - parentTask, err := p.getParentTask(taskItem) + instanceID := taskItem.Metadata.Labels["execution_instance_id"] + instance.ID, err = util.ExtractString(instanceID) if err != nil { + log.Error("failed to get execution_instance_id") return } - // Use sub task's execution instance - instanceID := parentTask.Metadata.Labels["execution_instance_id"] - instance.ID, _ = util.ExtractString(instanceID) _, err = orm.Get(&instance) if err != nil { log.Errorf("failed to get instance, err: %v", err) @@ -202,53 +196,6 @@ func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance return } -func (p *processor) getParentTask(taskItem *task.Task) (*task.Task, error) { - queryDsl := util.MapStr{ - "size": 1, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "_id": taskItem.ParentId, - }, - }, - { - "terms": util.MapStr{ - "metadata.type": parentTaskTypes, - }, - }, - }, - }, - }, - } - - esClient := elastic.GetClient(p.Elasticsearch) - res, err := esClient.SearchWithRawQueryDSL(p.IndexName, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Errorf("query tasks from es failed, err: %v", err) - return nil, err - } - if res.GetTotal() == 0 { - return nil, errors.New("no parent task found") - } - for _, hit := range res.Hits.Hits { - buf, err := util.ToJSONBytes(hit.Source) - if err != nil { - log.Errorf("marshal task json failed, err: %v", err) - return nil, err - } - tk := task.Task{} - err = util.FromJSONBytes(buf, &tk) - if err != nil { - log.Errorf("unmarshal task json failed, err: %v", err) - return nil, err - } - return &tk, nil - } - return nil, errors.New("not reachable") -} - func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]util.MapStr, error) { query := util.MapStr{ "size": 999,