diff --git a/plugin/migration/model.go b/plugin/migration/model.go index c40f142a..12329ae5 100644 --- a/plugin/migration/model.go +++ b/plugin/migration/model.go @@ -136,7 +136,6 @@ type IndexMigrationSourceConfig struct { // Parition configs Start float64 `json:"start"` End float64 `json:"end"` - Docs int64 `json:"docs"` DocCount int64 `json:"doc_count"` Step interface{} `json:"step"` PartitionId int `json:"partition_id"` @@ -154,7 +153,6 @@ type IndexMigrationBulkConfig struct { type IndexMigrationTargetConfig struct { ClusterId string `json:"cluster_id"` Bulk IndexMigrationBulkConfig `json:"bulk"` - QueryDSL util.MapStr `json:"query_dsl,omitempty"` } type PipelineTaskLoggingConfig struct { diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index e5f4beec..92798ccf 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -980,31 +980,6 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro Compress: clusterMigrationTask.Settings.Bulk.Compress, }, } - indexParameters := IndexMigrationTaskConfig{ - Source: source, - Target: target, - } - indexMigrationTask := task2.Task{ - ParentId: []string{taskItem.ID}, - Cancellable: true, - Runnable: false, - Status: task2.StatusReady, - StartTimeInMillis: time.Now().UnixMilli(), - Metadata: task2.Metadata{ - Type: "index_migration", - Labels: util.MapStr{ - "business_id": "index_migration", - "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, - "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "partition_count": 1, - "index_name": index.Source.Name, - "unique_index_name": index.Source.GetUniqueIndexName(), - }, - }, - ConfigString: util.MustToJSON(indexParameters), - } - - indexMigrationTask.ID = util.GetUUID() if index.Partition != nil { partitionQ := &elastic.PartitionQuery{ @@ -1039,34 +1014,6 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro partitionSource.PartitionId = partitionID partitionSource.QueryDSL = partition.Filter partitionSource.QueryString = "" - var must []interface{} - - if partition.Other { - must = append(must, partition.Filter) - } else { - must = append(must, util.MapStr{ - "range": util.MapStr{ - index.Partition.FieldName: util.MapStr{ - "gte": partition.Start, - "lt": partition.End, - }, - }, - }) - } - - if targetMust != nil { - must = append(must, targetMust...) - } - partitionTarget := target - if len(must) > 0 { - partitionTarget.QueryDSL = util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "must": must, - }, - }, - } - } partitionMigrationTask := task2.Task{ ParentId: []string{taskItem.ID}, @@ -1085,13 +1032,12 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro }, ConfigString: util.MustToJSON(IndexMigrationTaskConfig{ Source: partitionSource, - Target: partitionTarget, + Target: target, Execution: clusterMigrationTask.Settings.Execution, }), } partitionMigrationTask.ID = util.GetUUID() err = orm.Create(nil, &partitionMigrationTask) - target.QueryDSL = nil if err != nil { return fmt.Errorf("store index migration task(partition) error: %w", err) } @@ -1099,6 +1045,33 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro } } else { source.DocCount = index.Source.Docs + + indexParameters := IndexMigrationTaskConfig{ + Source: source, + Target: target, + } + indexMigrationTask := task2.Task{ + ParentId: []string{taskItem.ID}, + Cancellable: true, + Runnable: false, + Status: task2.StatusReady, + StartTimeInMillis: time.Now().UnixMilli(), + Metadata: task2.Metadata{ + Type: "index_migration", + Labels: util.MapStr{ + "business_id": "index_migration", + "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, + "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, + "partition_count": 1, + "index_name": index.Source.Name, + "unique_index_name": index.Source.GetUniqueIndexName(), + }, + }, + ConfigString: util.MustToJSON(indexParameters), + } + + indexMigrationTask.ID = util.GetUUID() + err = orm.Create(nil, &indexMigrationTask) if err != nil { return fmt.Errorf("store index migration task error: %w", err)