Merge pull request '[comparison] add info APIs' (#92) from feature/comparison into master

This commit is contained in:
sunjiacheng 2023-05-19 09:22:22 +08:00
commit cc2fd546cf
14 changed files with 709 additions and 645 deletions

View File

@ -5,7 +5,6 @@
package migration package migration
import ( import (
"context"
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
@ -30,22 +29,23 @@ import (
func InitAPI() { func InitAPI() {
handler := APIHandler{} handler := APIHandler{}
api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionTaskWrite)) api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionTaskWrite)) api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead))
api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite))
api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionComparisonTaskWrite))
api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info", handler.RequirePermission(handler.getDataComparisonTaskInfo, enum.PermissionComparisonTaskRead))
api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info/:index", handler.RequirePermission(handler.getDataComparisonTaskOfIndex, enum.PermissionComparisonTaskRead))
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionComparisonTaskWrite))
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionComparisonTaskWrite))
api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration))
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionTaskWrite))
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionTaskWrite))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionTaskRead))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionTaskRead))
api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequirePermission(handler.updateDataMigrationTaskStatus, enum.PermissionTaskRead))
api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionTaskRead))
api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionTaskWrite))
api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionTaskWrite))
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionTaskWrite))
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionTaskWrite))
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo)
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments)
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_refresh", handler.refreshIndex) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_refresh", handler.refreshIndex)
@ -138,82 +138,6 @@ func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Requ
h.WriteJSON(w, partitions, http.StatusOK) h.WriteJSON(w, partitions, http.StatusOK)
} }
func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API) (map[string]string, error) {
const step = 50
var (
length = len(indexNames)
end int
)
refreshIntervals := map[string]string{}
for i := 0; i < length; i += step {
end = i + step
if end > length-1 {
end = length
}
tempNames := indexNames[i:end]
strNames := strings.Join(tempNames, ",")
resultM, err := targetESClient.GetIndexSettings(strNames)
if err != nil {
return refreshIntervals, nil
}
for indexName, v := range *resultM {
if m, ok := v.(map[string]interface{}); ok {
refreshInterval, _ := util.GetMapValueByKeys([]string{"settings", "index", "refresh_interval"}, m)
if ri, ok := refreshInterval.(string); ok {
refreshIntervals[indexName] = ri
continue
}
refreshInterval, _ = util.GetMapValueByKeys([]string{"defaults", "index", "refresh_interval"}, m)
if ri, ok := refreshInterval.(string); ok {
refreshIntervals[indexName] = ri
continue
}
}
}
}
return refreshIntervals, nil
}
func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
obj := task2.Task{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
taskConfig := &migration_model.ClusterMigrationTaskConfig{}
err = migration_util.GetTaskConfig(&obj, taskConfig)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var indexNames []string
for _, index := range taskConfig.Indices {
indexNames = append(indexNames, index.Target.Name)
}
targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id)
if targetESClient == nil {
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
}
vals, err := getIndexRefreshInterval(indexNames, targetESClient)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, vals, http.StatusOK)
}
func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id") id := ps.MustGetParameter("task_id")
@ -236,15 +160,9 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
return return
} }
indexState, err := getMajorTaskInfoByIndex(id) _, indexState, err := h.getMigrationMajorTaskInfo(id)
if err != nil { if err != nil {
log.Error(err) log.Errorf("failed to get major task info, err: %v", err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
realtimeIndexState, err := getMajorTaskByIndexFromES(id)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
@ -252,15 +170,15 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
var completedIndices int var completedIndices int
for i, index := range taskConfig.Indices { for i, index := range taskConfig.Indices {
indexName := index.Source.GetUniqueIndexName() indexName := index.Source.GetUniqueIndexName()
count := indexState[indexName].IndexDocs + realtimeIndexState[indexName].IndexDocs count := indexState[indexName].IndexDocs
percent := count * 100 / float64(index.Source.Docs) percent := float64(count) / float64(index.Source.Docs) * 100
if percent > 100 { if percent > 100 {
percent = 100 percent = 100
} }
taskConfig.Indices[i].Target.Docs = int64(count) taskConfig.Indices[i].Target.Docs = count
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
if int64(count) == index.Source.Docs { if count == index.Source.Docs {
completedIndices++ completedIndices++
} }
} }
@ -279,139 +197,15 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
h.WriteJSON(w, obj, http.StatusOK) h.WriteJSON(w, obj, http.StatusOK)
} }
func getMajorTaskInfoByIndex(taskID string) (map[string]migration_model.IndexStateInfo, error) { type TaskInfoResponse struct {
query := util.MapStr{ TaskID string `json:"task_id"`
"size": 0, Step interface{} `json:"step"`
"aggs": util.MapStr{ StartTime int64 `json:"start_time"`
"group_by_task": util.MapStr{ CompletedTime int64 `json:"completed_time"`
"terms": util.MapStr{ Duration int64 `json:"duration"`
"field": "metadata.labels.unique_index_name", DataPartition int `json:"data_partition"`
"size": 100, CompletedPartitions int `json:"completed_partitions"`
}, Partitions []util.MapStr `json:"partitions"`
"aggs": util.MapStr{
"group_by_status": util.MapStr{
"terms": util.MapStr{
"field": "status",
"size": 100,
},
},
"total_docs": util.MapStr{
"sum": util.MapStr{
"field": "metadata.labels.index_docs",
},
},
},
},
},
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": "index_migration",
},
},
},
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": taskID,
},
},
},
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(query),
}
err, result := orm.Search(task2.Task{}, q)
if err != nil {
return nil, err
}
searchRes := &elastic.SearchResponse{}
err = util.FromJSONBytes(result.Raw, searchRes)
if err != nil {
return nil, err
}
resBody := map[string]migration_model.IndexStateInfo{}
if taskAgg, ok := searchRes.Aggregations["group_by_task"]; ok {
for _, bk := range taskAgg.Buckets {
if key, ok := bk["key"].(string); ok {
//resBody[key] = int(bk["doc_count"].(float64))
resBody[key] = migration_model.IndexStateInfo{}
if statusAgg, ok := bk["group_by_status"].(map[string]interface{}); ok {
if sbks, ok := statusAgg["buckets"].([]interface{}); ok {
for _, sbk := range sbks {
if sbkM, ok := sbk.(map[string]interface{}); ok {
if sbkM["key"] == task2.StatusError {
if v, ok := sbkM["doc_count"].(float64); ok {
st := resBody[key]
st.ErrorPartitions = int(v)
resBody[key] = st
}
}
}
}
}
}
if indexDocsAgg, ok := bk["total_docs"].(map[string]interface{}); ok {
if v, ok := indexDocsAgg["value"].(float64); ok {
st := resBody[key]
st.IndexDocs = v
resBody[key] = st
}
}
}
}
}
return resBody, nil
}
func getIndexTaskDocCount(ctx context.Context, index *migration_model.ClusterMigrationIndexConfig, targetESClient elastic.API) (int64, error) {
targetIndexName := index.Target.Name
if targetIndexName == "" {
if v, ok := index.IndexRename[index.Source.Name].(string); ok {
targetIndexName = v
}
}
var body []byte
var must []interface{}
if index.Target.DocType != "" && targetESClient.GetMajorVersion() < 8 {
must = append(must, util.MapStr{
"terms": util.MapStr{
"_type": []string{index.Target.DocType},
},
})
}
if index.RawFilter != nil {
must = append(must, index.RawFilter)
}
if len(must) > 0 {
query := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": must,
},
},
}
body = util.MustToJSONBytes(query)
}
countRes, err := targetESClient.Count(ctx, targetIndexName, body)
if err != nil {
return 0, err
}
if countRes.StatusCode != http.StatusOK && countRes.RawResult != nil {
return 0, fmt.Errorf(string(countRes.RawResult.Body))
}
return countRes.Count, nil
} }
func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -425,122 +219,37 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
return return
} }
var completedTime int64 taskInfo := &TaskInfoResponse{
TaskID: id,
StartTime: majorTask.StartTimeInMillis,
}
taskInfo := util.MapStr{ subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName)
"task_id": id,
"start_time": majorTask.StartTimeInMillis, taskInfo.DataPartition = len(subTasks)
} if len(subTasks) == 0 {
partitionTaskQuery := util.MapStr{ h.WriteJSON(w, taskInfo, http.StatusOK)
"size": 500,
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "asc",
},
},
},
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": id,
},
},
},
{
"term": util.MapStr{
"metadata.labels.unique_index_name": util.MapStr{
"value": uniqueIndexName,
},
},
},
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(partitionTaskQuery),
}
err, result := orm.Search(task2.Task{}, q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
var subTasks []task2.Task
var pipelineTaskIDs = map[string][]string{}
pipelineSubParentIDs := map[string]string{}
subTaskStatus := map[string]string{}
parentIDPipelineTasks := map[string][]task2.Task{}
for _, row := range result.Result {
buf := util.MustToJSONBytes(row)
subTask := task2.Task{}
err = util.FromJSONBytes(buf, &subTask)
if err != nil {
log.Error(err)
continue
}
if subTask.Metadata.Labels != nil {
if subTask.Metadata.Type == "index_migration" {
subTasks = append(subTasks, subTask)
subTaskStatus[subTask.ID] = subTask.Status
continue
}
if subTask.Metadata.Labels["pipeline_id"] == "es_scroll" || subTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok {
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
if pl := len(subTask.ParentId); pl > 0 {
parentID := subTask.ParentId[pl-1]
if subTaskStatus[parentID] == task2.StatusRunning {
pipelineSubParentIDs[subTask.ID] = parentID
}
parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask)
}
}
}
}
}
taskInfo["data_partition"] = len(subTasks)
var scrollStats = map[string]int64{} var scrollStats = map[string]int64{}
var bulkStats = map[string]int64{} var bulkStats = map[string]int64{}
for instID, taskIDs := range pipelineTaskIDs { pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
inst := &model.Instance{} for pipelineID, pipelineContext := range pipelineContexts {
inst.ID = instID if pid, ok := pipelineSubParentIDs[pipelineID]; ok {
_, err = orm.Get(inst) if vv := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs"); vv > 0 {
if err != nil { scrollStats[pid] = vv
log.Error(err) }
continue if vv := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count"); vv > 0 {
} bulkStats[pid] = vv
pipelines, err := inst.GetPipelinesByIDs(taskIDs)
if err != nil {
log.Error(err)
continue
}
for pipelineID, status := range pipelines {
if pid, ok := pipelineSubParentIDs[pipelineID]; ok {
if vv := migration_util.GetMapIntValue(status.Context, "es_scroll.scrolled_docs"); vv > 0 {
scrollStats[pid] = vv
}
if vv := migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count"); vv > 0 {
bulkStats[pid] = vv
}
} }
} }
} }
var ( startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks)
partitionTaskInfos []util.MapStr
completedPartitions int var partitionTaskInfos []util.MapStr
startTime int64
)
if len(subTasks) > 0 {
startTime = subTasks[0].StartTimeInMillis
}
for i, ptask := range subTasks { for i, ptask := range subTasks {
cfg := migration_model.IndexMigrationTaskConfig{} cfg := migration_model.IndexMigrationTaskConfig{}
err := migration_util.GetTaskConfig(&ptask, &cfg) err := migration_util.GetTaskConfig(&ptask, &cfg)
@ -548,20 +257,20 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
log.Errorf("failed to get task config, err: %v", err) log.Errorf("failed to get task config, err: %v", err)
continue continue
} }
start := cfg.Source.Start
end := cfg.Source.End
if i == 0 { if i == 0 {
step := cfg.Source.Step taskInfo.Step = cfg.Source.Step
taskInfo["step"] = step
} }
var durationInMS int64 = 0
var durationInMS int64
var subCompletedTime int64
if ptask.StartTimeInMillis > 0 { if ptask.StartTimeInMillis > 0 {
if ptask.StartTimeInMillis < startTime { if migration_util.IsPendingState(ptask.Status) {
startTime = ptask.StartTimeInMillis durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis
continue
} }
durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis if ptask.CompletedTime != nil {
if ptask.CompletedTime != nil && (ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError) { subCompletedTime = ptask.CompletedTime.UnixMilli()
durationInMS = ptask.CompletedTime.UnixMilli() - ptask.StartTimeInMillis durationInMS = subCompletedTime - ptask.StartTimeInMillis
} }
} }
var ( var (
@ -580,54 +289,42 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs") indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs")
} }
var subCompletedTime int64
if ptask.CompletedTime != nil {
subCompletedTime = ptask.CompletedTime.UnixMilli()
if subCompletedTime > completedTime {
completedTime = subCompletedTime
}
}
partitionTotalDocs := cfg.Source.DocCount partitionTotalDocs := cfg.Source.DocCount
partitionTaskInfo := util.MapStr{ partitionTaskInfo := util.MapStr{
"task_id": ptask.ID, "task_id": ptask.ID,
"status": ptask.Status, "status": ptask.Status,
"start_time": ptask.StartTimeInMillis, "start_time": ptask.StartTimeInMillis,
"completed_time": subCompletedTime, "completed_time": subCompletedTime,
"start": start, "start": cfg.Source.Start,
"end": end, "end": cfg.Source.End,
"duration": durationInMS, "duration": durationInMS,
"scroll_docs": scrollDocs, "scroll_docs": scrollDocs,
"index_docs": indexDocs, "index_docs": indexDocs,
"total_docs": partitionTotalDocs, "total_docs": partitionTotalDocs,
} }
for _, pipelineTask := range parentIDPipelineTasks[ptask.ID] { scrollTask, bulkTask := migration_util.SplitIndexMigrationTasks(parentIDPipelineTasks[ptask.ID])
if pipelineTask.Metadata.Labels["pipeline_id"] == "es_scroll" { if scrollTask != nil {
partitionTaskInfo["scroll_task"] = util.MapStr{ partitionTaskInfo["scroll_task"] = util.MapStr{
"id": pipelineTask.ID, "id": scrollTask.ID,
"status": pipelineTask.Status, "status": scrollTask.Status,
} }
} else if pipelineTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { }
partitionTaskInfo["bulk_task"] = util.MapStr{ if bulkTask != nil {
"id": pipelineTask.ID, partitionTaskInfo["bulk_task"] = util.MapStr{
"status": pipelineTask.Status, "id": bulkTask.ID,
} "status": bulkTask.Status,
} }
} }
partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo) partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo)
if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError {
completedPartitions++
}
} }
if len(subTasks) == completedPartitions { taskInfo.CompletedTime = completedTime
taskInfo["completed_time"] = completedTime taskInfo.Duration = duration
taskInfo["duration"] = completedTime - startTime // NOTE: overwrite major task start time with the first started sub task
} else { if taskInfo.StartTime == 0 {
taskInfo["duration"] = time.Now().UnixMilli() - startTime taskInfo.StartTime = startTime
} }
taskInfo["start_time"] = startTime taskInfo.Partitions = partitionTaskInfos
taskInfo["partitions"] = partitionTaskInfos taskInfo.CompletedPartitions = completedPartitions
taskInfo["completed_partitions"] = completedPartitions
h.WriteJSON(w, taskInfo, http.StatusOK) h.WriteJSON(w, taskInfo, http.StatusOK)
} }
@ -664,42 +361,6 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps
h.WriteJSON(w, countRes, http.StatusOK) h.WriteJSON(w, countRes, http.StatusOK)
} }
func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
obj := task2.Task{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
reqBody := struct {
Status string `json:"status"`
}{}
err = h.DecodeJSON(req, &reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
obj.Status = reqBody.Status
err = orm.Update(nil, obj)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, util.MapStr{
"success": true,
}, 200)
}
func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
typ := h.GetParameter(req, "type") typ := h.GetParameter(req, "type")
switch typ { switch typ {
@ -828,48 +489,30 @@ func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps h
}, 200) }, 200)
} }
func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_model.MajorTaskState, err error) { func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexName string) (subTasks []task2.Task, pipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) {
taskQuery := util.MapStr{ queryDsl := util.MapStr{
"size": 500, "size": 9999,
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "asc",
},
},
},
"query": util.MapStr{ "query": util.MapStr{
"bool": util.MapStr{ "bool": util.MapStr{
"must": []util.MapStr{ "must": []util.MapStr{
{ {
"term": util.MapStr{ "term": util.MapStr{
"parent_id": util.MapStr{ "parent_id": util.MapStr{
"value": majorTaskID, "value": taskItem.ID,
}, },
}, },
}, },
{ {
"bool": util.MapStr{ "term": util.MapStr{
"minimum_should_match": 1, "metadata.labels.unique_index_name": util.MapStr{
"should": []util.MapStr{ "value": uniqueIndexName,
{
"term": util.MapStr{
"metadata.labels.pipeline_id": util.MapStr{
"value": "bulk_indexing",
},
},
},
{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": "index_migration",
},
},
},
{
"terms": util.MapStr{
"status": []string{task2.StatusComplete, task2.StatusError},
},
},
},
},
},
}, },
}, },
}, },
@ -878,13 +521,17 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod
}, },
} }
q := &orm.Query{ q := &orm.Query{
RawQuery: util.MustToJSONBytes(taskQuery), RawQuery: util.MustToJSONBytes(queryDsl),
} }
err, result := orm.Search(task2.Task{}, q) err, result := orm.Search(task2.Task{}, q)
if err != nil { if err != nil {
return taskStats, err return
} }
var pipelineTaskIDs = map[string][]string{}
pipelineTaskIDs = map[string][]string{}
pipelineSubParentIDs = map[string]string{}
parentIDPipelineTasks = map[string][]task2.Task{}
for _, row := range result.Result { for _, row := range result.Result {
buf := util.MustToJSONBytes(row) buf := util.MustToJSONBytes(row)
subTask := task2.Task{} subTask := task2.Task{}
@ -893,114 +540,90 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod
log.Error(err) log.Error(err)
continue continue
} }
taskLabels := util.MapStr(subTask.Metadata.Labels)
if subTask.Metadata.Labels != nil { if subTask.Metadata.Type != "pipeline" {
//add indexDocs of already complete subTasks = append(subTasks, subTask)
if subTask.Metadata.Type == "index_migration" {
taskStats.IndexDocs += migration_util.GetMapIntValue(taskLabels, "index_docs")
continue
}
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
}
}
}
for instID, taskIDs := range pipelineTaskIDs {
inst := &model.Instance{}
inst.ID = instID
_, err = orm.Get(inst)
if err != nil {
log.Error(err)
continue continue
} }
pipelines, err := inst.GetPipelinesByIDs(taskIDs) if subTask.Status != task2.StatusRunning {
if err != nil {
log.Error(err)
continue continue
} }
for _, status := range pipelines { // TODO: use more robust logic
taskStats.IndexDocs += migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count") if pl := len(subTask.ParentId); pl != 2 {
continue
} }
parentID := subTask.ParentId[1]
pipelineSubParentIDs[subTask.ID] = parentID
instID := migration_util.GetMapStringValue(util.MapStr(subTask.Metadata.Labels), "execution_instance_id")
if instID == "" {
continue
}
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask)
} }
return taskStats, nil
return
} }
func getMajorTaskByIndexFromES(majorTaskID string) (map[string]migration_model.IndexStateInfo, error) { func (h *APIHandler) getChildPipelineInfosFromGateway(pipelineTaskIDs map[string][]string) (pipelineContexts map[string]util.MapStr) {
taskQuery := util.MapStr{ pipelineContexts = map[string]util.MapStr{}
"size": 500, var err error
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": majorTaskID,
},
},
},
{
"term": util.MapStr{
"metadata.labels.pipeline_id": util.MapStr{
"value": "bulk_indexing",
},
},
},
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(taskQuery),
}
err, result := orm.Search(task2.Task{}, q)
if err != nil {
return nil, err
}
var pipelineTaskIDs = map[string][]string{}
var pipelineIndexNames = map[string]string{}
for _, row := range result.Result {
buf := util.MustToJSONBytes(row)
subTask := task2.Task{}
err = util.FromJSONBytes(buf, &subTask)
if err != nil {
log.Error(err)
continue
}
if subTask.Metadata.Labels != nil {
if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok {
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
}
if indexName, ok := subTask.Metadata.Labels["unique_index_name"].(string); ok {
pipelineIndexNames[subTask.ID] = indexName
}
}
}
state := map[string]migration_model.IndexStateInfo{}
for instID, taskIDs := range pipelineTaskIDs { for instID, taskIDs := range pipelineTaskIDs {
inst := &model.Instance{} inst := &model.Instance{}
inst.ID = instID inst.ID = instID
_, err = orm.Get(inst) _, err = orm.Get(inst)
if err != nil { if err != nil {
log.Error(err) log.Error("failed to get instance info, err: %v", err)
continue continue
} }
pipelines, err := inst.GetPipelinesByIDs(taskIDs) pipelines, err := inst.GetPipelinesByIDs(taskIDs)
if err != nil { if err != nil {
log.Error(err) log.Errorf("failed to get pipelines info, err: %v", err)
continue continue
} }
for pipelineID, status := range pipelines { for pipelineID, status := range pipelines {
indexName := pipelineIndexNames[pipelineID] pipelineContexts[pipelineID] = status.Context
if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil {
if vv, ok := v.(float64); ok && indexName != "" {
st := state[indexName]
st.IndexDocs += vv
state[indexName] = st
}
}
} }
} }
return state, nil
return
}
func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task) (startTime int64, completedTime int64, duration int64, completedPartitions int) {
if len(subTasks) == 0 {
return
}
for _, subTask := range subTasks {
if subTask.StartTimeInMillis > 0 {
if startTime == 0 {
startTime = subTask.StartTimeInMillis
}
if subTask.StartTimeInMillis < startTime {
startTime = subTask.StartTimeInMillis
}
}
if subTask.CompletedTime != nil {
subCompletedTime := subTask.CompletedTime.UnixMilli()
if subCompletedTime > completedTime {
completedTime = subCompletedTime
}
}
if subTask.Status == task2.StatusComplete || subTask.Status == task2.StatusError {
completedPartitions++
}
}
if len(subTasks) != completedPartitions {
completedTime = 0
duration = time.Now().UnixMilli() - startTime
} else {
duration = completedTime - startTime
}
return
} }

View File

@ -55,6 +55,9 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error {
return nil return nil
} }
taskItem.RetryTimes++ taskItem.RetryTimes++
if taskItem.StartTimeInMillis == 0 {
taskItem.StartTimeInMillis = time.Now().UnixMilli()
}
taskItem.Status = task.StatusRunning taskItem.Status = task.StatusRunning
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
Success: true, Success: true,
@ -102,6 +105,9 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
} }
} }
// TODO: parition twice for source & target, then merge
// if there's a partition missing from source but present in target
// ideally we can capture it in docs count, but this won't always work
if index.Partition != nil { if index.Partition != nil {
partitionQ := &elastic.PartitionQuery{ partitionQ := &elastic.PartitionQuery{
IndexName: index.Source.Name, IndexName: index.Source.Name,
@ -119,18 +125,20 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
if partitions == nil || len(partitions) == 0 { if partitions == nil || len(partitions) == 0 {
return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter)) return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter))
} }
var (
partitionID int
)
for _, partition := range partitions { for _, partition := range partitions {
//skip empty partition partitionID++
if partition.Docs <= 0 {
continue
}
partitionSourceDump := sourceDump partitionSourceDump := sourceDump
partitionSourceDump.QueryDSL = partition.Filter partitionSourceDump.Start = partition.Start
partitionSourceDump.End = partition.End
partitionSourceDump.DocCount = partition.Docs partitionSourceDump.DocCount = partition.Docs
partitionSourceDump.Step = index.Partition.Step
partitionSourceDump.PartitionId = partitionID
partitionSourceDump.QueryDSL = partition.Filter
partitionSourceDump.QueryString = "" partitionSourceDump.QueryString = ""
// TODO: if there's a partition missing from source but present in target
// ideally we can capture it in docs count, but this won't always work
partitionTargetDump := partitionSourceDump partitionTargetDump := partitionSourceDump
partitionTargetDump.Indices = index.Target.Name partitionTargetDump.Indices = index.Target.Name
@ -166,6 +174,7 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
sourceDump.DocCount = index.Source.Docs sourceDump.DocCount = index.Source.Docs
targetDump := sourceDump targetDump := sourceDump
targetDump.Indices = index.Target.Name targetDump.Indices = index.Target.Name
targetDump.DocCount = index.Target.Docs
indexComparisonTask := task.Task{ indexComparisonTask := task.Task{
ParentId: []string{taskItem.ID}, ParentId: []string{taskItem.ID},

View File

@ -57,6 +57,9 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error {
return nil return nil
} }
taskItem.RetryTimes++ taskItem.RetryTimes++
if taskItem.StartTimeInMillis == 0 {
taskItem.StartTimeInMillis = time.Now().UnixMilli()
}
taskItem.Status = task.StatusRunning taskItem.Status = task.StatusRunning
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
Success: true, Success: true,
@ -284,7 +287,7 @@ func (p *processor) handleRunningMajorTask(taskItem *task.Task) error {
return nil return nil
} }
func (p *processor) getMajorTaskState(majorTask *task.Task) (taskState migration_model.MajorTaskState, err error) { func (p *processor) getMajorTaskState(majorTask *task.Task) (taskState migration_model.ClusterMigrationTaskState, err error) {
query := util.MapStr{ query := util.MapStr{
"size": 0, "size": 0,
"aggs": util.MapStr{ "aggs": util.MapStr{

View File

@ -8,7 +8,6 @@ import (
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
migration_model "infini.sh/console/plugin/migration/model"
migration_util "infini.sh/console/plugin/migration/util" migration_util "infini.sh/console/plugin/migration/util"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
@ -88,29 +87,40 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req
} }
for _, hit := range searchRes.Hits.Hits { for _, hit := range searchRes.Hits.Hits {
sourceM := util.MapStr(hit.Source) sourceM := util.MapStr(hit.Source)
buf := util.MustToJSONBytes(sourceM["config"]) h.populateMajorTaskInfo(hit.ID, sourceM)
dataConfig := migration_model.ClusterMigrationTaskConfig{}
err = util.FromJSONBytes(buf, &dataConfig)
if err != nil {
log.Error(err)
continue
}
//var targetTotalDocs int64
if hit.Source["status"] == task.StatusRunning {
ts, err := getMajorTaskStatsFromInstances(hit.ID)
if err != nil {
log.Warnf("fetch progress info of task error: %v", err)
continue
}
sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs)
}
} }
h.WriteJSON(w, searchRes, http.StatusOK) h.WriteJSON(w, searchRes, http.StatusOK)
} }
} }
func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
buf := util.MustToJSONBytes(sourceM)
majorTask := task.Task{}
err := util.FromJSONBytes(buf, &majorTask)
if err != nil {
log.Errorf("failed to unmarshal major task info, err: %v", err)
return
}
switch majorTask.Metadata.Type {
case "cluster_migration":
ts, _, err := h.getMigrationMajorTaskInfo(taskID)
if err != nil {
log.Warnf("fetch progress info of task error: %v", err)
return
}
sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs)
case "cluster_comparison":
ts, _, err := h.getComparisonMajorTaskInfo(taskID)
if err != nil {
log.Warnf("fetch progress info of task error: %v", err)
return
}
sourceM.Put("metadata.labels.source_scroll_docs", ts.SourceScrollDocs)
sourceM.Put("metadata.labels.target_scroll_docs", ts.TargetScrollDocs)
}
}
func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
taskID := ps.MustGetParameter("task_id") taskID := ps.MustGetParameter("task_id")
obj := task.Task{} obj := task.Task{}

View File

@ -1,15 +1,19 @@
package migration package migration
import ( import (
"fmt"
"net/http" "net/http"
"time"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
migration_model "infini.sh/console/plugin/migration/model" migration_model "infini.sh/console/plugin/migration/model"
migration_util "infini.sh/console/plugin/migration/util"
"infini.sh/framework/core/api/rbac" "infini.sh/framework/core/api/rbac"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"
"infini.sh/framework/core/global"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/task" "infini.sh/framework/core/task"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
@ -38,9 +42,11 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R
clusterTaskConfig.Creator.Id = user.UserId clusterTaskConfig.Creator.Id = user.UserId
} }
var totalDocs int64 var sourceTotalDocs int64
var targetTotalDocs int64
for _, index := range clusterTaskConfig.Indices { for _, index := range clusterTaskConfig.Indices {
totalDocs += index.Source.Docs sourceTotalDocs += index.Source.Docs
targetTotalDocs += index.Target.Docs
} }
srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id) srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id)
@ -54,7 +60,8 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R
"business_id": "cluster_comparison", "business_id": "cluster_comparison",
"source_cluster_id": clusterTaskConfig.Cluster.Source.Id, "source_cluster_id": clusterTaskConfig.Cluster.Source.Id,
"target_cluster_id": clusterTaskConfig.Cluster.Target.Id, "target_cluster_id": clusterTaskConfig.Cluster.Target.Id,
"source_total_docs": totalDocs, "source_total_docs": sourceTotalDocs,
"target_total_docs": targetTotalDocs,
}, },
}, },
Cancellable: true, Cancellable: true,
@ -71,3 +78,275 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R
} }
h.WriteCreatedOKJSON(w, t.ID) h.WriteCreatedOKJSON(w, t.ID)
} }
func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
obj := task.Task{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
taskConfig := &migration_model.ClusterComparisonTaskConfig{}
err = migration_util.GetTaskConfig(&obj, taskConfig)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
_, indexState, err := h.getComparisonMajorTaskInfo(id)
if err != nil {
log.Errorf("failed to get major task info, err: %v", err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
var completedIndices int
for i, index := range taskConfig.Indices {
indexName := index.Source.GetUniqueIndexName()
count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs
percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100
if percent > 100 {
percent = 100
}
taskConfig.Indices[i].Target.Docs = count
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
if count == index.Source.Docs {
completedIndices++
}
}
cfg := global.MustLookup("cluster_migration_config")
if migrationConfig, ok := cfg.(*DispatcherConfig); ok {
if obj.Metadata.Labels == nil {
obj.Metadata.Labels = util.MapStr{}
}
obj.Metadata.Labels["log_info"] = util.MapStr{
"cluster_id": migrationConfig.Elasticsearch,
"index_name": migrationConfig.LogIndexName,
}
}
obj.ConfigString = util.MustToJSON(taskConfig)
obj.Metadata.Labels["completed_indices"] = completedIndices
h.WriteJSON(w, obj, http.StatusOK)
}
type ComparisonIndexStateInfo struct {
ErrorPartitions int
SourceScrollDocs int64
TargetScrollDocs int64
}
// TODO: calc realtime info from instance
func (h *APIHandler) getComparisonMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) {
taskQuery := util.MapStr{
"size": 500,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": majorTaskID,
},
},
},
{
"bool": util.MapStr{
"minimum_should_match": 1,
"should": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.pipeline_id": util.MapStr{
"value": "dump_hash",
},
},
},
{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": "index_comparison",
},
},
},
{
"terms": util.MapStr{
"status": []string{task.StatusComplete, task.StatusError},
},
},
},
},
},
},
},
},
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(taskQuery),
}
err, result := orm.Search(task.Task{}, q)
if err != nil {
return taskStats, indexState, err
}
var pipelineIndexNames = map[string]string{}
indexState = map[string]ComparisonIndexStateInfo{}
for _, row := range result.Result {
buf := util.MustToJSONBytes(row)
subTask := task.Task{}
err := util.FromJSONBytes(buf, &subTask)
if err != nil {
log.Errorf("failed to unmarshal task, err: %v", err)
continue
}
if subTask.Metadata.Labels == nil {
continue
}
taskLabels := util.MapStr(subTask.Metadata.Labels)
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
if indexName == "" {
continue
}
// add indexDocs of already complete/error
if subTask.Metadata.Type == "index_comparison" {
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled")
taskStats.SourceScrollDocs += sourceDocs
taskStats.TargetScrollDocs += targetDocs
st := indexState[indexName]
st.SourceScrollDocs += sourceDocs
st.TargetScrollDocs += targetDocs
if subTask.Status == task.StatusError {
st.ErrorPartitions += 1
}
indexState[indexName] = st
continue
}
pipelineIndexNames[subTask.ID] = indexName
}
return taskStats, indexState, nil
}
func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
uniqueIndexName := ps.MustGetParameter("index")
majorTask := task.Task{}
majorTask.ID = id
exists, err := orm.Get(&majorTask)
if !exists || err != nil {
h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError)
return
}
taskInfo := &TaskInfoResponse{
TaskID: id,
StartTime: majorTask.StartTimeInMillis,
}
subTasks, pipelineTaskIDs, _, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName)
taskInfo.DataPartition = len(subTasks)
if len(subTasks) == 0 {
h.WriteJSON(w, taskInfo, http.StatusOK)
return
}
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks)
var partitionTaskInfos []util.MapStr
for i, subTask := range subTasks {
cfg := migration_model.IndexComparisonTaskConfig{}
err := migration_util.GetTaskConfig(&subTask, &cfg)
if err != nil {
log.Errorf("failed to get task config, err: %v", err)
continue
}
if i == 0 {
taskInfo.Step = cfg.Source.Step
}
var durationInMS int64
var subCompletedTime int64
if subTask.StartTimeInMillis > 0 {
if migration_util.IsPendingState(subTask.Status) {
durationInMS = time.Now().UnixMilli() - subTask.StartTimeInMillis
continue
}
if subTask.CompletedTime != nil {
subCompletedTime = subTask.CompletedTime.UnixMilli()
durationInMS = subCompletedTime - subTask.StartTimeInMillis
}
}
subTaskLabels := util.MapStr(subTask.Metadata.Labels)
sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled")
targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled")
partitionTaskInfo := util.MapStr{
"task_id": subTask.ID,
"status": subTask.Status,
"start_time": subTask.StartTimeInMillis,
"completed_time": subCompletedTime,
"start": cfg.Source.Start,
"end": cfg.Source.End,
"duration": durationInMS,
"source_total_docs": cfg.Source.DocCount,
"target_total_docs": cfg.Target.DocCount,
}
sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg)
if sourceDumpTask != nil {
partitionTaskInfo["source_scroll_task"] = util.MapStr{
"id": sourceDumpTask.ID,
"status": sourceDumpTask.Status,
}
pipelineID := sourceDumpTask.ID
pipelineContext, ok := pipelineContexts[pipelineID]
if ok {
if vv := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs"); vv > 0 {
sourceScrollDocs = vv
}
}
}
if targetDumpTask != nil {
partitionTaskInfo["target_scroll_task"] = util.MapStr{
"id": targetDumpTask.ID,
"status": targetDumpTask.Status,
}
pipelineID := targetDumpTask.ID
pipelineContext, ok := pipelineContexts[pipelineID]
if ok {
if vv := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs"); vv > 0 {
targetScrollDocs = vv
}
}
}
partitionTaskInfo["source_scroll_docs"] = sourceScrollDocs
partitionTaskInfo["target_scroll_docs"] = targetScrollDocs
partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo)
}
taskInfo.CompletedTime = completedTime
taskInfo.Duration = duration
// NOTE: overwrite major task start time with the first started sub task
if taskInfo.StartTime == 0 {
taskInfo.StartTime = startTime
}
taskInfo.Partitions = partitionTaskInfos
taskInfo.CompletedPartitions = completedPartitions
h.WriteJSON(w, taskInfo, http.StatusOK)
}

View File

@ -293,6 +293,7 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error {
// update sub migration task status to running and save task log // update sub migration task status to running and save task log
taskItem.Metadata.Labels["execution_instance_id"] = instanceID taskItem.Metadata.Labels["execution_instance_id"] = instanceID
p.clearTaskState(taskItem)
taskItem.Status = task.StatusRunning taskItem.Status = task.StatusRunning
taskItem.StartTimeInMillis = time.Now().UnixMilli() taskItem.StartTimeInMillis = time.Now().UnixMilli()
@ -330,6 +331,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
if sourceDumpTask.Status == task.StatusComplete && targetDumpTask.Status == task.StatusComplete { if sourceDumpTask.Status == task.StatusComplete && targetDumpTask.Status == task.StatusComplete {
sourceDocs := migration_util.GetMapIntValue(util.MapStr(sourceDumpTask.Metadata.Labels), "scrolled_docs") sourceDocs := migration_util.GetMapIntValue(util.MapStr(sourceDumpTask.Metadata.Labels), "scrolled_docs")
targetDocs := migration_util.GetMapIntValue(util.MapStr(targetDumpTask.Metadata.Labels), "scrolled_docs") targetDocs := migration_util.GetMapIntValue(util.MapStr(targetDumpTask.Metadata.Labels), "scrolled_docs")
taskItem.Metadata.Labels["source_scrolled"] = sourceDocs
taskItem.Metadata.Labels["target_scrolled"] = targetDocs
if sourceDocs != targetDocs { if sourceDocs != targetDocs {
now := time.Now() now := time.Now()
taskItem.CompletedTime = &now taskItem.CompletedTime = &now
@ -424,19 +428,7 @@ func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.I
err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks)) err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks))
return return
} }
for i, ptask := range ptasks { sourceDumpTask, targetDumpTask, diffTask = migration_util.SplitIndexComparisonTasks(ptasks, cfg)
if ptask.Metadata.Labels["pipeline_id"] == "dump_hash" {
// TODO: we can't handle when compare the same cluster & same index
// catch it earlier when creating the task
if ptask.Metadata.Labels["cluster_id"] == cfg.Source.ClusterId && ptask.Metadata.Labels["index_name"] == cfg.Source.Indices {
sourceDumpTask = &ptasks[i]
} else {
targetDumpTask = &ptasks[i]
}
} else if ptask.Metadata.Labels["pipeline_id"] == "index_diff" {
diffTask = &ptasks[i]
}
}
return return
} }
@ -478,3 +470,8 @@ func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.Ta
migration_util.WriteLog(taskItem, taskResult, message) migration_util.WriteLog(taskItem, taskResult, message)
} }
} }
func (p *processor) clearTaskState(taskItem *task.Task) {
delete(taskItem.Metadata.Labels, "source_scrolled")
delete(taskItem.Metadata.Labels, "target_scrolled")
}

View File

@ -306,8 +306,7 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error {
// update sub migration task status to running and save task log // update sub migration task status to running and save task log
taskItem.Metadata.Labels["execution_instance_id"] = instanceID taskItem.Metadata.Labels["execution_instance_id"] = instanceID
taskItem.Metadata.Labels["index_docs"] = 0 p.clearTaskState(taskItem)
taskItem.Metadata.Labels["scrolled_docs"] = 0
taskItem.Status = task.StatusRunning taskItem.Status = task.StatusRunning
taskItem.StartTimeInMillis = time.Now().UnixMilli() taskItem.StartTimeInMillis = time.Now().UnixMilli()
@ -331,8 +330,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
if totalDocs == 0 { if totalDocs == 0 {
taskItem.Status = task.StatusComplete taskItem.Status = task.StatusComplete
taskItem.Metadata.Labels["scrolled_docs"] = 0 p.clearTaskState(taskItem)
taskItem.Metadata.Labels["index_docs"] = 0
now := time.Now() now := time.Now()
taskItem.CompletedTime = &now taskItem.CompletedTime = &now
@ -498,13 +496,7 @@ func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask
err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks)) err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks))
return return
} }
for i, ptask := range ptasks { scrollTask, bulkTask = migration_util.SplitIndexMigrationTasks(ptasks)
if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
bulkTask = &ptasks[i]
} else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" {
scrollTask = &ptasks[i]
}
}
return return
} }
@ -571,3 +563,8 @@ func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.Ta
migration_util.WriteLog(taskItem, taskResult, message) migration_util.WriteLog(taskItem, taskResult, message)
} }
} }
func (p *processor) clearTaskState(taskItem *task.Task) {
delete(taskItem.Metadata.Labels, "index_docs")
delete(taskItem.Metadata.Labels, "scrolled_docs")
}

View File

@ -0,0 +1,132 @@
package migration
import (
log "github.com/cihub/seelog"
migration_model "infini.sh/console/plugin/migration/model"
migration_util "infini.sh/console/plugin/migration/util"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/task"
"infini.sh/framework/core/util"
)
type MigrationIndexStateInfo struct {
ErrorPartitions int
IndexDocs int64
}
/*
We count data from two sources:
- index_migrations with complete/error status
- plus index_migration.index_docs with realtime bulk indexing info
- realtime bulk indexing info is only available for running index_migrations
*/
func (h *APIHandler) getMigrationMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) {
taskQuery := util.MapStr{
"size": 500,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": majorTaskID,
},
},
},
{
"bool": util.MapStr{
"minimum_should_match": 1,
"should": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.pipeline_id": util.MapStr{
"value": "bulk_indexing",
},
},
},
{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": "index_migration",
},
},
},
{
"terms": util.MapStr{
"status": []string{task.StatusComplete, task.StatusError},
},
},
},
},
},
},
},
},
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(taskQuery),
}
err, result := orm.Search(task.Task{}, q)
if err != nil {
return taskStats, indexState, err
}
var pipelineTaskIDs = map[string][]string{}
var pipelineIndexNames = map[string]string{}
indexState = map[string]MigrationIndexStateInfo{}
for _, row := range result.Result {
buf := util.MustToJSONBytes(row)
subTask := task.Task{}
err := util.FromJSONBytes(buf, &subTask)
if err != nil {
log.Errorf("failed to unmarshal task, err: %v", err)
continue
}
if subTask.Metadata.Labels == nil {
continue
}
taskLabels := util.MapStr(subTask.Metadata.Labels)
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
if indexName == "" {
continue
}
// add indexDocs of already complete/error
if subTask.Metadata.Type == "index_migration" {
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
taskStats.IndexDocs += indexDocs
st := indexState[indexName]
st.IndexDocs += indexDocs
if subTask.Status == task.StatusError {
st.ErrorPartitions += 1
}
indexState[indexName] = st
continue
}
pipelineIndexNames[subTask.ID] = indexName
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
}
}
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
for pipelineID, pipelineContext := range pipelineContexts {
// add indexDocs of running tasks
indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count")
taskStats.IndexDocs += indexDocs
indexName := pipelineIndexNames[pipelineID]
st := indexState[indexName]
st.IndexDocs += indexDocs
indexState[indexName] = st
}
return taskStats, indexState, nil
}

View File

@ -26,10 +26,11 @@ type IndexPartition struct {
} }
type IndexInfo struct { type IndexInfo struct {
Name string `json:"name"` Name string `json:"name"`
DocType string `json:"doc_type"` DocType string `json:"doc_type"`
Docs int64 `json:"docs"` // NOTE: == 0 for migration target index
StoreSizeInBytes int `json:"store_size_in_bytes"` Docs int64 `json:"docs"`
StoreSizeInBytes int `json:"store_size_in_bytes"`
} }
func (ii *IndexInfo) GetUniqueIndexName() string { func (ii *IndexInfo) GetUniqueIndexName() string {

View File

@ -26,10 +26,16 @@ type ClusterComparisonIndexConfig struct {
Partition *IndexPartition `json:"partition,omitempty"` Partition *IndexPartition `json:"partition,omitempty"`
// only used in API // only used in API
Percent float64 `json:"percent,omitempty"` ScrollPercent float64 `json:"scroll_percent,omitempty"`
ErrorPartitions int `json:"error_partitions,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"`
} }
type ClusterComparisonTaskState struct {
SourceScrollDocs int64
TargetScrollDocs int64
Status string
}
type IndexComparisonTaskConfig struct { type IndexComparisonTaskConfig struct {
Source IndexComparisonDumpConfig `json:"source"` Source IndexComparisonDumpConfig `json:"source"`
Target IndexComparisonDumpConfig `json:"target"` Target IndexComparisonDumpConfig `json:"target"`
@ -47,8 +53,13 @@ type IndexComparisonDumpConfig struct {
ScrollTime string `json:"scroll_time"` ScrollTime string `json:"scroll_time"`
QueryString string `json:"query_string,omitempty"` QueryString string `json:"query_string,omitempty"`
QueryDSL util.MapStr `json:"query_dsl,omitempty"` QueryDSL util.MapStr `json:"query_dsl,omitempty"`
DocCount int64 `json:"doc_count"`
DocCount int64 `json:"doc_count"` // Only populated for partitioned tasks
Start float64 `json:"start"`
End float64 `json:"end"`
Step interface{} `jsno:"step"`
PartitionId int `json:"partition_id"`
} }
type IndexComparisonDiffConfig struct { type IndexComparisonDiffConfig struct {

View File

@ -38,16 +38,11 @@ type ClusterMigrationIndexConfig struct {
ErrorPartitions int `json:"error_partitions,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"`
} }
type MajorTaskState struct { type ClusterMigrationTaskState struct {
IndexDocs int64 IndexDocs int64
Status string Status string
} }
type IndexStateInfo struct {
ErrorPartitions int
IndexDocs float64
}
const ( const (
IndexMigrationV0 = 0 IndexMigrationV0 = 0
IndexMigrationV1 = 1 IndexMigrationV1 = 1
@ -70,11 +65,11 @@ type IndexMigrationSourceConfig struct {
TypeRename util.MapStr `json:"type_rename,omitempty"` TypeRename util.MapStr `json:"type_rename,omitempty"`
QueryString string `json:"query_string,omitempty"` QueryString string `json:"query_string,omitempty"`
QueryDSL util.MapStr `json:"query_dsl,omitempty"` QueryDSL util.MapStr `json:"query_dsl,omitempty"`
DocCount int64 `json:"doc_count"`
// Parition configs // Parition configs
Start float64 `json:"start"` Start float64 `json:"start"`
End float64 `json:"end"` End float64 `json:"end"`
DocCount int64 `json:"doc_count"`
Step interface{} `json:"step"` Step interface{} `json:"step"`
PartitionId int `json:"partition_id"` PartitionId int `json:"partition_id"`
} }

View File

@ -1,37 +0,0 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package migration
import (
log "github.com/cihub/seelog"
"infini.sh/framework/core/env"
"infini.sh/framework/core/module"
)
func (module *Module) Name() string {
return "migration"
}
func (module *Module) Setup() {
exists, err := env.ParseConfig("migration", module)
if exists && err != nil {
log.Error(err)
}
InitAPI()
}
func (module *Module) Start() error {
return nil
}
func (module *Module) Stop() error {
return nil
}
type Module struct {
}
func init() {
module.RegisterUserPlugin(&Module{})
}

View File

@ -0,0 +1,38 @@
package util
import (
migration_model "infini.sh/console/plugin/migration/model"
"infini.sh/framework/core/task"
)
/*
These functions could return nil tasks
*/
func SplitIndexMigrationTasks(ptasks []task.Task) (scrollTask *task.Task, bulkTask *task.Task) {
for i, ptask := range ptasks {
if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
bulkTask = &ptasks[i]
} else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" {
scrollTask = &ptasks[i]
}
}
return
}
func SplitIndexComparisonTasks(ptasks []task.Task, cfg *migration_model.IndexComparisonTaskConfig) (sourceDumpTask *task.Task, targetDumpTask *task.Task, diffTask *task.Task) {
for i, ptask := range ptasks {
if ptask.Metadata.Labels["pipeline_id"] == "dump_hash" {
// TODO: we can't handle when compare the same cluster & same index
// catch it earlier when creating the task
if ptask.Metadata.Labels["cluster_id"] == cfg.Source.ClusterId && ptask.Metadata.Labels["index_name"] == cfg.Source.Indices {
sourceDumpTask = &ptasks[i]
} else {
targetDumpTask = &ptasks[i]
}
} else if ptask.Metadata.Labels["pipeline_id"] == "index_diff" {
diffTask = &ptasks[i]
}
}
return
}

View File

@ -41,6 +41,12 @@ func IsRunningState(status string) bool {
return util.StringInArray(runningTaskStatus, status) return util.StringInArray(runningTaskStatus, status)
} }
var pendingTaskStatus = []string{task.StatusRunning, task.StatusReady, task.StatusPendingStop}
func IsPendingState(status string) bool {
return util.StringInArray(pendingTaskStatus, status)
}
func GetTaskConfig(task *task.Task, config interface{}) error { func GetTaskConfig(task *task.Task, config interface{}) error {
if task.Config_ == nil { if task.Config_ == nil {
return util.FromJSONBytes([]byte(task.ConfigString), config) return util.FromJSONBytes([]byte(task.ConfigString), config)