diff --git a/plugin/migration/api.go b/plugin/migration/api.go index ae3a5c9f..5a25e018 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -5,7 +5,6 @@ package migration import ( - "context" "fmt" "net/http" "strings" @@ -30,22 +29,23 @@ import ( func InitAPI() { handler := APIHandler{} - api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionTaskRead)) - api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionTaskWrite)) - api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionTaskWrite)) + api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) + + api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) + api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info", handler.RequirePermission(handler.getDataComparisonTaskInfo, enum.PermissionComparisonTaskRead)) + api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info/:index", handler.RequirePermission(handler.getDataComparisonTaskOfIndex, enum.PermissionComparisonTaskRead)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionComparisonTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionTaskWrite)) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionTaskWrite)) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionTaskRead)) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionTaskRead)) - api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequirePermission(handler.updateDataMigrationTaskStatus, enum.PermissionTaskRead)) - - api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionTaskRead)) - api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionTaskWrite)) - api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionTaskWrite)) - api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionTaskWrite)) - api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionTaskWrite)) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_refresh", handler.refreshIndex) @@ -138,82 +138,6 @@ func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Requ h.WriteJSON(w, partitions, http.StatusOK) } -func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API) (map[string]string, error) { - const step = 50 - var ( - length = len(indexNames) - end int - ) - refreshIntervals := map[string]string{} - for i := 0; i < length; i += step { - end = i + step - if end > length-1 { - end = length - } - tempNames := indexNames[i:end] - strNames := strings.Join(tempNames, ",") - resultM, err := targetESClient.GetIndexSettings(strNames) - if err != nil { - return refreshIntervals, nil - } - for indexName, v := range *resultM { - if m, ok := v.(map[string]interface{}); ok { - refreshInterval, _ := util.GetMapValueByKeys([]string{"settings", "index", "refresh_interval"}, m) - if ri, ok := refreshInterval.(string); ok { - refreshIntervals[indexName] = ri - continue - } - refreshInterval, _ = util.GetMapValueByKeys([]string{"defaults", "index", "refresh_interval"}, m) - if ri, ok := refreshInterval.(string); ok { - refreshIntervals[indexName] = ri - continue - } - } - - } - } - return refreshIntervals, nil - -} - -func (h *APIHandler) getIndexRefreshIntervals(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - - obj := task2.Task{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - taskConfig := &migration_model.ClusterMigrationTaskConfig{} - err = migration_util.GetTaskConfig(&obj, taskConfig) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - var indexNames []string - for _, index := range taskConfig.Indices { - indexNames = append(indexNames, index.Target.Name) - } - targetESClient := elastic.GetClientNoPanic(taskConfig.Cluster.Target.Id) - if targetESClient == nil { - h.WriteJSON(w, util.MapStr{}, http.StatusOK) - } - vals, err := getIndexRefreshInterval(indexNames, targetESClient) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - h.WriteJSON(w, vals, http.StatusOK) -} - func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") @@ -236,15 +160,9 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R return } - indexState, err := getMajorTaskInfoByIndex(id) + _, indexState, err := h.getMigrationMajorTaskInfo(id) if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - realtimeIndexState, err := getMajorTaskByIndexFromES(id) - if err != nil { - log.Error(err) + log.Errorf("failed to get major task info, err: %v", err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } @@ -252,15 +170,15 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R var completedIndices int for i, index := range taskConfig.Indices { indexName := index.Source.GetUniqueIndexName() - count := indexState[indexName].IndexDocs + realtimeIndexState[indexName].IndexDocs - percent := count * 100 / float64(index.Source.Docs) + count := indexState[indexName].IndexDocs + percent := float64(count) / float64(index.Source.Docs) * 100 if percent > 100 { percent = 100 } - taskConfig.Indices[i].Target.Docs = int64(count) + taskConfig.Indices[i].Target.Docs = count taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions - if int64(count) == index.Source.Docs { + if count == index.Source.Docs { completedIndices++ } } @@ -279,139 +197,15 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R h.WriteJSON(w, obj, http.StatusOK) } -func getMajorTaskInfoByIndex(taskID string) (map[string]migration_model.IndexStateInfo, error) { - query := util.MapStr{ - "size": 0, - "aggs": util.MapStr{ - "group_by_task": util.MapStr{ - "terms": util.MapStr{ - "field": "metadata.labels.unique_index_name", - "size": 100, - }, - "aggs": util.MapStr{ - "group_by_status": util.MapStr{ - "terms": util.MapStr{ - "field": "status", - "size": 100, - }, - }, - "total_docs": util.MapStr{ - "sum": util.MapStr{ - "field": "metadata.labels.index_docs", - }, - }, - }, - }, - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskID, - }, - }, - }, - }, - }, - }, - } - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(task2.Task{}, q) - if err != nil { - return nil, err - } - - searchRes := &elastic.SearchResponse{} - err = util.FromJSONBytes(result.Raw, searchRes) - if err != nil { - return nil, err - } - resBody := map[string]migration_model.IndexStateInfo{} - - if taskAgg, ok := searchRes.Aggregations["group_by_task"]; ok { - for _, bk := range taskAgg.Buckets { - if key, ok := bk["key"].(string); ok { - //resBody[key] = int(bk["doc_count"].(float64)) - resBody[key] = migration_model.IndexStateInfo{} - if statusAgg, ok := bk["group_by_status"].(map[string]interface{}); ok { - if sbks, ok := statusAgg["buckets"].([]interface{}); ok { - for _, sbk := range sbks { - if sbkM, ok := sbk.(map[string]interface{}); ok { - if sbkM["key"] == task2.StatusError { - if v, ok := sbkM["doc_count"].(float64); ok { - st := resBody[key] - st.ErrorPartitions = int(v) - resBody[key] = st - } - } - } - } - } - } - if indexDocsAgg, ok := bk["total_docs"].(map[string]interface{}); ok { - if v, ok := indexDocsAgg["value"].(float64); ok { - st := resBody[key] - st.IndexDocs = v - resBody[key] = st - } - - } - } - } - } - return resBody, nil -} - -func getIndexTaskDocCount(ctx context.Context, index *migration_model.ClusterMigrationIndexConfig, targetESClient elastic.API) (int64, error) { - targetIndexName := index.Target.Name - if targetIndexName == "" { - if v, ok := index.IndexRename[index.Source.Name].(string); ok { - targetIndexName = v - } - } - - var body []byte - var must []interface{} - if index.Target.DocType != "" && targetESClient.GetMajorVersion() < 8 { - must = append(must, util.MapStr{ - "terms": util.MapStr{ - "_type": []string{index.Target.DocType}, - }, - }) - } - if index.RawFilter != nil { - must = append(must, index.RawFilter) - } - if len(must) > 0 { - query := util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "must": must, - }, - }, - } - body = util.MustToJSONBytes(query) - } - - countRes, err := targetESClient.Count(ctx, targetIndexName, body) - if err != nil { - return 0, err - } - if countRes.StatusCode != http.StatusOK && countRes.RawResult != nil { - return 0, fmt.Errorf(string(countRes.RawResult.Body)) - } - return countRes.Count, nil +type TaskInfoResponse struct { + TaskID string `json:"task_id"` + Step interface{} `json:"step"` + StartTime int64 `json:"start_time"` + CompletedTime int64 `json:"completed_time"` + Duration int64 `json:"duration"` + DataPartition int `json:"data_partition"` + CompletedPartitions int `json:"completed_partitions"` + Partitions []util.MapStr `json:"partitions"` } func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -425,122 +219,37 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt return } - var completedTime int64 + taskInfo := &TaskInfoResponse{ + TaskID: id, + StartTime: majorTask.StartTimeInMillis, + } - taskInfo := util.MapStr{ - "task_id": id, - "start_time": majorTask.StartTimeInMillis, - } - partitionTaskQuery := util.MapStr{ - "size": 500, - "sort": []util.MapStr{ - { - "created": util.MapStr{ - "order": "asc", - }, - }, - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.unique_index_name": util.MapStr{ - "value": uniqueIndexName, - }, - }, - }, - }, - }, - }, - } - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(partitionTaskQuery), - } - err, result := orm.Search(task2.Task{}, q) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) + subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName) + + taskInfo.DataPartition = len(subTasks) + if len(subTasks) == 0 { + h.WriteJSON(w, taskInfo, http.StatusOK) return } - var subTasks []task2.Task - var pipelineTaskIDs = map[string][]string{} - pipelineSubParentIDs := map[string]string{} - subTaskStatus := map[string]string{} - parentIDPipelineTasks := map[string][]task2.Task{} - for _, row := range result.Result { - buf := util.MustToJSONBytes(row) - subTask := task2.Task{} - err = util.FromJSONBytes(buf, &subTask) - if err != nil { - log.Error(err) - continue - } - if subTask.Metadata.Labels != nil { - if subTask.Metadata.Type == "index_migration" { - subTasks = append(subTasks, subTask) - subTaskStatus[subTask.ID] = subTask.Status - continue - } - if subTask.Metadata.Labels["pipeline_id"] == "es_scroll" || subTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { - if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { - pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) - if pl := len(subTask.ParentId); pl > 0 { - parentID := subTask.ParentId[pl-1] - if subTaskStatus[parentID] == task2.StatusRunning { - pipelineSubParentIDs[subTask.ID] = parentID - } - parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask) - } - } - } - } - } - taskInfo["data_partition"] = len(subTasks) var scrollStats = map[string]int64{} var bulkStats = map[string]int64{} - for instID, taskIDs := range pipelineTaskIDs { - inst := &model.Instance{} - inst.ID = instID - _, err = orm.Get(inst) - if err != nil { - log.Error(err) - continue - } - pipelines, err := inst.GetPipelinesByIDs(taskIDs) - if err != nil { - log.Error(err) - continue - } - - for pipelineID, status := range pipelines { - if pid, ok := pipelineSubParentIDs[pipelineID]; ok { - if vv := migration_util.GetMapIntValue(status.Context, "es_scroll.scrolled_docs"); vv > 0 { - scrollStats[pid] = vv - } - if vv := migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count"); vv > 0 { - bulkStats[pid] = vv - } + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + for pipelineID, pipelineContext := range pipelineContexts { + if pid, ok := pipelineSubParentIDs[pipelineID]; ok { + if vv := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs"); vv > 0 { + scrollStats[pid] = vv + } + if vv := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count"); vv > 0 { + bulkStats[pid] = vv } } } - var ( - partitionTaskInfos []util.MapStr - completedPartitions int - startTime int64 - ) - if len(subTasks) > 0 { - startTime = subTasks[0].StartTimeInMillis - } + startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks) + + var partitionTaskInfos []util.MapStr + for i, ptask := range subTasks { cfg := migration_model.IndexMigrationTaskConfig{} err := migration_util.GetTaskConfig(&ptask, &cfg) @@ -548,20 +257,20 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt log.Errorf("failed to get task config, err: %v", err) continue } - start := cfg.Source.Start - end := cfg.Source.End if i == 0 { - step := cfg.Source.Step - taskInfo["step"] = step + taskInfo.Step = cfg.Source.Step } - var durationInMS int64 = 0 + + var durationInMS int64 + var subCompletedTime int64 if ptask.StartTimeInMillis > 0 { - if ptask.StartTimeInMillis < startTime { - startTime = ptask.StartTimeInMillis + if migration_util.IsPendingState(ptask.Status) { + durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis + continue } - durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis - if ptask.CompletedTime != nil && (ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError) { - durationInMS = ptask.CompletedTime.UnixMilli() - ptask.StartTimeInMillis + if ptask.CompletedTime != nil { + subCompletedTime = ptask.CompletedTime.UnixMilli() + durationInMS = subCompletedTime - ptask.StartTimeInMillis } } var ( @@ -580,54 +289,42 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs") } - var subCompletedTime int64 - if ptask.CompletedTime != nil { - subCompletedTime = ptask.CompletedTime.UnixMilli() - if subCompletedTime > completedTime { - completedTime = subCompletedTime - } - } - partitionTotalDocs := cfg.Source.DocCount partitionTaskInfo := util.MapStr{ "task_id": ptask.ID, "status": ptask.Status, "start_time": ptask.StartTimeInMillis, "completed_time": subCompletedTime, - "start": start, - "end": end, + "start": cfg.Source.Start, + "end": cfg.Source.End, "duration": durationInMS, "scroll_docs": scrollDocs, "index_docs": indexDocs, "total_docs": partitionTotalDocs, } - for _, pipelineTask := range parentIDPipelineTasks[ptask.ID] { - if pipelineTask.Metadata.Labels["pipeline_id"] == "es_scroll" { - partitionTaskInfo["scroll_task"] = util.MapStr{ - "id": pipelineTask.ID, - "status": pipelineTask.Status, - } - } else if pipelineTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { - partitionTaskInfo["bulk_task"] = util.MapStr{ - "id": pipelineTask.ID, - "status": pipelineTask.Status, - } + scrollTask, bulkTask := migration_util.SplitIndexMigrationTasks(parentIDPipelineTasks[ptask.ID]) + if scrollTask != nil { + partitionTaskInfo["scroll_task"] = util.MapStr{ + "id": scrollTask.ID, + "status": scrollTask.Status, + } + } + if bulkTask != nil { + partitionTaskInfo["bulk_task"] = util.MapStr{ + "id": bulkTask.ID, + "status": bulkTask.Status, } } partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo) - if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError { - completedPartitions++ - } } - if len(subTasks) == completedPartitions { - taskInfo["completed_time"] = completedTime - taskInfo["duration"] = completedTime - startTime - } else { - taskInfo["duration"] = time.Now().UnixMilli() - startTime + taskInfo.CompletedTime = completedTime + taskInfo.Duration = duration + // NOTE: overwrite major task start time with the first started sub task + if taskInfo.StartTime == 0 { + taskInfo.StartTime = startTime } - taskInfo["start_time"] = startTime - taskInfo["partitions"] = partitionTaskInfos - taskInfo["completed_partitions"] = completedPartitions + taskInfo.Partitions = partitionTaskInfos + taskInfo.CompletedPartitions = completedPartitions h.WriteJSON(w, taskInfo, http.StatusOK) } @@ -664,42 +361,6 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps h.WriteJSON(w, countRes, http.StatusOK) } -func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - - obj := task2.Task{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - reqBody := struct { - Status string `json:"status"` - }{} - err = h.DecodeJSON(req, &reqBody) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - obj.Status = reqBody.Status - err = orm.Update(nil, obj) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - h.WriteJSON(w, util.MapStr{ - "success": true, - }, 200) -} - func (h *APIHandler) validateDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { typ := h.GetParameter(req, "type") switch typ { @@ -828,48 +489,30 @@ func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps h }, 200) } -func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_model.MajorTaskState, err error) { - taskQuery := util.MapStr{ - "size": 500, +func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexName string) (subTasks []task2.Task, pipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) { + queryDsl := util.MapStr{ + "size": 9999, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "asc", + }, + }, + }, "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ { "term": util.MapStr{ "parent_id": util.MapStr{ - "value": majorTaskID, + "value": taskItem.ID, }, }, }, { - "bool": util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "bulk_indexing", - }, - }, - }, - { - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusComplete, task2.StatusError}, - }, - }, - }, - }, - }, + "term": util.MapStr{ + "metadata.labels.unique_index_name": util.MapStr{ + "value": uniqueIndexName, }, }, }, @@ -878,13 +521,17 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod }, } q := &orm.Query{ - RawQuery: util.MustToJSONBytes(taskQuery), + RawQuery: util.MustToJSONBytes(queryDsl), } err, result := orm.Search(task2.Task{}, q) if err != nil { - return taskStats, err + return } - var pipelineTaskIDs = map[string][]string{} + + pipelineTaskIDs = map[string][]string{} + pipelineSubParentIDs = map[string]string{} + parentIDPipelineTasks = map[string][]task2.Task{} + for _, row := range result.Result { buf := util.MustToJSONBytes(row) subTask := task2.Task{} @@ -893,114 +540,90 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod log.Error(err) continue } - taskLabels := util.MapStr(subTask.Metadata.Labels) - if subTask.Metadata.Labels != nil { - //add indexDocs of already complete - if subTask.Metadata.Type == "index_migration" { - taskStats.IndexDocs += migration_util.GetMapIntValue(taskLabels, "index_docs") - continue - } - if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { - pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) - } - } - } - for instID, taskIDs := range pipelineTaskIDs { - inst := &model.Instance{} - inst.ID = instID - _, err = orm.Get(inst) - if err != nil { - log.Error(err) + + if subTask.Metadata.Type != "pipeline" { + subTasks = append(subTasks, subTask) continue } - pipelines, err := inst.GetPipelinesByIDs(taskIDs) - if err != nil { - log.Error(err) + if subTask.Status != task2.StatusRunning { continue } - for _, status := range pipelines { - taskStats.IndexDocs += migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count") + // TODO: use more robust logic + if pl := len(subTask.ParentId); pl != 2 { + continue } + parentID := subTask.ParentId[1] + + pipelineSubParentIDs[subTask.ID] = parentID + instID := migration_util.GetMapStringValue(util.MapStr(subTask.Metadata.Labels), "execution_instance_id") + if instID == "" { + continue + } + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask) } - return taskStats, nil + + return } -func getMajorTaskByIndexFromES(majorTaskID string) (map[string]migration_model.IndexStateInfo, error) { - taskQuery := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": majorTaskID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "bulk_indexing", - }, - }, - }, - }, - }, - }, - } - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(taskQuery), - } - err, result := orm.Search(task2.Task{}, q) - if err != nil { - return nil, err - } - var pipelineTaskIDs = map[string][]string{} - var pipelineIndexNames = map[string]string{} - for _, row := range result.Result { - buf := util.MustToJSONBytes(row) - subTask := task2.Task{} - err = util.FromJSONBytes(buf, &subTask) - if err != nil { - log.Error(err) - continue - } - if subTask.Metadata.Labels != nil { - if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { - pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) - } - if indexName, ok := subTask.Metadata.Labels["unique_index_name"].(string); ok { - pipelineIndexNames[subTask.ID] = indexName - } - } - } - state := map[string]migration_model.IndexStateInfo{} +func (h *APIHandler) getChildPipelineInfosFromGateway(pipelineTaskIDs map[string][]string) (pipelineContexts map[string]util.MapStr) { + pipelineContexts = map[string]util.MapStr{} + var err error + for instID, taskIDs := range pipelineTaskIDs { inst := &model.Instance{} inst.ID = instID _, err = orm.Get(inst) if err != nil { - log.Error(err) + log.Error("failed to get instance info, err: %v", err) continue } pipelines, err := inst.GetPipelinesByIDs(taskIDs) if err != nil { - log.Error(err) + log.Errorf("failed to get pipelines info, err: %v", err) continue } for pipelineID, status := range pipelines { - indexName := pipelineIndexNames[pipelineID] - if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { - if vv, ok := v.(float64); ok && indexName != "" { - st := state[indexName] - st.IndexDocs += vv - state[indexName] = st - } - } + pipelineContexts[pipelineID] = status.Context } } - return state, nil + + return +} + +func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task) (startTime int64, completedTime int64, duration int64, completedPartitions int) { + if len(subTasks) == 0 { + return + } + + for _, subTask := range subTasks { + if subTask.StartTimeInMillis > 0 { + if startTime == 0 { + startTime = subTask.StartTimeInMillis + } + if subTask.StartTimeInMillis < startTime { + startTime = subTask.StartTimeInMillis + } + } + if subTask.CompletedTime != nil { + subCompletedTime := subTask.CompletedTime.UnixMilli() + if subCompletedTime > completedTime { + completedTime = subCompletedTime + } + } + + if subTask.Status == task2.StatusComplete || subTask.Status == task2.StatusError { + completedPartitions++ + } + } + if len(subTasks) != completedPartitions { + completedTime = 0 + duration = time.Now().UnixMilli() - startTime + } else { + duration = completedTime - startTime + } + + return } diff --git a/plugin/migration/cluster_comparison/cluster_comparison.go b/plugin/migration/cluster_comparison/cluster_comparison.go index ad61d5b5..29cf0d09 100644 --- a/plugin/migration/cluster_comparison/cluster_comparison.go +++ b/plugin/migration/cluster_comparison/cluster_comparison.go @@ -55,6 +55,9 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { return nil } taskItem.RetryTimes++ + if taskItem.StartTimeInMillis == 0 { + taskItem.StartTimeInMillis = time.Now().UnixMilli() + } taskItem.Status = task.StatusRunning p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, @@ -102,6 +105,9 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { } } + // TODO: parition twice for source & target, then merge + // if there's a partition missing from source but present in target + // ideally we can capture it in docs count, but this won't always work if index.Partition != nil { partitionQ := &elastic.PartitionQuery{ IndexName: index.Source.Name, @@ -119,18 +125,20 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { if partitions == nil || len(partitions) == 0 { return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter)) } + var ( + partitionID int + ) for _, partition := range partitions { - //skip empty partition - if partition.Docs <= 0 { - continue - } + partitionID++ partitionSourceDump := sourceDump - partitionSourceDump.QueryDSL = partition.Filter + partitionSourceDump.Start = partition.Start + partitionSourceDump.End = partition.End partitionSourceDump.DocCount = partition.Docs + partitionSourceDump.Step = index.Partition.Step + partitionSourceDump.PartitionId = partitionID + partitionSourceDump.QueryDSL = partition.Filter partitionSourceDump.QueryString = "" - // TODO: if there's a partition missing from source but present in target - // ideally we can capture it in docs count, but this won't always work partitionTargetDump := partitionSourceDump partitionTargetDump.Indices = index.Target.Name @@ -166,6 +174,7 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { sourceDump.DocCount = index.Source.Docs targetDump := sourceDump targetDump.Indices = index.Target.Name + targetDump.DocCount = index.Target.Docs indexComparisonTask := task.Task{ ParentId: []string{taskItem.ID}, diff --git a/plugin/migration/cluster_migration/cluster_migration.go b/plugin/migration/cluster_migration/cluster_migration.go index 1b4f2870..7bd79d5c 100644 --- a/plugin/migration/cluster_migration/cluster_migration.go +++ b/plugin/migration/cluster_migration/cluster_migration.go @@ -57,6 +57,9 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { return nil } taskItem.RetryTimes++ + if taskItem.StartTimeInMillis == 0 { + taskItem.StartTimeInMillis = time.Now().UnixMilli() + } taskItem.Status = task.StatusRunning p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, @@ -284,7 +287,7 @@ func (p *processor) handleRunningMajorTask(taskItem *task.Task) error { return nil } -func (p *processor) getMajorTaskState(majorTask *task.Task) (taskState migration_model.MajorTaskState, err error) { +func (p *processor) getMajorTaskState(majorTask *task.Task) (taskState migration_model.ClusterMigrationTaskState, err error) { query := util.MapStr{ "size": 0, "aggs": util.MapStr{ diff --git a/plugin/migration/common_api.go b/plugin/migration/common_api.go index e99d3904..f46caf41 100644 --- a/plugin/migration/common_api.go +++ b/plugin/migration/common_api.go @@ -8,7 +8,6 @@ import ( log "github.com/cihub/seelog" - migration_model "infini.sh/console/plugin/migration/model" migration_util "infini.sh/console/plugin/migration/util" httprouter "infini.sh/framework/core/api/router" @@ -88,29 +87,40 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req } for _, hit := range searchRes.Hits.Hits { sourceM := util.MapStr(hit.Source) - buf := util.MustToJSONBytes(sourceM["config"]) - dataConfig := migration_model.ClusterMigrationTaskConfig{} - err = util.FromJSONBytes(buf, &dataConfig) - if err != nil { - log.Error(err) - continue - } - //var targetTotalDocs int64 - if hit.Source["status"] == task.StatusRunning { - ts, err := getMajorTaskStatsFromInstances(hit.ID) - if err != nil { - log.Warnf("fetch progress info of task error: %v", err) - continue - } - sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) - } - + h.populateMajorTaskInfo(hit.ID, sourceM) } h.WriteJSON(w, searchRes, http.StatusOK) } } +func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { + buf := util.MustToJSONBytes(sourceM) + majorTask := task.Task{} + err := util.FromJSONBytes(buf, &majorTask) + if err != nil { + log.Errorf("failed to unmarshal major task info, err: %v", err) + return + } + switch majorTask.Metadata.Type { + case "cluster_migration": + ts, _, err := h.getMigrationMajorTaskInfo(taskID) + if err != nil { + log.Warnf("fetch progress info of task error: %v", err) + return + } + sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) + case "cluster_comparison": + ts, _, err := h.getComparisonMajorTaskInfo(taskID) + if err != nil { + log.Warnf("fetch progress info of task error: %v", err) + return + } + sourceM.Put("metadata.labels.source_scroll_docs", ts.SourceScrollDocs) + sourceM.Put("metadata.labels.target_scroll_docs", ts.TargetScrollDocs) + } +} + func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { taskID := ps.MustGetParameter("task_id") obj := task.Task{} diff --git a/plugin/migration/comparison_api.go b/plugin/migration/comparison_api.go index 6eb56152..186a6b56 100644 --- a/plugin/migration/comparison_api.go +++ b/plugin/migration/comparison_api.go @@ -1,15 +1,19 @@ package migration import ( + "fmt" "net/http" + "time" log "github.com/cihub/seelog" migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" "infini.sh/framework/core/api/rbac" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/task" "infini.sh/framework/core/util" @@ -38,9 +42,11 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R clusterTaskConfig.Creator.Id = user.UserId } - var totalDocs int64 + var sourceTotalDocs int64 + var targetTotalDocs int64 for _, index := range clusterTaskConfig.Indices { - totalDocs += index.Source.Docs + sourceTotalDocs += index.Source.Docs + targetTotalDocs += index.Target.Docs } srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id) @@ -54,7 +60,8 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R "business_id": "cluster_comparison", "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, "target_cluster_id": clusterTaskConfig.Cluster.Target.Id, - "source_total_docs": totalDocs, + "source_total_docs": sourceTotalDocs, + "target_total_docs": targetTotalDocs, }, }, Cancellable: true, @@ -71,3 +78,275 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R } h.WriteCreatedOKJSON(w, t.ID) } + +func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + + obj := task.Task{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + taskConfig := &migration_model.ClusterComparisonTaskConfig{} + err = migration_util.GetTaskConfig(&obj, taskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + _, indexState, err := h.getComparisonMajorTaskInfo(id) + if err != nil { + log.Errorf("failed to get major task info, err: %v", err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + var completedIndices int + for i, index := range taskConfig.Indices { + indexName := index.Source.GetUniqueIndexName() + count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs + percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100 + if percent > 100 { + percent = 100 + } + taskConfig.Indices[i].Target.Docs = count + taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) + taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + if count == index.Source.Docs { + completedIndices++ + } + } + cfg := global.MustLookup("cluster_migration_config") + if migrationConfig, ok := cfg.(*DispatcherConfig); ok { + if obj.Metadata.Labels == nil { + obj.Metadata.Labels = util.MapStr{} + } + obj.Metadata.Labels["log_info"] = util.MapStr{ + "cluster_id": migrationConfig.Elasticsearch, + "index_name": migrationConfig.LogIndexName, + } + } + obj.ConfigString = util.MustToJSON(taskConfig) + obj.Metadata.Labels["completed_indices"] = completedIndices + h.WriteJSON(w, obj, http.StatusOK) +} + +type ComparisonIndexStateInfo struct { + ErrorPartitions int + SourceScrollDocs int64 + TargetScrollDocs int64 +} + +// TODO: calc realtime info from instance +func (h *APIHandler) getComparisonMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) { + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTaskID, + }, + }, + }, + { + "bool": util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "dump_hash", + }, + }, + }, + { + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_comparison", + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task.StatusComplete, task.StatusError}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(taskQuery), + } + err, result := orm.Search(task.Task{}, q) + if err != nil { + return taskStats, indexState, err + } + + var pipelineIndexNames = map[string]string{} + indexState = map[string]ComparisonIndexStateInfo{} + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + subTask := task.Task{} + err := util.FromJSONBytes(buf, &subTask) + if err != nil { + log.Errorf("failed to unmarshal task, err: %v", err) + continue + } + if subTask.Metadata.Labels == nil { + continue + } + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + // add indexDocs of already complete/error + if subTask.Metadata.Type == "index_comparison" { + sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") + targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") + taskStats.SourceScrollDocs += sourceDocs + taskStats.TargetScrollDocs += targetDocs + st := indexState[indexName] + st.SourceScrollDocs += sourceDocs + st.TargetScrollDocs += targetDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + } + indexState[indexName] = st + continue + } + pipelineIndexNames[subTask.ID] = indexName + } + + return taskStats, indexState, nil +} + +func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + uniqueIndexName := ps.MustGetParameter("index") + majorTask := task.Task{} + majorTask.ID = id + exists, err := orm.Get(&majorTask) + if !exists || err != nil { + h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError) + return + } + + taskInfo := &TaskInfoResponse{ + TaskID: id, + StartTime: majorTask.StartTimeInMillis, + } + + subTasks, pipelineTaskIDs, _, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName) + + taskInfo.DataPartition = len(subTasks) + if len(subTasks) == 0 { + h.WriteJSON(w, taskInfo, http.StatusOK) + return + } + + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks) + + var partitionTaskInfos []util.MapStr + + for i, subTask := range subTasks { + cfg := migration_model.IndexComparisonTaskConfig{} + err := migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + if i == 0 { + taskInfo.Step = cfg.Source.Step + } + + var durationInMS int64 + var subCompletedTime int64 + if subTask.StartTimeInMillis > 0 { + if migration_util.IsPendingState(subTask.Status) { + durationInMS = time.Now().UnixMilli() - subTask.StartTimeInMillis + continue + } + if subTask.CompletedTime != nil { + subCompletedTime = subTask.CompletedTime.UnixMilli() + durationInMS = subCompletedTime - subTask.StartTimeInMillis + } + } + subTaskLabels := util.MapStr(subTask.Metadata.Labels) + sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled") + targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled") + + partitionTaskInfo := util.MapStr{ + "task_id": subTask.ID, + "status": subTask.Status, + "start_time": subTask.StartTimeInMillis, + "completed_time": subCompletedTime, + "start": cfg.Source.Start, + "end": cfg.Source.End, + "duration": durationInMS, + "source_total_docs": cfg.Source.DocCount, + "target_total_docs": cfg.Target.DocCount, + } + sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg) + if sourceDumpTask != nil { + partitionTaskInfo["source_scroll_task"] = util.MapStr{ + "id": sourceDumpTask.ID, + "status": sourceDumpTask.Status, + } + pipelineID := sourceDumpTask.ID + pipelineContext, ok := pipelineContexts[pipelineID] + if ok { + if vv := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs"); vv > 0 { + sourceScrollDocs = vv + } + } + } + if targetDumpTask != nil { + partitionTaskInfo["target_scroll_task"] = util.MapStr{ + "id": targetDumpTask.ID, + "status": targetDumpTask.Status, + } + pipelineID := targetDumpTask.ID + pipelineContext, ok := pipelineContexts[pipelineID] + if ok { + if vv := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs"); vv > 0 { + targetScrollDocs = vv + } + } + } + partitionTaskInfo["source_scroll_docs"] = sourceScrollDocs + partitionTaskInfo["target_scroll_docs"] = targetScrollDocs + partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo) + } + taskInfo.CompletedTime = completedTime + taskInfo.Duration = duration + // NOTE: overwrite major task start time with the first started sub task + if taskInfo.StartTime == 0 { + taskInfo.StartTime = startTime + } + taskInfo.Partitions = partitionTaskInfos + taskInfo.CompletedPartitions = completedPartitions + h.WriteJSON(w, taskInfo, http.StatusOK) +} diff --git a/plugin/migration/index_comparison/index_comparison.go b/plugin/migration/index_comparison/index_comparison.go index 73aabd7f..1e5cff57 100644 --- a/plugin/migration/index_comparison/index_comparison.go +++ b/plugin/migration/index_comparison/index_comparison.go @@ -293,6 +293,7 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error { // update sub migration task status to running and save task log taskItem.Metadata.Labels["execution_instance_id"] = instanceID + p.clearTaskState(taskItem) taskItem.Status = task.StatusRunning taskItem.StartTimeInMillis = time.Now().UnixMilli() @@ -330,6 +331,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { if sourceDumpTask.Status == task.StatusComplete && targetDumpTask.Status == task.StatusComplete { sourceDocs := migration_util.GetMapIntValue(util.MapStr(sourceDumpTask.Metadata.Labels), "scrolled_docs") targetDocs := migration_util.GetMapIntValue(util.MapStr(targetDumpTask.Metadata.Labels), "scrolled_docs") + + taskItem.Metadata.Labels["source_scrolled"] = sourceDocs + taskItem.Metadata.Labels["target_scrolled"] = targetDocs if sourceDocs != targetDocs { now := time.Now() taskItem.CompletedTime = &now @@ -424,19 +428,7 @@ func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.I err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks)) return } - for i, ptask := range ptasks { - if ptask.Metadata.Labels["pipeline_id"] == "dump_hash" { - // TODO: we can't handle when compare the same cluster & same index - // catch it earlier when creating the task - if ptask.Metadata.Labels["cluster_id"] == cfg.Source.ClusterId && ptask.Metadata.Labels["index_name"] == cfg.Source.Indices { - sourceDumpTask = &ptasks[i] - } else { - targetDumpTask = &ptasks[i] - } - } else if ptask.Metadata.Labels["pipeline_id"] == "index_diff" { - diffTask = &ptasks[i] - } - } + sourceDumpTask, targetDumpTask, diffTask = migration_util.SplitIndexComparisonTasks(ptasks, cfg) return } @@ -478,3 +470,8 @@ func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.Ta migration_util.WriteLog(taskItem, taskResult, message) } } + +func (p *processor) clearTaskState(taskItem *task.Task) { + delete(taskItem.Metadata.Labels, "source_scrolled") + delete(taskItem.Metadata.Labels, "target_scrolled") +} diff --git a/plugin/migration/index_migration/index_migration.go b/plugin/migration/index_migration/index_migration.go index 60cec9a0..954874c0 100644 --- a/plugin/migration/index_migration/index_migration.go +++ b/plugin/migration/index_migration/index_migration.go @@ -306,8 +306,7 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error { // update sub migration task status to running and save task log taskItem.Metadata.Labels["execution_instance_id"] = instanceID - taskItem.Metadata.Labels["index_docs"] = 0 - taskItem.Metadata.Labels["scrolled_docs"] = 0 + p.clearTaskState(taskItem) taskItem.Status = task.StatusRunning taskItem.StartTimeInMillis = time.Now().UnixMilli() @@ -331,8 +330,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { if totalDocs == 0 { taskItem.Status = task.StatusComplete - taskItem.Metadata.Labels["scrolled_docs"] = 0 - taskItem.Metadata.Labels["index_docs"] = 0 + p.clearTaskState(taskItem) now := time.Now() taskItem.CompletedTime = &now @@ -498,13 +496,7 @@ func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks)) return } - for i, ptask := range ptasks { - if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { - bulkTask = &ptasks[i] - } else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" { - scrollTask = &ptasks[i] - } - } + scrollTask, bulkTask = migration_util.SplitIndexMigrationTasks(ptasks) return } @@ -571,3 +563,8 @@ func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.Ta migration_util.WriteLog(taskItem, taskResult, message) } } + +func (p *processor) clearTaskState(taskItem *task.Task) { + delete(taskItem.Metadata.Labels, "index_docs") + delete(taskItem.Metadata.Labels, "scrolled_docs") +} diff --git a/plugin/migration/migration_api.go b/plugin/migration/migration_api.go new file mode 100644 index 00000000..9698c4c0 --- /dev/null +++ b/plugin/migration/migration_api.go @@ -0,0 +1,132 @@ +package migration + +import ( + log "github.com/cihub/seelog" + + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +type MigrationIndexStateInfo struct { + ErrorPartitions int + IndexDocs int64 +} + +/* +We count data from two sources: + - index_migrations with complete/error status + - plus index_migration.index_docs with realtime bulk indexing info + - realtime bulk indexing info is only available for running index_migrations +*/ +func (h *APIHandler) getMigrationMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) { + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTaskID, + }, + }, + }, + { + "bool": util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "bulk_indexing", + }, + }, + }, + { + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task.StatusComplete, task.StatusError}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(taskQuery), + } + err, result := orm.Search(task.Task{}, q) + if err != nil { + return taskStats, indexState, err + } + + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} + indexState = map[string]MigrationIndexStateInfo{} + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + subTask := task.Task{} + err := util.FromJSONBytes(buf, &subTask) + if err != nil { + log.Errorf("failed to unmarshal task, err: %v", err) + continue + } + if subTask.Metadata.Labels == nil { + continue + } + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + // add indexDocs of already complete/error + if subTask.Metadata.Type == "index_migration" { + indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") + taskStats.IndexDocs += indexDocs + st := indexState[indexName] + st.IndexDocs += indexDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + } + indexState[indexName] = st + continue + } + pipelineIndexNames[subTask.ID] = indexName + + if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + } + + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + for pipelineID, pipelineContext := range pipelineContexts { + // add indexDocs of running tasks + indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count") + taskStats.IndexDocs += indexDocs + indexName := pipelineIndexNames[pipelineID] + st := indexState[indexName] + st.IndexDocs += indexDocs + indexState[indexName] = st + } + return taskStats, indexState, nil +} diff --git a/plugin/migration/model/common.go b/plugin/migration/model/common.go index b4ae0e97..e6d47180 100644 --- a/plugin/migration/model/common.go +++ b/plugin/migration/model/common.go @@ -26,10 +26,11 @@ type IndexPartition struct { } type IndexInfo struct { - Name string `json:"name"` - DocType string `json:"doc_type"` - Docs int64 `json:"docs"` - StoreSizeInBytes int `json:"store_size_in_bytes"` + Name string `json:"name"` + DocType string `json:"doc_type"` + // NOTE: == 0 for migration target index + Docs int64 `json:"docs"` + StoreSizeInBytes int `json:"store_size_in_bytes"` } func (ii *IndexInfo) GetUniqueIndexName() string { diff --git a/plugin/migration/model/comparison.go b/plugin/migration/model/comparison.go index 18b868b7..c595b5b6 100644 --- a/plugin/migration/model/comparison.go +++ b/plugin/migration/model/comparison.go @@ -26,10 +26,16 @@ type ClusterComparisonIndexConfig struct { Partition *IndexPartition `json:"partition,omitempty"` // only used in API - Percent float64 `json:"percent,omitempty"` + ScrollPercent float64 `json:"scroll_percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` } +type ClusterComparisonTaskState struct { + SourceScrollDocs int64 + TargetScrollDocs int64 + Status string +} + type IndexComparisonTaskConfig struct { Source IndexComparisonDumpConfig `json:"source"` Target IndexComparisonDumpConfig `json:"target"` @@ -47,8 +53,13 @@ type IndexComparisonDumpConfig struct { ScrollTime string `json:"scroll_time"` QueryString string `json:"query_string,omitempty"` QueryDSL util.MapStr `json:"query_dsl,omitempty"` + DocCount int64 `json:"doc_count"` - DocCount int64 `json:"doc_count"` + // Only populated for partitioned tasks + Start float64 `json:"start"` + End float64 `json:"end"` + Step interface{} `jsno:"step"` + PartitionId int `json:"partition_id"` } type IndexComparisonDiffConfig struct { diff --git a/plugin/migration/model/model.go b/plugin/migration/model/migration.go similarity index 96% rename from plugin/migration/model/model.go rename to plugin/migration/model/migration.go index 93e4ba8b..d57b2e6f 100644 --- a/plugin/migration/model/model.go +++ b/plugin/migration/model/migration.go @@ -38,16 +38,11 @@ type ClusterMigrationIndexConfig struct { ErrorPartitions int `json:"error_partitions,omitempty"` } -type MajorTaskState struct { +type ClusterMigrationTaskState struct { IndexDocs int64 Status string } -type IndexStateInfo struct { - ErrorPartitions int - IndexDocs float64 -} - const ( IndexMigrationV0 = 0 IndexMigrationV1 = 1 @@ -70,11 +65,11 @@ type IndexMigrationSourceConfig struct { TypeRename util.MapStr `json:"type_rename,omitempty"` QueryString string `json:"query_string,omitempty"` QueryDSL util.MapStr `json:"query_dsl,omitempty"` + DocCount int64 `json:"doc_count"` // Parition configs Start float64 `json:"start"` End float64 `json:"end"` - DocCount int64 `json:"doc_count"` Step interface{} `json:"step"` PartitionId int `json:"partition_id"` } diff --git a/plugin/migration/module.go b/plugin/migration/module.go deleted file mode 100644 index de2d4b73..00000000 --- a/plugin/migration/module.go +++ /dev/null @@ -1,37 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package migration - -import ( - log "github.com/cihub/seelog" - "infini.sh/framework/core/env" - "infini.sh/framework/core/module" -) - -func (module *Module) Name() string { - return "migration" -} - -func (module *Module) Setup() { - exists, err := env.ParseConfig("migration", module) - if exists && err != nil { - log.Error(err) - } - InitAPI() -} -func (module *Module) Start() error { - return nil -} - -func (module *Module) Stop() error { - return nil -} - -type Module struct { -} - -func init() { - module.RegisterUserPlugin(&Module{}) -} \ No newline at end of file diff --git a/plugin/migration/util/pipeline.go b/plugin/migration/util/pipeline.go new file mode 100644 index 00000000..b977bee2 --- /dev/null +++ b/plugin/migration/util/pipeline.go @@ -0,0 +1,38 @@ +package util + +import ( + migration_model "infini.sh/console/plugin/migration/model" + "infini.sh/framework/core/task" +) + +/* +These functions could return nil tasks +*/ + +func SplitIndexMigrationTasks(ptasks []task.Task) (scrollTask *task.Task, bulkTask *task.Task) { + for i, ptask := range ptasks { + if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + bulkTask = &ptasks[i] + } else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" { + scrollTask = &ptasks[i] + } + } + return +} + +func SplitIndexComparisonTasks(ptasks []task.Task, cfg *migration_model.IndexComparisonTaskConfig) (sourceDumpTask *task.Task, targetDumpTask *task.Task, diffTask *task.Task) { + for i, ptask := range ptasks { + if ptask.Metadata.Labels["pipeline_id"] == "dump_hash" { + // TODO: we can't handle when compare the same cluster & same index + // catch it earlier when creating the task + if ptask.Metadata.Labels["cluster_id"] == cfg.Source.ClusterId && ptask.Metadata.Labels["index_name"] == cfg.Source.Indices { + sourceDumpTask = &ptasks[i] + } else { + targetDumpTask = &ptasks[i] + } + } else if ptask.Metadata.Labels["pipeline_id"] == "index_diff" { + diffTask = &ptasks[i] + } + } + return +} diff --git a/plugin/migration/util/util.go b/plugin/migration/util/util.go index 216e797b..19719674 100644 --- a/plugin/migration/util/util.go +++ b/plugin/migration/util/util.go @@ -41,6 +41,12 @@ func IsRunningState(status string) bool { return util.StringInArray(runningTaskStatus, status) } +var pendingTaskStatus = []string{task.StatusRunning, task.StatusReady, task.StatusPendingStop} + +func IsPendingState(status string) bool { + return util.StringInArray(pendingTaskStatus, status) +} + func GetTaskConfig(task *task.Task, config interface{}) error { if task.Config_ == nil { return util.FromJSONBytes([]byte(task.ConfigString), config)