From 523a6eeb379bbce075d9da9fdf94b50fbd0cca07 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Tue, 6 Jun 2023 10:25:34 +0800 Subject: [PATCH] [task_manager] rename from migration, do final cleanups --- plugin/migration/api.go | 579 ------------------ plugin/migration/migration_api.go | 173 ------ plugin/task_manager/api.go | 44 ++ .../cluster_comparison/cluster_comparison.go | 4 +- .../cluster_comparison/orm.go | 4 +- .../cluster_migration/cluster_migration.go | 4 +- .../cluster_migration/orm.go | 4 +- .../{migration => task_manager}/common_api.go | 183 +++++- .../comparison_api.go | 8 +- plugin/task_manager/es_api.go | 156 +++++ .../index_comparison/index_comparison.go | 4 +- .../index_migration/index_migration.go | 4 +- plugin/task_manager/migration_api.go | 372 +++++++++++ .../model/common.go | 0 .../model/comparison.go | 0 .../model/migration.go | 0 .../model/pipeline.go | 0 .../model/processor.go | 0 .../model/scheduler.go | 0 plugin/{migration => task_manager}/module.go | 2 +- .../{migration => task_manager}/pipeline.go | 58 +- .../pipeline_task/bulk_indexing.go | 2 +- .../pipeline_task/dump_hash.go | 2 +- .../pipeline_task/es_scroll.go | 2 +- .../pipeline_task/index_diff.go | 2 +- .../pipeline_task/pipeline_task.go | 4 +- plugin/{migration => task_manager}/repeat.go | 8 +- .../scheduler/scheduler.go | 2 +- .../{migration => task_manager}/util/orm.go | 0 .../util/pipeline.go | 2 +- .../util/repeat.go | 2 +- .../{migration => task_manager}/util/util.go | 0 32 files changed, 812 insertions(+), 813 deletions(-) delete mode 100644 plugin/migration/api.go delete mode 100644 plugin/migration/migration_api.go create mode 100644 plugin/task_manager/api.go rename plugin/{migration => task_manager}/cluster_comparison/cluster_comparison.go (99%) rename plugin/{migration => task_manager}/cluster_comparison/orm.go (96%) rename plugin/{migration => task_manager}/cluster_migration/cluster_migration.go (99%) rename plugin/{migration => task_manager}/cluster_migration/orm.go (96%) rename plugin/{migration => task_manager}/common_api.go (70%) rename plugin/{migration => task_manager}/comparison_api.go (97%) create mode 100644 plugin/task_manager/es_api.go rename plugin/{migration => task_manager}/index_comparison/index_comparison.go (99%) rename plugin/{migration => task_manager}/index_migration/index_migration.go (99%) create mode 100644 plugin/task_manager/migration_api.go rename plugin/{migration => task_manager}/model/common.go (100%) rename plugin/{migration => task_manager}/model/comparison.go (100%) rename plugin/{migration => task_manager}/model/migration.go (100%) rename plugin/{migration => task_manager}/model/pipeline.go (100%) rename plugin/{migration => task_manager}/model/processor.go (100%) rename plugin/{migration => task_manager}/model/scheduler.go (100%) rename plugin/{migration => task_manager}/module.go (96%) rename plugin/{migration => task_manager}/pipeline.go (71%) rename plugin/{migration => task_manager}/pipeline_task/bulk_indexing.go (98%) rename plugin/{migration => task_manager}/pipeline_task/dump_hash.go (97%) rename plugin/{migration => task_manager}/pipeline_task/es_scroll.go (97%) rename plugin/{migration => task_manager}/pipeline_task/index_diff.go (98%) rename plugin/{migration => task_manager}/pipeline_task/pipeline_task.go (98%) rename plugin/{migration => task_manager}/repeat.go (94%) rename plugin/{migration => task_manager}/scheduler/scheduler.go (99%) rename plugin/{migration => task_manager}/util/orm.go (100%) rename plugin/{migration => task_manager}/util/pipeline.go (94%) rename plugin/{migration => task_manager}/util/repeat.go (96%) rename plugin/{migration => task_manager}/util/util.go (100%) diff --git a/plugin/migration/api.go b/plugin/migration/api.go deleted file mode 100644 index a134a51a..00000000 --- a/plugin/migration/api.go +++ /dev/null @@ -1,579 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package migration - -import ( - "fmt" - "net/http" - "strings" - "time" - - log "github.com/cihub/seelog" - - "infini.sh/console/model" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" - - "infini.sh/framework/core/api" - "infini.sh/framework/core/api/rbac/enum" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/global" - "infini.sh/framework/core/orm" - task2 "infini.sh/framework/core/task" - "infini.sh/framework/core/util" -) - -func InitAPI() { - handler := APIHandler{} - api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionMigrationTaskRead)) - api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionMigrationTaskWrite)) - api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionMigrationTaskWrite)) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionMigrationTaskWrite)) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite)) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_pause", handler.RequirePermission(handler.pauseTask, enum.PermissionMigrationTaskWrite)) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite)) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) - - api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) - api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) - api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionComparisonTaskWrite)) - api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info", handler.RequirePermission(handler.getDataComparisonTaskInfo, enum.PermissionComparisonTaskRead)) - api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info/:index", handler.RequirePermission(handler.getDataComparisonTaskOfIndex, enum.PermissionComparisonTaskRead)) - api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionComparisonTaskWrite)) - api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionComparisonTaskWrite)) - api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_pause", handler.RequirePermission(handler.pauseTask, enum.PermissionComparisonTaskWrite)) - api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionComparisonTaskWrite)) - - api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_refresh", handler.refreshIndex) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_init", handler.initIndex) - -} - -type APIHandler struct { - api.Handler -} - -func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - index = ps.MustGetParameter("index") - clusterID = ps.MustGetParameter("id") - ) - client := elastic.GetClient(clusterID) - pq := &elastic.PartitionQuery{ - IndexName: index, - } - err := h.DecodeJSON(req, pq) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - partitions, err := elastic.GetPartitions(pq, client) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteJSON(w, partitions, http.StatusOK) -} - -func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - - obj := task2.Task{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - taskConfig := &migration_model.ClusterMigrationTaskConfig{} - err = migration_util.GetTaskConfig(&obj, taskConfig) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - _, indexState, err := h.getMigrationMajorTaskInfo(id) - if err != nil { - log.Errorf("failed to get major task info, err: %v", err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - var completedIndices int - for i, index := range taskConfig.Indices { - indexName := index.Source.GetUniqueIndexName() - count := indexState[indexName].IndexDocs - percent := float64(count) / float64(index.Source.Docs) * 100 - if percent > 100 { - percent = 100 - } - taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceDocs - taskConfig.Indices[i].Target.Docs = count - taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) - taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions - if count == index.Source.Docs { - completedIndices++ - } - } - - cfg := global.MustLookup("cluster_migration_config") - if migrationConfig, ok := cfg.(*DispatcherConfig); ok { - if obj.Metadata.Labels == nil { - obj.Metadata.Labels = util.MapStr{} - } - obj.Metadata.Labels["log_info"] = util.MapStr{ - "cluster_id": migrationConfig.Elasticsearch, - "index_name": migrationConfig.LogIndexName, - } - } - - _, repeatStatus, err := h.calcRepeatingStatus(&obj) - if err != nil { - log.Warnf("failed to calc repeat info, err: %v", err) - } - obj.Metadata.Labels["repeat"] = repeatStatus - - obj.ConfigString = util.MustToJSON(taskConfig) - obj.Metadata.Labels["completed_indices"] = completedIndices - h.WriteJSON(w, obj, http.StatusOK) -} - -type TaskInfoResponse struct { - TaskID string `json:"task_id"` - Step interface{} `json:"step"` - StartTime int64 `json:"start_time"` - CompletedTime int64 `json:"completed_time"` - Duration int64 `json:"duration"` - DataPartition int `json:"data_partition"` - CompletedPartitions int `json:"completed_partitions"` - Partitions []util.MapStr `json:"partitions"` - Repeating bool `json:"repeating"` -} - -func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - uniqueIndexName := ps.MustGetParameter("index") - majorTask := task2.Task{} - majorTask.ID = id - exists, err := orm.Get(&majorTask) - if !exists || err != nil { - h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError) - return - } - - taskConfig := &migration_model.ClusterMigrationTaskConfig{} - err = migration_util.GetTaskConfig(&majorTask, taskConfig) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - taskInfo := &TaskInfoResponse{ - TaskID: id, - StartTime: majorTask.StartTimeInMillis, - Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels), - } - - subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName) - - taskInfo.DataPartition = len(subTasks) - if len(subTasks) == 0 { - h.WriteJSON(w, taskInfo, http.StatusOK) - return - } - - var scrollStats = map[string]int64{} - var bulkStats = map[string]int64{} - pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) - for pipelineID, pipelineContext := range pipelineContexts { - if pid, ok := pipelineSubParentIDs[pipelineID]; ok { - if vv := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs"); vv > 0 { - scrollStats[pid] = vv - } - if vv := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count"); vv > 0 { - bulkStats[pid] = vv - } - } - } - - startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) - - var partitionTaskInfos []util.MapStr - - for i, ptask := range subTasks { - cfg := migration_model.IndexMigrationTaskConfig{} - err := migration_util.GetTaskConfig(&ptask, &cfg) - if err != nil { - log.Errorf("failed to get task config, err: %v", err) - continue - } - if i == 0 { - taskInfo.Step = cfg.Source.Step - } - - var durationInMS int64 - var subCompletedTime int64 - if ptask.StartTimeInMillis > 0 { - if migration_util.IsPendingState(ptask.Status) { - durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis - } else if ptask.CompletedTime != nil { - subCompletedTime = ptask.CompletedTime.UnixMilli() - durationInMS = subCompletedTime - ptask.StartTimeInMillis - } - } - var ( - scrollDocs int64 - indexDocs int64 - ) - ptaskLabels := util.MapStr(ptask.Metadata.Labels) - if vv, ok := scrollStats[ptask.ID]; ok { - scrollDocs = vv - } else { - scrollDocs = migration_util.GetMapIntValue(ptaskLabels, "scrolled_docs") - } - if vv, ok := bulkStats[ptask.ID]; ok { - indexDocs = vv - } else { - indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs") - } - - partitionTotalDocs := cfg.Source.DocCount - partitionTaskInfo := util.MapStr{ - "task_id": ptask.ID, - "status": ptask.Status, - "start_time": ptask.StartTimeInMillis, - "completed_time": subCompletedTime, - "start": cfg.Source.Start, - "end": cfg.Source.End, - "duration": durationInMS, - "scroll_docs": scrollDocs, - "index_docs": indexDocs, - "total_docs": partitionTotalDocs, - } - scrollTask, bulkTask := migration_util.SplitIndexMigrationTasks(parentIDPipelineTasks[ptask.ID]) - if scrollTask != nil { - partitionTaskInfo["scroll_task"] = util.MapStr{ - "id": scrollTask.ID, - "status": scrollTask.Status, - } - } - if bulkTask != nil { - partitionTaskInfo["bulk_task"] = util.MapStr{ - "id": bulkTask.ID, - "status": bulkTask.Status, - } - } - partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo) - } - taskInfo.CompletedTime = completedTime - taskInfo.Duration = duration - // NOTE: overwrite major task start time with the first started sub task - if taskInfo.StartTime == 0 { - taskInfo.StartTime = startTime - } - taskInfo.Partitions = partitionTaskInfos - taskInfo.CompletedPartitions = completedPartitions - h.WriteJSON(w, taskInfo, http.StatusOK) -} - -func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - index = ps.MustGetParameter("index") - clusterID = ps.MustGetParameter("id") - ) - client := elastic.GetClient(clusterID) - reqBody := struct { - Filter interface{} `json:"filter"` - }{} - err := h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - var query []byte - if reqBody.Filter != nil { - query = util.MustToJSONBytes(util.MapStr{ - "query": reqBody.Filter, - }) - } - - ctx := req.Context() - - countRes, err := client.Count(ctx, index, query) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteJSON(w, countRes, http.StatusOK) -} - -func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - typ := h.GetParameter(req, "type") - switch typ { - case "multi_type": - h.validateMultiType(w, req, ps) - return - } - h.WriteError(w, "unknown parameter type", http.StatusOK) -} - -func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var reqBody = struct { - Cluster struct { - SourceID string `json:"source_id"` - TargetID string `json:"target_id"` - } `json:"cluster"` - Indices []string - }{} - err := h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - sourceClient := elastic.GetClient(reqBody.Cluster.SourceID) - // get source type - indexNames := strings.Join(reqBody.Indices, ",") - typeInfo, err := elastic.GetIndexTypes(sourceClient, indexNames) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - h.WriteJSON(w, util.MapStr{ - "result": typeInfo, - }, http.StatusOK) -} - -type InitIndexRequest struct { - Mappings map[string]interface{} `json:"mappings"` - Settings map[string]interface{} `json:"settings"` -} - -func (h *APIHandler) initIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - targetClusterID := ps.MustGetParameter("id") - indexName := ps.MustGetParameter("index") - reqBody := &InitIndexRequest{} - err := h.DecodeJSON(req, reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - client := elastic.GetClient(targetClusterID) - exists, err := client.Exists(indexName) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - if exists { - if len(reqBody.Settings) > 0 { - err = client.UpdateIndexSettings(indexName, reqBody.Settings) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } - if ml := len(reqBody.Mappings); ml > 0 { - var ( - docType = "" - mapping interface{} = reqBody.Mappings - ) - if ml == 1 { - for key, _ := range reqBody.Mappings { - if key != "properties" { - docType = key - mapping = reqBody.Mappings[key] - } - } - } - mappingBytes := util.MustToJSONBytes(mapping) - _, err = client.UpdateMapping(indexName, docType, mappingBytes) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } - } else { - indexSettings := map[string]interface{}{} - if len(reqBody.Settings) > 0 { - indexSettings["settings"] = reqBody.Settings - } - if len(reqBody.Mappings) > 0 { - indexSettings["mappings"] = reqBody.Mappings - } - err = client.CreateIndex(indexName, indexSettings) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } - h.WriteJSON(w, util.MapStr{ - "success": true, - }, http.StatusOK) -} - -func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - index = ps.MustGetParameter("index") - clusterID = ps.MustGetParameter("id") - ) - client := elastic.GetClient(clusterID) - err := client.Refresh(index) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - h.WriteJSON(w, util.MapStr{ - "success": true, - }, 200) -} - -func (h *APIHandler) getChildTaskInfosByIndex(id string, uniqueIndexName string) (subTasks []task2.Task, runningPipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) { - queryDsl := util.MapStr{ - "size": 9999, - "sort": []util.MapStr{ - { - "created": util.MapStr{ - "order": "asc", - }, - }, - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.unique_index_name": util.MapStr{ - "value": uniqueIndexName, - }, - }, - }, - }, - }, - }, - } - allTasks, err := migration_util.GetTasks(queryDsl) - if err != nil { - return - } - - runningPipelineTaskIDs = map[string][]string{} - pipelineSubParentIDs = map[string]string{} - parentIDPipelineTasks = map[string][]task2.Task{} - - for _, subTask := range allTasks { - if subTask.Metadata.Type != "pipeline" { - subTasks = append(subTasks, subTask) - continue - } - - if pl := len(subTask.ParentId); pl != 2 { - continue - } - parentID := migration_util.GetDirectParentId(subTask.ParentId) - - pipelineSubParentIDs[subTask.ID] = parentID - instID := migration_util.GetMapStringValue(util.MapStr(subTask.Metadata.Labels), "execution_instance_id") - if instID == "" { - continue - } - if subTask.Status == task2.StatusRunning { - runningPipelineTaskIDs[instID] = append(runningPipelineTaskIDs[instID], subTask.ID) - } - parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask) - } - - return -} - -func (h *APIHandler) getChildPipelineInfosFromGateway(pipelineTaskIDs map[string][]string) (pipelineContexts map[string]util.MapStr) { - pipelineContexts = map[string]util.MapStr{} - var err error - - for instID, taskIDs := range pipelineTaskIDs { - inst := &model.Instance{} - inst.ID = instID - _, err = orm.Get(inst) - if err != nil { - log.Errorf("failed to get instance info, id: %s, err: %v", instID, err) - continue - } - pipelines, err := inst.GetPipelinesByIDs(taskIDs) - if err != nil { - log.Errorf("failed to get pipelines info, err: %v", err) - continue - } - - for pipelineID, status := range pipelines { - pipelineContexts[pipelineID] = status.Context - } - } - - return -} - -func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task, repeating bool) (startTime int64, completedTime int64, duration int64, completedPartitions int) { - if len(subTasks) == 0 { - return - } - - for _, subTask := range subTasks { - if subTask.StartTimeInMillis > 0 { - if startTime == 0 { - startTime = subTask.StartTimeInMillis - } - if subTask.StartTimeInMillis < startTime { - startTime = subTask.StartTimeInMillis - } - } - if subTask.CompletedTime != nil { - subCompletedTime := subTask.CompletedTime.UnixMilli() - if subCompletedTime > completedTime { - completedTime = subCompletedTime - } - } - - if subTask.Status == task2.StatusComplete || subTask.Status == task2.StatusError { - completedPartitions++ - } - } - if len(subTasks) != completedPartitions || repeating { - completedTime = 0 - duration = time.Now().UnixMilli() - startTime - } else { - duration = completedTime - startTime - } - - return -} diff --git a/plugin/migration/migration_api.go b/plugin/migration/migration_api.go deleted file mode 100644 index 56ce9778..00000000 --- a/plugin/migration/migration_api.go +++ /dev/null @@ -1,173 +0,0 @@ -package migration - -import ( - "net/http" - - log "github.com/cihub/seelog" - - "infini.sh/console/plugin/migration/cluster_migration" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" - - "infini.sh/framework/core/api/rbac" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/task" - "infini.sh/framework/core/util" -) - -func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{} - err := h.DecodeJSON(req, clusterTaskConfig) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - user, err := rbac.FromUserContext(req.Context()) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - t, err := cluster_migration.CreateTask(clusterTaskConfig, user) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - h.WriteCreatedOKJSON(w, t.ID) -} - -type MigrationIndexStateInfo struct { - ErrorPartitions int - IndexDocs int64 - SourceDocs int64 -} - -/* -We count data from two sources: - - index_migrations with complete/error status - - plus index_migration.index_docs with realtime bulk indexing info - - realtime bulk indexing info is only available for running index_migrations -*/ -func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) { - taskQuery := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_migration", - }, - }, - }, - }, - }, - }, - } - subTasks, err := migration_util.GetTasks(taskQuery) - if err != nil { - return taskStats, indexState, err - } - - var indexMigrationTaskIDs []string - indexState = map[string]MigrationIndexStateInfo{} - for _, subTask := range subTasks { - taskLabels := util.MapStr(subTask.Metadata.Labels) - indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") - if indexName == "" { - continue - } - - cfg := migration_model.IndexMigrationTaskConfig{} - err = migration_util.GetTaskConfig(&subTask, &cfg) - if err != nil { - log.Errorf("failed to get task config, err: %v", err) - continue - } - - taskStats.SourceDocs += cfg.Source.DocCount - st := indexState[indexName] - st.SourceDocs += cfg.Source.DocCount - indexState[indexName] = st - - if subTask.Status == task.StatusRunning { - indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) - continue - } - - indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") - taskStats.IndexDocs += indexDocs - st.IndexDocs += indexDocs - if subTask.Status == task.StatusError { - st.ErrorPartitions += 1 - taskStats.ErrorPartitions += 1 - } - indexState[indexName] = st - indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) - } - - taskQuery = util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "parent_id": indexMigrationTaskIDs, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "bulk_indexing", - }, - }, - }, - }, - }, - }, - } - subTasks, err = migration_util.GetTasks(taskQuery) - if err != nil { - return taskStats, indexState, err - } - - var pipelineTaskIDs = map[string][]string{} - var pipelineIndexNames = map[string]string{} - for _, subTask := range subTasks { - taskLabels := util.MapStr(subTask.Metadata.Labels) - indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") - if indexName == "" { - continue - } - - pipelineIndexNames[subTask.ID] = indexName - - if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { - pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) - } - } - - pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) - for pipelineID, pipelineContext := range pipelineContexts { - // add indexDocs of running tasks - indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count") - taskStats.IndexDocs += indexDocs - indexName := pipelineIndexNames[pipelineID] - st := indexState[indexName] - st.IndexDocs += indexDocs - indexState[indexName] = st - } - return taskStats, indexState, nil -} diff --git a/plugin/task_manager/api.go b/plugin/task_manager/api.go new file mode 100644 index 00000000..e4f4cace --- /dev/null +++ b/plugin/task_manager/api.go @@ -0,0 +1,44 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package task_manager + +import ( + "infini.sh/framework/core/api" + "infini.sh/framework/core/api/rbac/enum" +) + +func InitAPI() { + handler := APIHandler{} + api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_pause", handler.RequirePermission(handler.pauseTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) + + api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) + api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info", handler.RequirePermission(handler.getDataComparisonTaskInfo, enum.PermissionComparisonTaskRead)) + api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info/:index", handler.RequirePermission(handler.getDataComparisonTaskOfIndex, enum.PermissionComparisonTaskRead)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_pause", handler.RequirePermission(handler.pauseTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionComparisonTaskWrite)) + + api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo) + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments) + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_refresh", handler.refreshIndex) + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_init", handler.initIndex) + +} + +type APIHandler struct { + api.Handler +} diff --git a/plugin/migration/cluster_comparison/cluster_comparison.go b/plugin/task_manager/cluster_comparison/cluster_comparison.go similarity index 99% rename from plugin/migration/cluster_comparison/cluster_comparison.go rename to plugin/task_manager/cluster_comparison/cluster_comparison.go index 2bdf3b43..ef7ac7ea 100644 --- a/plugin/migration/cluster_comparison/cluster_comparison.go +++ b/plugin/task_manager/cluster_comparison/cluster_comparison.go @@ -7,8 +7,8 @@ import ( log "github.com/cihub/seelog" "infini.sh/console/model" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" + migration_model "infini.sh/console/plugin/task_manager/model" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" diff --git a/plugin/migration/cluster_comparison/orm.go b/plugin/task_manager/cluster_comparison/orm.go similarity index 96% rename from plugin/migration/cluster_comparison/orm.go rename to plugin/task_manager/cluster_comparison/orm.go index e7267fa8..3137bf4e 100644 --- a/plugin/migration/cluster_comparison/orm.go +++ b/plugin/task_manager/cluster_comparison/orm.go @@ -4,8 +4,8 @@ import ( "errors" "fmt" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" + migration_model "infini.sh/console/plugin/task_manager/model" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/api/rbac" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" diff --git a/plugin/migration/cluster_migration/cluster_migration.go b/plugin/task_manager/cluster_migration/cluster_migration.go similarity index 99% rename from plugin/migration/cluster_migration/cluster_migration.go rename to plugin/task_manager/cluster_migration/cluster_migration.go index 80b136c7..c3f57291 100644 --- a/plugin/migration/cluster_migration/cluster_migration.go +++ b/plugin/task_manager/cluster_migration/cluster_migration.go @@ -8,8 +8,8 @@ import ( log "github.com/cihub/seelog" "infini.sh/console/model" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" + migration_model "infini.sh/console/plugin/task_manager/model" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" diff --git a/plugin/migration/cluster_migration/orm.go b/plugin/task_manager/cluster_migration/orm.go similarity index 96% rename from plugin/migration/cluster_migration/orm.go rename to plugin/task_manager/cluster_migration/orm.go index 2616c6fa..728f60df 100644 --- a/plugin/migration/cluster_migration/orm.go +++ b/plugin/task_manager/cluster_migration/orm.go @@ -4,8 +4,8 @@ import ( "errors" "fmt" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" + migration_model "infini.sh/console/plugin/task_manager/model" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/api/rbac" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" diff --git a/plugin/migration/common_api.go b/plugin/task_manager/common_api.go similarity index 70% rename from plugin/migration/common_api.go rename to plugin/task_manager/common_api.go index 561dd89d..08241dda 100644 --- a/plugin/migration/common_api.go +++ b/plugin/task_manager/common_api.go @@ -1,15 +1,17 @@ -package migration +package task_manager import ( "errors" "fmt" "net/http" "strconv" + "strings" "time" log "github.com/cihub/seelog" - migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/console/model" + migration_util "infini.sh/console/plugin/task_manager/util" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" @@ -19,6 +21,18 @@ import ( elastic2 "infini.sh/framework/modules/elastic" ) +type TaskInfoResponse struct { + TaskID string `json:"task_id"` + Step interface{} `json:"step"` + StartTime int64 `json:"start_time"` + CompletedTime int64 `json:"completed_time"` + Duration int64 `json:"duration"` + DataPartition int `json:"data_partition"` + CompletedPartitions int `json:"completed_partitions"` + Partitions []util.MapStr `json:"partitions"` + Repeating bool `json:"repeating"` +} + func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( @@ -436,3 +450,168 @@ func (h *APIHandler) pauseTask(w http.ResponseWriter, req *http.Request, ps http }, 200) return } + +func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + typ := h.GetParameter(req, "type") + switch typ { + case "multi_type": + h.validateMultiType(w, req, ps) + return + } + h.WriteError(w, "unknown parameter type", http.StatusOK) +} + +func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var reqBody = struct { + Cluster struct { + SourceID string `json:"source_id"` + TargetID string `json:"target_id"` + } `json:"cluster"` + Indices []string + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + sourceClient := elastic.GetClient(reqBody.Cluster.SourceID) + // get source type + indexNames := strings.Join(reqBody.Indices, ",") + typeInfo, err := elastic.GetIndexTypes(sourceClient, indexNames) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteJSON(w, util.MapStr{ + "result": typeInfo, + }, http.StatusOK) +} + +func (h *APIHandler) getChildTaskInfosByIndex(id string, uniqueIndexName string) (subTasks []task.Task, runningPipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task.Task, err error) { + queryDsl := util.MapStr{ + "size": 9999, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "asc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.unique_index_name": util.MapStr{ + "value": uniqueIndexName, + }, + }, + }, + }, + }, + }, + } + allTasks, err := migration_util.GetTasks(queryDsl) + if err != nil { + return + } + + runningPipelineTaskIDs = map[string][]string{} + pipelineSubParentIDs = map[string]string{} + parentIDPipelineTasks = map[string][]task.Task{} + + for _, subTask := range allTasks { + if subTask.Metadata.Type != "pipeline" { + subTasks = append(subTasks, subTask) + continue + } + + if pl := len(subTask.ParentId); pl != 2 { + continue + } + parentID := migration_util.GetDirectParentId(subTask.ParentId) + + pipelineSubParentIDs[subTask.ID] = parentID + instID := migration_util.GetMapStringValue(util.MapStr(subTask.Metadata.Labels), "execution_instance_id") + if instID == "" { + continue + } + if subTask.Status == task.StatusRunning { + runningPipelineTaskIDs[instID] = append(runningPipelineTaskIDs[instID], subTask.ID) + } + parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask) + } + + return +} + +func (h *APIHandler) getChildPipelineInfosFromGateway(pipelineTaskIDs map[string][]string) (pipelineContexts map[string]util.MapStr) { + pipelineContexts = map[string]util.MapStr{} + var err error + + for instID, taskIDs := range pipelineTaskIDs { + inst := &model.Instance{} + inst.ID = instID + _, err = orm.Get(inst) + if err != nil { + log.Errorf("failed to get instance info, id: %s, err: %v", instID, err) + continue + } + pipelines, err := inst.GetPipelinesByIDs(taskIDs) + if err != nil { + log.Errorf("failed to get pipelines info, err: %v", err) + continue + } + + for pipelineID, status := range pipelines { + pipelineContexts[pipelineID] = status.Context + } + } + + return +} + +func (h *APIHandler) calcMajorTaskInfo(subTasks []task.Task, repeating bool) (startTime int64, completedTime int64, duration int64, completedPartitions int) { + if len(subTasks) == 0 { + return + } + + for _, subTask := range subTasks { + if subTask.StartTimeInMillis > 0 { + if startTime == 0 { + startTime = subTask.StartTimeInMillis + } + if subTask.StartTimeInMillis < startTime { + startTime = subTask.StartTimeInMillis + } + } + if subTask.CompletedTime != nil { + subCompletedTime := subTask.CompletedTime.UnixMilli() + if subCompletedTime > completedTime { + completedTime = subCompletedTime + } + } + + if subTask.Status == task.StatusComplete || subTask.Status == task.StatusError { + completedPartitions++ + } + } + if len(subTasks) != completedPartitions || repeating { + completedTime = 0 + duration = time.Now().UnixMilli() - startTime + } else { + duration = completedTime - startTime + } + + return +} diff --git a/plugin/migration/comparison_api.go b/plugin/task_manager/comparison_api.go similarity index 97% rename from plugin/migration/comparison_api.go rename to plugin/task_manager/comparison_api.go index 11df430f..439aa0c6 100644 --- a/plugin/migration/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -1,4 +1,4 @@ -package migration +package task_manager import ( "fmt" @@ -7,9 +7,9 @@ import ( log "github.com/cihub/seelog" - "infini.sh/console/plugin/migration/cluster_comparison" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/console/plugin/task_manager/cluster_comparison" + migration_model "infini.sh/console/plugin/task_manager/model" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/api/rbac" httprouter "infini.sh/framework/core/api/router" diff --git a/plugin/task_manager/es_api.go b/plugin/task_manager/es_api.go new file mode 100644 index 00000000..ae77ed41 --- /dev/null +++ b/plugin/task_manager/es_api.go @@ -0,0 +1,156 @@ +package task_manager + +import ( + "net/http" + + log "github.com/cihub/seelog" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/util" +) + +func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ( + index = ps.MustGetParameter("index") + clusterID = ps.MustGetParameter("id") + ) + client := elastic.GetClient(clusterID) + pq := &elastic.PartitionQuery{ + IndexName: index, + } + err := h.DecodeJSON(req, pq) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + partitions, err := elastic.GetPartitions(pq, client) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, partitions, http.StatusOK) +} + +func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ( + index = ps.MustGetParameter("index") + clusterID = ps.MustGetParameter("id") + ) + client := elastic.GetClient(clusterID) + reqBody := struct { + Filter interface{} `json:"filter"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var query []byte + if reqBody.Filter != nil { + query = util.MustToJSONBytes(util.MapStr{ + "query": reqBody.Filter, + }) + } + + ctx := req.Context() + + countRes, err := client.Count(ctx, index, query) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, countRes, http.StatusOK) +} + +func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ( + index = ps.MustGetParameter("index") + clusterID = ps.MustGetParameter("id") + ) + client := elastic.GetClient(clusterID) + err := client.Refresh(index) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) +} + +type InitIndexRequest struct { + Mappings map[string]interface{} `json:"mappings"` + Settings map[string]interface{} `json:"settings"` +} + +func (h *APIHandler) initIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + targetClusterID := ps.MustGetParameter("id") + indexName := ps.MustGetParameter("index") + reqBody := &InitIndexRequest{} + err := h.DecodeJSON(req, reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + client := elastic.GetClient(targetClusterID) + exists, err := client.Exists(indexName) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if exists { + if len(reqBody.Settings) > 0 { + err = client.UpdateIndexSettings(indexName, reqBody.Settings) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + if ml := len(reqBody.Mappings); ml > 0 { + var ( + docType = "" + mapping interface{} = reqBody.Mappings + ) + if ml == 1 { + for key, _ := range reqBody.Mappings { + if key != "properties" { + docType = key + mapping = reqBody.Mappings[key] + } + } + } + mappingBytes := util.MustToJSONBytes(mapping) + _, err = client.UpdateMapping(indexName, docType, mappingBytes) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + } else { + indexSettings := map[string]interface{}{} + if len(reqBody.Settings) > 0 { + indexSettings["settings"] = reqBody.Settings + } + if len(reqBody.Mappings) > 0 { + indexSettings["mappings"] = reqBody.Mappings + } + err = client.CreateIndex(indexName, indexSettings) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + h.WriteJSON(w, util.MapStr{ + "success": true, + }, http.StatusOK) +} diff --git a/plugin/migration/index_comparison/index_comparison.go b/plugin/task_manager/index_comparison/index_comparison.go similarity index 99% rename from plugin/migration/index_comparison/index_comparison.go rename to plugin/task_manager/index_comparison/index_comparison.go index 2d838fb8..4d9e2c6d 100644 --- a/plugin/migration/index_comparison/index_comparison.go +++ b/plugin/task_manager/index_comparison/index_comparison.go @@ -5,8 +5,8 @@ import ( "time" log "github.com/cihub/seelog" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" + migration_model "infini.sh/console/plugin/task_manager/model" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" diff --git a/plugin/migration/index_migration/index_migration.go b/plugin/task_manager/index_migration/index_migration.go similarity index 99% rename from plugin/migration/index_migration/index_migration.go rename to plugin/task_manager/index_migration/index_migration.go index 21a1e859..ec9af750 100644 --- a/plugin/migration/index_migration/index_migration.go +++ b/plugin/task_manager/index_migration/index_migration.go @@ -7,8 +7,8 @@ import ( log "github.com/cihub/seelog" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" + migration_model "infini.sh/console/plugin/task_manager/model" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" diff --git a/plugin/task_manager/migration_api.go b/plugin/task_manager/migration_api.go new file mode 100644 index 00000000..afd28a69 --- /dev/null +++ b/plugin/task_manager/migration_api.go @@ -0,0 +1,372 @@ +package task_manager + +import ( + "fmt" + "net/http" + "time" + + log "github.com/cihub/seelog" + + "infini.sh/console/plugin/task_manager/cluster_migration" + migration_model "infini.sh/console/plugin/task_manager/model" + migration_util "infini.sh/console/plugin/task_manager/util" + + "infini.sh/framework/core/api/rbac" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/global" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{} + err := h.DecodeJSON(req, clusterTaskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + user, err := rbac.FromUserContext(req.Context()) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + t, err := cluster_migration.CreateTask(clusterTaskConfig, user) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteCreatedOKJSON(w, t.ID) +} + +func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + + obj := task.Task{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + taskConfig := &migration_model.ClusterMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&obj, taskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + _, indexState, err := h.getMigrationMajorTaskInfo(id) + if err != nil { + log.Errorf("failed to get major task info, err: %v", err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + var completedIndices int + for i, index := range taskConfig.Indices { + indexName := index.Source.GetUniqueIndexName() + count := indexState[indexName].IndexDocs + percent := float64(count) / float64(index.Source.Docs) * 100 + if percent > 100 { + percent = 100 + } + taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceDocs + taskConfig.Indices[i].Target.Docs = count + taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) + taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + if count == index.Source.Docs { + completedIndices++ + } + } + + cfg := global.MustLookup("cluster_migration_config") + if migrationConfig, ok := cfg.(*DispatcherConfig); ok { + if obj.Metadata.Labels == nil { + obj.Metadata.Labels = util.MapStr{} + } + obj.Metadata.Labels["log_info"] = util.MapStr{ + "cluster_id": migrationConfig.Elasticsearch, + "index_name": migrationConfig.LogIndexName, + } + } + + _, repeatStatus, err := h.calcRepeatingStatus(&obj) + if err != nil { + log.Warnf("failed to calc repeat info, err: %v", err) + } + obj.Metadata.Labels["repeat"] = repeatStatus + + obj.ConfigString = util.MustToJSON(taskConfig) + obj.Metadata.Labels["completed_indices"] = completedIndices + h.WriteJSON(w, obj, http.StatusOK) +} + +func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + uniqueIndexName := ps.MustGetParameter("index") + majorTask := task.Task{} + majorTask.ID = id + exists, err := orm.Get(&majorTask) + if !exists || err != nil { + h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError) + return + } + + taskConfig := &migration_model.ClusterMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&majorTask, taskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + taskInfo := &TaskInfoResponse{ + TaskID: id, + StartTime: majorTask.StartTimeInMillis, + Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels), + } + + subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName) + + taskInfo.DataPartition = len(subTasks) + if len(subTasks) == 0 { + h.WriteJSON(w, taskInfo, http.StatusOK) + return + } + + var scrollStats = map[string]int64{} + var bulkStats = map[string]int64{} + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + for pipelineID, pipelineContext := range pipelineContexts { + if pid, ok := pipelineSubParentIDs[pipelineID]; ok { + if vv := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs"); vv > 0 { + scrollStats[pid] = vv + } + if vv := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count"); vv > 0 { + bulkStats[pid] = vv + } + } + } + + startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) + + var partitionTaskInfos []util.MapStr + + for i, ptask := range subTasks { + cfg := migration_model.IndexMigrationTaskConfig{} + err := migration_util.GetTaskConfig(&ptask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + if i == 0 { + taskInfo.Step = cfg.Source.Step + } + + var durationInMS int64 + var subCompletedTime int64 + if ptask.StartTimeInMillis > 0 { + if migration_util.IsPendingState(ptask.Status) { + durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis + } else if ptask.CompletedTime != nil { + subCompletedTime = ptask.CompletedTime.UnixMilli() + durationInMS = subCompletedTime - ptask.StartTimeInMillis + } + } + var ( + scrollDocs int64 + indexDocs int64 + ) + ptaskLabels := util.MapStr(ptask.Metadata.Labels) + if vv, ok := scrollStats[ptask.ID]; ok { + scrollDocs = vv + } else { + scrollDocs = migration_util.GetMapIntValue(ptaskLabels, "scrolled_docs") + } + if vv, ok := bulkStats[ptask.ID]; ok { + indexDocs = vv + } else { + indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs") + } + + partitionTotalDocs := cfg.Source.DocCount + partitionTaskInfo := util.MapStr{ + "task_id": ptask.ID, + "status": ptask.Status, + "start_time": ptask.StartTimeInMillis, + "completed_time": subCompletedTime, + "start": cfg.Source.Start, + "end": cfg.Source.End, + "duration": durationInMS, + "scroll_docs": scrollDocs, + "index_docs": indexDocs, + "total_docs": partitionTotalDocs, + } + scrollTask, bulkTask := migration_util.SplitIndexMigrationTasks(parentIDPipelineTasks[ptask.ID]) + if scrollTask != nil { + partitionTaskInfo["scroll_task"] = util.MapStr{ + "id": scrollTask.ID, + "status": scrollTask.Status, + } + } + if bulkTask != nil { + partitionTaskInfo["bulk_task"] = util.MapStr{ + "id": bulkTask.ID, + "status": bulkTask.Status, + } + } + partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo) + } + taskInfo.CompletedTime = completedTime + taskInfo.Duration = duration + // NOTE: overwrite major task start time with the first started sub task + if taskInfo.StartTime == 0 { + taskInfo.StartTime = startTime + } + taskInfo.Partitions = partitionTaskInfos + taskInfo.CompletedPartitions = completedPartitions + h.WriteJSON(w, taskInfo, http.StatusOK) +} + +type MigrationIndexStateInfo struct { + ErrorPartitions int + IndexDocs int64 + SourceDocs int64 +} + +/* +We count data from two sources: + - index_migrations with complete/error status + - plus index_migration.index_docs with realtime bulk indexing info + - realtime bulk indexing info is only available for running index_migrations +*/ +func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) { + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + }, + } + subTasks, err := migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } + + var indexMigrationTaskIDs []string + indexState = map[string]MigrationIndexStateInfo{} + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + cfg := migration_model.IndexMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + + taskStats.SourceDocs += cfg.Source.DocCount + st := indexState[indexName] + st.SourceDocs += cfg.Source.DocCount + indexState[indexName] = st + + if subTask.Status == task.StatusRunning { + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + continue + } + + indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") + taskStats.IndexDocs += indexDocs + st.IndexDocs += indexDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + taskStats.ErrorPartitions += 1 + } + indexState[indexName] = st + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + } + + taskQuery = util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "parent_id": indexMigrationTaskIDs, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "bulk_indexing", + }, + }, + }, + }, + }, + }, + } + subTasks, err = migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } + + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + pipelineIndexNames[subTask.ID] = indexName + + if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + } + + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + for pipelineID, pipelineContext := range pipelineContexts { + // add indexDocs of running tasks + indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count") + taskStats.IndexDocs += indexDocs + indexName := pipelineIndexNames[pipelineID] + st := indexState[indexName] + st.IndexDocs += indexDocs + indexState[indexName] = st + } + return taskStats, indexState, nil +} diff --git a/plugin/migration/model/common.go b/plugin/task_manager/model/common.go similarity index 100% rename from plugin/migration/model/common.go rename to plugin/task_manager/model/common.go diff --git a/plugin/migration/model/comparison.go b/plugin/task_manager/model/comparison.go similarity index 100% rename from plugin/migration/model/comparison.go rename to plugin/task_manager/model/comparison.go diff --git a/plugin/migration/model/migration.go b/plugin/task_manager/model/migration.go similarity index 100% rename from plugin/migration/model/migration.go rename to plugin/task_manager/model/migration.go diff --git a/plugin/migration/model/pipeline.go b/plugin/task_manager/model/pipeline.go similarity index 100% rename from plugin/migration/model/pipeline.go rename to plugin/task_manager/model/pipeline.go diff --git a/plugin/migration/model/processor.go b/plugin/task_manager/model/processor.go similarity index 100% rename from plugin/migration/model/processor.go rename to plugin/task_manager/model/processor.go diff --git a/plugin/migration/model/scheduler.go b/plugin/task_manager/model/scheduler.go similarity index 100% rename from plugin/migration/model/scheduler.go rename to plugin/task_manager/model/scheduler.go diff --git a/plugin/migration/module.go b/plugin/task_manager/module.go similarity index 96% rename from plugin/migration/module.go rename to plugin/task_manager/module.go index 38a853ed..ae8d0ca3 100644 --- a/plugin/migration/module.go +++ b/plugin/task_manager/module.go @@ -2,7 +2,7 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package migration +package task_manager import ( log "github.com/cihub/seelog" diff --git a/plugin/migration/pipeline.go b/plugin/task_manager/pipeline.go similarity index 71% rename from plugin/migration/pipeline.go rename to plugin/task_manager/pipeline.go index f2780809..e5652eac 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/task_manager/pipeline.go @@ -2,7 +2,7 @@ * Web: https://infinilabs.com * Email: hello#infini.ltd */ -package migration +package task_manager import ( "errors" @@ -11,21 +11,21 @@ import ( log "github.com/cihub/seelog" - "infini.sh/console/plugin/migration/cluster_comparison" - "infini.sh/console/plugin/migration/cluster_migration" - "infini.sh/console/plugin/migration/index_comparison" - "infini.sh/console/plugin/migration/index_migration" - migration_model "infini.sh/console/plugin/migration/model" - "infini.sh/console/plugin/migration/pipeline_task" - "infini.sh/console/plugin/migration/scheduler" - migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/console/plugin/task_manager/cluster_comparison" + "infini.sh/console/plugin/task_manager/cluster_migration" + "infini.sh/console/plugin/task_manager/index_comparison" + "infini.sh/console/plugin/task_manager/index_migration" + migration_model "infini.sh/console/plugin/task_manager/model" + "infini.sh/console/plugin/task_manager/pipeline_task" + "infini.sh/console/plugin/task_manager/scheduler" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/config" "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" "infini.sh/framework/core/global" "infini.sh/framework/core/pipeline" - task2 "infini.sh/framework/core/task" + "infini.sh/framework/core/task" "infini.sh/framework/core/util" "infini.sh/framework/modules/elastic/common" ) @@ -115,27 +115,27 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { p.handleRepeatingTasks(ctx, "cluster_migration") // handle pipeline task - p.handleTasks(ctx, "pipeline", []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, p.pipelineTaskProcessor.Process) + p.handleTasks(ctx, "pipeline", []string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, p.pipelineTaskProcessor.Process) // handle comparison tasks - p.handleTasks(ctx, "cluster_comparison", []string{task2.StatusPendingStop}, p.clusterComparisonTaskProcessor.Process) - p.handleTasks(ctx, "index_comparison", []string{task2.StatusPendingStop}, p.indexComparisonTaskProcessor.Process) - p.handleTasks(ctx, "index_comparison", []string{task2.StatusRunning}, p.indexComparisonTaskProcessor.Process) - p.handleTasks(ctx, "index_comparison", []string{task2.StatusReady}, p.indexComparisonTaskProcessor.Process) - p.handleTasks(ctx, "cluster_comparison", []string{task2.StatusRunning}, p.clusterComparisonTaskProcessor.Process) - p.handleTasks(ctx, "cluster_comparison", []string{task2.StatusReady}, p.clusterComparisonTaskProcessor.Process) + p.handleTasks(ctx, "cluster_comparison", []string{task.StatusPendingStop}, p.clusterComparisonTaskProcessor.Process) + p.handleTasks(ctx, "index_comparison", []string{task.StatusPendingStop}, p.indexComparisonTaskProcessor.Process) + p.handleTasks(ctx, "index_comparison", []string{task.StatusRunning}, p.indexComparisonTaskProcessor.Process) + p.handleTasks(ctx, "index_comparison", []string{task.StatusReady}, p.indexComparisonTaskProcessor.Process) + p.handleTasks(ctx, "cluster_comparison", []string{task.StatusRunning}, p.clusterComparisonTaskProcessor.Process) + p.handleTasks(ctx, "cluster_comparison", []string{task.StatusReady}, p.clusterComparisonTaskProcessor.Process) // handle migration tasks - p.handleTasks(ctx, "cluster_migration", []string{task2.StatusPendingStop}, p.clusterMigrationTaskProcessor.Process) - p.handleTasks(ctx, "index_migration", []string{task2.StatusPendingStop}, p.indexMigrationTaskProcessor.Process) - p.handleTasks(ctx, "index_migration", []string{task2.StatusRunning}, p.indexMigrationTaskProcessor.Process) - p.handleTasks(ctx, "index_migration", []string{task2.StatusReady}, p.indexMigrationTaskProcessor.Process) - p.handleTasks(ctx, "cluster_migration", []string{task2.StatusRunning}, p.clusterMigrationTaskProcessor.Process) - p.handleTasks(ctx, "cluster_migration", []string{task2.StatusReady}, p.clusterMigrationTaskProcessor.Process) + p.handleTasks(ctx, "cluster_migration", []string{task.StatusPendingStop}, p.clusterMigrationTaskProcessor.Process) + p.handleTasks(ctx, "index_migration", []string{task.StatusPendingStop}, p.indexMigrationTaskProcessor.Process) + p.handleTasks(ctx, "index_migration", []string{task.StatusRunning}, p.indexMigrationTaskProcessor.Process) + p.handleTasks(ctx, "index_migration", []string{task.StatusReady}, p.indexMigrationTaskProcessor.Process) + p.handleTasks(ctx, "cluster_migration", []string{task.StatusRunning}, p.clusterMigrationTaskProcessor.Process) + p.handleTasks(ctx, "cluster_migration", []string{task.StatusReady}, p.clusterMigrationTaskProcessor.Process) return nil } -func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task2.Task) error) { +func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task.Task) error) { tasks, err := p.getMigrationTasks(taskType, taskStatus, p.config.TaskBatchSize) if err != nil { log.Errorf("failed to get [%s] with status %s, err: %v", taskType, taskStatus, err) @@ -158,10 +158,10 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string if err != nil { log.Errorf("failed to handle task [%s]: [%v]", taskItem.ID, err) - taskItem.Status = task2.StatusError + taskItem.Status = task.StatusError tn := time.Now() taskItem.CompletedTime = &tn - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: false, Error: err.Error(), }, fmt.Sprintf("failed to handle task [%s]", taskItem.ID)) @@ -170,7 +170,7 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string return } -func (p *DispatcherProcessor) handleTask(taskItem *task2.Task, taskHandler func(taskItem *task2.Task) error) error { +func (p *DispatcherProcessor) handleTask(taskItem *task.Task, taskHandler func(taskItem *task.Task) error) error { if taskItem.Metadata.Labels == nil { log.Errorf("got migration task [%s] with empty labels, skip handling", taskItem.ID) return errors.New("missing labels") @@ -178,7 +178,7 @@ func (p *DispatcherProcessor) handleTask(taskItem *task2.Task, taskHandler func( return taskHandler(taskItem) } -func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []string, size int) ([]task2.Task, error) { +func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []string, size int) ([]task.Task, error) { queryDsl := util.MapStr{ "size": size, "sort": []util.MapStr{ @@ -210,7 +210,7 @@ func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []st return migration_util.GetTasks(queryDsl) } -func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) { +func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) { esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, "") if err != nil { diff --git a/plugin/migration/pipeline_task/bulk_indexing.go b/plugin/task_manager/pipeline_task/bulk_indexing.go similarity index 98% rename from plugin/migration/pipeline_task/bulk_indexing.go rename to plugin/task_manager/pipeline_task/bulk_indexing.go index a2029a94..2f8cee83 100644 --- a/plugin/migration/pipeline_task/bulk_indexing.go +++ b/plugin/task_manager/pipeline_task/bulk_indexing.go @@ -6,7 +6,7 @@ import ( "time" log "github.com/cihub/seelog" - migration_util "infini.sh/console/plugin/migration/util" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/task" ) diff --git a/plugin/migration/pipeline_task/dump_hash.go b/plugin/task_manager/pipeline_task/dump_hash.go similarity index 97% rename from plugin/migration/pipeline_task/dump_hash.go rename to plugin/task_manager/pipeline_task/dump_hash.go index b001bb87..09e49024 100644 --- a/plugin/migration/pipeline_task/dump_hash.go +++ b/plugin/task_manager/pipeline_task/dump_hash.go @@ -6,7 +6,7 @@ import ( "time" log "github.com/cihub/seelog" - migration_util "infini.sh/console/plugin/migration/util" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/task" ) diff --git a/plugin/migration/pipeline_task/es_scroll.go b/plugin/task_manager/pipeline_task/es_scroll.go similarity index 97% rename from plugin/migration/pipeline_task/es_scroll.go rename to plugin/task_manager/pipeline_task/es_scroll.go index 2f41ddae..be682cfb 100644 --- a/plugin/migration/pipeline_task/es_scroll.go +++ b/plugin/task_manager/pipeline_task/es_scroll.go @@ -6,7 +6,7 @@ import ( "time" log "github.com/cihub/seelog" - migration_util "infini.sh/console/plugin/migration/util" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/task" ) diff --git a/plugin/migration/pipeline_task/index_diff.go b/plugin/task_manager/pipeline_task/index_diff.go similarity index 98% rename from plugin/migration/pipeline_task/index_diff.go rename to plugin/task_manager/pipeline_task/index_diff.go index 6f969643..9ebdab6e 100644 --- a/plugin/migration/pipeline_task/index_diff.go +++ b/plugin/task_manager/pipeline_task/index_diff.go @@ -6,7 +6,7 @@ import ( "time" log "github.com/cihub/seelog" - migration_util "infini.sh/console/plugin/migration/util" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/task" ) diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/task_manager/pipeline_task/pipeline_task.go similarity index 98% rename from plugin/migration/pipeline_task/pipeline_task.go rename to plugin/task_manager/pipeline_task/pipeline_task.go index 9106cc6a..0c8e0133 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/task_manager/pipeline_task/pipeline_task.go @@ -10,8 +10,8 @@ import ( log "github.com/cihub/seelog" "infini.sh/console/model" - migration_model "infini.sh/console/plugin/migration/model" - migration_util "infini.sh/console/plugin/migration/util" + migration_model "infini.sh/console/plugin/task_manager/model" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/elastic" "infini.sh/framework/core/task" diff --git a/plugin/migration/repeat.go b/plugin/task_manager/repeat.go similarity index 94% rename from plugin/migration/repeat.go rename to plugin/task_manager/repeat.go index 420009e1..aa8c23aa 100644 --- a/plugin/migration/repeat.go +++ b/plugin/task_manager/repeat.go @@ -1,4 +1,4 @@ -package migration +package task_manager import ( "errors" @@ -6,9 +6,9 @@ import ( "time" log "github.com/cihub/seelog" - "infini.sh/console/plugin/migration/cluster_comparison" - "infini.sh/console/plugin/migration/cluster_migration" - migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/console/plugin/task_manager/cluster_comparison" + "infini.sh/console/plugin/task_manager/cluster_migration" + migration_util "infini.sh/console/plugin/task_manager/util" "infini.sh/framework/core/pipeline" "infini.sh/framework/core/task" "infini.sh/framework/core/util" diff --git a/plugin/migration/scheduler/scheduler.go b/plugin/task_manager/scheduler/scheduler.go similarity index 99% rename from plugin/migration/scheduler/scheduler.go rename to plugin/task_manager/scheduler/scheduler.go index 23b6e675..1d7413c4 100644 --- a/plugin/migration/scheduler/scheduler.go +++ b/plugin/task_manager/scheduler/scheduler.go @@ -11,7 +11,7 @@ import ( log "github.com/cihub/seelog" "infini.sh/console/model" - migration_model "infini.sh/console/plugin/migration/model" + migration_model "infini.sh/console/plugin/task_manager/model" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" diff --git a/plugin/migration/util/orm.go b/plugin/task_manager/util/orm.go similarity index 100% rename from plugin/migration/util/orm.go rename to plugin/task_manager/util/orm.go diff --git a/plugin/migration/util/pipeline.go b/plugin/task_manager/util/pipeline.go similarity index 94% rename from plugin/migration/util/pipeline.go rename to plugin/task_manager/util/pipeline.go index b977bee2..8c42e83d 100644 --- a/plugin/migration/util/pipeline.go +++ b/plugin/task_manager/util/pipeline.go @@ -1,7 +1,7 @@ package util import ( - migration_model "infini.sh/console/plugin/migration/model" + migration_model "infini.sh/console/plugin/task_manager/model" "infini.sh/framework/core/task" ) diff --git a/plugin/migration/util/repeat.go b/plugin/task_manager/util/repeat.go similarity index 96% rename from plugin/migration/util/repeat.go rename to plugin/task_manager/util/repeat.go index 708c62f0..eafa632d 100644 --- a/plugin/migration/util/repeat.go +++ b/plugin/task_manager/util/repeat.go @@ -3,7 +3,7 @@ package util import ( "time" - migration_model "infini.sh/console/plugin/migration/model" + migration_model "infini.sh/console/plugin/task_manager/model" "infini.sh/framework/core/util" ) diff --git a/plugin/migration/util/util.go b/plugin/task_manager/util/util.go similarity index 100% rename from plugin/migration/util/util.go rename to plugin/task_manager/util/util.go