console/plugin/migration/api.go

628 lines
19 KiB
Go

/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package migration
import (
"fmt"
"net/http"
"strings"
"time"
log "github.com/cihub/seelog"
"infini.sh/console/model"
migration_model "infini.sh/console/plugin/migration/model"
migration_util "infini.sh/console/plugin/migration/util"
"infini.sh/framework/core/api"
"infini.sh/framework/core/api/rbac"
"infini.sh/framework/core/api/rbac/enum"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/global"
"infini.sh/framework/core/orm"
task2 "infini.sh/framework/core/task"
"infini.sh/framework/core/util"
)
func InitAPI() {
handler := APIHandler{}
api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead))
api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite))
api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionComparisonTaskWrite))
api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info", handler.RequirePermission(handler.getDataComparisonTaskInfo, enum.PermissionComparisonTaskRead))
api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info/:index", handler.RequirePermission(handler.getDataComparisonTaskOfIndex, enum.PermissionComparisonTaskRead))
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionComparisonTaskWrite))
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionComparisonTaskWrite))
api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration))
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo)
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments)
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_refresh", handler.refreshIndex)
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_init", handler.initIndex)
}
type APIHandler struct {
api.Handler
}
func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{}
err := h.DecodeJSON(req, clusterTaskConfig)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(clusterTaskConfig.Indices) == 0 {
h.WriteError(w, "indices must not be empty", http.StatusInternalServerError)
return
}
user, err := rbac.FromUserContext(req.Context())
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if user != nil {
clusterTaskConfig.Creator.Name = user.Username
clusterTaskConfig.Creator.Id = user.UserId
}
var totalDocs int64
for _, index := range clusterTaskConfig.Indices {
totalDocs += index.Source.Docs
}
srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id)
clusterTaskConfig.Cluster.Source.Distribution = srcClusterCfg.Distribution
dstClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Target.Id)
clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution
t := task2.Task{
Metadata: task2.Metadata{
Type: "cluster_migration",
Labels: util.MapStr{
"business_id": "cluster_migration",
"source_cluster_id": clusterTaskConfig.Cluster.Source.Id,
"target_cluster_id": clusterTaskConfig.Cluster.Target.Id,
"source_total_docs": totalDocs,
},
},
Cancellable: true,
Runnable: false,
Status: task2.StatusInit,
ConfigString: util.MustToJSON(clusterTaskConfig),
}
t.ID = util.GetUUID()
err = orm.Create(nil, &t)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteCreatedOKJSON(w, t.ID)
}
func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
index = ps.MustGetParameter("index")
clusterID = ps.MustGetParameter("id")
)
client := elastic.GetClient(clusterID)
pq := &elastic.PartitionQuery{
IndexName: index,
}
err := h.DecodeJSON(req, pq)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
partitions, err := elastic.GetPartitions(pq, client)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, partitions, http.StatusOK)
}
func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
obj := task2.Task{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
taskConfig := &migration_model.ClusterMigrationTaskConfig{}
err = migration_util.GetTaskConfig(&obj, taskConfig)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
_, indexState, err := h.getMigrationMajorTaskInfo(id)
if err != nil {
log.Errorf("failed to get major task info, err: %v", err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var completedIndices int
for i, index := range taskConfig.Indices {
indexName := index.Source.GetUniqueIndexName()
count := indexState[indexName].IndexDocs
percent := float64(count) / float64(index.Source.Docs) * 100
if percent > 100 {
percent = 100
}
taskConfig.Indices[i].Target.Docs = count
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
if count == index.Source.Docs {
completedIndices++
}
}
cfg := global.MustLookup("cluster_migration_config")
if migrationConfig, ok := cfg.(*DispatcherConfig); ok {
if obj.Metadata.Labels == nil {
obj.Metadata.Labels = util.MapStr{}
}
obj.Metadata.Labels["log_info"] = util.MapStr{
"cluster_id": migrationConfig.Elasticsearch,
"index_name": migrationConfig.LogIndexName,
}
}
obj.ConfigString = util.MustToJSON(taskConfig)
obj.Metadata.Labels["completed_indices"] = completedIndices
h.WriteJSON(w, obj, http.StatusOK)
}
type TaskInfoResponse struct {
TaskID string `json:"task_id"`
Step interface{} `json:"step"`
StartTime int64 `json:"start_time"`
CompletedTime int64 `json:"completed_time"`
Duration int64 `json:"duration"`
DataPartition int `json:"data_partition"`
CompletedPartitions int `json:"completed_partitions"`
Partitions []util.MapStr `json:"partitions"`
}
func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
uniqueIndexName := ps.MustGetParameter("index")
majorTask := task2.Task{}
majorTask.ID = id
exists, err := orm.Get(&majorTask)
if !exists || err != nil {
h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError)
return
}
taskInfo := &TaskInfoResponse{
TaskID: id,
StartTime: majorTask.StartTimeInMillis,
}
subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName)
taskInfo.DataPartition = len(subTasks)
if len(subTasks) == 0 {
h.WriteJSON(w, taskInfo, http.StatusOK)
return
}
var scrollStats = map[string]int64{}
var bulkStats = map[string]int64{}
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
for pipelineID, pipelineContext := range pipelineContexts {
if pid, ok := pipelineSubParentIDs[pipelineID]; ok {
if vv := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs"); vv > 0 {
scrollStats[pid] = vv
}
if vv := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count"); vv > 0 {
bulkStats[pid] = vv
}
}
}
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks)
var partitionTaskInfos []util.MapStr
for i, ptask := range subTasks {
cfg := migration_model.IndexMigrationTaskConfig{}
err := migration_util.GetTaskConfig(&ptask, &cfg)
if err != nil {
log.Errorf("failed to get task config, err: %v", err)
continue
}
if i == 0 {
taskInfo.Step = cfg.Source.Step
}
var durationInMS int64
var subCompletedTime int64
if ptask.StartTimeInMillis > 0 {
if migration_util.IsPendingState(ptask.Status) {
durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis
} else if ptask.CompletedTime != nil {
subCompletedTime = ptask.CompletedTime.UnixMilli()
durationInMS = subCompletedTime - ptask.StartTimeInMillis
}
}
var (
scrollDocs int64
indexDocs int64
)
ptaskLabels := util.MapStr(ptask.Metadata.Labels)
if vv, ok := scrollStats[ptask.ID]; ok {
scrollDocs = vv
} else {
scrollDocs = migration_util.GetMapIntValue(ptaskLabels, "scrolled_docs")
}
if vv, ok := bulkStats[ptask.ID]; ok {
indexDocs = vv
} else {
indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs")
}
partitionTotalDocs := cfg.Source.DocCount
partitionTaskInfo := util.MapStr{
"task_id": ptask.ID,
"status": ptask.Status,
"start_time": ptask.StartTimeInMillis,
"completed_time": subCompletedTime,
"start": cfg.Source.Start,
"end": cfg.Source.End,
"duration": durationInMS,
"scroll_docs": scrollDocs,
"index_docs": indexDocs,
"total_docs": partitionTotalDocs,
}
scrollTask, bulkTask := migration_util.SplitIndexMigrationTasks(parentIDPipelineTasks[ptask.ID])
if scrollTask != nil {
partitionTaskInfo["scroll_task"] = util.MapStr{
"id": scrollTask.ID,
"status": scrollTask.Status,
}
}
if bulkTask != nil {
partitionTaskInfo["bulk_task"] = util.MapStr{
"id": bulkTask.ID,
"status": bulkTask.Status,
}
}
partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo)
}
taskInfo.CompletedTime = completedTime
taskInfo.Duration = duration
// NOTE: overwrite major task start time with the first started sub task
if taskInfo.StartTime == 0 {
taskInfo.StartTime = startTime
}
taskInfo.Partitions = partitionTaskInfos
taskInfo.CompletedPartitions = completedPartitions
h.WriteJSON(w, taskInfo, http.StatusOK)
}
func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
index = ps.MustGetParameter("index")
clusterID = ps.MustGetParameter("id")
)
client := elastic.GetClient(clusterID)
reqBody := struct {
Filter interface{} `json:"filter"`
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var query []byte
if reqBody.Filter != nil {
query = util.MustToJSONBytes(util.MapStr{
"query": reqBody.Filter,
})
}
ctx := req.Context()
countRes, err := client.Count(ctx, index, query)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, countRes, http.StatusOK)
}
func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
typ := h.GetParameter(req, "type")
switch typ {
case "multi_type":
h.validateMultiType(w, req, ps)
return
}
h.WriteError(w, "unknown parameter type", http.StatusOK)
}
func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var reqBody = struct {
Cluster struct {
SourceID string `json:"source_id"`
TargetID string `json:"target_id"`
} `json:"cluster"`
Indices []string
}{}
err := h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
sourceClient := elastic.GetClient(reqBody.Cluster.SourceID)
// get source type
indexNames := strings.Join(reqBody.Indices, ",")
typeInfo, err := elastic.GetIndexTypes(sourceClient, indexNames)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, util.MapStr{
"result": typeInfo,
}, http.StatusOK)
}
type InitIndexRequest struct {
Mappings map[string]interface{} `json:"mappings"`
Settings map[string]interface{} `json:"settings"`
}
func (h *APIHandler) initIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
targetClusterID := ps.MustGetParameter("id")
indexName := ps.MustGetParameter("index")
reqBody := &InitIndexRequest{}
err := h.DecodeJSON(req, reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
client := elastic.GetClient(targetClusterID)
exists, err := client.Exists(indexName)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if exists {
if len(reqBody.Settings) > 0 {
err = client.UpdateIndexSettings(indexName, reqBody.Settings)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
if ml := len(reqBody.Mappings); ml > 0 {
var (
docType = ""
mapping interface{} = reqBody.Mappings
)
if ml == 1 {
for key, _ := range reqBody.Mappings {
if key != "properties" {
docType = key
mapping = reqBody.Mappings[key]
}
}
}
mappingBytes := util.MustToJSONBytes(mapping)
_, err = client.UpdateMapping(indexName, docType, mappingBytes)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
} else {
indexSettings := map[string]interface{}{}
if len(reqBody.Settings) > 0 {
indexSettings["settings"] = reqBody.Settings
}
if len(reqBody.Mappings) > 0 {
indexSettings["mappings"] = reqBody.Mappings
}
err = client.CreateIndex(indexName, indexSettings)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
h.WriteJSON(w, util.MapStr{
"success": true,
}, http.StatusOK)
}
func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
index = ps.MustGetParameter("index")
clusterID = ps.MustGetParameter("id")
)
client := elastic.GetClient(clusterID)
err := client.Refresh(index)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"success": true,
}, 200)
}
func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexName string) (subTasks []task2.Task, pipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) {
queryDsl := util.MapStr{
"size": 9999,
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "asc",
},
},
},
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": taskItem.ID,
},
},
},
{
"term": util.MapStr{
"metadata.labels.unique_index_name": util.MapStr{
"value": uniqueIndexName,
},
},
},
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),
}
err, result := orm.Search(task2.Task{}, q)
if err != nil {
return
}
pipelineTaskIDs = map[string][]string{}
pipelineSubParentIDs = map[string]string{}
parentIDPipelineTasks = map[string][]task2.Task{}
for _, row := range result.Result {
buf := util.MustToJSONBytes(row)
subTask := task2.Task{}
err = util.FromJSONBytes(buf, &subTask)
if err != nil {
log.Error(err)
continue
}
if subTask.Metadata.Type != "pipeline" {
subTasks = append(subTasks, subTask)
continue
}
if subTask.Status != task2.StatusRunning {
continue
}
// TODO: use more robust logic
if pl := len(subTask.ParentId); pl != 2 {
continue
}
parentID := subTask.ParentId[1]
pipelineSubParentIDs[subTask.ID] = parentID
instID := migration_util.GetMapStringValue(util.MapStr(subTask.Metadata.Labels), "execution_instance_id")
if instID == "" {
continue
}
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask)
}
return
}
func (h *APIHandler) getChildPipelineInfosFromGateway(pipelineTaskIDs map[string][]string) (pipelineContexts map[string]util.MapStr) {
pipelineContexts = map[string]util.MapStr{}
var err error
for instID, taskIDs := range pipelineTaskIDs {
inst := &model.Instance{}
inst.ID = instID
_, err = orm.Get(inst)
if err != nil {
log.Errorf("failed to get instance info, id: %s, err: %v", instID, err)
continue
}
pipelines, err := inst.GetPipelinesByIDs(taskIDs)
if err != nil {
log.Errorf("failed to get pipelines info, err: %v", err)
continue
}
for pipelineID, status := range pipelines {
pipelineContexts[pipelineID] = status.Context
}
}
return
}
func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task) (startTime int64, completedTime int64, duration int64, completedPartitions int) {
if len(subTasks) == 0 {
return
}
for _, subTask := range subTasks {
if subTask.StartTimeInMillis > 0 {
if startTime == 0 {
startTime = subTask.StartTimeInMillis
}
if subTask.StartTimeInMillis < startTime {
startTime = subTask.StartTimeInMillis
}
}
if subTask.CompletedTime != nil {
subCompletedTime := subTask.CompletedTime.UnixMilli()
if subCompletedTime > completedTime {
completedTime = subCompletedTime
}
}
if subTask.Status == task2.StatusComplete || subTask.Status == task2.StatusError {
completedPartitions++
}
}
if len(subTasks) != completedPartitions {
completedTime = 0
duration = time.Now().UnixMilli() - startTime
} else {
duration = completedTime - startTime
}
return
}