From 855c4849cb0ab863d0697cd6475e0eafb0262297 Mon Sep 17 00:00:00 2001 From: hardy Date: Mon, 5 Feb 2024 10:09:56 +0800 Subject: [PATCH] feat: migration skip exists docs with bulk operation --- .../cluster_migration/cluster_migration.go | 9 +++++---- plugin/task_manager/model/pipeline.go | 13 +++++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/plugin/task_manager/cluster_migration/cluster_migration.go b/plugin/task_manager/cluster_migration/cluster_migration.go index b2d06079..95636135 100644 --- a/plugin/task_manager/cluster_migration/cluster_migration.go +++ b/plugin/task_manager/cluster_migration/cluster_migration.go @@ -200,15 +200,16 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { IdleTimeoutInSeconds: clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, SliceSize: clusterMigrationTask.Settings.Bulk.SliceSize, Compress: clusterMigrationTask.Settings.Bulk.Compress, + Operation: clusterMigrationTask.Settings.Bulk.Operation, }, } if index.Partition != nil { partitionQ := &elastic.PartitionQuery{ - IndexName: index.Source.Name, - FieldName: index.Partition.FieldName, - FieldType: index.Partition.FieldType, - Step: index.Partition.Step, + IndexName: index.Source.Name, + FieldName: index.Partition.FieldName, + FieldType: index.Partition.FieldType, + Step: index.Partition.Step, UseEvenStrategy: index.Partition.UseEvenStrategy, } if source.QueryDSL != nil { diff --git a/plugin/task_manager/model/pipeline.go b/plugin/task_manager/model/pipeline.go index 6531a06c..823ff56f 100644 --- a/plugin/task_manager/model/pipeline.go +++ b/plugin/task_manager/model/pipeline.go @@ -23,12 +23,13 @@ type IndexDiffConfig struct { // tunable `bulk_indexing` configurations type BulkIndexingConfig struct { - Docs int `json:"docs"` - StoreSizeInMB int `json:"store_size_in_mb"` - MaxWorkerSize int `json:"max_worker_size"` - IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` - SliceSize int `json:"slice_size"` - Compress bool `json:"compress"` + Docs int `json:"docs"` + StoreSizeInMB int `json:"store_size_in_mb"` + MaxWorkerSize int `json:"max_worker_size"` + IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` + SliceSize int `json:"slice_size"` + Compress bool `json:"compress"` + Operation string `json:"operation"` } type PipelineTaskLoggingConfig struct {