From 633d0333f25ea2bbd99c41774da9a89a45f15e41 Mon Sep 17 00:00:00 2001 From: sunjiacheng Date: Tue, 28 Mar 2023 20:12:23 +0800 Subject: [PATCH] [plugins][migration] status_log -> logging (#40) [plugins][migration] status_log -> logging Co-authored-by: Kassian Sun --- plugin/migration/pipeline.go | 313 +++++++++++++++++------------------ 1 file changed, 156 insertions(+), 157 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 42593863..f275f50a 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -7,6 +7,11 @@ package migration import ( "errors" "fmt" + "math" + "strings" + "syscall" + "time" + log "github.com/cihub/seelog" "infini.sh/console/model" "infini.sh/framework/core/config" @@ -19,25 +24,21 @@ import ( task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" "infini.sh/framework/modules/elastic/common" - "math" - "strings" - "syscall" - "time" ) type DispatcherProcessor struct { - id string + id string config *DispatcherConfig - state map[string]DispatcherState + state map[string]DispatcherState } type DispatcherConfig struct { - Elasticsearch string `config:"elasticsearch,omitempty"` - IndexName string `config:"index_name"` - LogIndexName string `config:"log_index_name"` - MaxTasksPerInstance int `config:"max_tasks_per_instance"` - CheckInstanceAvailable bool `config:"check_instance_available"` - TaskBatchSize int `config:"task_batch_size"` + Elasticsearch string `config:"elasticsearch,omitempty"` + IndexName string `config:"index_name"` + LogIndexName string `config:"log_index_name"` + MaxTasksPerInstance int `config:"max_tasks_per_instance"` + CheckInstanceAvailable bool `config:"check_instance_available"` + TaskBatchSize int `config:"task_batch_size"` } type DispatcherState struct { @@ -59,13 +60,13 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro ormConfig := common.ORMConfig{} ok, err := env.ParseConfig("elastic.orm", &ormConfig) if ok && err == nil { - if cfg.IndexName == ""{ + if cfg.IndexName == "" { cfg.IndexName = fmt.Sprintf("%stask", ormConfig.IndexPrefix) } if cfg.LogIndexName == "" { cfg.LogIndexName = fmt.Sprintf("%slogs", ormConfig.IndexPrefix) } - }else{ + } else { err = fmt.Errorf("parse config elastic.orm error: %w", err) log.Error(err) return nil, err @@ -83,7 +84,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro processor := DispatcherProcessor{ id: util.GetUUID(), config: &cfg, - state: map[string]DispatcherState{}, + state: map[string]DispatcherState{}, } state, err := processor.getInstanceTaskState() if err != nil { @@ -130,7 +131,7 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { case task2.StatusPendingStop: err = p.handlePendingStopMajorTask(&t) } - }else if t.Metadata.Labels["business_id"] == "index_migration" { + } else if t.Metadata.Labels["business_id"] == "index_migration" { //handle sub migration task switch t.Status { case task2.StatusReady: @@ -146,18 +147,18 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { tn := time.Now() t.CompletedTime = &tn p.saveTaskAndWriteLog(&t, &task2.Log{ - ID: util.GetUUID(), - TaskId: t.ID, - Status: task2.StatusError, - Type: t.Metadata.Type, - Config: t.Config, + ID: util.GetUUID(), + TaskId: t.ID, + Status: task2.StatusError, + Type: t.Metadata.Type, + Config: t.Config, Result: &task2.LogResult{ Success: false, - Error: err.Error(), + Error: err.Error(), }, Message: fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err), Timestamp: time.Now().UTC(), - },"") + }, "") } } //es index refresh @@ -165,7 +166,7 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { } } -func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error{ +func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { if taskItem.Metadata.Labels == nil { return fmt.Errorf("got migration task with empty labels, skip handling: %v", taskItem) } @@ -175,7 +176,7 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error{ return err } taskItem.Metadata.Labels["is_split"] = true - }else{ + } else { taskItem.RetryTimes++ } //update status of subtask to ready @@ -212,7 +213,7 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error{ } esClient := elastic.GetClient(p.config.Elasticsearch) - _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl) ) + _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl)) if err != nil { return err } @@ -233,7 +234,7 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error{ return nil } -func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error{ +func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error { //check whether all pipeline task is stopped or not, then update task status q := util.MapStr{ "size": 200, @@ -269,11 +270,11 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e Config: taskItem.Config, Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), Timestamp: time.Now().UTC(), - },"") + }, "") } return nil } -func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error{ +func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error { ts, err := p.getMajorTaskState(taskItem) if err != nil { return err @@ -291,12 +292,12 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error Config: taskItem.Config, Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), Timestamp: time.Now().UTC(), - },"") + }, "") } return nil } -func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error{ +func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { state, err := p.getTaskCompleteState(taskItem) if err != nil { return err @@ -328,7 +329,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error{ } } } - if st, ok := p.state[instanceID]; ok { + if st, ok := p.state[instanceID]; ok { st.Total -= 1 p.state[instanceID] = st } @@ -337,30 +338,30 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error{ } if state.Error != "" && state.TotalDocs != state.SuccessDocs { taskItem.Status = task2.StatusError - }else { + } else { taskItem.Status = task2.StatusComplete } tn := time.Now() taskItem.CompletedTime = &tn p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: taskItem.Status, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, + ID: util.GetUUID(), + TaskId: taskItem.ID, + Status: taskItem.Status, + Type: taskItem.Metadata.Type, + Config: taskItem.Config, Result: &task2.LogResult{ Success: state.Error == "", - Error: state.Error, + Error: state.Error, }, Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), Timestamp: time.Now().UTC(), - },"") - }else{ + }, "") + } else { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { - return err + return err } var bulkTask *task2.Task for i, t := range ptasks { @@ -388,13 +389,13 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error{ taskItem.Metadata.Labels["running_phase"] = 2 } } - p.saveTaskAndWriteLog(taskItem,nil, "wait_for") + p.saveTaskAndWriteLog(taskItem, nil, "wait_for") } } return nil } -func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) error{ +func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) error { //check whether all pipeline task is stopped or not, then update task status ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { @@ -409,7 +410,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err "size": len(taskIDs), "sort": []util.MapStr{ { - "payload.pipeline.status_log.steps": util.MapStr{ + "payload.pipeline.logging.steps": util.MapStr{ "order": "desc", }, }, @@ -430,9 +431,9 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err if len(searchRes.Hits.Hits) == 0 { return nil } - MainLoop: +MainLoop: for _, hit := range searchRes.Hits.Hits { - status, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.status_log.status") + status, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.status") if status != "STOPPED" { //call instance api to stop scroll/bulk_indexing pipeline task if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { @@ -446,7 +447,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err for _, pipelineID := range taskIDs { err = inst.StopPipelineWithTimeout(pipelineID, time.Second) if err != nil { - if !errors.Is(err, syscall.ECONNREFUSED) && !strings.Contains(err.Error(), "task not found"){ + if !errors.Is(err, syscall.ECONNREFUSED) && !strings.Contains(err.Error(), "task not found") { hasStopped = false break } @@ -487,7 +488,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err log.Error(err) } } - if st, ok := p.state[instanceID]; ok { + if st, ok := p.state[instanceID]; ok { st.Total -= 1 p.state[instanceID] = st } @@ -500,17 +501,17 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err Config: taskItem.Config, Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), Timestamp: time.Now().UTC(), - },"") + }, "") return nil } -func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ +func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { if taskItem.Metadata.Labels == nil { return fmt.Errorf("empty labels") } var ( scrollTask *task2.Task - bulkTask *task2.Task + bulkTask *task2.Task ) if taskItem.Metadata.Labels["is_split"] == true { //query split pipeline task @@ -521,11 +522,11 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ for i, t := range ptasks { if t.Metadata.Labels != nil { if cfg, ok := ptasks[i].Config.(map[string]interface{}); ok { - util.MapStr(cfg).Put("labels.retry_no", taskItem.RetryTimes + 1) + util.MapStr(cfg).Put("labels.retry_no", taskItem.RetryTimes+1) } if t.Metadata.Labels["pipeline_id"] == "es_scroll" { scrollTask = &ptasks[i] - }else if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + } else if t.Metadata.Labels["pipeline_id"] == "bulk_indexing" { bulkTask = &ptasks[i] } } @@ -534,7 +535,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ return fmt.Errorf("es_scroll or bulk_indexing pipeline task not found") } taskItem.RetryTimes++ - }else { + } else { //split task to scroll/bulk_indexing pipeline and then persistent var pids []string pids = append(pids, taskItem.ParentId...) @@ -542,7 +543,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ scrollID := util.GetUUID() var ( cfg map[string]interface{} - ok bool + ok bool ) if cfg, ok = taskItem.Config.(map[string]interface{}); !ok { return fmt.Errorf("got wrong config [%v] with task [%s]", taskItem.Config, taskItem.ID) @@ -570,15 +571,15 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ }) indexName := getMapValue(cfgm, "source.indices") scrollTask = &task2.Task{ - ParentId: pids, - Runnable: true, + ParentId: pids, + Runnable: true, Cancellable: true, Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "cluster_id": sourceClusterID, - "pipeline_id": "es_scroll", - "index_name": indexName, + "cluster_id": sourceClusterID, + "pipeline_id": "es_scroll", + "index_name": indexName, "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], }, }, @@ -588,24 +589,24 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ "enabled": true, }, "labels": util.MapStr{ - "parent_task_id": pids, + "parent_task_id": pids, "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - "retry_no": taskItem.RetryTimes, + "retry_no": taskItem.RetryTimes, }, - "auto_start": true, + "auto_start": true, "keep_running": false, "processor": []util.MapStr{ { "es_scroll": util.MapStr{ - "remove_type": docType == "", - "slice_size": getMapValue(cfgm, "source.slice_size"), - "batch_size": getMapValue(cfgm, "source.batch_size"), - "indices": indexName, + "remove_type": docType == "", + "slice_size": getMapValue(cfgm, "source.slice_size"), + "batch_size": getMapValue(cfgm, "source.batch_size"), + "indices": indexName, "elasticsearch": sourceClusterID, "elasticsearch_config": util.MapStr{ - "name": sourceClusterID, - "enabled": true, - "endpoint": esConfig.Endpoint, + "name": sourceClusterID, + "enabled": true, + "endpoint": esConfig.Endpoint, "basic_auth": esConfig.BasicAuth, }, "queue": util.MapStr{ @@ -615,10 +616,10 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ }, }, "partition_size": 20, - "scroll_time": getMapValue(cfgm, "source.scroll_time"), - "query_dsl": scrollQueryDsl, - "index_rename": getMapValue(cfgm, "source.index_rename"), - "type_rename": getMapValue(cfgm, "source.type_rename"), + "scroll_time": getMapValue(cfgm, "source.scroll_time"), + "query_dsl": scrollQueryDsl, + "index_rename": getMapValue(cfgm, "source.index_rename"), + "type_rename": getMapValue(cfgm, "source.type_rename"), }, }, }, @@ -628,15 +629,15 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ bulkID := util.GetUUID() bulkTask = &task2.Task{ - ParentId: pids, - Runnable: true, + ParentId: pids, + Runnable: true, Cancellable: true, Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "cluster_id": targetClusterID, - "pipeline_id": "bulk_indexing", - "index_name": indexName, + "cluster_id": targetClusterID, + "pipeline_id": "bulk_indexing", + "index_name": indexName, "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], }, }, @@ -646,38 +647,38 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ "enabled": true, }, "labels": util.MapStr{ - "parent_task_id": pids, + "parent_task_id": pids, "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - "retry_no": taskItem.RetryTimes, + "retry_no": taskItem.RetryTimes, }, - "auto_start": true, + "auto_start": true, "keep_running": false, "processor": []util.MapStr{ { "bulk_indexing": util.MapStr{ "detect_active_queue": false, "bulk": util.MapStr{ - "batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"), + "batch_size_in_mb": getMapValue(cfgm, "target.bulk.batch_size_in_mb"), "batch_size_in_docs": getMapValue(cfgm, "target.bulk.batch_size_in_docs"), - "invalid_queue": "bulk_indexing_400", + "invalid_queue": "bulk_indexing_400", //"retry_rules": util.MapStr{ // "default": false, // "retry_4xx": false, // "retry_429": true, //}, }, - "max_worker_size": getMapValue(cfgm, "target.bulk.max_worker_size"), - "num_of_slices": getMapValue(cfgm, "target.bulk.slice_size"), + "max_worker_size": getMapValue(cfgm, "target.bulk.max_worker_size"), + "num_of_slices": getMapValue(cfgm, "target.bulk.slice_size"), "idle_timeout_in_seconds": getMapValue(cfgm, "target.bulk.idle_timeout_in_seconds"), - "elasticsearch": targetClusterID, + "elasticsearch": targetClusterID, "elasticsearch_config": util.MapStr{ - "name": targetClusterID, - "enabled": true, - "endpoint": esTargetConfig.Endpoint, + "name": targetClusterID, + "enabled": true, + "endpoint": esTargetConfig.Endpoint, "basic_auth": esTargetConfig.BasicAuth, }, "queues": util.MapStr{ - "type": "scroll_docs", + "type": "scroll_docs", "migration_task_id": taskItem.ID, }, }, @@ -693,7 +694,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ } if p.state[instance.ID].Total >= p.config.MaxTasksPerInstance { log.Infof("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance) - return nil + return nil } scrollTask.Metadata.Labels["execution_instance_id"] = instance.ID bulkTask.Metadata.Labels["execution_instance_id"] = instance.ID @@ -727,7 +728,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ if err != nil { return fmt.Errorf("create bulk_indexing pipeline task error: %w", err) } - }else{ + } else { err = orm.Update(nil, scrollTask) if err != nil { return fmt.Errorf("update scroll pipeline task error: %w", err) @@ -754,7 +755,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ ID: util.GetUUID(), TaskId: taskItem.ID, Status: task2.StatusRunning, - Type: taskItem.Metadata.Type, + Type: taskItem.Metadata.Type, Config: taskItem.Config, Result: &task2.LogResult{ Success: true, @@ -766,7 +767,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error{ return nil } -func getMapValue(m util.MapStr, key string) interface{}{ +func getMapValue(m util.MapStr, key string) interface{} { v, _ := m.GetValue(key) return v } @@ -788,7 +789,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc return } var ( - total = math.MaxInt + total = math.MaxInt tempInst = model.Instance{} ) for _, node := range cfg.Settings.Execution.Nodes.Permit { @@ -819,7 +820,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc _, err = orm.Get(&instance) return } -func (p *DispatcherProcessor) getMigrationTasks(size int)([]task2.Task, error){ +func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) { majorTaskQ := util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ @@ -876,8 +877,8 @@ func (p *DispatcherProcessor) getMigrationTasks(size int)([]task2.Task, error){ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem *task2.Log, refresh string) { esClient := elastic.GetClient(p.config.Elasticsearch) - _, err := esClient.Index(p.config.IndexName,"", taskItem.ID, taskItem, refresh ) - if err != nil{ + _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh) + if err != nil { log.Error(err) } if logItem != nil { @@ -933,10 +934,10 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro for _, index := range clusterMigrationTask.Indices { source := util.MapStr{ - "cluster_id": clusterMigrationTask.Cluster.Source.Id, - "indices": index.Source.Name, - "slice_size": clusterMigrationTask.Settings.Scroll.SliceSize, - "batch_size": clusterMigrationTask.Settings.Scroll.Docs, + "cluster_id": clusterMigrationTask.Cluster.Source.Id, + "indices": index.Source.Name, + "slice_size": clusterMigrationTask.Settings.Scroll.SliceSize, + "batch_size": clusterMigrationTask.Settings.Scroll.Docs, "scroll_time": clusterMigrationTask.Settings.Scroll.Timeout, } if index.IndexRename != nil { @@ -953,7 +954,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro if v, ok := index.RawFilter.(string); ok { source["query_string"] = v - }else{ + } else { source["query_dsl"] = index.RawFilter if index.Source.DocType != "" { if index.Target.DocType != "" { @@ -976,7 +977,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro "must": must, }, } - }else{ + } else { if esSourceClient.GetMajorVersion() >= 8 { source["type_rename"] = util.MapStr{ "*": index.Target.DocType, @@ -999,11 +1000,11 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro target := util.MapStr{ "cluster_id": clusterMigrationTask.Cluster.Target.Id, "bulk": util.MapStr{ - "batch_size_in_mb": clusterMigrationTask.Settings.Bulk.StoreSizeInMB, - "batch_size_in_docs": clusterMigrationTask.Settings.Bulk.Docs, - "max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize, + "batch_size_in_mb": clusterMigrationTask.Settings.Bulk.StoreSizeInMB, + "batch_size_in_docs": clusterMigrationTask.Settings.Bulk.Docs, + "max_worker_size": clusterMigrationTask.Settings.Bulk.MaxWorkerSize, "idle_timeout_in_seconds": clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, - "slice_size": clusterMigrationTask.Settings.Bulk.SliceSize, + "slice_size": clusterMigrationTask.Settings.Bulk.SliceSize, }, } indexParameters := util.MapStr{ @@ -1011,19 +1012,19 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro "target": target, } indexMigrationTask := task2.Task{ - ParentId: []string{taskItem.ID}, - Cancellable: true, - Runnable: false, - Status: task2.StatusReady, + ParentId: []string{taskItem.ID}, + Cancellable: true, + Runnable: false, + Status: task2.StatusReady, StartTimeInMillis: time.Now().UnixMilli(), Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "business_id": "index_migration", + "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, - "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "partition_count": 1, - "index_name": index.Source.Name, + "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, + "partition_count": 1, + "index_name": index.Source.Name, "unique_index_name": index.Source.GetUniqueIndexName(), }, }, @@ -1037,7 +1038,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro IndexName: index.Source.Name, FieldName: index.Partition.FieldName, FieldType: index.Partition.FieldType, - Step: index.Partition.Step, + Step: index.Partition.Step, //Filter: index.RawFilter, Filter: source["query_dsl"], } @@ -1045,7 +1046,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro if err != nil { return err } - if partitions == nil || len(partitions) == 0{ + if partitions == nil || len(partitions) == 0 { return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter)) } var ( @@ -1058,14 +1059,14 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro } partitionID++ partitionSource := util.MapStr{ - "start": partition.Start, - "end": partition.End, - "doc_count": partition.Docs, - "step": index.Partition.Step, + "start": partition.Start, + "end": partition.End, + "doc_count": partition.Docs, + "step": index.Partition.Step, "partition_id": partitionID, } - for k, v := range source{ - if k == "query_string"{ + for k, v := range source { + if k == "query_string" { continue } partitionSource[k] = v @@ -1075,12 +1076,12 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro if partition.Other { must = append(must, partition.Filter) - }else{ + } else { must = append(must, util.MapStr{ "range": util.MapStr{ index.Partition.FieldName: util.MapStr{ "gte": partition.Start, - "lt": partition.End, + "lt": partition.End, }, }, }) @@ -1098,23 +1099,23 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro } partitionMigrationTask := task2.Task{ - ParentId: []string{taskItem.ID}, + ParentId: []string{taskItem.ID}, Cancellable: false, - Runnable: true, - Status: task2.StatusReady, - Metadata: task2.Metadata{ + Runnable: true, + Status: task2.StatusReady, + Metadata: task2.Metadata{ Type: "pipeline", Labels: util.MapStr{ - "business_id": "index_migration", + "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, - "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "index_name": index.Source.Name, + "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, + "index_name": index.Source.Name, "unique_index_name": index.Source.GetUniqueIndexName(), }, }, Config: util.MapStr{ - "source": partitionSource, - "target": target, + "source": partitionSource, + "target": target, "execution": clusterMigrationTask.Settings.Execution, }, } @@ -1126,7 +1127,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro } } - }else{ + } else { source["doc_count"] = index.Source.Docs err = orm.Create(nil, &indexMigrationTask) if err != nil { @@ -1137,7 +1138,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro return nil } -func (p *DispatcherProcessor) getPipelineTasks(subTaskID string) ([]task2.Task, error){ +func (p *DispatcherProcessor) getPipelineTasks(subTaskID string) ([]task2.Task, error) { queryDsl := util.MapStr{ "size": 2, "query": util.MapStr{ @@ -1157,7 +1158,7 @@ func (p *DispatcherProcessor) getPipelineTasks(subTaskID string) ([]task2.Task, return p.getTasks(queryDsl) } -func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error){ +func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error) { esClient := elastic.GetClient(p.config.Elasticsearch) res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { @@ -1182,7 +1183,7 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error){ return migrationTasks, nil } -func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCompleteState, error){ +func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCompleteState, error) { ptasks, err := p.getPipelineTasks(subTask.ID) if err != nil { return nil, err @@ -1228,7 +1229,6 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo }, }, }, - }, } esClient := elastic.GetClient(p.config.Elasticsearch) @@ -1238,7 +1238,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo } var ( cfg map[string]interface{} - ok bool + ok bool ) if cfg, ok = subTask.Config.(map[string]interface{}); !ok { return nil, fmt.Errorf("got wrong config of task %v", *subTask) @@ -1249,10 +1249,10 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo } var ( - indexDocs float64 - successDocs float64 + indexDocs float64 + successDocs float64 scrolledDocs interface{} - state TaskCompleteState + state TaskCompleteState ) state.TotalDocs = totalDocs state.PipelineIds = pids @@ -1263,7 +1263,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo state.IsComplete = true state.ClearPipeline = true } - for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"}{ + for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"} { v, err := util.MapStr(hit.Source).GetValue(key) if err == nil { if fv, ok := v.(float64); ok { @@ -1273,7 +1273,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo state.SuccessDocs = successDocs } } - }else{ + } else { break } } @@ -1293,7 +1293,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo if state.Error == "" { if successDocs > 0 { state.Error = "partial complete" - }else{ + } else { state.Error = "invalid request" } } @@ -1320,7 +1320,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo return &state, nil } -func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskState MajorTaskState, err error){ +func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskState MajorTaskState, err error) { query := util.MapStr{ "size": 0, "aggs": util.MapStr{ @@ -1354,11 +1354,10 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat }, }, }, - }, } esClient := elastic.GetClient(p.config.Elasticsearch) - res, err := esClient.SearchWithRawQueryDSL( p.config.IndexName, util.MustToJSONBytes(query)) + res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { return taskState, err } @@ -1379,20 +1378,20 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat } if hasError { taskState.Status = task2.StatusError - }else { + } else { taskState.Status = task2.StatusComplete } return taskState, nil } -func (p *DispatcherProcessor) getInstanceTaskState()(map[string]DispatcherState, error){ +func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState, error) { query := util.MapStr{ "size": 0, "aggs": util.MapStr{ "grp": util.MapStr{ "terms": util.MapStr{ "field": "metadata.labels.execution_instance_id", - "size": 1000, + "size": 1000, }, }, }, @@ -1425,7 +1424,7 @@ func (p *DispatcherProcessor) getInstanceTaskState()(map[string]DispatcherState, state := map[string]DispatcherState{} for _, bk := range res.Aggregations["grp"].Buckets { if key, ok := bk["key"].(string); ok { - if v, ok := bk["doc_count"].(float64); ok { + if v, ok := bk["doc_count"].(float64); ok { state[key] = DispatcherState{ Total: int(v), } @@ -1433,4 +1432,4 @@ func (p *DispatcherProcessor) getInstanceTaskState()(map[string]DispatcherState, } } return state, nil -} \ No newline at end of file +}