[migration] add option to skip scroll/bulk count check

This commit is contained in:
Kassian Sun 2023-05-19 14:36:08 +08:00 committed by Gitea
parent 5000deefa9
commit 1789d74ece
3 changed files with 24 additions and 16 deletions

View File

@ -85,6 +85,7 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize,
BatchSize: clusterMigrationTask.Settings.Scroll.Docs,
ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout,
SkipCountCheck: clusterMigrationTask.Settings.SkipScrollCountCheck,
}
if index.IndexRename != nil {
source.IndexRename = index.IndexRename
@ -134,6 +135,7 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
target := migration_model.IndexMigrationTargetConfig{
ClusterId: clusterMigrationTask.Cluster.Target.Id,
SkipCountCheck: clusterMigrationTask.Settings.SkipBulkCountCheck,
Bulk: migration_model.IndexMigrationBulkConfig{
BatchSizeInMB: clusterMigrationTask.Settings.Bulk.StoreSizeInMB,
BatchSizeInDocs: clusterMigrationTask.Settings.Bulk.Docs,

View File

@ -241,7 +241,7 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error {
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
totalDocs := cfg.Source.DocCount
scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs)
scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, &cfg, totalDocs)
redoScroll := true
if cfg.Version >= migration_model.IndexMigrationV1 {
@ -351,7 +351,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
return errors.New("scroll/bulk pipeline task missing")
}
scrolled, scrolledDocs, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs)
scrolled, scrolledDocs, err := p.checkScrollPipelineTaskStatus(scrollTask, &cfg, totalDocs)
if !scrolled {
return nil
}
@ -374,7 +374,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
p.saveTaskAndWriteLog(taskItem, nil, "")
}
bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, totalDocs)
bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, &cfg, totalDocs)
if !bulked {
return nil
}
@ -400,7 +400,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
return nil
}
func (p *processor) checkScrollPipelineTaskStatus(scrollTask *task.Task, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) {
func (p *processor) checkScrollPipelineTaskStatus(scrollTask *task.Task, cfg *migration_model.IndexMigrationTaskConfig, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) {
if scrollTask.Status == task.StatusError {
return true, 0, errors.New("scroll pipeline failed")
}
@ -419,14 +419,14 @@ func (p *processor) checkScrollPipelineTaskStatus(scrollTask *task.Task, totalDo
)
scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs")
if scrolledDocs != totalDocs {
if !cfg.Source.SkipCountCheck && scrolledDocs != totalDocs {
return true, scrolledDocs, fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs)
}
return true, scrolledDocs, nil
}
func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, totalDocs int64) (bulked bool, successDocs int64, err error) {
func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, cfg *migration_model.IndexMigrationTaskConfig, totalDocs int64) (bulked bool, successDocs int64, err error) {
// NOTE: old-version pipeline tasks has empty status
if bulkTask.Status == "" {
return true, 0, errors.New("task was started by an old-version console, need to manually restart it")
@ -455,7 +455,7 @@ func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, totalDocs i
)
successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs")
if successDocs != totalDocs {
if !cfg.Target.SkipCountCheck && successDocs != totalDocs {
return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons)
}

View File

@ -17,6 +17,8 @@ type ClusterMigrationTaskConfig struct {
Settings struct {
Scroll EsScrollConfig `json:"scroll"`
Bulk BulkIndexingConfig `json:"bulk"`
SkipScrollCountCheck bool `json:"skip_scroll_count_check"`
SkipBulkCountCheck bool `json:"skip_bulk_count_check"`
Execution ExecutionConfig `json:"execution"`
} `json:"settings"`
Creator struct {
@ -56,6 +58,8 @@ type IndexMigrationTaskConfig struct {
}
type IndexMigrationSourceConfig struct {
SkipCountCheck bool `json:"skip_count_check"`
ClusterId string `json:"cluster_id"`
Indices string `json:"indices"`
SliceSize int `json:"slice_size"`
@ -84,6 +88,8 @@ type IndexMigrationBulkConfig struct {
}
type IndexMigrationTargetConfig struct {
SkipCountCheck bool `json:"skip_count_check"`
ClusterId string `json:"cluster_id"`
Bulk IndexMigrationBulkConfig `json:"bulk"`
}