diff --git a/plugin/task_manager/cluster_comparison/cluster_comparison.go b/plugin/task_manager/cluster_comparison/cluster_comparison.go index ef7ac7ea..982419f2 100644 --- a/plugin/task_manager/cluster_comparison/cluster_comparison.go +++ b/plugin/task_manager/cluster_comparison/cluster_comparison.go @@ -56,10 +56,12 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { return nil } taskItem.RetryTimes++ - if taskItem.StartTimeInMillis == 0 { - taskItem.StartTimeInMillis = time.Now().UnixMilli() - } + taskItem.StartTimeInMillis = time.Now().UnixMilli() taskItem.Status = task.StatusRunning + taskItem.Metadata.Labels["total_diff_docs"] = 0 + taskItem.Metadata.Labels["only_in_source"] = 0 + taskItem.Metadata.Labels["only_in_target"] = 0 + taskItem.Metadata.Labels["diff_both"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID)) diff --git a/plugin/task_manager/cluster_comparison/orm.go b/plugin/task_manager/cluster_comparison/orm.go index f3d37c57..5682a61d 100644 --- a/plugin/task_manager/cluster_comparison/orm.go +++ b/plugin/task_manager/cluster_comparison/orm.go @@ -105,6 +105,7 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba "source_total_docs": sourceTotalDocs, "target_total_docs": targetTotalDocs, "permit_nodes": config.Settings.Execution.Nodes.Permit, + "name": config.Name, }, }, Cancellable: true, @@ -112,6 +113,9 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba Status: task.StatusInit, ConfigString: util.MustToJSON(config), } + if len(config.Tags) > 0 { + t.Metadata.Labels["tags"] = config.Tags + } t.ID = util.GetUUID() return &t, nil } diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 4400cf7b..e828a4a5 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -160,7 +160,11 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs) } case "cluster_comparison": - ts, _, err := h.getComparisonMajorTaskInfo(taskID) + targetTaskId := taskID + if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" { + targetTaskId = repeatStatus.LastRunChildTaskID + } + ts, _, err := h.getComparisonMajorTaskInfo(targetTaskId) if err != nil { log.Warnf("fetch progress info of task error: %v", err) return @@ -189,7 +193,7 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) return } - if obj.Metadata.Type != "pipeline" && obj.Status == task.StatusComplete { + if obj.Metadata.Type != "pipeline" && (obj.Status == task.StatusComplete && obj.Metadata.Type != "cluster_comparison") { h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError) return } diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index 439aa0c6..a4382f0f 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -74,7 +74,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. var completedIndices int for i, index := range taskConfig.Indices { indexName := index.Source.GetUniqueIndexName() - count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs + count := indexState[indexName].TotalScrollDocs percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100 if percent > 100 { percent = 100 @@ -82,7 +82,9 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) + taskConfig.Indices[i].TotalScrollDocs = count taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren if count == index.Source.Docs+index.Target.Docs { completedIndices++ } @@ -115,6 +117,7 @@ type ClusterComparisonTaskState struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int } type ComparisonIndexStateInfo struct { @@ -124,73 +127,140 @@ type ComparisonIndexStateInfo struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int + TotalScrollDocs int64 } -// TODO: calc realtime info from instance func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) { + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} indexState = map[string]ComparisonIndexStateInfo{} - - taskQuery := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskID, + const size = 500 + var ( + from = -size + hasMore = true + ) + for hasMore { + from += size + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, }, }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_comparison", + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_comparison", + }, }, }, }, }, }, - }, - } - subTasks, err := migration_util.GetTasks(taskQuery) - if err != nil { - return taskStats, indexState, err - } - - for _, subTask := range subTasks { - taskLabels := util.MapStr(subTask.Metadata.Labels) - indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") - if indexName == "" { - continue } - - cfg := migration_model.IndexComparisonTaskConfig{} - err = migration_util.GetTaskConfig(&subTask, &cfg) + subTasks, err := migration_util.GetTasks(taskQuery) if err != nil { - log.Errorf("failed to get task config, err: %v", err) + return taskStats, indexState, err + } + if len(subTasks) < size { + hasMore = false + } + + var indexMigrationTaskIDs []string + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + cfg := migration_model.IndexComparisonTaskConfig{} + err = migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") + taskStats.SourceTotalDocs += cfg.Source.DocCount + taskStats.TargetTotalDocs += cfg.Target.DocCount + taskStats.TotalDiffDocs += totalDiffDocs + st := indexState[indexName] + st.SourceTotalDocs += cfg.Source.DocCount + st.TargetTotalDocs += cfg.Target.DocCount + st.TotalDiffDocs += totalDiffDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + } + if subTask.Status == task.StatusRunning { + st.RunningChildren++ + indexState[indexName] = st + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + continue + } + sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") + targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") + st.SourceScrollDocs += sourceDocs + st.TargetScrollDocs += targetDocs + st.TotalScrollDocs += sourceDocs + targetDocs + taskStats.TargetScrollDocs += targetDocs + taskStats.SourceScrollDocs += sourceDocs + indexState[indexName] = st + } + + if len(indexMigrationTaskIDs) == 0 { continue } - sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") - targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") - totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") - taskStats.SourceTotalDocs += cfg.Source.DocCount - taskStats.SourceScrollDocs += sourceDocs - taskStats.TargetTotalDocs += cfg.Target.DocCount - taskStats.TargetScrollDocs += targetDocs - taskStats.TotalDiffDocs += totalDiffDocs - st := indexState[indexName] - st.SourceTotalDocs += cfg.Source.DocCount - st.SourceScrollDocs += sourceDocs - st.TargetTotalDocs += cfg.Target.DocCount - st.TargetScrollDocs += targetDocs - st.TotalDiffDocs += totalDiffDocs - if subTask.Status == task.StatusError { - st.ErrorPartitions += 1 + + taskQuery = util.MapStr{ + "size": len(indexMigrationTaskIDs) * 2, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "parent_id": indexMigrationTaskIDs, + }, + }, + }, + }, + }, } + subTasks, err = migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } + + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + pipelineIndexNames[subTask.ID] = indexName + + if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + } + } + + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + for pipelineID, pipelineContext := range pipelineContexts { + // add scrolledDocs of running tasks + scrollDocs := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs") + indexName := pipelineIndexNames[pipelineID] + st := indexState[indexName] + st.TotalScrollDocs += scrollDocs indexState[indexName] = st } - return taskStats, indexState, nil } @@ -231,6 +301,7 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) var partitionTaskInfos []util.MapStr + var workers = map[string]struct{}{} for i, subTask := range subTasks { cfg := migration_model.IndexComparisonTaskConfig{} @@ -242,6 +313,10 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht if i == 0 { taskInfo.Step = cfg.Source.Step } + instID := migration_util.GetMapStringValue(subTask.Metadata.Labels, "execution_instance_id") + if instID != "" { + workers[instID] = struct{}{} + } var durationInMS int64 var subCompletedTime int64 @@ -256,6 +331,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht subTaskLabels := util.MapStr(subTask.Metadata.Labels) sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled") targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled") + onlyInSource := migration_util.GetMapIntValue(subTaskLabels, "only_in_source") + onlyInTarget := migration_util.GetMapIntValue(subTaskLabels, "only_in_target") + diffBoth := migration_util.GetMapIntValue(subTaskLabels, "diff_both") partitionTaskInfo := util.MapStr{ "task_id": subTask.ID, @@ -267,6 +345,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht "duration": durationInMS, "source_total_docs": cfg.Source.DocCount, "target_total_docs": cfg.Target.DocCount, + "only_in_source": onlyInSource, + "only_in_target": onlyInTarget, + "diff_both": diffBoth, } sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg) if sourceDumpTask != nil { @@ -305,6 +386,14 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht if taskInfo.StartTime == 0 { taskInfo.StartTime = startTime } + for _, node := range taskConfig.Settings.Execution.Nodes.Permit { + if _, ok := workers[node.ID]; ok { + taskInfo.Workers = append(taskInfo.Workers, util.MapStr{ + "id": node.ID, + "name": node.Name, + }) + } + } taskInfo.Partitions = partitionTaskInfos taskInfo.CompletedPartitions = completedPartitions h.WriteJSON(w, taskInfo, http.StatusOK) diff --git a/plugin/task_manager/index_comparison/index_comparison.go b/plugin/task_manager/index_comparison/index_comparison.go index c14d902a..cdcb1bc5 100644 --- a/plugin/task_manager/index_comparison/index_comparison.go +++ b/plugin/task_manager/index_comparison/index_comparison.go @@ -362,6 +362,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 { now := time.Now() taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth + taskItem.Metadata.Labels["only_in_source"] = onlyInSource + taskItem.Metadata.Labels["only_in_target"] = onlyInTarget + taskItem.Metadata.Labels["diff_both"] = diffBoth taskItem.CompletedTime = &now taskItem.Status = task.StatusError p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ @@ -390,6 +393,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { now := time.Now() taskItem.CompletedTime = &now taskItem.Status = task.StatusComplete + taskItem.Metadata.Labels["total_diff_docs"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, "index comparison completed") diff --git a/plugin/task_manager/model/comparison.go b/plugin/task_manager/model/comparison.go index ba9d046a..b64d61ce 100644 --- a/plugin/task_manager/model/comparison.go +++ b/plugin/task_manager/model/comparison.go @@ -5,6 +5,8 @@ import ( ) type ClusterComparisonTaskConfig struct { + Name string `json:"name"` + Tags []string `json:"tags"` Cluster struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` @@ -29,8 +31,10 @@ type ClusterComparisonIndexConfig struct { Partition *IndexPartition `json:"partition,omitempty"` // only used in API - ScrollPercent float64 `json:"scroll_percent,omitempty"` + ScrollPercent float64 `json:"scroll_percent,omitempty"` + TotalScrollDocs int64 `json:"total_scroll_docs,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` + RunningChildren int `json:"running_children,omitempty"` } type IndexComparisonTaskConfig struct { diff --git a/plugin/task_manager/util/orm.go b/plugin/task_manager/util/orm.go index 99506721..faa65d00 100644 --- a/plugin/task_manager/util/orm.go +++ b/plugin/task_manager/util/orm.go @@ -242,7 +242,7 @@ func UpdateStoppedChildTasksToReady(taskItem *task.Task, taskType string) error }, { "terms": util.MapStr{ - "status": []string{task.StatusError, task.StatusStopped}, + "status": []string{task.StatusError, task.StatusStopped, task.StatusComplete}, }, }, {