diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 6a697a84..a999bc2b 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -17,7 +17,6 @@ import ( migration_util "infini.sh/console/plugin/migration/util" "infini.sh/framework/core/api" - "infini.sh/framework/core/api/rbac" "infini.sh/framework/core/api/rbac/enum" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" @@ -34,6 +33,8 @@ func InitAPI() { api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_pause", handler.RequirePermission(handler.pauseTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) @@ -44,6 +45,8 @@ func InitAPI() { api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info/:index", handler.RequirePermission(handler.getDataComparisonTaskOfIndex, enum.PermissionComparisonTaskRead)) api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionComparisonTaskWrite)) api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_pause", handler.RequirePermission(handler.pauseTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionComparisonTaskWrite)) api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo) @@ -57,63 +60,6 @@ type APIHandler struct { api.Handler } -func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{} - err := h.DecodeJSON(req, clusterTaskConfig) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if len(clusterTaskConfig.Indices) == 0 { - h.WriteError(w, "indices must not be empty", http.StatusInternalServerError) - return - } - user, err := rbac.FromUserContext(req.Context()) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if user != nil { - clusterTaskConfig.Creator.Name = user.Username - clusterTaskConfig.Creator.Id = user.UserId - } - - var totalDocs int64 - for _, index := range clusterTaskConfig.Indices { - totalDocs += index.Source.Docs - } - - srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id) - clusterTaskConfig.Cluster.Source.Distribution = srcClusterCfg.Distribution - dstClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Target.Id) - clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution - t := task2.Task{ - Metadata: task2.Metadata{ - Type: "cluster_migration", - Labels: util.MapStr{ - "business_id": "cluster_migration", - "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, - "target_cluster_id": clusterTaskConfig.Cluster.Target.Id, - "source_total_docs": totalDocs, - }, - }, - Cancellable: true, - Runnable: false, - Status: task2.StatusInit, - ConfigString: util.MustToJSON(clusterTaskConfig), - } - t.ID = util.GetUUID() - err = orm.Create(nil, &t) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - h.WriteCreatedOKJSON(w, t.ID) -} - func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( index = ps.MustGetParameter("index") @@ -175,6 +121,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R if percent > 100 { percent = 100 } + taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceDocs taskConfig.Indices[i].Target.Docs = count taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions @@ -206,6 +153,7 @@ type TaskInfoResponse struct { DataPartition int `json:"data_partition"` CompletedPartitions int `json:"completed_partitions"` Partitions []util.MapStr `json:"partitions"` + Repeating bool `json:"repeating"` } func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -219,12 +167,21 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt return } + taskConfig := &migration_model.ClusterMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&majorTask, taskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + taskInfo := &TaskInfoResponse{ TaskID: id, StartTime: majorTask.StartTimeInMillis, + Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels), } - subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName) + subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName) taskInfo.DataPartition = len(subTasks) if len(subTasks) == 0 { @@ -246,7 +203,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } } - startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks) + startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) var partitionTaskInfos []util.MapStr @@ -487,7 +444,7 @@ func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps h }, 200) } -func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexName string) (subTasks []task2.Task, runningPipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) { +func (h *APIHandler) getChildTaskInfosByIndex(id string, uniqueIndexName string) (subTasks []task2.Task, runningPipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) { queryDsl := util.MapStr{ "size": 9999, "sort": []util.MapStr{ @@ -503,7 +460,7 @@ func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexN { "term": util.MapStr{ "parent_id": util.MapStr{ - "value": taskItem.ID, + "value": id, }, }, }, @@ -518,10 +475,7 @@ func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexN }, }, } - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, result := orm.Search(task2.Task{}, q) + allTasks, err := migration_util.GetTasks(queryDsl) if err != nil { return } @@ -530,25 +484,16 @@ func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexN pipelineSubParentIDs = map[string]string{} parentIDPipelineTasks = map[string][]task2.Task{} - for _, row := range result.Result { - buf := util.MustToJSONBytes(row) - subTask := task2.Task{} - err = util.FromJSONBytes(buf, &subTask) - if err != nil { - log.Error(err) - continue - } - + for _, subTask := range allTasks { if subTask.Metadata.Type != "pipeline" { subTasks = append(subTasks, subTask) continue } - // TODO: use more robust logic if pl := len(subTask.ParentId); pl != 2 { continue } - parentID := subTask.ParentId[1] + parentID := migration_util.GetDirectParentId(subTask.ParentId) pipelineSubParentIDs[subTask.ID] = parentID instID := migration_util.GetMapStringValue(util.MapStr(subTask.Metadata.Labels), "execution_instance_id") @@ -590,7 +535,7 @@ func (h *APIHandler) getChildPipelineInfosFromGateway(pipelineTaskIDs map[string return } -func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task) (startTime int64, completedTime int64, duration int64, completedPartitions int) { +func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task, repeating bool) (startTime int64, completedTime int64, duration int64, completedPartitions int) { if len(subTasks) == 0 { return } @@ -615,7 +560,7 @@ func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task) (startTime int64, completedPartitions++ } } - if len(subTasks) != completedPartitions { + if len(subTasks) != completedPartitions || repeating { completedTime = 0 duration = time.Now().UnixMilli() - startTime } else { diff --git a/plugin/migration/cluster_comparison/cluster_comparison.go b/plugin/migration/cluster_comparison/cluster_comparison.go index e2f39eb3..bfeb1669 100644 --- a/plugin/migration/cluster_comparison/cluster_comparison.go +++ b/plugin/migration/cluster_comparison/cluster_comparison.go @@ -1,6 +1,7 @@ package cluster_comparison import ( + "context" "fmt" "time" @@ -76,13 +77,27 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id) esTargetClient := elastic.GetClient(clusterComparisonTask.Cluster.Target.Id) - err = migration_util.DeleteChildTasks(taskItem.ID) + err = migration_util.DeleteChildTasks(taskItem.ID, "index_comparison") if err != nil { log.Warnf("failed to clear child tasks, err: %v", err) return nil } - for _, index := range clusterComparisonTask.Indices { + current := migration_util.GetMapIntValue(taskItem.Metadata.Labels, "next_run_time") + var step time.Duration + if clusterComparisonTask.Settings.Execution.Repeat != nil { + step = clusterComparisonTask.Settings.Execution.Repeat.Interval + } + + var pids []string + pids = append(pids, taskItem.ParentId...) + pids = append(pids, taskItem.ID) + + var sourceTotalDocs int64 + var targetTotalDocs int64 + ctx := context.Background() + + for i, index := range clusterComparisonTask.Indices { sourceDump := migration_model.IndexComparisonDumpConfig{ ClusterId: clusterComparisonTask.Cluster.Source.Id, Indices: index.Source.Name, @@ -105,6 +120,13 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { if index.RawFilter != nil { must = append(must, index.RawFilter) } + if index.Incremental != nil { + incrementalFilter, err := index.Incremental.BuildFilter(current, step) + if err != nil { + return err + } + must = append(must, incrementalFilter) + } if len(must) > 0 { sourceDump.QueryDSL = util.MapStr{ "bool": util.MapStr{ @@ -119,9 +141,31 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { targetDump.Indices = index.Target.Name targetDump.DocCount = index.Target.Docs + var countQuery = util.MapStr{} + if sourceDump.QueryDSL != nil { + countQuery = util.MapStr{ + "query": sourceDump.QueryDSL, + } + } + sourceCount, err := esSourceClient.Count(ctx, index.Source.Name, util.MustToJSONBytes(countQuery)) + if err != nil { + return err + } + clusterComparisonTask.Indices[i].Source.Docs = sourceCount.Count + sourceTotalDocs += sourceCount.Count + sourceDump.DocCount = sourceCount.Count + + targetCount, err := esTargetClient.Count(ctx, index.Target.Name, util.MustToJSONBytes(countQuery)) + if err != nil { + return err + } + clusterComparisonTask.Indices[i].Target.Docs = targetCount.Count + targetTotalDocs += targetCount.Count + targetDump.DocCount = targetCount.Count + if index.Partition == nil { indexComparisonTask := task.Task{ - ParentId: []string{taskItem.ID}, + ParentId: pids, Cancellable: true, Runnable: false, Status: task.StatusReady, @@ -182,7 +226,7 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { partitions := elastic.MergePartitions(sourcePartitions, targetPartitions, index.Partition.FieldName, index.Partition.FieldType, targetPartitionQ.Filter) if len(partitions) == 0 { - return fmt.Errorf("empty partitions") + continue } var ( @@ -204,7 +248,7 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { partitionTargetDump.Indices = index.Target.Name partitionComparisonTask := task.Task{ - ParentId: []string{taskItem.ID}, + ParentId: pids, Cancellable: false, Runnable: true, Status: task.StatusReady, @@ -232,7 +276,10 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { } } + taskItem.ConfigString = util.MustToJSON(clusterComparisonTask) taskItem.Metadata.Labels["is_split"] = true + taskItem.Metadata.Labels["source_total_docs"] = sourceTotalDocs + taskItem.Metadata.Labels["target_total_docs"] = targetTotalDocs p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, fmt.Sprintf("major task [%s] splitted", taskItem.ID)) @@ -336,7 +383,7 @@ func (p *processor) handlePendingStopMajorTask(taskItem *task.Task) error { return nil } - tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "index_comparison") + tasks, err := migration_util.GetPendingChildTasks(taskItem.ID, "index_comparison") if err != nil { log.Errorf("failed to get sub tasks, err: %v", err) return nil diff --git a/plugin/migration/cluster_comparison/orm.go b/plugin/migration/cluster_comparison/orm.go new file mode 100644 index 00000000..e7267fa8 --- /dev/null +++ b/plugin/migration/cluster_comparison/orm.go @@ -0,0 +1,124 @@ +package cluster_comparison + +import ( + "errors" + "fmt" + + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/framework/core/api/rbac" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +func RepeatTask(oldTask *task.Task) (*task.Task, error) { + config := migration_model.ClusterComparisonTaskConfig{} + err := migration_util.GetTaskConfig(oldTask, &config) + if err != nil { + return nil, err + } + + t, err := buildTask(&config, &rbac.ShortUser{ + Username: config.Creator.Name, + UserId: config.Creator.Id, + }, true) + if err != nil { + return nil, err + } + t.ParentId = []string{oldTask.ID} + if len(oldTask.ParentId) > 0 { + t.ParentId = oldTask.ParentId + } + + migration_util.CopyRepeatState(oldTask.Metadata.Labels, t.Metadata.Labels) + err = migration_util.UpdateRepeatState(config.Settings.Execution.Repeat, t.Metadata.Labels) + if err != nil { + return nil, fmt.Errorf("repeat invalid: %v", err) + } + + err = orm.Create(nil, t) + if err != nil { + return nil, err + } + return t, nil +} + +func CreateTask(config *migration_model.ClusterComparisonTaskConfig, creator *rbac.ShortUser) (*task.Task, error) { + t, err := buildTask(config, creator, false) + if err != nil { + return nil, err + } + + err = migration_util.UpdateRepeatState(config.Settings.Execution.Repeat, t.Metadata.Labels) + if err != nil { + return nil, fmt.Errorf("repeat invalid: %v", err) + } + + err = orm.Create(nil, t) + if err != nil { + return nil, err + } + return t, nil +} + +func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rbac.ShortUser, repeat bool) (*task.Task, error) { + if len(config.Indices) == 0 { + return nil, errors.New("indices must not be empty") + } + if creator == nil { + return nil, errors.New("missing creator info") + } + config.Creator.Name = creator.Username + config.Creator.Id = creator.UserId + + srcClusterCfg := elastic.GetConfig(config.Cluster.Source.Id) + config.Cluster.Source.Distribution = srcClusterCfg.Distribution + dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id) + config.Cluster.Target.Distribution = dstClusterCfg.Distribution + + clearTaskConfig(config) + + var sourceTotalDocs int64 + var targetTotalDocs int64 + + for _, index := range config.Indices { + if index.Incremental != nil { + if repeat { + index.Incremental.Full = false + } else { + index.Incremental.Full = true + } + } + sourceTotalDocs += index.Source.Docs + targetTotalDocs += index.Target.Docs + } + + t := task.Task{ + Metadata: task.Metadata{ + Type: "cluster_comparison", + Labels: util.MapStr{ + "business_id": "cluster_comparison", + "source_cluster_id": config.Cluster.Source.Id, + "target_cluster_id": config.Cluster.Target.Id, + "source_total_docs": sourceTotalDocs, + "target_total_docs": targetTotalDocs, + }, + }, + Cancellable: true, + Runnable: false, + Status: task.StatusInit, + ConfigString: util.MustToJSON(config), + } + t.ID = util.GetUUID() + return &t, nil +} + +// sync with getDataComparisonTaskInfo +func clearTaskConfig(config *migration_model.ClusterComparisonTaskConfig) { + for i := range config.Indices { + config.Indices[i].ScrollPercent = 0 + config.Indices[i].ErrorPartitions = 0 + } +} diff --git a/plugin/migration/cluster_migration/cluster_migration.go b/plugin/migration/cluster_migration/cluster_migration.go index 95269115..3dec3b8f 100644 --- a/plugin/migration/cluster_migration/cluster_migration.go +++ b/plugin/migration/cluster_migration/cluster_migration.go @@ -1,6 +1,7 @@ package cluster_migration import ( + "context" "fmt" "time" @@ -78,21 +79,36 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id) - err = migration_util.DeleteChildTasks(taskItem.ID) + err = migration_util.DeleteChildTasks(taskItem.ID, "index_migration") if err != nil { log.Warnf("failed to clear child tasks, err: %v", err) return nil } - for _, index := range clusterMigrationTask.Indices { + current := migration_util.GetMapIntValue(taskItem.Metadata.Labels, "next_run_time") + var step time.Duration + if clusterMigrationTask.Settings.Execution.Repeat != nil { + step = clusterMigrationTask.Settings.Execution.Repeat.Interval + } + + var pids []string + pids = append(pids, taskItem.ParentId...) + pids = append(pids, taskItem.ID) + + var totalDocs int64 + ctx := context.Background() + + for i, index := range clusterMigrationTask.Indices { source := migration_model.IndexMigrationSourceConfig{ ClusterId: clusterMigrationTask.Cluster.Source.Id, Indices: index.Source.Name, + DocCount: index.Source.Docs, SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize, BatchSize: clusterMigrationTask.Settings.Scroll.Docs, ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout, SkipCountCheck: clusterMigrationTask.Settings.SkipScrollCountCheck, } + if index.IndexRename != nil { source.IndexRename = index.IndexRename } @@ -112,6 +128,13 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { if index.RawFilter != nil { must = append(must, index.RawFilter) } + if index.Incremental != nil { + incrementalFilter, err := index.Incremental.BuildFilter(current, step) + if err != nil { + return err + } + must = append(must, incrementalFilter) + } if index.Source.DocType != "" { if index.Target.DocType != "" { source.TypeRename = util.MapStr{ @@ -139,6 +162,25 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { } } + // NOTE: for repeating tasks, frontend can't get the accurate doc count + // update here before we split the task, if the index has incremental config and correct delay value, + // the doc count will not change and will be stable + var countQuery = util.MapStr{} + if source.QueryDSL != nil { + countQuery = util.MapStr{ + "query": source.QueryDSL, + } + } + sourceCount, err := esSourceClient.Count(ctx, index.Source.Name, util.MustToJSONBytes(countQuery)) + if err != nil { + log.Errorf("failed to count docs, err: %v", err) + return err + } + + clusterMigrationTask.Indices[i].Source.Docs = sourceCount.Count + source.DocCount = sourceCount.Count + totalDocs += sourceCount.Count + target := migration_model.IndexMigrationTargetConfig{ ClusterId: clusterMigrationTask.Cluster.Target.Id, SkipCountCheck: clusterMigrationTask.Settings.SkipBulkCountCheck, @@ -166,8 +208,8 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { if err != nil { return err } - if partitions == nil || len(partitions) == 0 { - return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter)) + if len(partitions) == 0 { + continue } var ( partitionID int @@ -188,7 +230,7 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { partitionSource.QueryString = "" partitionMigrationTask := task.Task{ - ParentId: []string{taskItem.ID}, + ParentId: pids, Cancellable: false, Runnable: true, Status: task.StatusReady, @@ -217,10 +259,8 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { } } else { - source.DocCount = index.Source.Docs - indexMigrationTask := task.Task{ - ParentId: []string{taskItem.ID}, + ParentId: pids, Cancellable: true, Runnable: false, Status: task.StatusReady, @@ -251,7 +291,9 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { } } } + taskItem.ConfigString = util.MustToJSON(clusterMigrationTask) taskItem.Metadata.Labels["is_split"] = true + taskItem.Metadata.Labels["source_total_docs"] = totalDocs p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, fmt.Sprintf("major task [%s] splitted", taskItem.ID)) @@ -368,7 +410,7 @@ func (p *processor) handlePendingStopMajorTask(taskItem *task.Task) error { return nil } - tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "index_migration") + tasks, err := migration_util.GetPendingChildTasks(taskItem.ID, "index_migration") if err != nil { log.Errorf("failed to get sub tasks, err: %v", err) return nil diff --git a/plugin/migration/cluster_migration/orm.go b/plugin/migration/cluster_migration/orm.go new file mode 100644 index 00000000..2616c6fa --- /dev/null +++ b/plugin/migration/cluster_migration/orm.go @@ -0,0 +1,122 @@ +package cluster_migration + +import ( + "errors" + "fmt" + + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/framework/core/api/rbac" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +func RepeatTask(oldTask *task.Task) (*task.Task, error) { + config := migration_model.ClusterMigrationTaskConfig{} + err := migration_util.GetTaskConfig(oldTask, &config) + if err != nil { + return nil, err + } + + t, err := buildTask(&config, &rbac.ShortUser{ + Username: config.Creator.Name, + UserId: config.Creator.Id, + }, true) + if err != nil { + return nil, err + } + t.ParentId = []string{oldTask.ID} + if len(oldTask.ParentId) > 0 { + t.ParentId = oldTask.ParentId + } + + migration_util.CopyRepeatState(oldTask.Metadata.Labels, t.Metadata.Labels) + err = migration_util.UpdateRepeatState(config.Settings.Execution.Repeat, t.Metadata.Labels) + if err != nil { + return nil, fmt.Errorf("repeat invalid: %v", err) + } + + err = orm.Create(nil, t) + if err != nil { + return nil, err + } + return t, nil +} + +func CreateTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac.ShortUser) (*task.Task, error) { + t, err := buildTask(config, creator, false) + if err != nil { + return nil, err + } + + err = migration_util.UpdateRepeatState(config.Settings.Execution.Repeat, t.Metadata.Labels) + if err != nil { + return nil, fmt.Errorf("repeat invalid: %v", err) + } + + err = orm.Create(nil, t) + if err != nil { + return nil, err + } + return t, nil +} + +func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac.ShortUser, repeat bool) (*task.Task, error) { + if len(config.Indices) == 0 { + return nil, errors.New("indices must not be empty") + } + if creator == nil { + return nil, errors.New("missing creator info") + } + + config.Creator.Name = creator.Username + config.Creator.Id = creator.UserId + + srcClusterCfg := elastic.GetConfig(config.Cluster.Source.Id) + config.Cluster.Source.Distribution = srcClusterCfg.Distribution + dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id) + config.Cluster.Target.Distribution = dstClusterCfg.Distribution + + clearTaskConfig(config) + + var totalDocs int64 + for _, index := range config.Indices { + if index.Incremental != nil { + if repeat { + index.Incremental.Full = false + } else { + index.Incremental.Full = true + } + } + totalDocs += index.Source.Docs + } + + t := task.Task{ + Metadata: task.Metadata{ + Type: "cluster_migration", + Labels: util.MapStr{ + "business_id": "cluster_migration", + "source_cluster_id": config.Cluster.Source.Id, + "target_cluster_id": config.Cluster.Target.Id, + "source_total_docs": totalDocs, + }, + }, + Cancellable: true, + Runnable: false, + Status: task.StatusInit, + ConfigString: util.MustToJSON(config), + } + t.ID = util.GetUUID() + return &t, nil +} + +// sync with getDataMigrationTaskInfo +func clearTaskConfig(config *migration_model.ClusterMigrationTaskConfig) { + for i := range config.Indices { + config.Indices[i].Target.Docs = 0 + config.Indices[i].Percent = 0 + config.Indices[i].ErrorPartitions = 0 + } +} diff --git a/plugin/migration/common_api.go b/plugin/migration/common_api.go index f46caf41..561dd89d 100644 --- a/plugin/migration/common_api.go +++ b/plugin/migration/common_api.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strconv" + "time" log "github.com/cihub/seelog" @@ -64,6 +65,11 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req "query": util.MapStr{ "bool": util.MapStr{ "must": mustQ, + "must_not": util.MapStr{ + "exists": util.MapStr{ + "field": "parent_id", + }, + }, }, }, } @@ -110,6 +116,14 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { return } sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) + sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs) + sourceM.Put("metadata.labels.error_partitions", ts.ErrorPartitions) + count, err := migration_util.CountRunningChildren(taskID, "index_migration") + if err != nil { + log.Warnf("failed to count running children, err: %v", err) + return + } + sourceM.Put("running_children", count) case "cluster_comparison": ts, _, err := h.getComparisonMajorTaskInfo(taskID) if err != nil { @@ -117,8 +131,23 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { return } sourceM.Put("metadata.labels.source_scroll_docs", ts.SourceScrollDocs) + sourceM.Put("metadata.labels.source_total_docs", ts.SourceTotalDocs) + sourceM.Put("metadata.labels.target_total_docs", ts.TargetTotalDocs) sourceM.Put("metadata.labels.target_scroll_docs", ts.TargetScrollDocs) + sourceM.Put("metadata.labels.total_diff_docs", ts.TotalDiffDocs) + count, err := migration_util.CountRunningChildren(taskID, "index_comparison") + if err != nil { + log.Warnf("failed to count running children, err: %v", err) + return + } + sourceM.Put("running_children", count) } + _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) + if err != nil { + log.Warnf("failed to calc repeat info, err: %v", err) + return + } + sourceM.Put("repeat", repeatStatus) } func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -136,6 +165,13 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http return } obj.Status = task.StatusReady + if _, ok := obj.Metadata.Labels["next_run_time"]; !ok { + startTime := time.Now().UnixMilli() + // only set for initial cluster-level tasks + if len(obj.ParentId) == 0 { + obj.Metadata.Labels["next_run_time"] = startTime + } + } err = orm.Update(nil, &obj) if err != nil { @@ -172,41 +208,6 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http }, 200) } -func (h *APIHandler) stopTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - obj := task.Task{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - if task.IsEnded(obj.Status) { - h.WriteJSON(w, util.MapStr{ - "success": true, - }, 200) - return - } - obj.Status = task.StatusPendingStop - err = orm.Update(nil, &obj) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - migration_util.WriteLog(&obj, &task.TaskResult{ - Success: true, - }, "task status manually set to pending stop") - - h.WriteJSON(w, util.MapStr{ - "success": true, - }, 200) -} - // delete task and all sub tasks func (h *APIHandler) deleteTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") @@ -266,3 +267,172 @@ func (h *APIHandler) deleteTask(w http.ResponseWriter, req *http.Request, ps htt "result": "deleted", }, 200) } + +// resume an repeating task +func (h *APIHandler) resumeTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + taskID := ps.MustGetParameter("task_id") + obj := task.Task{} + + obj.ID = taskID + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) + return + } + if len(obj.ParentId) > 0 { + h.WriteError(w, fmt.Sprintf("can't resume on a child task", taskID), http.StatusInternalServerError) + return + } + lastRepeatingChild, repeatStatus, err := h.calcRepeatingStatus(&obj) + if err != nil { + h.WriteError(w, fmt.Sprintf("failed to get repeating status", taskID), http.StatusInternalServerError) + return + } + if !repeatStatus.IsRepeat { + h.WriteError(w, fmt.Sprintf("not a repeating task", taskID), http.StatusInternalServerError) + return + } + if repeatStatus.Done { + h.WriteError(w, fmt.Sprintf("repeat task done", taskID), http.StatusInternalServerError) + return + } + if repeatStatus.Repeating { + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) + return + } + + lastRepeatingChild.Metadata.Labels["repeat_triggered"] = false + err = orm.Update(nil, lastRepeatingChild) + if err != nil { + log.Errorf("failed to update last child, err: %v", err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + migration_util.WriteLog(&obj, nil, fmt.Sprintf("repeating task [%s] manually resumed", obj.Metadata.Type, taskID)) + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) + return +} + +type RepeatStatus struct { + IsRepeat bool `json:"is_repeat"` + Done bool `json:"done"` + Repeating bool `json:"repeating"` +} + +func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) { + ret := &RepeatStatus{} + lastRepeatingChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type) + if err != nil { + return nil, nil, err + } + if lastRepeatingChild == nil { + lastRepeatingChild = taskItem + } + + isRepeat := migration_util.GetMapBoolValue(lastRepeatingChild.Metadata.Labels, "is_repeat") + if !isRepeat { + return lastRepeatingChild, ret, nil + } + ret.IsRepeat = isRepeat + + repeatDone := migration_util.GetMapBoolValue(lastRepeatingChild.Metadata.Labels, "repeat_done") + if repeatDone { + ret.Done = true + return lastRepeatingChild, ret, nil + } + repeatTriggered := migration_util.GetMapBoolValue(lastRepeatingChild.Metadata.Labels, "repeat_triggered") + if !repeatTriggered { + ret.Repeating = true + } + return lastRepeatingChild, ret, nil +} + +func (h *APIHandler) stopTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + obj := task.Task{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + if task.IsEnded(obj.Status) { + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) + return + } + obj.Status = task.StatusPendingStop + err = orm.Update(nil, &obj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + migration_util.WriteLog(&obj, &task.TaskResult{ + Success: true, + }, "task status manually set to pending stop") + + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) +} + +// pause an repeating task +func (h *APIHandler) pauseTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + taskID := ps.MustGetParameter("task_id") + obj := task.Task{} + + obj.ID = taskID + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) + return + } + if len(obj.ParentId) > 0 { + h.WriteError(w, fmt.Sprintf("can't pause on a child task", taskID), http.StatusInternalServerError) + return + } + lastRepeatingChild, repeatStatus, err := h.calcRepeatingStatus(&obj) + if err != nil { + h.WriteError(w, fmt.Sprintf("failed to get repeating status", taskID), http.StatusInternalServerError) + return + } + if !repeatStatus.IsRepeat { + h.WriteError(w, fmt.Sprintf("not a repeating task", taskID), http.StatusInternalServerError) + return + } + if repeatStatus.Done { + h.WriteError(w, fmt.Sprintf("repeat task done", taskID), http.StatusInternalServerError) + return + } + if !repeatStatus.Repeating { + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) + return + } + + lastRepeatingChild.Metadata.Labels["repeat_triggered"] = true + err = orm.Update(nil, lastRepeatingChild) + if err != nil { + log.Errorf("failed to update last child, err: %v", err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + migration_util.WriteLog(&obj, nil, fmt.Sprintf("repeating task [%s] manually paused", taskID)) + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) + return +} diff --git a/plugin/migration/comparison_api.go b/plugin/migration/comparison_api.go index c60d8d74..231569ca 100644 --- a/plugin/migration/comparison_api.go +++ b/plugin/migration/comparison_api.go @@ -7,12 +7,12 @@ import ( log "github.com/cihub/seelog" + "infini.sh/console/plugin/migration/cluster_comparison" migration_model "infini.sh/console/plugin/migration/model" migration_util "infini.sh/console/plugin/migration/util" "infini.sh/framework/core/api/rbac" httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/task" @@ -27,50 +27,13 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - if len(clusterTaskConfig.Indices) == 0 { - h.WriteError(w, "indices must not be empty", http.StatusInternalServerError) - return - } user, err := rbac.FromUserContext(req.Context()) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - if user != nil { - clusterTaskConfig.Creator.Name = user.Username - clusterTaskConfig.Creator.Id = user.UserId - } - - var sourceTotalDocs int64 - var targetTotalDocs int64 - for _, index := range clusterTaskConfig.Indices { - sourceTotalDocs += index.Source.Docs - targetTotalDocs += index.Target.Docs - } - - srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id) - clusterTaskConfig.Cluster.Source.Distribution = srcClusterCfg.Distribution - dstClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Target.Id) - clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution - t := task.Task{ - Metadata: task.Metadata{ - Type: "cluster_comparison", - Labels: util.MapStr{ - "business_id": "cluster_comparison", - "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, - "target_cluster_id": clusterTaskConfig.Cluster.Target.Id, - "source_total_docs": sourceTotalDocs, - "target_total_docs": targetTotalDocs, - }, - }, - Cancellable: true, - Runnable: false, - Status: task.StatusInit, - ConfigString: util.MustToJSON(clusterTaskConfig), - } - t.ID = util.GetUUID() - err = orm.Create(nil, &t) + t, err := cluster_comparison.CreateTask(clusterTaskConfig, user) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -116,6 +79,8 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. if percent > 100 { percent = 100 } + taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs + taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions if count == index.Source.Docs+index.Target.Docs { @@ -137,14 +102,27 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. h.WriteJSON(w, obj, http.StatusOK) } +type ClusterComparisonTaskState struct { + SourceTotalDocs int64 + SourceScrollDocs int64 + TargetTotalDocs int64 + TargetScrollDocs int64 + TotalDiffDocs int64 +} + type ComparisonIndexStateInfo struct { ErrorPartitions int + SourceTotalDocs int64 SourceScrollDocs int64 + TargetTotalDocs int64 TargetScrollDocs int64 + TotalDiffDocs int64 } // TODO: calc realtime info from instance -func (h *APIHandler) getComparisonMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) { +func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) { + indexState = map[string]ComparisonIndexStateInfo{} + taskQuery := util.MapStr{ "size": 500, "query": util.MapStr{ @@ -153,89 +131,62 @@ func (h *APIHandler) getComparisonMajorTaskInfo(majorTaskID string) (taskStats m { "term": util.MapStr{ "parent_id": util.MapStr{ - "value": majorTaskID, + "value": taskID, }, }, }, { - "bool": util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "dump_hash", - }, - }, - }, - { - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_comparison", - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task.StatusComplete, task.StatusError}, - }, - }, - }, - }, - }, + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_comparison", }, }, }, + { + "terms": util.MapStr{ + "status": []string{task.StatusComplete, task.StatusError}, + }, + }, }, }, }, } - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(taskQuery), - } - err, result := orm.Search(task.Task{}, q) + subTasks, err := migration_util.GetTasks(taskQuery) if err != nil { return taskStats, indexState, err } - var pipelineIndexNames = map[string]string{} - indexState = map[string]ComparisonIndexStateInfo{} - for _, row := range result.Result { - buf := util.MustToJSONBytes(row) - subTask := task.Task{} - err := util.FromJSONBytes(buf, &subTask) - if err != nil { - log.Errorf("failed to unmarshal task, err: %v", err) - continue - } - if subTask.Metadata.Labels == nil { - continue - } + for _, subTask := range subTasks { taskLabels := util.MapStr(subTask.Metadata.Labels) indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") if indexName == "" { continue } - // add indexDocs of already complete/error - if subTask.Metadata.Type == "index_comparison" { - sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") - targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") - taskStats.SourceScrollDocs += sourceDocs - taskStats.TargetScrollDocs += targetDocs - st := indexState[indexName] - st.SourceScrollDocs += sourceDocs - st.TargetScrollDocs += targetDocs - if subTask.Status == task.StatusError { - st.ErrorPartitions += 1 - } - indexState[indexName] = st + cfg := migration_model.IndexComparisonTaskConfig{} + err = migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) continue } - pipelineIndexNames[subTask.ID] = indexName + sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") + targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") + totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") + taskStats.SourceTotalDocs += cfg.Source.DocCount + taskStats.SourceScrollDocs += sourceDocs + taskStats.TargetTotalDocs += cfg.Target.DocCount + taskStats.TargetScrollDocs += targetDocs + taskStats.TotalDiffDocs += totalDiffDocs + st := indexState[indexName] + st.SourceTotalDocs += cfg.Source.DocCount + st.SourceScrollDocs += sourceDocs + st.TargetTotalDocs += cfg.Target.DocCount + st.TargetScrollDocs += targetDocs + st.TotalDiffDocs += totalDiffDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + } + indexState[indexName] = st } return taskStats, indexState, nil @@ -252,12 +203,21 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht return } + taskConfig := &migration_model.ClusterComparisonTaskConfig{} + err = migration_util.GetTaskConfig(&majorTask, taskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + taskInfo := &TaskInfoResponse{ TaskID: id, StartTime: majorTask.StartTimeInMillis, + Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels), } - subTasks, pipelineTaskIDs, _, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName) + subTasks, pipelineTaskIDs, _, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName) taskInfo.DataPartition = len(subTasks) if len(subTasks) == 0 { @@ -266,7 +226,7 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht } pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) - startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks) + startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) var partitionTaskInfos []util.MapStr diff --git a/plugin/migration/index_comparison/index_comparison.go b/plugin/migration/index_comparison/index_comparison.go index 1875e32b..2d838fb8 100644 --- a/plugin/migration/index_comparison/index_comparison.go +++ b/plugin/migration/index_comparison/index_comparison.go @@ -63,7 +63,7 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) } - err = migration_util.DeleteChildTasks(taskItem.ID) + err = migration_util.DeleteChildTasks(taskItem.ID, "pipeline") if err != nil { log.Warnf("failed to clear child tasks, err: %v", err) return nil @@ -344,6 +344,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { taskItem.Metadata.Labels["target_scrolled"] = targetDocs if sourceDocs != targetDocs { now := time.Now() + taskItem.Metadata.Labels["total_diff_docs"] = sourceDocs taskItem.CompletedTime = &now taskItem.Status = task.StatusError p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ @@ -372,6 +373,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { ) if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 { now := time.Now() + taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth taskItem.CompletedTime = &now taskItem.Status = task.StatusError p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ @@ -416,7 +418,7 @@ func (p *processor) handlePendingStopSubTask(taskItem *task.Task) error { return nil } - tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline") + tasks, err := migration_util.GetPendingChildTasks(taskItem.ID, "pipeline") if err != nil { log.Errorf("failed to get sub tasks, err: %v", err) return nil @@ -431,7 +433,7 @@ func (p *processor) handlePendingStopSubTask(taskItem *task.Task) error { } func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.IndexComparisonTaskConfig) (sourceDumpTask *task.Task, targetDumpTask *task.Task, diffTask *task.Task, err error) { - ptasks, err := migration_util.GetChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline", nil) + ptasks, err := migration_util.GetChildTasks(taskItem.ID, "pipeline", nil) if err != nil { log.Errorf("failed to get pipeline tasks, err: %v", err) return diff --git a/plugin/migration/index_migration/index_migration.go b/plugin/migration/index_migration/index_migration.go index 1783e277..21a1e859 100644 --- a/plugin/migration/index_migration/index_migration.go +++ b/plugin/migration/index_migration/index_migration.go @@ -53,7 +53,7 @@ func (p *processor) handlePendingStopSubTask(taskItem *task.Task) error { return nil } - tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline") + tasks, err := migration_util.GetPendingChildTasks(taskItem.ID, "pipeline") if err != nil { log.Errorf("failed to get sub tasks, err: %v", err) return nil @@ -93,7 +93,7 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) } - err = migration_util.DeleteChildTasks(taskItem.ID) + err = migration_util.DeleteChildTasks(taskItem.ID, "pipeline") if err != nil { log.Warnf("failed to clear child tasks, err: %v", err) return nil @@ -474,7 +474,7 @@ func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, cfg *migrat } func (p *processor) getExecutionConfigFromMajorTask(taskItem *task.Task) (config migration_model.ExecutionConfig, err error) { - majorTaskID := taskItem.ParentId[0] + majorTaskID := migration_util.GetDirectParentId(taskItem.ParentId) majorTask := task.Task{} majorTask.ID = majorTaskID _, err = orm.Get(&majorTask) @@ -493,7 +493,7 @@ func (p *processor) getExecutionConfigFromMajorTask(taskItem *task.Task) (config } func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask *task.Task, bulkTask *task.Task, err error) { - ptasks, err := migration_util.GetChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline", nil) + ptasks, err := migration_util.GetChildTasks(taskItem.ID, "pipeline", nil) if err != nil { log.Errorf("failed to get pipeline tasks, err: %v", err) return diff --git a/plugin/migration/migration_api.go b/plugin/migration/migration_api.go index 9698c4c0..146c98c6 100644 --- a/plugin/migration/migration_api.go +++ b/plugin/migration/migration_api.go @@ -1,19 +1,48 @@ package migration import ( + "net/http" + log "github.com/cihub/seelog" + "infini.sh/console/plugin/migration/cluster_migration" migration_model "infini.sh/console/plugin/migration/model" migration_util "infini.sh/console/plugin/migration/util" - "infini.sh/framework/core/orm" + "infini.sh/framework/core/api/rbac" + httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/task" "infini.sh/framework/core/util" ) +func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{} + err := h.DecodeJSON(req, clusterTaskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + user, err := rbac.FromUserContext(req.Context()) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + t, err := cluster_migration.CreateTask(clusterTaskConfig, user) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteCreatedOKJSON(w, t.ID) +} + type MigrationIndexStateInfo struct { ErrorPartitions int IndexDocs int64 + SourceDocs int64 } /* @@ -22,7 +51,7 @@ We count data from two sources: - plus index_migration.index_docs with realtime bulk indexing info - realtime bulk indexing info is only available for running index_migrations */ -func (h *APIHandler) getMigrationMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) { +func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) { taskQuery := util.MapStr{ "size": 500, "query": util.MapStr{ @@ -31,39 +60,14 @@ func (h *APIHandler) getMigrationMajorTaskInfo(majorTaskID string) (taskStats mi { "term": util.MapStr{ "parent_id": util.MapStr{ - "value": majorTaskID, + "value": id, }, }, }, { - "bool": util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "bulk_indexing", - }, - }, - }, - { - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task.StatusComplete, task.StatusError}, - }, - }, - }, - }, - }, + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_migration", }, }, }, @@ -71,46 +75,80 @@ func (h *APIHandler) getMigrationMajorTaskInfo(majorTaskID string) (taskStats mi }, }, } - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(taskQuery), - } - err, result := orm.Search(task.Task{}, q) + subTasks, err := migration_util.GetTasks(taskQuery) if err != nil { return taskStats, indexState, err } - var pipelineTaskIDs = map[string][]string{} - var pipelineIndexNames = map[string]string{} + var indexMigrationTaskIDs []string indexState = map[string]MigrationIndexStateInfo{} - for _, row := range result.Result { - buf := util.MustToJSONBytes(row) - subTask := task.Task{} - err := util.FromJSONBytes(buf, &subTask) - if err != nil { - log.Errorf("failed to unmarshal task, err: %v", err) - continue - } - if subTask.Metadata.Labels == nil { - continue - } + for _, subTask := range subTasks { taskLabels := util.MapStr(subTask.Metadata.Labels) indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") if indexName == "" { continue } - // add indexDocs of already complete/error - if subTask.Metadata.Type == "index_migration" { - indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") - taskStats.IndexDocs += indexDocs - st := indexState[indexName] - st.IndexDocs += indexDocs - if subTask.Status == task.StatusError { - st.ErrorPartitions += 1 - } - indexState[indexName] = st + if subTask.Status == task.StatusRunning { + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) continue } + + cfg := migration_model.IndexMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") + taskStats.IndexDocs += indexDocs + taskStats.SourceDocs += cfg.Source.DocCount + st := indexState[indexName] + st.IndexDocs += indexDocs + st.SourceDocs += cfg.Source.DocCount + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + taskStats.ErrorPartitions += 1 + } + indexState[indexName] = st + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + } + + taskQuery = util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "parent_id": indexMigrationTaskIDs, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "bulk_indexing", + }, + }, + }, + }, + }, + }, + } + subTasks, err = migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } + + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + pipelineIndexNames[subTask.ID] = indexName if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { diff --git a/plugin/migration/model/common.go b/plugin/migration/model/common.go index e6d47180..b5d2a517 100644 --- a/plugin/migration/model/common.go +++ b/plugin/migration/model/common.go @@ -1,9 +1,15 @@ package model -import "fmt" +import ( + "fmt" + "time" + + "infini.sh/framework/core/util" +) type ExecutionConfig struct { TimeWindow []TimeWindowItem `json:"time_window"` + Repeat *Repeat `json:"repeat"` Nodes struct { Permit []ExecutionNode `json:"permit"` } `json:"nodes"` @@ -14,6 +20,12 @@ type ExecutionNode struct { Name string `json:"name"` } +type Repeat struct { + NextRunTime *time.Time `json:"next_run_time"` + Interval time.Duration `json:"interval"` + TotalRun int64 `json:"total_run"` +} + type TimeWindowItem struct { Start string `json:"start"` End string `json:"end"` @@ -25,6 +37,14 @@ type IndexPartition struct { Step interface{} `json:"step"` } +type IndexIncremental struct { + FieldName string `json:"field_name"` + // Optional, data ingest delay + Delay time.Duration `json:"delay"` + // If full, run the data from -inf, else from current - step + Full bool `json:"full"` +} + type IndexInfo struct { Name string `json:"name"` DocType string `json:"doc_type"` @@ -42,3 +62,23 @@ type ClusterInfo struct { Name string `json:"name"` Distribution string `json:"distribution,omitempty"` } + +// BuildFilter generate a query filter, used by split task +func (incremental *IndexIncremental) BuildFilter(current int64, step time.Duration) (util.MapStr, error) { + if incremental == nil { + return util.MapStr{}, nil + } + + rv := util.MapStr{ + "lt": int64(current) - incremental.Delay.Milliseconds(), + "format": "epoch_millis", + } + if !incremental.Full { + rv["gte"] = int64(current) - step.Milliseconds() - incremental.Delay.Milliseconds() + } + return util.MapStr{ + "range": util.MapStr{ + incremental.FieldName: rv, + }, + }, nil +} diff --git a/plugin/migration/model/comparison.go b/plugin/migration/model/comparison.go index c595b5b6..ba9d046a 100644 --- a/plugin/migration/model/comparison.go +++ b/plugin/migration/model/comparison.go @@ -1,6 +1,8 @@ package model -import "infini.sh/framework/core/util" +import ( + "infini.sh/framework/core/util" +) type ClusterComparisonTaskConfig struct { Cluster struct { @@ -20,22 +22,17 @@ type ClusterComparisonTaskConfig struct { } type ClusterComparisonIndexConfig struct { - Source IndexInfo `json:"source"` - Target IndexInfo `json:"target"` - RawFilter interface{} `json:"raw_filter"` - Partition *IndexPartition `json:"partition,omitempty"` + Source IndexInfo `json:"source"` + Target IndexInfo `json:"target"` + RawFilter interface{} `json:"raw_filter"` + Incremental *IndexIncremental `json:"incremental"` + Partition *IndexPartition `json:"partition,omitempty"` // only used in API ScrollPercent float64 `json:"scroll_percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` } -type ClusterComparisonTaskState struct { - SourceScrollDocs int64 - TargetScrollDocs int64 - Status string -} - type IndexComparisonTaskConfig struct { Source IndexComparisonDumpConfig `json:"source"` Target IndexComparisonDumpConfig `json:"target"` diff --git a/plugin/migration/model/migration.go b/plugin/migration/model/migration.go index 399cf642..5e011112 100644 --- a/plugin/migration/model/migration.go +++ b/plugin/migration/model/migration.go @@ -33,6 +33,7 @@ type ClusterMigrationIndexConfig struct { RawFilter interface{} `json:"raw_filter"` IndexRename map[string]interface{} `json:"index_rename"` TypeRename map[string]interface{} `json:"type_rename"` + Incremental *IndexIncremental `json:"incremental"` Partition *IndexPartition `json:"partition,omitempty"` // only used in API @@ -41,8 +42,10 @@ type ClusterMigrationIndexConfig struct { } type ClusterMigrationTaskState struct { - IndexDocs int64 - Status string + IndexDocs int64 + SourceDocs int64 + ErrorPartitions int64 + Status string } const ( diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index f5185276..f2780809 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -110,6 +110,10 @@ func (p *DispatcherProcessor) Name() string { } func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { + // handle repeating tasks + p.handleRepeatingTasks(ctx, "cluster_comparison") + p.handleRepeatingTasks(ctx, "cluster_migration") + // handle pipeline task p.handleTasks(ctx, "pipeline", []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, p.pipelineTaskProcessor.Process) @@ -203,7 +207,7 @@ func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []st }, }, } - return migration_util.GetTasks(p.config.Elasticsearch, p.config.IndexName, queryDsl) + return migration_util.GetTasks(queryDsl) } func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) { diff --git a/plugin/migration/repeat.go b/plugin/migration/repeat.go new file mode 100644 index 00000000..420009e1 --- /dev/null +++ b/plugin/migration/repeat.go @@ -0,0 +1,130 @@ +package migration + +import ( + "errors" + "fmt" + "time" + + log "github.com/cihub/seelog" + "infini.sh/console/plugin/migration/cluster_comparison" + "infini.sh/console/plugin/migration/cluster_migration" + migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/framework/core/pipeline" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) { + tasks, err := p.getPendingExecutionTasks(taskType, p.config.TaskBatchSize) + if err != nil { + log.Errorf("failed to get pending [%s] tasks, err: %v", taskType, err) + return + } + if len(tasks) == 0 { + return + } + log.Debugf("handling pending [%s] tasks, count: %d", taskType, len(tasks)) + // refresh index after each batch + defer func() { + p.refreshTask() + }() + for i := range tasks { + if ctx.IsCanceled() { + return + } + taskItem := &tasks[i] + err := p.handleTask(taskItem, p.handleRepeatingTask) + if err != nil { + log.Errorf("failed to handle task [%s]: [%v]", taskItem.ID, err) + + taskItem.Status = task.StatusError + tn := time.Now() + taskItem.CompletedTime = &tn + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: false, + Error: err.Error(), + }, fmt.Sprintf("failed to handle task [%s]", taskItem.ID)) + } + } + return +} + +func (p *DispatcherProcessor) getPendingExecutionTasks(taskType string, size int) ([]task.Task, error) { + queryDsl := util.MapStr{ + "size": size, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "asc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.type": taskType, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.repeat_triggered": util.MapStr{ + "value": false, + }, + }, + }, + { + "range": util.MapStr{ + "metadata.labels.next_run_time": util.MapStr{ + "lte": time.Now().UnixMilli(), + }, + }, + }, + }, + }, + }, + } + return migration_util.GetTasks(queryDsl) +} + +// NOTE: we handle repeating in two iterations: +// - the first iteration will mark the task as ready to run +// - the second iteration will trigger the next repeat, and update the status accordingly +// This will make the second step automatically retryable +func (p *DispatcherProcessor) handleRepeatingTask(taskItem *task.Task) error { + if taskItem.Status == task.StatusInit { + taskItem.Status = task.StatusReady + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("task started automatically")) + return nil + } + repeatDone := migration_util.GetMapBoolValue(taskItem.Metadata.Labels, "repeat_done") + if repeatDone { + taskItem.Metadata.Labels["repeat_triggered"] = true + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("task repeat ended")) + return nil + } + var nextTask *task.Task + var err error + switch taskItem.Metadata.Type { + case "cluster_migration": + nextTask, err = cluster_migration.RepeatTask(taskItem) + case "cluster_comparison": + nextTask, err = cluster_comparison.RepeatTask(taskItem) + default: + return errors.New("invalid type") + } + if err != nil { + log.Errorf("failed to repeat task [%s], err: %v", taskItem.ID, err) + return nil + } + taskItem.Metadata.Labels["repeat_triggered"] = true + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("next repeat task [%s] created", nextTask.ID)) + return nil +} diff --git a/plugin/migration/util/orm.go b/plugin/migration/util/orm.go index f9a02141..55992057 100644 --- a/plugin/migration/util/orm.go +++ b/plugin/migration/util/orm.go @@ -4,13 +4,13 @@ import ( "fmt" log "github.com/cihub/seelog" - "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" "infini.sh/framework/core/task" "infini.sh/framework/core/util" ) -func DeleteChildTasks(taskID string) error { +func DeleteChildTasks(taskID string, taskType string) error { q := util.MapStr{ "query": util.MapStr{ "bool": util.MapStr{ @@ -22,6 +22,11 @@ func DeleteChildTasks(taskID string) error { }, }, }, + { + "term": util.MapStr{ + "metadata.type": taskType, + }, + }, }, }, }, @@ -33,11 +38,78 @@ func DeleteChildTasks(taskID string) error { return nil } -func GetPendingChildTasks(elasticsearch, indexName string, taskID string, taskType string) ([]task.Task, error) { - return GetChildTasks(elasticsearch, indexName, taskID, taskType, []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady}) +func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, error) { + queryDsl := util.MapStr{ + "size": 1, + "sort": []util.MapStr{ + { + "metadata.labels.next_run_time": util.MapStr{ + "order": "desc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.type": taskType, + }, + }, + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, + }, + }, + }, + }, + }, + } + tasks, err := GetTasks(queryDsl) + if err != nil { + return nil, err + } + if len(tasks) == 0 { + return nil, nil + } + return &tasks[0], nil } -func GetChildTasks(elasticsearch, indexName string, taskID string, taskType string, status []string) ([]task.Task, error) { +func GetPendingChildTasks(taskID string, taskType string) ([]task.Task, error) { + return GetChildTasks(taskID, taskType, []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady}) +} + +func CountRunningChildren(taskID string, taskType string) (int64, error) { + return CountTasks(util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.type": taskType, + }, + }, + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady}, + }, + }, + }, + }, + }, + }) +} + +func GetChildTasks(taskID string, taskType string, status []string) ([]task.Task, error) { musts := []util.MapStr{ { "term": util.MapStr{ @@ -67,22 +139,27 @@ func GetChildTasks(elasticsearch, indexName string, taskID string, taskType stri }, }, } - return GetTasks(elasticsearch, indexName, queryDsl) + return GetTasks(queryDsl) } -func GetTasks(elasticsearch, indexName string, query util.MapStr) ([]task.Task, error) { - esClient := elastic.GetClient(elasticsearch) - res, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(query)) +func CountTasks(query util.MapStr) (int64, error) { + return orm.Count(task.Task{}, util.MustToJSONBytes(query)) +} + +func GetTasks(query util.MapStr) ([]task.Task, error) { + err, res := orm.Search(task.Task{}, &orm.Query{ + RawQuery: util.MustToJSONBytes(query), + }) if err != nil { log.Errorf("query tasks from es failed, err: %v", err) return nil, err } - if res.GetTotal() == 0 { + if res.Total == 0 { return nil, nil } var tasks []task.Task - for _, hit := range res.Hits.Hits { - buf, err := util.ToJSONBytes(hit.Source) + for _, row := range res.Result { + buf, err := util.ToJSONBytes(row) if err != nil { log.Errorf("marshal task json failed, err: %v", err) return nil, err @@ -93,6 +170,9 @@ func GetTasks(elasticsearch, indexName string, query util.MapStr) ([]task.Task, log.Errorf("unmarshal task json failed, err: %v", err) return nil, err } + if tk.Metadata.Labels == nil { + continue + } tasks = append(tasks, tk) } return tasks, nil diff --git a/plugin/migration/util/repeat.go b/plugin/migration/util/repeat.go new file mode 100644 index 00000000..708c62f0 --- /dev/null +++ b/plugin/migration/util/repeat.go @@ -0,0 +1,75 @@ +package util + +import ( + "time" + + migration_model "infini.sh/console/plugin/migration/model" + "infini.sh/framework/core/util" +) + +func UpdateRepeatState(repeat *migration_model.Repeat, labels util.MapStr) error { + if labels == nil { + return nil + } + + if !isValidRepeat(repeat) { + labels["repeat_done"] = true + return nil + } + labels["is_repeat"] = true + + runTimes := GetMapIntValue(labels, "run_times") + runTimes += 1 + labels["run_times"] = runTimes + + if repeat.TotalRun >= 1 && runTimes >= repeat.TotalRun { + labels["repeat_done"] = true + } + if _, ok := labels["next_run_time"]; !ok { + if repeat.NextRunTime != nil { + labels["next_run_time"] = repeat.NextRunTime.UnixMilli() + } + } else { + nextRunTime := GetMapIntValue(labels, "next_run_time") + labels["next_run_time"] = nextRunTime + repeat.Interval.Milliseconds() + } + labels["repeat_triggered"] = false + return nil +} + +func CopyRepeatState(oldLabels, newLabels util.MapStr) { + newLabels["run_times"] = oldLabels["run_times"] + newLabels["next_run_time"] = oldLabels["next_run_time"] +} + +func IsRepeating(repeat *migration_model.Repeat, labels map[string]interface{}) bool { + if !isValidRepeat(repeat) { + return false + } + if repeat.TotalRun < 1 { + return true + } + if repeat.TotalRun == 1 { + return false + } + nextRunTime := GetMapIntValue(labels, "next_run_time") + // not started yet + if nextRunTime == 0 { + return false + } + endTime := time.UnixMilli(nextRunTime).Add(time.Duration(repeat.TotalRun-1) * repeat.Interval) + if time.Now().Before(endTime) { + return true + } + return false +} + +func isValidRepeat(repeat *migration_model.Repeat) bool { + if repeat == nil { + return false + } + if repeat.Interval < time.Minute { + return false + } + return true +} diff --git a/plugin/migration/util/util.go b/plugin/migration/util/util.go index 91d11a8a..f0235fa2 100644 --- a/plugin/migration/util/util.go +++ b/plugin/migration/util/util.go @@ -47,6 +47,13 @@ func IsPendingState(status string) bool { return util.StringInArray(pendingTaskStatus, status) } +func GetDirectParentId(parentIDs []string) string { + if len(parentIDs) == 0 { + return "" + } + return parentIDs[len(parentIDs)-1] +} + func GetTaskConfig(task *task.Task, config interface{}) error { if task.Config_ == nil { return util.FromJSONBytes([]byte(task.ConfigString), config) @@ -71,6 +78,19 @@ func GetMapIntValue(m util.MapStr, key string) int64 { return vv } +func GetMapBoolValue(m util.MapStr, key string) bool { + v, err := m.GetValue(key) + if err != nil { + return false + } + vv, err := util.ExtractBool(v) + if err != nil { + log.Errorf("got %s but failed to extract, err: %v", key, err) + return false + } + return vv +} + func GetMapStringValue(m util.MapStr, key string) string { v, err := m.GetValue(key) if err != nil {