diff --git a/plugin/migration/cluster_migration/cluster_migration.go b/plugin/migration/cluster_migration/cluster_migration.go index 7bd79d5c..fead588c 100644 --- a/plugin/migration/cluster_migration/cluster_migration.go +++ b/plugin/migration/cluster_migration/cluster_migration.go @@ -80,11 +80,12 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { for _, index := range clusterMigrationTask.Indices { source := migration_model.IndexMigrationSourceConfig{ - ClusterId: clusterMigrationTask.Cluster.Source.Id, - Indices: index.Source.Name, - SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize, - BatchSize: clusterMigrationTask.Settings.Scroll.Docs, - ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout, + ClusterId: clusterMigrationTask.Cluster.Source.Id, + Indices: index.Source.Name, + SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize, + BatchSize: clusterMigrationTask.Settings.Scroll.Docs, + ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout, + SkipCountCheck: clusterMigrationTask.Settings.SkipScrollCountCheck, } if index.IndexRename != nil { source.IndexRename = index.IndexRename @@ -133,7 +134,8 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { } target := migration_model.IndexMigrationTargetConfig{ - ClusterId: clusterMigrationTask.Cluster.Target.Id, + ClusterId: clusterMigrationTask.Cluster.Target.Id, + SkipCountCheck: clusterMigrationTask.Settings.SkipBulkCountCheck, Bulk: migration_model.IndexMigrationBulkConfig{ BatchSizeInMB: clusterMigrationTask.Settings.Bulk.StoreSizeInMB, BatchSizeInDocs: clusterMigrationTask.Settings.Bulk.Docs, diff --git a/plugin/migration/index_migration/index_migration.go b/plugin/migration/index_migration/index_migration.go index 954874c0..46ad2f23 100644 --- a/plugin/migration/index_migration/index_migration.go +++ b/plugin/migration/index_migration/index_migration.go @@ -241,7 +241,7 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error { instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) totalDocs := cfg.Source.DocCount - scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) + scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, &cfg, totalDocs) redoScroll := true if cfg.Version >= migration_model.IndexMigrationV1 { @@ -351,7 +351,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { return errors.New("scroll/bulk pipeline task missing") } - scrolled, scrolledDocs, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) + scrolled, scrolledDocs, err := p.checkScrollPipelineTaskStatus(scrollTask, &cfg, totalDocs) if !scrolled { return nil } @@ -374,7 +374,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { p.saveTaskAndWriteLog(taskItem, nil, "") } - bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, totalDocs) + bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, &cfg, totalDocs) if !bulked { return nil } @@ -400,7 +400,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { return nil } -func (p *processor) checkScrollPipelineTaskStatus(scrollTask *task.Task, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) { +func (p *processor) checkScrollPipelineTaskStatus(scrollTask *task.Task, cfg *migration_model.IndexMigrationTaskConfig, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) { if scrollTask.Status == task.StatusError { return true, 0, errors.New("scroll pipeline failed") } @@ -419,14 +419,14 @@ func (p *processor) checkScrollPipelineTaskStatus(scrollTask *task.Task, totalDo ) scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs") - if scrolledDocs != totalDocs { + if !cfg.Source.SkipCountCheck && scrolledDocs != totalDocs { return true, scrolledDocs, fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs) } return true, scrolledDocs, nil } -func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, totalDocs int64) (bulked bool, successDocs int64, err error) { +func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, cfg *migration_model.IndexMigrationTaskConfig, totalDocs int64) (bulked bool, successDocs int64, err error) { // NOTE: old-version pipeline tasks has empty status if bulkTask.Status == "" { return true, 0, errors.New("task was started by an old-version console, need to manually restart it") @@ -455,7 +455,7 @@ func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, totalDocs i ) successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs") - if successDocs != totalDocs { + if !cfg.Target.SkipCountCheck && successDocs != totalDocs { return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons) } diff --git a/plugin/migration/model/migration.go b/plugin/migration/model/migration.go index d57b2e6f..399cf642 100644 --- a/plugin/migration/model/migration.go +++ b/plugin/migration/model/migration.go @@ -15,9 +15,11 @@ type ClusterMigrationTaskConfig struct { } `json:"cluster"` Indices []ClusterMigrationIndexConfig `json:"indices"` Settings struct { - Scroll EsScrollConfig `json:"scroll"` - Bulk BulkIndexingConfig `json:"bulk"` - Execution ExecutionConfig `json:"execution"` + Scroll EsScrollConfig `json:"scroll"` + Bulk BulkIndexingConfig `json:"bulk"` + SkipScrollCountCheck bool `json:"skip_scroll_count_check"` + SkipBulkCountCheck bool `json:"skip_bulk_count_check"` + Execution ExecutionConfig `json:"execution"` } `json:"settings"` Creator struct { Name string `json:"name"` @@ -56,6 +58,8 @@ type IndexMigrationTaskConfig struct { } type IndexMigrationSourceConfig struct { + SkipCountCheck bool `json:"skip_count_check"` + ClusterId string `json:"cluster_id"` Indices string `json:"indices"` SliceSize int `json:"slice_size"` @@ -84,6 +88,8 @@ type IndexMigrationBulkConfig struct { } type IndexMigrationTargetConfig struct { + SkipCountCheck bool `json:"skip_count_check"` + ClusterId string `json:"cluster_id"` Bulk IndexMigrationBulkConfig `json:"bulk"` }