From b5218b85164f3e84123b7604b0bb26ab383cd0b5 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 10 Apr 2023 14:17:18 +0800 Subject: [PATCH] add compress settings of migration --- plugin/migration/model.go | 1 + plugin/migration/pipeline.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/plugin/migration/model.go b/plugin/migration/model.go index 8e41952f..5bed1192 100644 --- a/plugin/migration/model.go +++ b/plugin/migration/model.go @@ -26,6 +26,7 @@ type ElasticDataConfig struct { MaxWorkerSize int `json:"max_worker_size"` IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` SliceSize int `json:"slice_size"` + Compress bool `json:"compress"` } `json:"bulk"` Execution ExecutionConfig `json:"execution"` } `json:"settings"` diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 44d79042..22e20d2e 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -675,6 +675,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { "batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"), "batch_size_in_docs": getMapValue(cfgm, "target.bulk.batch_size_in_docs"), "invalid_queue": "bulk_indexing_400", + "compress": getMapValue(cfgm, "target.bulk.compress"), //"retry_rules": util.MapStr{ // "default": false, // "retry_4xx": false, @@ -1014,6 +1015,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro "max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize, "idle_timeout_in_seconds": clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, "slice_size": clusterMigrationTask.Settings.Bulk.SliceSize, + "compress": clusterMigrationTask.Settings.Bulk.Compress, }, } indexParameters := util.MapStr{