From 4871fbcb12d34d6126c4e8a6d8f8b4518ec2b163 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 29 Mar 2023 16:17:19 +0800 Subject: [PATCH] checking instance is avaiable when there is no pipeline logs --- plugin/migration/pipeline.go | 37 +++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 66e858a4..5e408cc0 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -429,6 +429,21 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err return err } if len(searchRes.Hits.Hits) == 0 { + //check instance available + if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { + inst := model.Instance{} + inst.ID = instID + _, err = orm.Get(&inst) + if err != nil { + return err + } + err = inst.TryConnectWithTimeout(time.Second) + if err != nil { + if errors.Is(err, syscall.ECONNREFUSED) { + return fmt.Errorf("stoping task [%s] error: %w", taskItem.ID, err) + } + } + } return nil } MainLoop: @@ -930,7 +945,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro taskItem.Config = clusterMigrationTask }() esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) - esTargetClient := elastic.GetClient(clusterMigrationTask.Cluster.Target.Id) + targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id) for _, index := range clusterMigrationTask.Indices { source := util.MapStr{ @@ -971,23 +986,25 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro }, }) } else { - if esSourceClient.GetMajorVersion() >= 8 { + if targetType != "" { source["type_rename"] = util.MapStr{ "*": index.Target.DocType, } } } - source["query_dsl"] = util.MapStr{ - "bool": util.MapStr{ - "must": must, - }, + if len(must) > 0 { + source["query_dsl"] = util.MapStr{ + "bool": util.MapStr{ + "must": must, + }, + } } } var targetMust []interface{} if index.RawFilter != nil { targetMust = append(targetMust, index.RawFilter) } - if index.Target.DocType != "" && esTargetClient.GetMajorVersion() < 8 { + if index.Target.DocType != "" && targetType != "" { targetMust = append(targetMust, util.MapStr{ "terms": util.MapStr{ "_type": []string{index.Target.DocType}, @@ -1090,8 +1107,10 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro } if len(must) > 0 { target["query_dsl"] = util.MapStr{ - "bool": util.MapStr{ - "must": must, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": must, + }, }, } }