checking instance is avaiable when there is no pipeline logs

This commit is contained in:
liugq 2023-03-29 16:17:19 +08:00
parent 0f5626c41d
commit 4871fbcb12
1 changed files with 28 additions and 9 deletions

View File

@ -429,6 +429,21 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
return err return err
} }
if len(searchRes.Hits.Hits) == 0 { if len(searchRes.Hits.Hits) == 0 {
//check instance available
if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok {
inst := model.Instance{}
inst.ID = instID
_, err = orm.Get(&inst)
if err != nil {
return err
}
err = inst.TryConnectWithTimeout(time.Second)
if err != nil {
if errors.Is(err, syscall.ECONNREFUSED) {
return fmt.Errorf("stoping task [%s] error: %w", taskItem.ID, err)
}
}
}
return nil return nil
} }
MainLoop: MainLoop:
@ -930,7 +945,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
taskItem.Config = clusterMigrationTask taskItem.Config = clusterMigrationTask
}() }()
esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id)
esTargetClient := elastic.GetClient(clusterMigrationTask.Cluster.Target.Id) targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id)
for _, index := range clusterMigrationTask.Indices { for _, index := range clusterMigrationTask.Indices {
source := util.MapStr{ source := util.MapStr{
@ -971,23 +986,25 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
}, },
}) })
} else { } else {
if esSourceClient.GetMajorVersion() >= 8 { if targetType != "" {
source["type_rename"] = util.MapStr{ source["type_rename"] = util.MapStr{
"*": index.Target.DocType, "*": index.Target.DocType,
} }
} }
} }
source["query_dsl"] = util.MapStr{ if len(must) > 0 {
"bool": util.MapStr{ source["query_dsl"] = util.MapStr{
"must": must, "bool": util.MapStr{
}, "must": must,
},
}
} }
} }
var targetMust []interface{} var targetMust []interface{}
if index.RawFilter != nil { if index.RawFilter != nil {
targetMust = append(targetMust, index.RawFilter) targetMust = append(targetMust, index.RawFilter)
} }
if index.Target.DocType != "" && esTargetClient.GetMajorVersion() < 8 { if index.Target.DocType != "" && targetType != "" {
targetMust = append(targetMust, util.MapStr{ targetMust = append(targetMust, util.MapStr{
"terms": util.MapStr{ "terms": util.MapStr{
"_type": []string{index.Target.DocType}, "_type": []string{index.Target.DocType},
@ -1090,8 +1107,10 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
} }
if len(must) > 0 { if len(must) > 0 {
target["query_dsl"] = util.MapStr{ target["query_dsl"] = util.MapStr{
"bool": util.MapStr{ "query": util.MapStr{
"must": must, "bool": util.MapStr{
"must": must,
},
}, },
} }
} }