diff --git a/plugin/task_manager/index_migration/index_migration.go b/plugin/task_manager/index_migration/index_migration.go index 952671b2..aac3b0a4 100644 --- a/plugin/task_manager/index_migration/index_migration.go +++ b/plugin/task_manager/index_migration/index_migration.go @@ -129,12 +129,12 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { Processor: []util.MapStr{ { "es_scroll": util.MapStr{ - "remove_type": docType == "", - "slice_size": cfg.Source.SliceSize, - "batch_size": cfg.Source.BatchSize, - "indices": indexName, - "elasticsearch": sourceClusterID, - "skip_exist_documents": cfg.Target.Bulk.SkipExistDocuments, + "remove_type": docType == "", + "slice_size": cfg.Source.SliceSize, + "batch_size": cfg.Source.BatchSize, + "indices": indexName, + "elasticsearch": sourceClusterID, + "bulk_create_operation": cfg.Target.Bulk.CreateOperation, "queue": util.MapStr{ "name": scrollID, "labels": util.MapStr{ @@ -187,11 +187,11 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { "bulk_indexing": util.MapStr{ "detect_active_queue": false, "bulk": util.MapStr{ - "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, - "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, - "invalid_queue": "bulk_indexing_400", - "compress": cfg.Target.Bulk.Compress, - "skip_exist_documents": cfg.Target.Bulk.SkipExistDocuments, + "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, + "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, + "invalid_queue": "bulk_indexing_400", + "compress": cfg.Target.Bulk.Compress, + "bulk_create_operation": cfg.Target.Bulk.CreateOperation, }, "max_worker_size": cfg.Target.Bulk.MaxWorkerSize, "num_of_slices": cfg.Target.Bulk.SliceSize, diff --git a/plugin/task_manager/model/migration.go b/plugin/task_manager/model/migration.go index 0fb967d1..01b46d27 100644 --- a/plugin/task_manager/model/migration.go +++ b/plugin/task_manager/model/migration.go @@ -92,7 +92,7 @@ type IndexMigrationBulkConfig struct { IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` SliceSize int `json:"slice_size"` Compress bool `json:"compress"` - SkipExistDocuments bool `json:"skip_exist_documents"` + CreateOperation bool `json:"bulk_create_operation"` } type IndexMigrationTargetConfig struct {