[comparison] merge source/target dump partitions

This commit is contained in:
Kassian Sun 2023-05-19 14:15:11 +08:00
parent d4202897e1
commit 02243264e8
1 changed files with 89 additions and 70 deletions

View File

@ -74,16 +74,19 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
return err return err
} }
esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id) esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id)
esTargetClient := elastic.GetClient(clusterComparisonTask.Cluster.Target.Id)
for _, index := range clusterComparisonTask.Indices { for _, index := range clusterComparisonTask.Indices {
sourceDump := migration_model.IndexComparisonDumpConfig{ sourceDump := migration_model.IndexComparisonDumpConfig{
ClusterId: clusterComparisonTask.Cluster.Source.Id, ClusterId: clusterComparisonTask.Cluster.Source.Id,
Indices: index.Source.Name, Indices: index.Source.Name,
DocCount: index.Source.Docs,
SliceSize: clusterComparisonTask.Settings.Dump.SliceSize, SliceSize: clusterComparisonTask.Settings.Dump.SliceSize,
BatchSize: clusterComparisonTask.Settings.Dump.Docs, BatchSize: clusterComparisonTask.Settings.Dump.Docs,
PartitionSize: clusterComparisonTask.Settings.Dump.PartitionSize, PartitionSize: clusterComparisonTask.Settings.Dump.PartitionSize,
ScrollTime: clusterComparisonTask.Settings.Dump.Timeout, ScrollTime: clusterComparisonTask.Settings.Dump.Timeout,
} }
// TODO: dump_hash can only handle 1G file // TODO: dump_hash can only handle 1G file
if sourceDump.PartitionSize <= 0 { if sourceDump.PartitionSize <= 0 {
sourceDump.PartitionSize = 1 sourceDump.PartitionSize = 1
@ -105,77 +108,12 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
} }
} }
// TODO: parition twice for source & target, then merge targetDump := sourceDump
// if there's a partition missing from source but present in target targetDump.ClusterId = clusterComparisonTask.Cluster.Target.Id
// ideally we can capture it in docs count, but this won't always work targetDump.Indices = index.Target.Name
if index.Partition != nil { targetDump.DocCount = index.Target.Docs
partitionQ := &elastic.PartitionQuery{
IndexName: index.Source.Name,
FieldName: index.Partition.FieldName,
FieldType: index.Partition.FieldType,
Step: index.Partition.Step,
}
if sourceDump.QueryDSL != nil {
partitionQ.Filter = sourceDump.QueryDSL
}
partitions, err := elastic.GetPartitions(partitionQ, esSourceClient)
if err != nil {
return err
}
if partitions == nil || len(partitions) == 0 {
return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter))
}
var (
partitionID int
)
for _, partition := range partitions {
partitionID++
partitionSourceDump := sourceDump
partitionSourceDump.Start = partition.Start
partitionSourceDump.End = partition.End
partitionSourceDump.DocCount = partition.Docs
partitionSourceDump.Step = index.Partition.Step
partitionSourceDump.PartitionId = partitionID
partitionSourceDump.QueryDSL = partition.Filter
partitionSourceDump.QueryString = ""
partitionTargetDump := partitionSourceDump
partitionTargetDump.Indices = index.Target.Name
partitionComparisonTask := task.Task{
ParentId: []string{taskItem.ID},
Cancellable: false,
Runnable: true,
Status: task.StatusReady,
Metadata: task.Metadata{
Type: "index_comparison",
Labels: util.MapStr{
"business_id": "index_comparison",
"source_cluster_id": clusterComparisonTask.Cluster.Source.Id,
"target_cluster_id": clusterComparisonTask.Cluster.Target.Id,
"index_name": index.Source.Name,
"unique_index_name": index.Source.GetUniqueIndexName(),
},
},
ConfigString: util.MustToJSON(migration_model.IndexComparisonTaskConfig{
Source: partitionSourceDump,
Target: partitionTargetDump,
Execution: clusterComparisonTask.Settings.Execution,
}),
}
partitionComparisonTask.ID = util.GetUUID()
err = orm.Create(nil, &partitionComparisonTask)
if err != nil {
return fmt.Errorf("store index comparison task (partition) error: %w", err)
}
}
} else {
sourceDump.DocCount = index.Source.Docs
targetDump := sourceDump
targetDump.Indices = index.Target.Name
targetDump.DocCount = index.Target.Docs
if index.Partition == nil {
indexComparisonTask := task.Task{ indexComparisonTask := task.Task{
ParentId: []string{taskItem.ID}, ParentId: []string{taskItem.ID},
Cancellable: true, Cancellable: true,
@ -204,6 +142,87 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
if err != nil { if err != nil {
return fmt.Errorf("store index comparison task error: %w", err) return fmt.Errorf("store index comparison task error: %w", err)
} }
continue
}
sourcePartitionQ := &elastic.PartitionQuery{
IndexName: sourceDump.Indices,
FieldName: index.Partition.FieldName,
FieldType: index.Partition.FieldType,
Step: index.Partition.Step,
}
if sourceDump.QueryDSL != nil {
sourcePartitionQ.Filter = sourceDump.QueryDSL
}
sourcePartitions, err := elastic.GetPartitions(sourcePartitionQ, esSourceClient)
if err != nil {
return err
}
targetPartitionQ := &elastic.PartitionQuery{
IndexName: targetDump.Indices,
FieldName: index.Partition.FieldName,
FieldType: index.Partition.FieldType,
Step: index.Partition.Step,
}
if targetDump.QueryDSL != nil {
targetPartitionQ.Filter = targetDump.QueryDSL
}
targetPartitions, err := elastic.GetPartitions(targetPartitionQ, esTargetClient)
if err != nil {
return err
}
partitions := elastic.MergePartitions(sourcePartitions, targetPartitions, index.Partition.FieldName, index.Partition.FieldType, targetPartitionQ.Filter)
if len(partitions) == 0 {
return fmt.Errorf("empty partitions")
}
var (
partitionID int
)
for _, partition := range partitions {
partitionID++
partitionSourceDump := sourceDump
partitionSourceDump.Start = partition.Start
partitionSourceDump.End = partition.End
partitionSourceDump.DocCount = partition.Docs
partitionSourceDump.Step = index.Partition.Step
partitionSourceDump.PartitionId = partitionID
partitionSourceDump.QueryDSL = partition.Filter
partitionSourceDump.QueryString = ""
partitionTargetDump := partitionSourceDump
partitionTargetDump.ClusterId = clusterComparisonTask.Cluster.Target.Id
partitionTargetDump.Indices = index.Target.Name
partitionComparisonTask := task.Task{
ParentId: []string{taskItem.ID},
Cancellable: false,
Runnable: true,
Status: task.StatusReady,
Metadata: task.Metadata{
Type: "index_comparison",
Labels: util.MapStr{
"business_id": "index_comparison",
"source_cluster_id": clusterComparisonTask.Cluster.Source.Id,
"target_cluster_id": clusterComparisonTask.Cluster.Target.Id,
"index_name": index.Source.Name,
"unique_index_name": index.Source.GetUniqueIndexName(),
},
},
ConfigString: util.MustToJSON(migration_model.IndexComparisonTaskConfig{
Source: partitionSourceDump,
Target: partitionTargetDump,
Execution: clusterComparisonTask.Settings.Execution,
}),
}
partitionComparisonTask.ID = util.GetUUID()
err = orm.Create(nil, &partitionComparisonTask)
if err != nil {
return fmt.Errorf("store index comparison task (partition) error: %w", err)
}
} }
} }