diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 434fe385..70202bc0 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -453,6 +453,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R obj.Metadata.Labels["completed_indices"] = completedIndices h.WriteJSON(w, obj, http.StatusOK) } + func getMajorTaskInfoByIndex(taskID string) (map[string]migration_model.IndexStateInfo, error) { query := util.MapStr{ "size": 0, @@ -547,7 +548,7 @@ func getMajorTaskInfoByIndex(taskID string) (map[string]migration_model.IndexSta return resBody, nil } -func getIndexTaskDocCount(ctx context.Context, index *migration_model.IndexConfig, targetESClient elastic.API) (int64, error) { +func getIndexTaskDocCount(ctx context.Context, index *migration_model.ClusterMigrationIndexConfig, targetESClient elastic.API) (int64, error) { targetIndexName := index.Target.Name if targetIndexName == "" { if v, ok := index.IndexRename[index.Source.Name].(string); ok { @@ -1066,7 +1067,7 @@ func (h *APIHandler) deleteDataMigrationTask(w http.ResponseWriter, req *http.Re }, 200) } -func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ +func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( index = ps.MustGetParameter("index") clusterID = ps.MustGetParameter("id") diff --git a/plugin/migration/cluster_migration/cluster_migration.go b/plugin/migration/cluster_migration/cluster_migration.go new file mode 100644 index 00000000..861c7c0b --- /dev/null +++ b/plugin/migration/cluster_migration/cluster_migration.go @@ -0,0 +1,481 @@ +package cluster_migration + +import ( + "fmt" + "time" + + log "github.com/cihub/seelog" + + "infini.sh/console/model" + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic/common" +) + +type processor struct { + Elasticsearch string + IndexName string + scheduler migration_model.Scheduler +} + +func NewProcessor(elasticsearch, indexName string, scheduler migration_model.Scheduler) migration_model.Processor { + return &processor{ + Elasticsearch: elasticsearch, + IndexName: indexName, + scheduler: scheduler, + } +} + +func (p *processor) Process(t *task.Task) (err error) { + switch t.Status { + case task.StatusReady: + // mark index_migrations as pending_stop + err = p.handleReadyMajorTask(t) + case task.StatusRunning: + // check index_migration tasks status + err = p.handleRunningMajorTask(t) + case task.StatusPendingStop: + // split & schedule index_migration tasks + err = p.handlePendingStopMajorTask(t) + } + return err +} + +func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { + if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { + return p.splitMajorMigrationTask(taskItem) + } + //update status of subtask to ready + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task.StatusError, task.StatusStopped}, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady), + }, + } + + esClient := elastic.GetClient(p.Elasticsearch) + _, err := esClient.UpdateByQuery(p.IndexName, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Errorf("failed to update sub task status, err: %v", err) + return nil + } + taskItem.RetryTimes++ + taskItem.Status = task.StatusRunning + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("major task [%s] started", taskItem.ID)) + p.sendMajorTaskNotification(taskItem) + return nil +} + +func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { + clusterMigrationTask := migration_model.ClusterMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &clusterMigrationTask) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + return err + } + defer func() { + taskItem.ConfigString = util.MustToJSON(clusterMigrationTask) + }() + esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) + targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id) + + for _, index := range clusterMigrationTask.Indices { + source := migration_model.IndexMigrationSourceConfig{ + ClusterId: clusterMigrationTask.Cluster.Source.Id, + Indices: index.Source.Name, + SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize, + BatchSize: clusterMigrationTask.Settings.Scroll.Docs, + ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout, + } + if index.IndexRename != nil { + source.IndexRename = index.IndexRename + } + if index.Target.Name != "" { + source.IndexRename = util.MapStr{ + index.Source.Name: index.Target.Name, + } + } + if index.TypeRename != nil { + source.TypeRename = index.TypeRename + } + + if v, ok := index.RawFilter.(string); ok { + source.QueryString = v + } else { + var must []interface{} + if index.RawFilter != nil { + must = append(must, index.RawFilter) + } + if index.Source.DocType != "" { + if index.Target.DocType != "" { + source.TypeRename = util.MapStr{ + index.Source.DocType: index.Target.DocType, + } + } + must = append(must, util.MapStr{ + "terms": util.MapStr{ + "_type": []string{index.Source.DocType}, + }, + }) + } else { + if targetType != "" { + source.TypeRename = util.MapStr{ + "*": index.Target.DocType, + } + } + } + if len(must) > 0 { + source.QueryDSL = util.MapStr{ + "bool": util.MapStr{ + "must": must, + }, + } + } + } + var targetMust []interface{} + if index.RawFilter != nil { + targetMust = append(targetMust, index.RawFilter) + } + if index.Target.DocType != "" && targetType != "" { + targetMust = append(targetMust, util.MapStr{ + "terms": util.MapStr{ + "_type": []string{index.Target.DocType}, + }, + }) + } + + target := migration_model.IndexMigrationTargetConfig{ + ClusterId: clusterMigrationTask.Cluster.Target.Id, + Bulk: migration_model.IndexMigrationBulkConfig{ + BatchSizeInMB: clusterMigrationTask.Settings.Bulk.StoreSizeInMB, + BatchSizeInDocs: clusterMigrationTask.Settings.Bulk.Docs, + MaxWorkerSize: clusterMigrationTask.Settings.Bulk.MaxWorkerSize, + IdleTimeoutInSeconds: clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, + SliceSize: clusterMigrationTask.Settings.Bulk.SliceSize, + Compress: clusterMigrationTask.Settings.Bulk.Compress, + }, + } + + if index.Partition != nil { + partitionQ := &elastic.PartitionQuery{ + IndexName: index.Source.Name, + FieldName: index.Partition.FieldName, + FieldType: index.Partition.FieldType, + Step: index.Partition.Step, + } + if source.QueryDSL != nil { + partitionQ.Filter = source.QueryDSL + } + partitions, err := elastic.GetPartitions(partitionQ, esSourceClient) + if err != nil { + return err + } + if partitions == nil || len(partitions) == 0 { + return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter)) + } + var ( + partitionID int + ) + for _, partition := range partitions { + //skip empty partition + if partition.Docs <= 0 { + continue + } + partitionID++ + partitionSource := source + partitionSource.Start = partition.Start + partitionSource.End = partition.End + partitionSource.DocCount = partition.Docs + partitionSource.Step = index.Partition.Step + partitionSource.PartitionId = partitionID + partitionSource.QueryDSL = partition.Filter + partitionSource.QueryString = "" + + partitionMigrationTask := task.Task{ + ParentId: []string{taskItem.ID}, + Cancellable: false, + Runnable: true, + Status: task.StatusReady, + Metadata: task.Metadata{ + Type: "index_migration", + Labels: util.MapStr{ + "business_id": "index_migration", + "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, + "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, + "index_name": index.Source.Name, + "unique_index_name": index.Source.GetUniqueIndexName(), + }, + }, + ConfigString: util.MustToJSON(migration_model.IndexMigrationTaskConfig{ + Source: partitionSource, + Target: target, + Execution: clusterMigrationTask.Settings.Execution, + Version: migration_model.IndexMigrationV1, + }), + } + partitionMigrationTask.ID = util.GetUUID() + err = orm.Create(nil, &partitionMigrationTask) + if err != nil { + return fmt.Errorf("store index migration task(partition) error: %w", err) + } + + } + } else { + source.DocCount = index.Source.Docs + + indexParameters := migration_model.IndexMigrationTaskConfig{ + Source: source, + Target: target, + Execution: clusterMigrationTask.Settings.Execution, + Version: migration_model.IndexMigrationV1, + } + indexMigrationTask := task.Task{ + ParentId: []string{taskItem.ID}, + Cancellable: true, + Runnable: false, + Status: task.StatusReady, + StartTimeInMillis: time.Now().UnixMilli(), + Metadata: task.Metadata{ + Type: "index_migration", + Labels: util.MapStr{ + "business_id": "index_migration", + "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, + "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, + "partition_count": 1, + "index_name": index.Source.Name, + "unique_index_name": index.Source.GetUniqueIndexName(), + }, + }, + ConfigString: util.MustToJSON(indexParameters), + } + + indexMigrationTask.ID = util.GetUUID() + + err = orm.Create(nil, &indexMigrationTask) + if err != nil { + return fmt.Errorf("store index migration task error: %w", err) + } + } + } + taskItem.Metadata.Labels["is_split"] = true + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("major task [%s] splitted", taskItem.ID)) + return nil +} + +func (p *processor) handleRunningMajorTask(taskItem *task.Task) error { + ts, err := p.getMajorTaskState(taskItem) + if err != nil { + return err + } + if !(ts.Status == task.StatusComplete || ts.Status == task.StatusError) { + return nil + } + + totalDocs := migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "source_total_docs") + var errMsg string + if ts.Status == task.StatusError { + errMsg = "index migration(s) failed" + } + + if errMsg == "" { + if totalDocs != ts.IndexDocs { + errMsg = fmt.Sprintf("cluster migration completed but docs count unmatch: %d / %d", ts.IndexDocs, totalDocs) + } + } + + if errMsg == "" { + taskItem.Status = task.StatusComplete + } else { + taskItem.Status = task.StatusError + } + taskItem.Metadata.Labels["target_total_docs"] = ts.IndexDocs + tn := time.Now() + taskItem.CompletedTime = &tn + p.sendMajorTaskNotification(taskItem) + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: errMsg == "", + Error: errMsg, + }, fmt.Sprintf("major task [%s] finished with status [%s]", taskItem.ID, taskItem.Status)) + return nil +} + +func (p *processor) getMajorTaskState(majorTask *task.Task) (taskState migration_model.MajorTaskState, err error) { + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "total_docs": util.MapStr{ + "sum": util.MapStr{ + "field": "metadata.labels.index_docs", + }, + }, + "grp": util.MapStr{ + "terms": util.MapStr{ + "field": "status", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTask.ID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + }, + } + esClient := elastic.GetClient(p.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL(p.IndexName, util.MustToJSONBytes(query)) + if err != nil { + log.Errorf("search es failed, err: %v", err) + return taskState, nil + } + if v, err := util.ExtractInt(res.Aggregations["total_docs"].Value); err == nil { + taskState.IndexDocs = v + } + var ( + hasError bool + ) + for _, bk := range res.Aggregations["grp"].Buckets { + status, _ := util.ExtractString(bk["key"]) + if migration_util.IsRunningState(status) { + taskState.Status = task.StatusRunning + return taskState, nil + } + if status == task.StatusError { + hasError = true + } + } + if hasError { + taskState.Status = task.StatusError + } else { + taskState.Status = task.StatusComplete + } + return taskState, nil +} + +func (p *processor) handlePendingStopMajorTask(taskItem *task.Task) error { + err := migration_util.UpdatePendingChildTasksToPendingStop(taskItem, "index_migration") + if err != nil { + log.Errorf("failed to update sub task status, err: %v", err) + return nil + } + + tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "index_migration") + if err != nil { + log.Errorf("failed to get sub tasks, err: %v", err) + return nil + } + + // all subtask stopped or error or complete + if len(tasks) == 0 { + taskItem.Status = task.StatusStopped + p.sendMajorTaskNotification(taskItem) + p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) + // NOTE: we don't know how many running index_migration's stopped, so do a refresh from ES + p.scheduler.RefreshInstanceJobsFromES() + } + return nil +} + +func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) { + esClient := elastic.GetClient(p.Elasticsearch) + _, err := esClient.Index(p.IndexName, "", taskItem.ID, taskItem, "") + if err != nil { + log.Errorf("failed to update task, err: %v", err) + } + if message != "" { + migration_util.WriteLog(taskItem, taskResult, message) + } +} + +func (p *processor) sendMajorTaskNotification(taskItem *task.Task) { + config := migration_model.ClusterMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &config) + if err != nil { + log.Errorf("failed to parse config info from major task, id: %s, err: %v", taskItem.ID, err) + return + } + + creatorID := config.Creator.Id + + var title, body string + body = fmt.Sprintf("From Cluster: [%s (%s)], To Cluster: [%s (%s)]", config.Cluster.Source.Id, config.Cluster.Source.Name, config.Cluster.Target.Id, config.Cluster.Target.Name) + link := fmt.Sprintf("/#/data_tools/migration/%s/detail", taskItem.ID) + switch taskItem.Status { + case task.StatusReady: + log.Debugf("skip sending notification for ready task, id: %s", taskItem.ID) + return + case task.StatusStopped: + title = fmt.Sprintf("Data Migration Stopped") + case task.StatusComplete: + title = fmt.Sprintf("Data Migration Completed") + case task.StatusError: + title = fmt.Sprintf("Data Migration Failed") + case task.StatusRunning: + title = fmt.Sprintf("Data Migration Started") + default: + log.Warnf("skip sending notification for invalid task status, id: %s", taskItem.ID) + return + } + notification := &model.Notification{ + UserId: util.ToString(creatorID), + Type: model.NotificationTypeNotification, + MessageType: model.MessageTypeMigration, + Status: model.NotificationStatusNew, + Title: title, + Body: body, + Link: link, + } + err = orm.Create(nil, notification) + if err != nil { + log.Errorf("failed to create notification, err: %v", err) + return + } + return +} diff --git a/plugin/migration/index_migration/index_migration.go b/plugin/migration/index_migration/index_migration.go index ac8a9010..9a3aaeda 100644 --- a/plugin/migration/index_migration/index_migration.go +++ b/plugin/migration/index_migration/index_migration.go @@ -21,15 +21,13 @@ import ( type processor struct { Elasticsearch string IndexName string - LogIndexName string scheduler migration_model.Scheduler } -func NewProcessor(elasticsearch, indexName, logIndexName string, scheduler migration_model.Scheduler) migration_model.Processor { +func NewProcessor(elasticsearch, indexName string, scheduler migration_model.Scheduler) migration_model.Processor { return &processor{ Elasticsearch: elasticsearch, IndexName: indexName, - LogIndexName: logIndexName, scheduler: scheduler, } } diff --git a/plugin/migration/model/common.go b/plugin/migration/model/common.go new file mode 100644 index 00000000..b4ae0e97 --- /dev/null +++ b/plugin/migration/model/common.go @@ -0,0 +1,43 @@ +package model + +import "fmt" + +type ExecutionConfig struct { + TimeWindow []TimeWindowItem `json:"time_window"` + Nodes struct { + Permit []ExecutionNode `json:"permit"` + } `json:"nodes"` +} + +type ExecutionNode struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type TimeWindowItem struct { + Start string `json:"start"` + End string `json:"end"` +} + +type IndexPartition struct { + FieldType string `json:"field_type"` + FieldName string `json:"field_name"` + Step interface{} `json:"step"` +} + +type IndexInfo struct { + Name string `json:"name"` + DocType string `json:"doc_type"` + Docs int64 `json:"docs"` + StoreSizeInBytes int `json:"store_size_in_bytes"` +} + +func (ii *IndexInfo) GetUniqueIndexName() string { + return fmt.Sprintf("%s:%s", ii.Name, ii.DocType) +} + +type ClusterInfo struct { + Id string `json:"id"` + Name string `json:"name"` + Distribution string `json:"distribution,omitempty"` +} diff --git a/plugin/migration/model/model.go b/plugin/migration/model/model.go index 6d9b9085..53010327 100644 --- a/plugin/migration/model/model.go +++ b/plugin/migration/model/model.go @@ -5,8 +5,6 @@ package model import ( - "fmt" - "infini.sh/framework/core/util" ) @@ -15,7 +13,7 @@ type ClusterMigrationTaskConfig struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` } `json:"cluster"` - Indices []IndexConfig `json:"indices"` + Indices []ClusterMigrationIndexConfig `json:"indices"` Settings struct { ParallelIndices int `json:"parallel_indices"` ParallelTaskPerIndex int `json:"parallel_task_per_index"` @@ -42,57 +40,15 @@ type ClusterMigrationBulkConfig struct { Compress bool `json:"compress"` } -type ExecutionConfig struct { - TimeWindow []TimeWindowItem `json:"time_window"` - Nodes struct { - Permit []ExecutionNode `json:"permit"` - } `json:"nodes"` -} - -type ExecutionNode struct { - ID string `json:"id"` - Name string `json:"name"` -} - -type TimeWindowItem struct { - Start string `json:"start"` - End string `json:"end"` -} - -type IndexConfig struct { - Source IndexInfo `json:"source"` - Target IndexInfo `json:"target"` - RawFilter interface{} `json:"raw_filter"` - IndexRename map[string]interface{} `json:"index_rename"` - TypeRename map[string]interface{} `json:"type_rename"` - Partition *IndexPartition `json:"partition,omitempty"` - //TaskID string `json:"task_id,omitempty"` - //Status string `json:"status,omitempty"` - Percent float64 `json:"percent,omitempty"` - ErrorPartitions int `json:"error_partitions,omitempty"` -} - -type IndexPartition struct { - FieldType string `json:"field_type"` - FieldName string `json:"field_name"` - Step interface{} `json:"step"` -} - -type IndexInfo struct { - Name string `json:"name"` - DocType string `json:"doc_type"` - Docs int64 `json:"docs"` - StoreSizeInBytes int `json:"store_size_in_bytes"` -} - -func (ii *IndexInfo) GetUniqueIndexName() string { - return fmt.Sprintf("%s:%s", ii.Name, ii.DocType) -} - -type ClusterInfo struct { - Id string `json:"id"` - Name string `json:"name"` - Distribution string `json:"distribution,omitempty"` +type ClusterMigrationIndexConfig struct { + Source IndexInfo `json:"source"` + Target IndexInfo `json:"target"` + RawFilter interface{} `json:"raw_filter"` + IndexRename map[string]interface{} `json:"index_rename"` + TypeRename map[string]interface{} `json:"type_rename"` + Partition *IndexPartition `json:"partition,omitempty"` + Percent float64 `json:"percent,omitempty"` + ErrorPartitions int `json:"error_partitions,omitempty"` } type MajorTaskState struct { @@ -149,16 +105,3 @@ type IndexMigrationTargetConfig struct { ClusterId string `json:"cluster_id"` Bulk IndexMigrationBulkConfig `json:"bulk"` } - -type PipelineTaskLoggingConfig struct { - Enabled bool `json:"enabled"` -} - -type PipelineTaskConfig struct { - Name string `json:"name"` - Logging PipelineTaskLoggingConfig `json:"logging"` - Labels util.MapStr `json:"labels"` - AutoStart bool `json:"auto_start"` - KeepRunning bool `json:"keep_running"` - Processor []util.MapStr `json:"processor"` -} diff --git a/plugin/migration/model/pipeline.go b/plugin/migration/model/pipeline.go new file mode 100644 index 00000000..99660262 --- /dev/null +++ b/plugin/migration/model/pipeline.go @@ -0,0 +1,16 @@ +package model + +import "infini.sh/framework/core/util" + +type PipelineTaskLoggingConfig struct { + Enabled bool `json:"enabled"` +} + +type PipelineTaskConfig struct { + Name string `json:"name"` + Logging PipelineTaskLoggingConfig `json:"logging"` + Labels util.MapStr `json:"labels"` + AutoStart bool `json:"auto_start"` + KeepRunning bool `json:"keep_running"` + Processor []util.MapStr `json:"processor"` +} diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 5b132293..cc7fe756 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -11,7 +11,7 @@ import ( log "github.com/cihub/seelog" - "infini.sh/console/model" + "infini.sh/console/plugin/migration/cluster_migration" "infini.sh/console/plugin/migration/index_migration" migration_model "infini.sh/console/plugin/migration/model" "infini.sh/console/plugin/migration/pipeline_task" @@ -22,7 +22,6 @@ import ( "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" "infini.sh/framework/core/global" - "infini.sh/framework/core/orm" "infini.sh/framework/core/pipeline" task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" @@ -33,9 +32,10 @@ type DispatcherProcessor struct { id string config *DispatcherConfig - scheduler migration_model.Scheduler - pipelineTaskProcessor migration_model.Processor - indexMigrationTaskProcessor migration_model.Processor + scheduler migration_model.Scheduler + pipelineTaskProcessor migration_model.Processor + clusterMigrationTaskProcessor migration_model.Processor + indexMigrationTaskProcessor migration_model.Processor } type DispatcherConfig struct { @@ -93,7 +93,8 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro return nil, err } processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName) - processor.indexMigrationTaskProcessor = index_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName, processor.scheduler) + processor.indexMigrationTaskProcessor = index_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) + processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) return &processor, nil } @@ -106,7 +107,7 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { // handle pipeline task p.handleTasks(ctx, "pipeline", []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, p.pipelineTaskProcessor.Process) // mark index_migrations as pending_stop - p.handleTasks(ctx, "cluster_migration", []string{task2.StatusPendingStop}, p.handlePendingStopMajorTask) + p.handleTasks(ctx, "cluster_migration", []string{task2.StatusPendingStop}, p.clusterMigrationTaskProcessor.Process) // mark pipeline tasks as pending_stop p.handleTasks(ctx, "index_migration", []string{task2.StatusPendingStop}, p.indexMigrationTaskProcessor.Process) // check pipeline tasks status @@ -114,9 +115,9 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { // split & schedule pipline tasks p.handleTasks(ctx, "index_migration", []string{task2.StatusReady}, p.indexMigrationTaskProcessor.Process) // check index_migration tasks status - p.handleTasks(ctx, "cluster_migration", []string{task2.StatusRunning}, p.handleRunningMajorTask) + p.handleTasks(ctx, "cluster_migration", []string{task2.StatusRunning}, p.clusterMigrationTaskProcessor.Process) // split & schedule index_migration tasks - p.handleTasks(ctx, "cluster_migration", []string{task2.StatusReady}, p.handleReadyMajorTask) + p.handleTasks(ctx, "cluster_migration", []string{task2.StatusReady}, p.clusterMigrationTaskProcessor.Process) return nil } @@ -163,119 +164,6 @@ func (p *DispatcherProcessor) handleTask(taskItem *task2.Task, taskHandler func( return taskHandler(taskItem) } -func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { - if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { - return p.splitMajorMigrationTask(taskItem) - } - //update status of subtask to ready - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskItem.ID, - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusError, task2.StatusStopped}, - }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_migration", - }, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusReady), - }, - } - - esClient := elastic.GetClient(p.config.Elasticsearch) - _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Errorf("failed to update sub task status, err: %v", err) - return nil - } - taskItem.RetryTimes++ - taskItem.Status = task2.StatusRunning - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ - Success: true, - }, fmt.Sprintf("major task [%s] started", taskItem.ID)) - p.sendMajorTaskNotification(taskItem) - return nil -} - -func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error { - err := migration_util.UpdatePendingChildTasksToPendingStop(taskItem, "index_migration") - if err != nil { - log.Errorf("failed to update sub task status, err: %v", err) - return nil - } - - tasks, err := migration_util.GetPendingChildTasks(p.config.Elasticsearch, p.config.IndexName, taskItem.ID, "index_migration") - if err != nil { - log.Errorf("failed to get sub tasks, err: %v", err) - return nil - } - - // all subtask stopped or error or complete - if len(tasks) == 0 { - taskItem.Status = task2.StatusStopped - p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) - // NOTE: we don't know how many running index_migration's stopped, so do a refresh from ES - p.scheduler.RefreshInstanceJobsFromES() - } - return nil -} - -func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error { - ts, err := p.getMajorTaskState(taskItem) - if err != nil { - return err - } - if !(ts.Status == task2.StatusComplete || ts.Status == task2.StatusError) { - return nil - } - - totalDocs := migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "source_total_docs") - var errMsg string - if ts.Status == task2.StatusError { - errMsg = "index migration(s) failed" - } - - if errMsg == "" { - if totalDocs != ts.IndexDocs { - errMsg = fmt.Sprintf("cluster migration completed but docs count unmatch: %d / %d", ts.IndexDocs, totalDocs) - } - } - - if errMsg == "" { - taskItem.Status = task2.StatusComplete - } else { - taskItem.Status = task2.StatusError - } - taskItem.Metadata.Labels["target_total_docs"] = ts.IndexDocs - tn := time.Now() - taskItem.CompletedTime = &tn - p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ - Success: errMsg == "", - Error: errMsg, - }, fmt.Sprintf("major task [%s] finished with status [%s]", taskItem.ID, taskItem.Status)) - return nil -} - func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []string, size int) ([]task2.Task, error) { queryDsl := util.MapStr{ "size": size, @@ -326,313 +214,3 @@ func (p *DispatcherProcessor) refreshTask() { log.Errorf("failed to refresh state, err: %v", err) } } - -func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error { - clusterMigrationTask := migration_model.ClusterMigrationTaskConfig{} - err := migration_util.GetTaskConfig(taskItem, &clusterMigrationTask) - if err != nil { - log.Errorf("failed to get task config, err: %v", err) - return err - } - defer func() { - taskItem.ConfigString = util.MustToJSON(clusterMigrationTask) - }() - esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) - targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id) - - for _, index := range clusterMigrationTask.Indices { - source := migration_model.IndexMigrationSourceConfig{ - ClusterId: clusterMigrationTask.Cluster.Source.Id, - Indices: index.Source.Name, - SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize, - BatchSize: clusterMigrationTask.Settings.Scroll.Docs, - ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout, - } - if index.IndexRename != nil { - source.IndexRename = index.IndexRename - } - if index.Target.Name != "" { - source.IndexRename = util.MapStr{ - index.Source.Name: index.Target.Name, - } - } - if index.TypeRename != nil { - source.TypeRename = index.TypeRename - } - - if v, ok := index.RawFilter.(string); ok { - source.QueryString = v - } else { - var must []interface{} - if index.RawFilter != nil { - must = append(must, index.RawFilter) - } - if index.Source.DocType != "" { - if index.Target.DocType != "" { - source.TypeRename = util.MapStr{ - index.Source.DocType: index.Target.DocType, - } - } - must = append(must, util.MapStr{ - "terms": util.MapStr{ - "_type": []string{index.Source.DocType}, - }, - }) - } else { - if targetType != "" { - source.TypeRename = util.MapStr{ - "*": index.Target.DocType, - } - } - } - if len(must) > 0 { - source.QueryDSL = util.MapStr{ - "bool": util.MapStr{ - "must": must, - }, - } - } - } - var targetMust []interface{} - if index.RawFilter != nil { - targetMust = append(targetMust, index.RawFilter) - } - if index.Target.DocType != "" && targetType != "" { - targetMust = append(targetMust, util.MapStr{ - "terms": util.MapStr{ - "_type": []string{index.Target.DocType}, - }, - }) - } - - target := migration_model.IndexMigrationTargetConfig{ - ClusterId: clusterMigrationTask.Cluster.Target.Id, - Bulk: migration_model.IndexMigrationBulkConfig{ - BatchSizeInMB: clusterMigrationTask.Settings.Bulk.StoreSizeInMB, - BatchSizeInDocs: clusterMigrationTask.Settings.Bulk.Docs, - MaxWorkerSize: clusterMigrationTask.Settings.Bulk.MaxWorkerSize, - IdleTimeoutInSeconds: clusterMigrationTask.Settings.Bulk.IdleTimeoutInSeconds, - SliceSize: clusterMigrationTask.Settings.Bulk.SliceSize, - Compress: clusterMigrationTask.Settings.Bulk.Compress, - }, - } - - if index.Partition != nil { - partitionQ := &elastic.PartitionQuery{ - IndexName: index.Source.Name, - FieldName: index.Partition.FieldName, - FieldType: index.Partition.FieldType, - Step: index.Partition.Step, - } - if source.QueryDSL != nil { - partitionQ.Filter = source.QueryDSL - } - partitions, err := elastic.GetPartitions(partitionQ, esSourceClient) - if err != nil { - return err - } - if partitions == nil || len(partitions) == 0 { - return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter)) - } - var ( - partitionID int - ) - for _, partition := range partitions { - //skip empty partition - if partition.Docs <= 0 { - continue - } - partitionID++ - partitionSource := source - partitionSource.Start = partition.Start - partitionSource.End = partition.End - partitionSource.DocCount = partition.Docs - partitionSource.Step = index.Partition.Step - partitionSource.PartitionId = partitionID - partitionSource.QueryDSL = partition.Filter - partitionSource.QueryString = "" - - partitionMigrationTask := task2.Task{ - ParentId: []string{taskItem.ID}, - Cancellable: false, - Runnable: true, - Status: task2.StatusReady, - Metadata: task2.Metadata{ - Type: "index_migration", - Labels: util.MapStr{ - "business_id": "index_migration", - "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, - "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "index_name": index.Source.Name, - "unique_index_name": index.Source.GetUniqueIndexName(), - }, - }, - ConfigString: util.MustToJSON(migration_model.IndexMigrationTaskConfig{ - Source: partitionSource, - Target: target, - Execution: clusterMigrationTask.Settings.Execution, - Version: migration_model.IndexMigrationV1, - }), - } - partitionMigrationTask.ID = util.GetUUID() - err = orm.Create(nil, &partitionMigrationTask) - if err != nil { - return fmt.Errorf("store index migration task(partition) error: %w", err) - } - - } - } else { - source.DocCount = index.Source.Docs - - indexParameters := migration_model.IndexMigrationTaskConfig{ - Source: source, - Target: target, - Execution: clusterMigrationTask.Settings.Execution, - Version: migration_model.IndexMigrationV1, - } - indexMigrationTask := task2.Task{ - ParentId: []string{taskItem.ID}, - Cancellable: true, - Runnable: false, - Status: task2.StatusReady, - StartTimeInMillis: time.Now().UnixMilli(), - Metadata: task2.Metadata{ - Type: "index_migration", - Labels: util.MapStr{ - "business_id": "index_migration", - "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, - "target_cluster_id": clusterMigrationTask.Cluster.Target.Id, - "partition_count": 1, - "index_name": index.Source.Name, - "unique_index_name": index.Source.GetUniqueIndexName(), - }, - }, - ConfigString: util.MustToJSON(indexParameters), - } - - indexMigrationTask.ID = util.GetUUID() - - err = orm.Create(nil, &indexMigrationTask) - if err != nil { - return fmt.Errorf("store index migration task error: %w", err) - } - } - } - taskItem.Metadata.Labels["is_split"] = true - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ - Success: true, - }, fmt.Sprintf("major task [%s] splitted", taskItem.ID)) - return nil -} - -func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskState migration_model.MajorTaskState, err error) { - query := util.MapStr{ - "size": 0, - "aggs": util.MapStr{ - "total_docs": util.MapStr{ - "sum": util.MapStr{ - "field": "metadata.labels.index_docs", - }, - }, - "grp": util.MapStr{ - "terms": util.MapStr{ - "field": "status", - }, - }, - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": majorTask.ID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_migration", - }, - }, - }, - }, - }, - }, - } - esClient := elastic.GetClient(p.config.Elasticsearch) - res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) - if err != nil { - log.Errorf("search es failed, err: %v", err) - return taskState, nil - } - if v, err := util.ExtractInt(res.Aggregations["total_docs"].Value); err == nil { - taskState.IndexDocs = v - } - var ( - hasError bool - ) - for _, bk := range res.Aggregations["grp"].Buckets { - status, _ := util.ExtractString(bk["key"]) - if migration_util.IsRunningState(status) { - taskState.Status = task2.StatusRunning - return taskState, nil - } - if status == task2.StatusError { - hasError = true - } - } - if hasError { - taskState.Status = task2.StatusError - } else { - taskState.Status = task2.StatusComplete - } - return taskState, nil -} - -func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { - config := migration_model.ClusterMigrationTaskConfig{} - err := migration_util.GetTaskConfig(taskItem, &config) - if err != nil { - log.Errorf("failed to parse config info from major task, id: %s, err: %v", taskItem.ID, err) - return - } - - creatorID := config.Creator.Id - - var title, body string - body = fmt.Sprintf("From Cluster: [%s (%s)], To Cluster: [%s (%s)]", config.Cluster.Source.Id, config.Cluster.Source.Name, config.Cluster.Target.Id, config.Cluster.Target.Name) - link := fmt.Sprintf("/#/data_tools/migration/%s/detail", taskItem.ID) - switch taskItem.Status { - case task2.StatusReady: - log.Debugf("skip sending notification for ready task, id: %s", taskItem.ID) - return - case task2.StatusStopped: - title = fmt.Sprintf("Data Migration Stopped") - case task2.StatusComplete: - title = fmt.Sprintf("Data Migration Completed") - case task2.StatusError: - title = fmt.Sprintf("Data Migration Failed") - case task2.StatusRunning: - title = fmt.Sprintf("Data Migration Started") - default: - log.Warnf("skip sending notification for invalid task status, id: %s", taskItem.ID) - return - } - notification := &model.Notification{ - UserId: util.ToString(creatorID), - Type: model.NotificationTypeNotification, - MessageType: model.MessageTypeMigration, - Status: model.NotificationStatusNew, - Title: title, - Body: body, - Link: link, - } - err = orm.Create(nil, notification) - if err != nil { - log.Errorf("failed to create notification, err: %v", err) - return - } - return -}