[migration] cleanup unused code

This commit is contained in:
Kassian Sun 2023-05-18 22:22:33 +08:00
parent b95b1ff703
commit 7620a76ef7
2 changed files with 0 additions and 192 deletions

View File

@ -5,7 +5,6 @@
package migration package migration
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
@ -37,7 +36,6 @@ func InitAPI() {
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequirePermission(handler.updateDataMigrationTaskStatus, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead))
api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite))
@ -140,82 +138,6 @@ func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Requ
h.WriteJSON(w, partitions, http.StatusOK) h.WriteJSON(w, partitions, http.StatusOK)
} }
func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API) (map[string]string, error) {
const step = 50
var (
length = len(indexNames)
end int
)
refreshIntervals := map[string]string{}
for i := 0; i < length; i += step {
end = i + step
if end > length-1 {
end = length
}
tempNames := indexNames[i:end]
strNames := strings.Join(tempNames, ",")
resultM, err := targetESClient.GetIndexSettings(strNames)
if err != nil {
return refreshIntervals, nil
}
for indexName, v := range *resultM {
if m, ok := v.(map[string]interface{}); ok {
refreshInterval, _ := util.GetMapValueByKeys([]string{"settings", "index", "refresh_interval"}, m)
if ri, ok := refreshInterval.(string); ok {
refreshIntervals[indexName] = ri
continue
}
refreshInterval, _ = util.GetMapValueByKeys([]string{"defaults", "index", "refresh_interval"}, m)
if ri, ok := refreshInterval.(string); ok {
refreshIntervals[indexName] = ri
continue
}
}
}
}
return refreshIntervals, nil
}
func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
obj := task2.Task{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
taskConfig := &migration_model.ClusterMigrationTaskConfig{}
err = migration_util.GetTaskConfig(&obj, taskConfig)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var indexNames []string
for _, index := range taskConfig.Indices {
indexNames = append(indexNames, index.Target.Name)
}
targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id)
if targetESClient == nil {
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
}
vals, err := getIndexRefreshInterval(indexNames, targetESClient)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, vals, http.StatusOK)
}
func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id") id := ps.MustGetParameter("task_id")
@ -275,47 +197,6 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
h.WriteJSON(w, obj, http.StatusOK) h.WriteJSON(w, obj, http.StatusOK)
} }
func getIndexTaskDocCount(ctx context.Context, index *migration_model.ClusterMigrationIndexConfig, targetESClient elastic.API) (int64, error) {
targetIndexName := index.Target.Name
if targetIndexName == "" {
if v, ok := index.IndexRename[index.Source.Name].(string); ok {
targetIndexName = v
}
}
var body []byte
var must []interface{}
if index.Target.DocType != "" && targetESClient.GetMajorVersion() < 8 {
must = append(must, util.MapStr{
"terms": util.MapStr{
"_type": []string{index.Target.DocType},
},
})
}
if index.RawFilter != nil {
must = append(must, index.RawFilter)
}
if len(must) > 0 {
query := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": must,
},
},
}
body = util.MustToJSONBytes(query)
}
countRes, err := targetESClient.Count(ctx, targetIndexName, body)
if err != nil {
return 0, err
}
if countRes.StatusCode != http.StatusOK && countRes.RawResult != nil {
return 0, fmt.Errorf(string(countRes.RawResult.Body))
}
return countRes.Count, nil
}
type TaskInfoResponse struct { type TaskInfoResponse struct {
TaskID string `json:"task_id"` TaskID string `json:"task_id"`
Step interface{} `json:"step"` Step interface{} `json:"step"`
@ -480,42 +361,6 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps
h.WriteJSON(w, countRes, http.StatusOK) h.WriteJSON(w, countRes, http.StatusOK)
} }
func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
obj := task2.Task{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
reqBody := struct {
Status string `json:"status"`
}{}
err = h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
obj.Status = reqBody.Status
err = orm.Update(nil, obj)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, util.MapStr{
"success": true,
}, 200)
}
func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
typ := h.GetParameter(req, "type") typ := h.GetParameter(req, "type")
switch typ { switch typ {

View File

@ -1,37 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package migration
import (
log "github.com/cihub/seelog"
"infini.sh/framework/core/env"
"infini.sh/framework/core/module"
)
func (module *Module) Name() string {
return "migration"
}
func (module *Module) Setup() {
exists, err := env.ParseConfig("migration", module)
if exists && err != nil {
log.Error(err)
}
InitAPI()
}
func (module *Module) Start() error {
return nil
}
func (module *Module) Stop() error {
return nil
}
type Module struct {
}
func init() {
module.RegisterUserPlugin(&Module{})
}