diff --git a/plugin/task_manager/cluster_comparison/cluster_comparison.go b/plugin/task_manager/cluster_comparison/cluster_comparison.go index ef7ac7ea..9dcaf070 100644 --- a/plugin/task_manager/cluster_comparison/cluster_comparison.go +++ b/plugin/task_manager/cluster_comparison/cluster_comparison.go @@ -60,6 +60,10 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { taskItem.StartTimeInMillis = time.Now().UnixMilli() } taskItem.Status = task.StatusRunning + taskItem.Metadata.Labels["total_diff_docs"] = 0 + taskItem.Metadata.Labels["only_in_source"] = 0 + taskItem.Metadata.Labels["only_in_target"] = 0 + taskItem.Metadata.Labels["diff_both"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID)) diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index 439aa0c6..05ad2a41 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -83,6 +83,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren if count == index.Source.Docs+index.Target.Docs { completedIndices++ } @@ -115,6 +116,7 @@ type ClusterComparisonTaskState struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int } type ComparisonIndexStateInfo struct { @@ -124,6 +126,7 @@ type ComparisonIndexStateInfo struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int } // TODO: calc realtime info from instance @@ -188,6 +191,9 @@ func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats Cluste if subTask.Status == task.StatusError { st.ErrorPartitions += 1 } + if subTask.Status == task.StatusRunning { + st.RunningChildren++ + } indexState[indexName] = st } @@ -256,6 +262,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht subTaskLabels := util.MapStr(subTask.Metadata.Labels) sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled") targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled") + onlyInSource := migration_util.GetMapIntValue(subTaskLabels, "only_in_source") + onlyInTarget := migration_util.GetMapIntValue(subTaskLabels, "only_in_target") + diffBoth := migration_util.GetMapIntValue(subTaskLabels, "diff_both") partitionTaskInfo := util.MapStr{ "task_id": subTask.ID, @@ -267,6 +276,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht "duration": durationInMS, "source_total_docs": cfg.Source.DocCount, "target_total_docs": cfg.Target.DocCount, + "only_in_source": onlyInSource, + "only_in_target": onlyInTarget, + "diff_both": diffBoth, } sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg) if sourceDumpTask != nil { diff --git a/plugin/task_manager/index_comparison/index_comparison.go b/plugin/task_manager/index_comparison/index_comparison.go index c14d902a..cdcb1bc5 100644 --- a/plugin/task_manager/index_comparison/index_comparison.go +++ b/plugin/task_manager/index_comparison/index_comparison.go @@ -362,6 +362,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 { now := time.Now() taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth + taskItem.Metadata.Labels["only_in_source"] = onlyInSource + taskItem.Metadata.Labels["only_in_target"] = onlyInTarget + taskItem.Metadata.Labels["diff_both"] = diffBoth taskItem.CompletedTime = &now taskItem.Status = task.StatusError p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ @@ -390,6 +393,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { now := time.Now() taskItem.CompletedTime = &now taskItem.Status = task.StatusComplete + taskItem.Metadata.Labels["total_diff_docs"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, "index comparison completed")