diff --git a/plugin/task_manager/index_migration/index_migration.go b/plugin/task_manager/index_migration/index_migration.go index e53f6bf5..952671b2 100644 --- a/plugin/task_manager/index_migration/index_migration.go +++ b/plugin/task_manager/index_migration/index_migration.go @@ -129,11 +129,12 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { Processor: []util.MapStr{ { "es_scroll": util.MapStr{ - "remove_type": docType == "", - "slice_size": cfg.Source.SliceSize, - "batch_size": cfg.Source.BatchSize, - "indices": indexName, - "elasticsearch": sourceClusterID, + "remove_type": docType == "", + "slice_size": cfg.Source.SliceSize, + "batch_size": cfg.Source.BatchSize, + "indices": indexName, + "elasticsearch": sourceClusterID, + "skip_exist_documents": cfg.Target.Bulk.SkipExistDocuments, "queue": util.MapStr{ "name": scrollID, "labels": util.MapStr{ @@ -186,10 +187,11 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { "bulk_indexing": util.MapStr{ "detect_active_queue": false, "bulk": util.MapStr{ - "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, - "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, - "invalid_queue": "bulk_indexing_400", - "compress": cfg.Target.Bulk.Compress, + "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, + "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, + "invalid_queue": "bulk_indexing_400", + "compress": cfg.Target.Bulk.Compress, + "skip_exist_documents": cfg.Target.Bulk.SkipExistDocuments, }, "max_worker_size": cfg.Target.Bulk.MaxWorkerSize, "num_of_slices": cfg.Target.Bulk.SliceSize, diff --git a/plugin/task_manager/model/migration.go b/plugin/task_manager/model/migration.go index e8e9e1ab..0fb967d1 100644 --- a/plugin/task_manager/model/migration.go +++ b/plugin/task_manager/model/migration.go @@ -9,8 +9,8 @@ import ( ) type ClusterMigrationTaskConfig struct { - Name string `json:"name"` - Tags []string `json:"tags"` + Name string `json:"name"` + Tags []string `json:"tags"` Cluster struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` @@ -42,7 +42,7 @@ type ClusterMigrationIndexConfig struct { Percent float64 `json:"percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` RunningChildren int `json:"running_children,omitempty"` - ExportedPercent float64 `json:"exported_percent,omitempty"` + ExportedPercent float64 `json:"exported_percent,omitempty"` } type ClusterMigrationTaskState struct { @@ -92,6 +92,7 @@ type IndexMigrationBulkConfig struct { IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` SliceSize int `json:"slice_size"` Compress bool `json:"compress"` + SkipExistDocuments bool `json:"skip_exist_documents"` } type IndexMigrationTargetConfig struct {