[migration] pipeline_task use execution_instance_id from labels

This commit is contained in:
Kassian Sun 2023-05-19 15:31:42 +08:00
parent be09c69212
commit a00bc59d6b
1 changed files with 3 additions and 56 deletions

View File

@ -25,11 +25,6 @@ type processor struct {
LogIndexName string
}
var (
// task types with pipline as children
parentTaskTypes = []string{"index_migration", "index_comparison"}
)
func NewProcessor(elasticsearch, indexName, logIndexName string) migration_model.Processor {
return &processor{
Elasticsearch: elasticsearch,
@ -187,13 +182,12 @@ func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.In
}
func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance model.Instance, err error) {
parentTask, err := p.getParentTask(taskItem)
instanceID := taskItem.Metadata.Labels["execution_instance_id"]
instance.ID, err = util.ExtractString(instanceID)
if err != nil {
log.Error("failed to get execution_instance_id")
return
}
// Use sub task's execution instance
instanceID := parentTask.Metadata.Labels["execution_instance_id"]
instance.ID, _ = util.ExtractString(instanceID)
_, err = orm.Get(&instance)
if err != nil {
log.Errorf("failed to get instance, err: %v", err)
@ -202,53 +196,6 @@ func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance
return
}
func (p *processor) getParentTask(taskItem *task.Task) (*task.Task, error) {
queryDsl := util.MapStr{
"size": 1,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"_id": taskItem.ParentId,
},
},
{
"terms": util.MapStr{
"metadata.type": parentTaskTypes,
},
},
},
},
},
}
esClient := elastic.GetClient(p.Elasticsearch)
res, err := esClient.SearchWithRawQueryDSL(p.IndexName, util.MustToJSONBytes(queryDsl))
if err != nil {
log.Errorf("query tasks from es failed, err: %v", err)
return nil, err
}
if res.GetTotal() == 0 {
return nil, errors.New("no parent task found")
}
for _, hit := range res.Hits.Hits {
buf, err := util.ToJSONBytes(hit.Source)
if err != nil {
log.Errorf("marshal task json failed, err: %v", err)
return nil, err
}
tk := task.Task{}
err = util.FromJSONBytes(buf, &tk)
if err != nil {
log.Errorf("unmarshal task json failed, err: %v", err)
return nil, err
}
return &tk, nil
}
return nil, errors.New("not reachable")
}
func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]util.MapStr, error) {
query := util.MapStr{
"size": 999,