From c73c44724df2194028e7b51dab1f0164d39c4bde Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Thu, 13 Apr 2023 11:25:40 +0800 Subject: [PATCH] [migration] index_migration & pipeline define struct --- plugin/migration/api.go | 43 +++--- plugin/migration/model.go | 83 +++++++++-- plugin/migration/pipeline.go | 259 ++++++++++++++--------------------- 3 files changed, 197 insertions(+), 188 deletions(-) diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 45ff1057..6e3222dc 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -49,7 +49,7 @@ type APIHandler struct { } func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - clusterTaskConfig := &ElasticDataConfig{} + clusterTaskConfig := &ClusterMigrationTaskConfig{} err := h.DecodeJSON(req, clusterTaskConfig) if err != nil { log.Error(err) @@ -90,10 +90,10 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re "source_total_docs": totalDocs, }, }, - Cancellable: true, - Runnable: false, - Status: task2.StatusInit, - Config: clusterTaskConfig, + Cancellable: true, + Runnable: false, + Status: task2.StatusInit, + ConfigString: util.MustToJSON(clusterTaskConfig), } t.ID = util.GetUUID() err = orm.Create(nil, &t) @@ -174,7 +174,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re for _, hit := range searchRes.Hits.Hits { sourceM := util.MapStr(hit.Source) buf := util.MustToJSONBytes(sourceM["config"]) - dataConfig := ElasticDataConfig{} + dataConfig := ClusterMigrationTaskConfig{} err = util.FromJSONBytes(buf, &dataConfig) if err != nil { log.Error(err) @@ -317,12 +317,14 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ } func getTaskConfig(task *task2.Task, config interface{}) error { - configBytes, err := util.ToJSONBytes(task.Config) + if task.Config_ == nil { + return util.FromJSONBytes([]byte(task.ConfigString), config) + } + buf, err := util.ToJSONBytes(task.Config_) if err != nil { return err } - - return util.FromJSONBytes(configBytes, config) + return util.FromJSONBytes(buf, config) } func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API) (map[string]string, error) { @@ -377,7 +379,7 @@ func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.R }, http.StatusNotFound) return } - taskConfig := &ElasticDataConfig{} + taskConfig := &ClusterMigrationTaskConfig{} err = getTaskConfig(&obj, taskConfig) if err != nil { log.Error(err) @@ -415,7 +417,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R }, http.StatusNotFound) return } - taskConfig := &ElasticDataConfig{} + taskConfig := &ClusterMigrationTaskConfig{} err = getTaskConfig(&obj, taskConfig) if err != nil { log.Error(err) @@ -461,7 +463,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R "index_name": migrationConfig.LogIndexName, } } - obj.Config = taskConfig + obj.ConfigString = util.MustToJSON(taskConfig) obj.Metadata.Labels["completed_indices"] = completedIndices h.WriteJSON(w, obj, http.StatusOK) } @@ -735,17 +737,16 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt startTime = subTasks[0].StartTimeInMillis } for i, ptask := range subTasks { - var ( - cfg map[string]interface{} - ok bool - ) - if cfg, ok = ptask.Config.(map[string]interface{}); !ok { + cfg := IndexMigrationTaskConfig{} + err := getTaskConfig(&ptask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) continue } - start, _ := util.MapStr(cfg).GetValue("source.start") - end, _ := util.MapStr(cfg).GetValue("source.end") + start := cfg.Source.Start + end := cfg.Source.End if i == 0 { - step, _ := util.MapStr(cfg).GetValue("source.step") + step := cfg.Source.Step taskInfo["step"] = step } var durationInMS int64 = 0 @@ -786,7 +787,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } } - partitionTotalDocs, _ := util.MapStr(cfg).GetValue("source.doc_count") + partitionTotalDocs := cfg.Source.DocCount partitionTaskInfos = append(partitionTaskInfos, util.MapStr{ "task_id": ptask.ID, "status": ptask.Status, diff --git a/plugin/migration/model.go b/plugin/migration/model.go index 26330703..c40f142a 100644 --- a/plugin/migration/model.go +++ b/plugin/migration/model.go @@ -4,9 +4,13 @@ package migration -import "fmt" +import ( + "fmt" -type ElasticDataConfig struct { + "infini.sh/framework/core/util" +) + +type ClusterMigrationTaskConfig struct { Cluster struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` @@ -20,15 +24,8 @@ type ElasticDataConfig struct { Docs int `json:"docs"` Timeout string `json:"timeout"` } `json:"scroll"` - Bulk struct { - Docs int `json:"docs"` - StoreSizeInMB int `json:"store_size_in_mb"` - MaxWorkerSize int `json:"max_worker_size"` - IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` - SliceSize int `json:"slice_size"` - Compress bool `json:"compress"` - } `json:"bulk"` - Execution ExecutionConfig `json:"execution"` + Bulk ClusterMigrationBulkConfig `json:"bulk"` + Execution ExecutionConfig `json:"execution"` } `json:"settings"` Creator struct { Name string `json:"name"` @@ -36,6 +33,15 @@ type ElasticDataConfig struct { } `json:"creator"` } +type ClusterMigrationBulkConfig struct { + Docs int `json:"docs"` + StoreSizeInMB int `json:"store_size_in_mb"` + MaxWorkerSize int `json:"max_worker_size"` + IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` + SliceSize int `json:"slice_size"` + Compress bool `json:"compress"` +} + type ExecutionConfig struct { TimeWindow []TimeWindowItem `json:"time_window"` Nodes struct { @@ -65,6 +71,7 @@ type IndexConfig struct { Percent float64 `json:"percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` } + type IndexPartition struct { FieldType string `json:"field_type"` FieldName string `json:"field_name"` @@ -108,3 +115,57 @@ type IndexStateInfo struct { ErrorPartitions int IndexDocs float64 } + +type IndexMigrationTaskConfig struct { + Source IndexMigrationSourceConfig `json:"source"` + Target IndexMigrationTargetConfig `json:"target"` + Execution ExecutionConfig `json:"execution"` +} + +type IndexMigrationSourceConfig struct { + ClusterId string `json:"cluster_id"` + Indices string `json:"indices"` + SliceSize int `json:"slice_size"` + BatchSize int `json:"batch_size"` + ScrollTime string `json:"scroll_time"` + IndexRename util.MapStr `json:"index_rename,omitempty"` + TypeRename util.MapStr `json:"type_rename,omitempty"` + QueryString string `json:"query_string,omitempty'` + QueryDSL util.MapStr `json:"query_dsl,omitempty"` + + // Parition configs + Start float64 `json:"start"` + End float64 `json:"end"` + Docs int64 `json:"docs"` + DocCount int64 `json:"doc_count"` + Step interface{} `json:"step"` + PartitionId int `json:"partition_id"` +} + +type IndexMigrationBulkConfig struct { + BatchSizeInDocs int `json:"batch_size_in_docs"` + BatchSizeInMB int `json:"batch_size_in_mb"` + MaxWorkerSize int `json:"max_worker_size"` + IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` + SliceSize int `json:"slice_size"` + Compress bool `json:"compress"` +} + +type IndexMigrationTargetConfig struct { + ClusterId string `json:"cluster_id"` + Bulk IndexMigrationBulkConfig `json:"bulk"` + QueryDSL util.MapStr `json:"query_dsl,omitempty"` +} + +type PipelineTaskLoggingConfig struct { + Enabled bool `json:"enabled"` +} + +type PipelineTaskConfig struct { + Name string `json:"name"` + Logging PipelineTaskLoggingConfig `json:"logging"` + Labels util.MapStr `json:"labels"` + AutoStart bool `json:"auto_start"` + KeepRunning bool `json:"keep_running"` + Processor []util.MapStr `json:"processor"` +} diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 31c0bce0..e5f4beec 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -13,7 +13,6 @@ import ( "time" log "github.com/cihub/seelog" - "github.com/mitchellh/mapstructure" "infini.sh/console/model" "infini.sh/framework/core/config" @@ -345,7 +344,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ Success: state.Error == "", Error: state.Error, - }, fmt.Sprintf("task [%s] completed", taskItem.ID)) + }, fmt.Sprintf("task [%s] finished with status [%s]", taskItem.ID, taskItem.Status)) p.cleanGatewayPipelines(taskItem, state.PipelineIds) } else { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { @@ -374,7 +373,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { log.Errorf("failed to get instance, err: %v", err) return err } - err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) + err = inst.CreatePipeline([]byte(bulkTask.ConfigString)) if err != nil { log.Errorf("failed to create bulk_indexing pipeline, err: %v", err) return err @@ -516,10 +515,8 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { return nil } for i, t := range ptasks { + ptasks[i].RetryTimes = taskItem.RetryTimes + 1 if t.Metadata.Labels != nil { - if cfg, ok := ptasks[i].Config.(map[string]interface{}); ok { - util.MapStr(cfg).Put("labels.retry_no", taskItem.RetryTimes+1) - } if t.Metadata.Labels["pipeline_id"] == "es_scroll" { scrollTask = &ptasks[i] } else if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" { @@ -537,35 +534,24 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { pids = append(pids, taskItem.ParentId...) pids = append(pids, taskItem.ID) scrollID := util.GetUUID() - var ( - cfg map[string]interface{} - ok bool - ) - if cfg, ok = taskItem.Config.(map[string]interface{}); !ok { - return fmt.Errorf("got wrong config [%v] with task [%s]", taskItem.Config, taskItem.ID) - } - cfgm := util.MapStr(cfg) - var ( - sourceClusterID string - targetClusterID string - ) - if sourceClusterID, ok = getMapValue(cfgm, "source.cluster_id").(string); !ok { - return fmt.Errorf("got wrong source cluster id of task [%v]", *taskItem) - } - if targetClusterID, ok = getMapValue(cfgm, "target.cluster_id").(string); !ok { - return fmt.Errorf("got wrong target cluster id of task [%v]", *taskItem) + cfg := IndexMigrationTaskConfig{} + err := getTaskConfig(taskItem, &cfg) + if err != nil { + return fmt.Errorf("got wrong config [%v] with task [%s], err: %v", taskItem.ConfigString, taskItem.ID, err) } + sourceClusterID := cfg.Source.ClusterId + targetClusterID := cfg.Target.ClusterId esConfig := elastic.GetConfig(sourceClusterID) esTargetConfig := elastic.GetConfig(targetClusterID) docType := common.GetClusterDocType(targetClusterID) if len(taskItem.ParentId) == 0 { return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) } - queryDsl := getMapValue(cfgm, "source.query_dsl") + queryDsl := cfg.Source.QueryDSL scrollQueryDsl := util.MustToJSON(util.MapStr{ "query": queryDsl, }) - indexName := getMapValue(cfgm, "source.indices") + indexName := cfg.Source.Indices scrollTask = &task2.Task{ ParentId: pids, Runnable: true, @@ -579,24 +565,24 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], }, }, - Config: util.MapStr{ - "name": scrollID, - "logging": util.MapStr{ - "enabled": true, + RetryTimes: taskItem.RetryTimes, + ConfigString: util.MustToJSON(PipelineTaskConfig{ + Name: scrollID, + Logging: PipelineTaskLoggingConfig{ + Enabled: true, }, - "labels": util.MapStr{ + Labels: util.MapStr{ "parent_task_id": pids, "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - "retry_no": taskItem.RetryTimes, }, - "auto_start": true, - "keep_running": false, - "processor": []util.MapStr{ + AutoStart: true, + KeepRunning: false, + Processor: []util.MapStr{ { "es_scroll": util.MapStr{ "remove_type": docType == "", - "slice_size": getMapValue(cfgm, "source.slice_size"), - "batch_size": getMapValue(cfgm, "source.batch_size"), + "slice_size": cfg.Source.SliceSize, + "batch_size": cfg.Source.BatchSize, "indices": indexName, "elasticsearch": sourceClusterID, "elasticsearch_config": util.MapStr{ @@ -612,14 +598,14 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { }, }, "partition_size": 1, - "scroll_time": getMapValue(cfgm, "source.scroll_time"), + "scroll_time": cfg.Source.ScrollTime, "query_dsl": scrollQueryDsl, - "index_rename": getMapValue(cfgm, "source.index_rename"), - "type_rename": getMapValue(cfgm, "source.type_rename"), + "index_rename": cfg.Source.IndexRename, + "type_rename": cfg.Source.TypeRename, }, }, }, - }, + }), } scrollTask.ID = scrollID @@ -637,36 +623,31 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], }, }, - Config: util.MapStr{ - "name": bulkID, - "logging": util.MapStr{ - "enabled": true, + RetryTimes: taskItem.RetryTimes, + ConfigString: util.MustToJSON(PipelineTaskConfig{ + Name: bulkID, + Logging: PipelineTaskLoggingConfig{ + Enabled: true, }, - "labels": util.MapStr{ + Labels: util.MapStr{ "parent_task_id": pids, "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - "retry_no": taskItem.RetryTimes, }, - "auto_start": true, - "keep_running": false, - "processor": []util.MapStr{ + AutoStart: true, + KeepRunning: false, + Processor: []util.MapStr{ { "bulk_indexing": util.MapStr{ "detect_active_queue": false, "bulk": util.MapStr{ - "batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"), - "batch_size_in_docs": getMapValue(cfgm, "target.bulk.batch_size_in_docs"), + "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, + "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, "invalid_queue": "bulk_indexing_400", - "compress": getMapValue(cfgm, "target.bulk.compress"), - //"retry_rules": util.MapStr{ - // "default": false, - // "retry_4xx": false, - // "retry_429": true, - //}, + "compress": cfg.Target.Bulk.Compress, }, - "max_worker_size": getMapValue(cfgm, "target.bulk.max_worker_size"), - "num_of_slices": getMapValue(cfgm, "target.bulk.slice_size"), - "idle_timeout_in_seconds": getMapValue(cfgm, "target.bulk.idle_timeout_in_seconds"), + "max_worker_size": cfg.Target.Bulk.MaxWorkerSize, + "num_of_slices": cfg.Target.Bulk.SliceSize, + "idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds, "elasticsearch": targetClusterID, "elasticsearch_config": util.MapStr{ "name": targetClusterID, @@ -681,7 +662,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { }, }, }, - }, + }), } bulkTask.ID = bulkID } @@ -707,7 +688,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { } //call instance api to create pipeline task - err = instance.CreatePipeline(util.MustToJSONBytes(scrollTask.Config)) + err = instance.CreatePipeline([]byte(scrollTask.ConfigString)) if err != nil { log.Errorf("create scroll pipeline failed, err: %+v", err) return err @@ -765,15 +746,13 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc majorTask.ID = majorTaskID _, err = orm.Get(&majorTask) if err != nil { + log.Errorf("failed to get major task, err: %v", err) return } - cfg := ElasticDataConfig{} - buf, err := util.ToJSONBytes(majorTask.Config) - if err != nil { - return - } - err = util.FromJSONBytes(buf, &cfg) + cfg := ClusterMigrationTaskConfig{} + err = getTaskConfig(&majorTask, &cfg) if err != nil { + log.Errorf("failed to get task config, err: %v", err) return } var ( @@ -891,7 +870,7 @@ func writeLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string Fields: util.MapStr{ "task": util.MapStr{ "logging": util.MapStr{ - "config": util.MustToJSON(taskItem.Config), + "config": taskItem.ConfigString, "status": taskItem.Status, "message": message, "result": taskResult, @@ -913,40 +892,40 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro return nil } - buf := util.MustToJSONBytes(taskItem.Config) - clusterMigrationTask := ElasticDataConfig{} - err := util.FromJSONBytes(buf, &clusterMigrationTask) + clusterMigrationTask := ClusterMigrationTaskConfig{} + err := getTaskConfig(taskItem, &clusterMigrationTask) if err != nil { + log.Errorf("failed to get task config, err: %v", err) return err } defer func() { - taskItem.Config = clusterMigrationTask + taskItem.ConfigString = util.MustToJSON(clusterMigrationTask) }() esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id) for _, index := range clusterMigrationTask.Indices { - source := util.MapStr{ - "cluster_id": clusterMigrationTask.Cluster.Source.Id, - "indices": index.Source.Name, - "slice_size": clusterMigrationTask.Settings.Scroll.SliceSize, - "batch_size": clusterMigrationTask.Settings.Scroll.Docs, - "scroll_time": clusterMigrationTask.Settings.Scroll.Timeout, + source := IndexMigrationSourceConfig{ + ClusterId: clusterMigrationTask.Cluster.Source.Id, + Indices: index.Source.Name, + SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize, + BatchSize: clusterMigrationTask.Settings.Scroll.Docs, + ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout, } if index.IndexRename != nil { - source["index_rename"] = index.IndexRename + source.IndexRename = index.IndexRename } if index.Target.Name != "" { - source["index_rename"] = util.MapStr{ + source.IndexRename = util.MapStr{ index.Source.Name: index.Target.Name, } } if index.TypeRename != nil { - source["type_rename"] = index.TypeRename + source.TypeRename = index.TypeRename } if v, ok := index.RawFilter.(string); ok { - source["query_string"] = v + source.QueryString = v } else { var must []interface{} if index.RawFilter != nil { @@ -954,7 +933,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro } if index.Source.DocType != "" { if index.Target.DocType != "" { - source["type_rename"] = util.MapStr{ + source.TypeRename = util.MapStr{ index.Source.DocType: index.Target.DocType, } } @@ -965,13 +944,13 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro }) } else { if targetType != "" { - source["type_rename"] = util.MapStr{ + source.TypeRename = util.MapStr{ "*": index.Target.DocType, } } } if len(must) > 0 { - source["query_dsl"] = util.MapStr{ + source.QueryDSL = util.MapStr{ "bool": util.MapStr{ "must": must, }, @@ -990,20 +969,20 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro }) } - target := util.MapStr{ - "cluster_id": clusterMigrationTask.Cluster.Target.Id, - "bulk": util.MapStr{ - "batch_size_in_mb": clusterMigrationTask.Settings.Bulk.StoreSizeInMB, - "batch_size_in_docs": clusterMigrationTask.Settings.Bulk.Docs, - "max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize, - "idle_timeout_in_seconds": clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, - "slice_size": clusterMigrationTask.Settings.Bulk.SliceSize, - "compress": clusterMigrationTask.Settings.Bulk.Compress, + target := IndexMigrationTargetConfig{ + ClusterId: clusterMigrationTask.Cluster.Target.Id, + Bulk: IndexMigrationBulkConfig{ + BatchSizeInMB: clusterMigrationTask.Settings.Bulk.StoreSizeInMB, + BatchSizeInDocs: clusterMigrationTask.Settings.Bulk.Docs, + MaxWorkerSize: clusterMigrationTask.Settings.Bulk.MaxWorkerSize, + IdleTimeoutInSeconds: clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, + SliceSize: clusterMigrationTask.Settings.Bulk.SliceSize, + Compress: clusterMigrationTask.Settings.Bulk.Compress, }, } - indexParameters := util.MapStr{ - "source": source, - "target": target, + indexParameters := IndexMigrationTaskConfig{ + Source: source, + Target: target, } indexMigrationTask := task2.Task{ ParentId: []string{taskItem.ID}, @@ -1022,7 +1001,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro "unique_index_name": index.Source.GetUniqueIndexName(), }, }, - Config: indexParameters, + ConfigString: util.MustToJSON(indexParameters), } indexMigrationTask.ID = util.GetUUID() @@ -1034,7 +1013,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro FieldType: index.Partition.FieldType, Step: index.Partition.Step, //Filter: index.RawFilter, - Filter: source["query_dsl"], + Filter: source.QueryDSL, } partitions, err := elastic.GetPartitions(partitionQ, esSourceClient) if err != nil { @@ -1052,20 +1031,14 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro continue } partitionID++ - partitionSource := util.MapStr{ - "start": partition.Start, - "end": partition.End, - "doc_count": partition.Docs, - "step": index.Partition.Step, - "partition_id": partitionID, - } - for k, v := range source { - if k == "query_string" { - continue - } - partitionSource[k] = v - } - partitionSource["query_dsl"] = partition.Filter + partitionSource := source + partitionSource.Start = partition.Start + partitionSource.End = partition.End + partitionSource.DocCount = partition.Docs + partitionSource.Step = index.Partition.Step + partitionSource.PartitionId = partitionID + partitionSource.QueryDSL = partition.Filter + partitionSource.QueryString = "" var must []interface{} if partition.Other { @@ -1084,8 +1057,9 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro if targetMust != nil { must = append(must, targetMust...) } + partitionTarget := target if len(must) > 0 { - target["query_dsl"] = util.MapStr{ + partitionTarget.QueryDSL = util.MapStr{ "query": util.MapStr{ "bool": util.MapStr{ "must": must, @@ -1109,22 +1083,22 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro "unique_index_name": index.Source.GetUniqueIndexName(), }, }, - Config: util.MapStr{ - "source": partitionSource, - "target": target, - "execution": clusterMigrationTask.Settings.Execution, - }, + ConfigString: util.MustToJSON(IndexMigrationTaskConfig{ + Source: partitionSource, + Target: partitionTarget, + Execution: clusterMigrationTask.Settings.Execution, + }), } partitionMigrationTask.ID = util.GetUUID() err = orm.Create(nil, &partitionMigrationTask) - delete(target, "query_dsl") + target.QueryDSL = nil if err != nil { return fmt.Errorf("store index migration task(partition) error: %w", err) } } } else { - source["doc_count"] = index.Source.Docs + source.DocCount = index.Source.Docs err = orm.Create(nil, &indexMigrationTask) if err != nil { return fmt.Errorf("store index migration task error: %w", err) @@ -1237,23 +1211,13 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo log.Errorf("search task log from es failed, err: %v", err) return nil, err } - var ( - cfg map[string]interface{} - ok bool - ) - if cfg, ok = subTask.Config.(map[string]interface{}); !ok { + cfg := IndexMigrationTaskConfig{} + err = getTaskConfig(subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) return nil, fmt.Errorf("got wrong config of task %v", *subTask) } - totalDocsVal, err := util.MapStr(cfg).GetValue("source.doc_count") - if err != nil { - log.Errorf("failed to get source.doc_count, err: %v", err) - return nil, err - } - totalDocs, err := util.ExtractInt(totalDocsVal) - if err != nil { - log.Errorf("failed to extract source.doc_count, err: %v", err) - return nil, err - } + totalDocs := cfg.Source.DocCount var ( indexDocs int64 @@ -1457,7 +1421,8 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState } func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { - config, err := p.extractTaskConfig(taskItem) + config := ClusterMigrationTaskConfig{} + err := getTaskConfig(taskItem, &config) if err != nil { log.Errorf("failed to parse config info from major task, id: %s, err: %v", taskItem.ID, err) return @@ -1500,21 +1465,3 @@ func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { } return } - -func (p *DispatcherProcessor) extractTaskConfig(taskItem *task2.Task) (*ElasticDataConfig, error) { - origConfig, ok := taskItem.Config.(ElasticDataConfig) - if ok { - return &origConfig, nil - } - rawConfig, ok := taskItem.Config.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("failed to extract configuration from major task, id: %s, type: %T", taskItem.ID, taskItem.Config) - } - - config := &ElasticDataConfig{} - err := mapstructure.Decode(rawConfig, config) - if err != nil { - return nil, err - } - return config, nil -}