Merge branch 'master' into init_indices

This commit is contained in:
liugq 2023-04-17 13:57:49 +08:00
commit f18a2bc054
2 changed files with 28 additions and 57 deletions

View File

@ -141,7 +141,6 @@ type IndexMigrationSourceConfig struct {
// Parition configs
Start float64 `json:"start"`
End float64 `json:"end"`
Docs int64 `json:"docs"`
DocCount int64 `json:"doc_count"`
Step interface{} `json:"step"`
PartitionId int `json:"partition_id"`
@ -159,7 +158,6 @@ type IndexMigrationBulkConfig struct {
type IndexMigrationTargetConfig struct {
ClusterId string `json:"cluster_id"`
Bulk IndexMigrationBulkConfig `json:"bulk"`
QueryDSL util.MapStr `json:"query_dsl,omitempty"`
}
type PipelineTaskLoggingConfig struct {

View File

@ -980,31 +980,6 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
Compress: clusterMigrationTask.Settings.Bulk.Compress,
},
}
indexParameters := IndexMigrationTaskConfig{
Source: source,
Target: target,
}
indexMigrationTask := task2.Task{
ParentId: []string{taskItem.ID},
Cancellable: true,
Runnable: false,
Status: task2.StatusReady,
StartTimeInMillis: time.Now().UnixMilli(),
Metadata: task2.Metadata{
Type: "index_migration",
Labels: util.MapStr{
"business_id": "index_migration",
"source_cluster_id": clusterMigrationTask.Cluster.Source.Id,
"target_cluster_id": clusterMigrationTask.Cluster.Target.Id,
"partition_count": 1,
"index_name": index.Source.Name,
"unique_index_name": index.Source.GetUniqueIndexName(),
},
},
ConfigString: util.MustToJSON(indexParameters),
}
indexMigrationTask.ID = util.GetUUID()
if index.Partition != nil {
partitionQ := &elastic.PartitionQuery{
@ -1039,34 +1014,6 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
partitionSource.PartitionId = partitionID
partitionSource.QueryDSL = partition.Filter
partitionSource.QueryString = ""
var must []interface{}
if partition.Other {
must = append(must, partition.Filter)
} else {
must = append(must, util.MapStr{
"range": util.MapStr{
index.Partition.FieldName: util.MapStr{
"gte": partition.Start,
"lt": partition.End,
},
},
})
}
if targetMust != nil {
must = append(must, targetMust...)
}
partitionTarget := target
if len(must) > 0 {
partitionTarget.QueryDSL = util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": must,
},
},
}
}
partitionMigrationTask := task2.Task{
ParentId: []string{taskItem.ID},
@ -1085,13 +1032,12 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
},
ConfigString: util.MustToJSON(IndexMigrationTaskConfig{
Source: partitionSource,
Target: partitionTarget,
Target: target,
Execution: clusterMigrationTask.Settings.Execution,
}),
}
partitionMigrationTask.ID = util.GetUUID()
err = orm.Create(nil, &partitionMigrationTask)
target.QueryDSL = nil
if err != nil {
return fmt.Errorf("store index migration task(partition) error: %w", err)
}
@ -1099,6 +1045,33 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
}
} else {
source.DocCount = index.Source.Docs
indexParameters := IndexMigrationTaskConfig{
Source: source,
Target: target,
}
indexMigrationTask := task2.Task{
ParentId: []string{taskItem.ID},
Cancellable: true,
Runnable: false,
Status: task2.StatusReady,
StartTimeInMillis: time.Now().UnixMilli(),
Metadata: task2.Metadata{
Type: "index_migration",
Labels: util.MapStr{
"business_id": "index_migration",
"source_cluster_id": clusterMigrationTask.Cluster.Source.Id,
"target_cluster_id": clusterMigrationTask.Cluster.Target.Id,
"partition_count": 1,
"index_name": index.Source.Name,
"unique_index_name": index.Source.GetUniqueIndexName(),
},
},
ConfigString: util.MustToJSON(indexParameters),
}
indexMigrationTask.ID = util.GetUUID()
err = orm.Create(nil, &indexMigrationTask)
if err != nil {
return fmt.Errorf("store index migration task error: %w", err)