From 02243264e8d2603dd5434cd75fc89494ea845525 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Fri, 19 May 2023 14:15:11 +0800 Subject: [PATCH] [comparison] merge source/target dump partitions --- .../cluster_comparison/cluster_comparison.go | 159 ++++++++++-------- 1 file changed, 89 insertions(+), 70 deletions(-) diff --git a/plugin/migration/cluster_comparison/cluster_comparison.go b/plugin/migration/cluster_comparison/cluster_comparison.go index 29cf0d09..5d6531da 100644 --- a/plugin/migration/cluster_comparison/cluster_comparison.go +++ b/plugin/migration/cluster_comparison/cluster_comparison.go @@ -74,16 +74,19 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { return err } esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id) + esTargetClient := elastic.GetClient(clusterComparisonTask.Cluster.Target.Id) for _, index := range clusterComparisonTask.Indices { sourceDump := migration_model.IndexComparisonDumpConfig{ ClusterId: clusterComparisonTask.Cluster.Source.Id, Indices: index.Source.Name, + DocCount: index.Source.Docs, SliceSize: clusterComparisonTask.Settings.Dump.SliceSize, BatchSize: clusterComparisonTask.Settings.Dump.Docs, PartitionSize: clusterComparisonTask.Settings.Dump.PartitionSize, ScrollTime: clusterComparisonTask.Settings.Dump.Timeout, } + // TODO: dump_hash can only handle 1G file if sourceDump.PartitionSize <= 0 { sourceDump.PartitionSize = 1 @@ -105,77 +108,12 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { } } - // TODO: parition twice for source & target, then merge - // if there's a partition missing from source but present in target - // ideally we can capture it in docs count, but this won't always work - if index.Partition != nil { - partitionQ := &elastic.PartitionQuery{ - IndexName: index.Source.Name, - FieldName: index.Partition.FieldName, - FieldType: index.Partition.FieldType, - Step: index.Partition.Step, - } - if sourceDump.QueryDSL != nil { - partitionQ.Filter = sourceDump.QueryDSL - } - partitions, err := elastic.GetPartitions(partitionQ, esSourceClient) - if err != nil { - return err - } - if partitions == nil || len(partitions) == 0 { - return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter)) - } - var ( - partitionID int - ) - for _, partition := range partitions { - partitionID++ - partitionSourceDump := sourceDump - partitionSourceDump.Start = partition.Start - partitionSourceDump.End = partition.End - partitionSourceDump.DocCount = partition.Docs - partitionSourceDump.Step = index.Partition.Step - partitionSourceDump.PartitionId = partitionID - partitionSourceDump.QueryDSL = partition.Filter - partitionSourceDump.QueryString = "" - - partitionTargetDump := partitionSourceDump - partitionTargetDump.Indices = index.Target.Name - - partitionComparisonTask := task.Task{ - ParentId: []string{taskItem.ID}, - Cancellable: false, - Runnable: true, - Status: task.StatusReady, - Metadata: task.Metadata{ - Type: "index_comparison", - Labels: util.MapStr{ - "business_id": "index_comparison", - "source_cluster_id": clusterComparisonTask.Cluster.Source.Id, - "target_cluster_id": clusterComparisonTask.Cluster.Target.Id, - "index_name": index.Source.Name, - "unique_index_name": index.Source.GetUniqueIndexName(), - }, - }, - ConfigString: util.MustToJSON(migration_model.IndexComparisonTaskConfig{ - Source: partitionSourceDump, - Target: partitionTargetDump, - Execution: clusterComparisonTask.Settings.Execution, - }), - } - partitionComparisonTask.ID = util.GetUUID() - err = orm.Create(nil, &partitionComparisonTask) - if err != nil { - return fmt.Errorf("store index comparison task (partition) error: %w", err) - } - - } - } else { - sourceDump.DocCount = index.Source.Docs - targetDump := sourceDump - targetDump.Indices = index.Target.Name - targetDump.DocCount = index.Target.Docs + targetDump := sourceDump + targetDump.ClusterId = clusterComparisonTask.Cluster.Target.Id + targetDump.Indices = index.Target.Name + targetDump.DocCount = index.Target.Docs + if index.Partition == nil { indexComparisonTask := task.Task{ ParentId: []string{taskItem.ID}, Cancellable: true, @@ -204,6 +142,87 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { if err != nil { return fmt.Errorf("store index comparison task error: %w", err) } + continue + } + + sourcePartitionQ := &elastic.PartitionQuery{ + IndexName: sourceDump.Indices, + FieldName: index.Partition.FieldName, + FieldType: index.Partition.FieldType, + Step: index.Partition.Step, + } + if sourceDump.QueryDSL != nil { + sourcePartitionQ.Filter = sourceDump.QueryDSL + } + sourcePartitions, err := elastic.GetPartitions(sourcePartitionQ, esSourceClient) + if err != nil { + return err + } + + targetPartitionQ := &elastic.PartitionQuery{ + IndexName: targetDump.Indices, + FieldName: index.Partition.FieldName, + FieldType: index.Partition.FieldType, + Step: index.Partition.Step, + } + if targetDump.QueryDSL != nil { + targetPartitionQ.Filter = targetDump.QueryDSL + } + targetPartitions, err := elastic.GetPartitions(targetPartitionQ, esTargetClient) + if err != nil { + return err + } + + partitions := elastic.MergePartitions(sourcePartitions, targetPartitions, index.Partition.FieldName, index.Partition.FieldType, targetPartitionQ.Filter) + + if len(partitions) == 0 { + return fmt.Errorf("empty partitions") + } + + var ( + partitionID int + ) + for _, partition := range partitions { + partitionID++ + partitionSourceDump := sourceDump + partitionSourceDump.Start = partition.Start + partitionSourceDump.End = partition.End + partitionSourceDump.DocCount = partition.Docs + partitionSourceDump.Step = index.Partition.Step + partitionSourceDump.PartitionId = partitionID + partitionSourceDump.QueryDSL = partition.Filter + partitionSourceDump.QueryString = "" + + partitionTargetDump := partitionSourceDump + partitionTargetDump.ClusterId = clusterComparisonTask.Cluster.Target.Id + partitionTargetDump.Indices = index.Target.Name + + partitionComparisonTask := task.Task{ + ParentId: []string{taskItem.ID}, + Cancellable: false, + Runnable: true, + Status: task.StatusReady, + Metadata: task.Metadata{ + Type: "index_comparison", + Labels: util.MapStr{ + "business_id": "index_comparison", + "source_cluster_id": clusterComparisonTask.Cluster.Source.Id, + "target_cluster_id": clusterComparisonTask.Cluster.Target.Id, + "index_name": index.Source.Name, + "unique_index_name": index.Source.GetUniqueIndexName(), + }, + }, + ConfigString: util.MustToJSON(migration_model.IndexComparisonTaskConfig{ + Source: partitionSourceDump, + Target: partitionTargetDump, + Execution: clusterComparisonTask.Settings.Execution, + }), + } + partitionComparisonTask.ID = util.GetUUID() + err = orm.Create(nil, &partitionComparisonTask) + if err != nil { + return fmt.Errorf("store index comparison task (partition) error: %w", err) + } } }