diff --git a/plugin/migration/api.go b/plugin/migration/api.go index b1cb7251..5a25e018 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -5,7 +5,6 @@ package migration import ( - "context" "fmt" "net/http" "strings" @@ -37,7 +36,6 @@ func InitAPI() { api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) - api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequirePermission(handler.updateDataMigrationTaskStatus, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) @@ -140,82 +138,6 @@ func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Requ h.WriteJSON(w, partitions, http.StatusOK) } -func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API) (map[string]string, error) { - const step = 50 - var ( - length = len(indexNames) - end int - ) - refreshIntervals := map[string]string{} - for i := 0; i < length; i += step { - end = i + step - if end > length-1 { - end = length - } - tempNames := indexNames[i:end] - strNames := strings.Join(tempNames, ",") - resultM, err := targetESClient.GetIndexSettings(strNames) - if err != nil { - return refreshIntervals, nil - } - for indexName, v := range *resultM { - if m, ok := v.(map[string]interface{}); ok { - refreshInterval, _ := util.GetMapValueByKeys([]string{"settings", "index", "refresh_interval"}, m) - if ri, ok := refreshInterval.(string); ok { - refreshIntervals[indexName] = ri - continue - } - refreshInterval, _ = util.GetMapValueByKeys([]string{"defaults", "index", "refresh_interval"}, m) - if ri, ok := refreshInterval.(string); ok { - refreshIntervals[indexName] = ri - continue - } - } - - } - } - return refreshIntervals, nil - -} - -func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - - obj := task2.Task{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - taskConfig := &migration_model.ClusterMigrationTaskConfig{} - err = migration_util.GetTaskConfig(&obj, taskConfig) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - var indexNames []string - for _, index := range taskConfig.Indices { - indexNames = append(indexNames, index.Target.Name) - } - targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id) - if targetESClient == nil { - h.WriteJSON(w, util.MapStr{}, http.StatusOK) - } - vals, err := getIndexRefreshInterval(indexNames, targetESClient) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteJSON(w, vals, http.StatusOK) -} - func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") @@ -275,47 +197,6 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R h.WriteJSON(w, obj, http.StatusOK) } -func getIndexTaskDocCount(ctx context.Context, index *migration_model.ClusterMigrationIndexConfig, targetESClient elastic.API) (int64, error) { - targetIndexName := index.Target.Name - if targetIndexName == "" { - if v, ok := index.IndexRename[index.Source.Name].(string); ok { - targetIndexName = v - } - } - - var body []byte - var must []interface{} - if index.Target.DocType != "" && targetESClient.GetMajorVersion() < 8 { - must = append(must, util.MapStr{ - "terms": util.MapStr{ - "_type": []string{index.Target.DocType}, - }, - }) - } - if index.RawFilter != nil { - must = append(must, index.RawFilter) - } - if len(must) > 0 { - query := util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "must": must, - }, - }, - } - body = util.MustToJSONBytes(query) - } - - countRes, err := targetESClient.Count(ctx, targetIndexName, body) - if err != nil { - return 0, err - } - if countRes.StatusCode != http.StatusOK && countRes.RawResult != nil { - return 0, fmt.Errorf(string(countRes.RawResult.Body)) - } - return countRes.Count, nil -} - type TaskInfoResponse struct { TaskID string `json:"task_id"` Step interface{} `json:"step"` @@ -480,42 +361,6 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps h.WriteJSON(w, countRes, http.StatusOK) } -func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - - obj := task2.Task{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - reqBody := struct { - Status string `json:"status"` - }{} - err = h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - obj.Status = reqBody.Status - err = orm.Update(nil, obj) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - h.WriteJSON(w, util.MapStr{ - "success": true, - }, 200) -} - func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { typ := h.GetParameter(req, "type") switch typ { diff --git a/plugin/migration/module.go b/plugin/migration/module.go deleted file mode 100644 index de2d4b73..00000000 --- a/plugin/migration/module.go +++ /dev/null @@ -1,37 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package migration - -import ( - log "github.com/cihub/seelog" - "infini.sh/framework/core/env" - "infini.sh/framework/core/module" -) - -func (module *Module) Name() string { - return "migration" -} - -func (module *Module) Setup() { - exists, err := env.ParseConfig("migration", module) - if exists && err != nil { - log.Error(err) - } - InitAPI() -} -func (module *Module) Start() error { - return nil -} - -func (module *Module) Stop() error { - return nil -} - -type Module struct { -} - -func init() { - module.RegisterUserPlugin(&Module{}) -} \ No newline at end of file