From 97731564c1a9350676a7cda161ec1a1e171bbae3 Mon Sep 17 00:00:00 2001 From: liugq Date: Sun, 8 Oct 2023 11:00:04 +0800 Subject: [PATCH 1/6] add name and tags config --- plugin/task_manager/cluster_comparison/orm.go | 4 ++++ plugin/task_manager/model/comparison.go | 3 +++ 2 files changed, 7 insertions(+) diff --git a/plugin/task_manager/cluster_comparison/orm.go b/plugin/task_manager/cluster_comparison/orm.go index f3d37c57..5682a61d 100644 --- a/plugin/task_manager/cluster_comparison/orm.go +++ b/plugin/task_manager/cluster_comparison/orm.go @@ -105,6 +105,7 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba "source_total_docs": sourceTotalDocs, "target_total_docs": targetTotalDocs, "permit_nodes": config.Settings.Execution.Nodes.Permit, + "name": config.Name, }, }, Cancellable: true, @@ -112,6 +113,9 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba Status: task.StatusInit, ConfigString: util.MustToJSON(config), } + if len(config.Tags) > 0 { + t.Metadata.Labels["tags"] = config.Tags + } t.ID = util.GetUUID() return &t, nil } diff --git a/plugin/task_manager/model/comparison.go b/plugin/task_manager/model/comparison.go index ba9d046a..07a4085a 100644 --- a/plugin/task_manager/model/comparison.go +++ b/plugin/task_manager/model/comparison.go @@ -5,6 +5,8 @@ import ( ) type ClusterComparisonTaskConfig struct { + Name string `json:"name"` + Tags []string `json:"tags"` Cluster struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` @@ -31,6 +33,7 @@ type ClusterComparisonIndexConfig struct { // only used in API ScrollPercent float64 `json:"scroll_percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` + RunningChildren int `json:"running_children,omitempty"` } type IndexComparisonTaskConfig struct { From 2f457924801c234493eaa6bb24649be775ed11ff Mon Sep 17 00:00:00 2001 From: liugq Date: Sun, 8 Oct 2023 11:02:39 +0800 Subject: [PATCH 2/6] add diff result info and reset total_diff_docs after task restarted --- .../cluster_comparison/cluster_comparison.go | 4 ++++ plugin/task_manager/comparison_api.go | 12 ++++++++++++ .../index_comparison/index_comparison.go | 4 ++++ 3 files changed, 20 insertions(+) diff --git a/plugin/task_manager/cluster_comparison/cluster_comparison.go b/plugin/task_manager/cluster_comparison/cluster_comparison.go index ef7ac7ea..9dcaf070 100644 --- a/plugin/task_manager/cluster_comparison/cluster_comparison.go +++ b/plugin/task_manager/cluster_comparison/cluster_comparison.go @@ -60,6 +60,10 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { taskItem.StartTimeInMillis = time.Now().UnixMilli() } taskItem.Status = task.StatusRunning + taskItem.Metadata.Labels["total_diff_docs"] = 0 + taskItem.Metadata.Labels["only_in_source"] = 0 + taskItem.Metadata.Labels["only_in_target"] = 0 + taskItem.Metadata.Labels["diff_both"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID)) diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index 439aa0c6..05ad2a41 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -83,6 +83,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren if count == index.Source.Docs+index.Target.Docs { completedIndices++ } @@ -115,6 +116,7 @@ type ClusterComparisonTaskState struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int } type ComparisonIndexStateInfo struct { @@ -124,6 +126,7 @@ type ComparisonIndexStateInfo struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int } // TODO: calc realtime info from instance @@ -188,6 +191,9 @@ func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats Cluste if subTask.Status == task.StatusError { st.ErrorPartitions += 1 } + if subTask.Status == task.StatusRunning { + st.RunningChildren++ + } indexState[indexName] = st } @@ -256,6 +262,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht subTaskLabels := util.MapStr(subTask.Metadata.Labels) sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled") targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled") + onlyInSource := migration_util.GetMapIntValue(subTaskLabels, "only_in_source") + onlyInTarget := migration_util.GetMapIntValue(subTaskLabels, "only_in_target") + diffBoth := migration_util.GetMapIntValue(subTaskLabels, "diff_both") partitionTaskInfo := util.MapStr{ "task_id": subTask.ID, @@ -267,6 +276,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht "duration": durationInMS, "source_total_docs": cfg.Source.DocCount, "target_total_docs": cfg.Target.DocCount, + "only_in_source": onlyInSource, + "only_in_target": onlyInTarget, + "diff_both": diffBoth, } sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg) if sourceDumpTask != nil { diff --git a/plugin/task_manager/index_comparison/index_comparison.go b/plugin/task_manager/index_comparison/index_comparison.go index c14d902a..cdcb1bc5 100644 --- a/plugin/task_manager/index_comparison/index_comparison.go +++ b/plugin/task_manager/index_comparison/index_comparison.go @@ -362,6 +362,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 { now := time.Now() taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth + taskItem.Metadata.Labels["only_in_source"] = onlyInSource + taskItem.Metadata.Labels["only_in_target"] = onlyInTarget + taskItem.Metadata.Labels["diff_both"] = diffBoth taskItem.CompletedTime = &now taskItem.Status = task.StatusError p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ @@ -390,6 +393,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { now := time.Now() taskItem.CompletedTime = &now taskItem.Status = task.StatusComplete + taskItem.Metadata.Labels["total_diff_docs"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, "index comparison completed") From 71ba08d266a1f6b84426cb4f2992b855ac7f2814 Mon Sep 17 00:00:00 2001 From: liugq Date: Sun, 8 Oct 2023 17:46:55 +0800 Subject: [PATCH 3/6] refactor getComparisonMajorTaskInfo --- plugin/task_manager/comparison_api.go | 170 ++++++++++++++++-------- plugin/task_manager/model/comparison.go | 3 +- 2 files changed, 119 insertions(+), 54 deletions(-) diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index 05ad2a41..bca13d5d 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -74,7 +74,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. var completedIndices int for i, index := range taskConfig.Indices { indexName := index.Source.GetUniqueIndexName() - count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs + count := indexState[indexName].TotalScrollDocs percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100 if percent > 100 { percent = 100 @@ -82,6 +82,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) + taskConfig.Indices[i].TotalScrollDocs = count taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren if count == index.Source.Docs+index.Target.Docs { @@ -127,76 +128,139 @@ type ComparisonIndexStateInfo struct { TargetScrollDocs int64 TotalDiffDocs int64 RunningChildren int + TotalScrollDocs int64 } -// TODO: calc realtime info from instance func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) { + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} indexState = map[string]ComparisonIndexStateInfo{} - - taskQuery := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskID, + const size = 500 + var ( + from = -size + hasMore = true + ) + for hasMore { + from += size + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, }, }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_comparison", + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_comparison", + }, }, }, }, }, }, - }, - } - subTasks, err := migration_util.GetTasks(taskQuery) - if err != nil { - return taskStats, indexState, err - } - - for _, subTask := range subTasks { - taskLabels := util.MapStr(subTask.Metadata.Labels) - indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") - if indexName == "" { - continue } - - cfg := migration_model.IndexComparisonTaskConfig{} - err = migration_util.GetTaskConfig(&subTask, &cfg) + subTasks, err := migration_util.GetTasks(taskQuery) if err != nil { - log.Errorf("failed to get task config, err: %v", err) + return taskStats, indexState, err + } + if len(subTasks) < size { + hasMore = false + } + + var indexMigrationTaskIDs []string + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + cfg := migration_model.IndexComparisonTaskConfig{} + err = migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") + taskStats.SourceTotalDocs += cfg.Source.DocCount + taskStats.TargetTotalDocs += cfg.Target.DocCount + taskStats.TotalDiffDocs += totalDiffDocs + st := indexState[indexName] + st.SourceTotalDocs += cfg.Source.DocCount + st.TargetTotalDocs += cfg.Target.DocCount + st.TotalDiffDocs += totalDiffDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + } + if subTask.Status == task.StatusRunning { + st.RunningChildren++ + indexState[indexName] = st + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + continue + } + sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") + targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") + st.SourceScrollDocs += sourceDocs + st.TargetScrollDocs += targetDocs + st.TotalScrollDocs += sourceDocs + targetDocs + taskStats.TargetScrollDocs += targetDocs + taskStats.SourceScrollDocs += sourceDocs + indexState[indexName] = st + } + + if len(indexMigrationTaskIDs) == 0 { continue } - sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") - targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") - totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") - taskStats.SourceTotalDocs += cfg.Source.DocCount - taskStats.SourceScrollDocs += sourceDocs - taskStats.TargetTotalDocs += cfg.Target.DocCount - taskStats.TargetScrollDocs += targetDocs - taskStats.TotalDiffDocs += totalDiffDocs + + taskQuery = util.MapStr{ + "size": len(indexMigrationTaskIDs) * 2, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "parent_id": indexMigrationTaskIDs, + }, + }, + }, + }, + }, + } + subTasks, err = migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } + + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + pipelineIndexNames[subTask.ID] = indexName + + if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + } + } + + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + for pipelineID, pipelineContext := range pipelineContexts { + // add scrolledDocs of running tasks + scrollDocs := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs") + indexName := pipelineIndexNames[pipelineID] st := indexState[indexName] - st.SourceTotalDocs += cfg.Source.DocCount - st.SourceScrollDocs += sourceDocs - st.TargetTotalDocs += cfg.Target.DocCount - st.TargetScrollDocs += targetDocs - st.TotalDiffDocs += totalDiffDocs - if subTask.Status == task.StatusError { - st.ErrorPartitions += 1 - } - if subTask.Status == task.StatusRunning { - st.RunningChildren++ - } + st.TotalScrollDocs += scrollDocs indexState[indexName] = st } - return taskStats, indexState, nil } diff --git a/plugin/task_manager/model/comparison.go b/plugin/task_manager/model/comparison.go index 07a4085a..b64d61ce 100644 --- a/plugin/task_manager/model/comparison.go +++ b/plugin/task_manager/model/comparison.go @@ -31,7 +31,8 @@ type ClusterComparisonIndexConfig struct { Partition *IndexPartition `json:"partition,omitempty"` // only used in API - ScrollPercent float64 `json:"scroll_percent,omitempty"` + ScrollPercent float64 `json:"scroll_percent,omitempty"` + TotalScrollDocs int64 `json:"total_scroll_docs,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` RunningChildren int `json:"running_children,omitempty"` } From f3569c2701ce5fa0ea4dff8d7c000084f3ee0b37 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 9 Oct 2023 09:36:50 +0800 Subject: [PATCH 4/6] support restart cluster comparison task --- plugin/task_manager/common_api.go | 2 +- plugin/task_manager/util/orm.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 4400cf7b..6bed3af8 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -189,7 +189,7 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) return } - if obj.Metadata.Type != "pipeline" && obj.Status == task.StatusComplete { + if obj.Metadata.Type != "pipeline" && (obj.Status == task.StatusComplete && obj.Metadata.Type != "cluster_comparison") { h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError) return } diff --git a/plugin/task_manager/util/orm.go b/plugin/task_manager/util/orm.go index 99506721..faa65d00 100644 --- a/plugin/task_manager/util/orm.go +++ b/plugin/task_manager/util/orm.go @@ -242,7 +242,7 @@ func UpdateStoppedChildTasksToReady(taskItem *task.Task, taskType string) error }, { "terms": util.MapStr{ - "status": []string{task.StatusError, task.StatusStopped}, + "status": []string{task.StatusError, task.StatusStopped, task.StatusComplete}, }, }, { From 9f60c4685fdfa7601f06bef0e35207a8ad5ab92d Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 9 Oct 2023 10:56:22 +0800 Subject: [PATCH 5/6] return workers info of index level --- plugin/task_manager/comparison_api.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index bca13d5d..a4382f0f 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -301,6 +301,7 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) var partitionTaskInfos []util.MapStr + var workers = map[string]struct{}{} for i, subTask := range subTasks { cfg := migration_model.IndexComparisonTaskConfig{} @@ -312,6 +313,10 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht if i == 0 { taskInfo.Step = cfg.Source.Step } + instID := migration_util.GetMapStringValue(subTask.Metadata.Labels, "execution_instance_id") + if instID != "" { + workers[instID] = struct{}{} + } var durationInMS int64 var subCompletedTime int64 @@ -381,6 +386,14 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht if taskInfo.StartTime == 0 { taskInfo.StartTime = startTime } + for _, node := range taskConfig.Settings.Execution.Nodes.Permit { + if _, ok := workers[node.ID]; ok { + taskInfo.Workers = append(taskInfo.Workers, util.MapStr{ + "id": node.ID, + "name": node.Name, + }) + } + } taskInfo.Partitions = partitionTaskInfos taskInfo.CompletedPartitions = completedPartitions h.WriteJSON(w, taskInfo, http.StatusOK) From 12d4a8eef0e9366f1940ec2cbc78fa63fa3477fd Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 10 Oct 2023 08:59:44 +0800 Subject: [PATCH 6/6] return last running task info --- .../task_manager/cluster_comparison/cluster_comparison.go | 4 +--- plugin/task_manager/common_api.go | 6 +++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/plugin/task_manager/cluster_comparison/cluster_comparison.go b/plugin/task_manager/cluster_comparison/cluster_comparison.go index 9dcaf070..982419f2 100644 --- a/plugin/task_manager/cluster_comparison/cluster_comparison.go +++ b/plugin/task_manager/cluster_comparison/cluster_comparison.go @@ -56,9 +56,7 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { return nil } taskItem.RetryTimes++ - if taskItem.StartTimeInMillis == 0 { - taskItem.StartTimeInMillis = time.Now().UnixMilli() - } + taskItem.StartTimeInMillis = time.Now().UnixMilli() taskItem.Status = task.StatusRunning taskItem.Metadata.Labels["total_diff_docs"] = 0 taskItem.Metadata.Labels["only_in_source"] = 0 diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 6bed3af8..e828a4a5 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -160,7 +160,11 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs) } case "cluster_comparison": - ts, _, err := h.getComparisonMajorTaskInfo(taskID) + targetTaskId := taskID + if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" { + targetTaskId = repeatStatus.LastRunChildTaskID + } + ts, _, err := h.getComparisonMajorTaskInfo(targetTaskId) if err != nil { log.Warnf("fetch progress info of task error: %v", err) return