From 71ba08d266a1f6b84426cb4f2992b855ac7f2814 Mon Sep 17 00:00:00 2001 From: liugq Date: Sun, 8 Oct 2023 17:46:55 +0800 Subject: [PATCH] refactor getComparisonMajorTaskInfo --- plugin/task_manager/comparison_api.go | 170 ++++++++++++++++-------- plugin/task_manager/model/comparison.go | 3 +- 2 files changed, 119 insertions(+), 54 deletions(-) diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index 05ad2a41..bca13d5d 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -74,7 +74,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. var completedIndices int for i, index := range taskConfig.Indices { indexName := index.Source.GetUniqueIndexName() - count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs + count := indexState[indexName].TotalScrollDocs percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100 if percent > 100 { percent = 100 @@ -82,6 +82,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) + taskConfig.Indices[i].TotalScrollDocs = count taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren if count == index.Source.Docs+index.Target.Docs { @@ -127,76 +128,139 @@ type ComparisonIndexStateInfo struct { TargetScrollDocs int64 TotalDiffDocs int64 RunningChildren int + TotalScrollDocs int64 } -// TODO: calc realtime info from instance func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) { + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} indexState = map[string]ComparisonIndexStateInfo{} - - taskQuery := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskID, + const size = 500 + var ( + from = -size + hasMore = true + ) + for hasMore { + from += size + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, }, }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_comparison", + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_comparison", + }, }, }, }, }, }, - }, - } - subTasks, err := migration_util.GetTasks(taskQuery) - if err != nil { - return taskStats, indexState, err - } - - for _, subTask := range subTasks { - taskLabels := util.MapStr(subTask.Metadata.Labels) - indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") - if indexName == "" { - continue } - - cfg := migration_model.IndexComparisonTaskConfig{} - err = migration_util.GetTaskConfig(&subTask, &cfg) + subTasks, err := migration_util.GetTasks(taskQuery) if err != nil { - log.Errorf("failed to get task config, err: %v", err) + return taskStats, indexState, err + } + if len(subTasks) < size { + hasMore = false + } + + var indexMigrationTaskIDs []string + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + cfg := migration_model.IndexComparisonTaskConfig{} + err = migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") + taskStats.SourceTotalDocs += cfg.Source.DocCount + taskStats.TargetTotalDocs += cfg.Target.DocCount + taskStats.TotalDiffDocs += totalDiffDocs + st := indexState[indexName] + st.SourceTotalDocs += cfg.Source.DocCount + st.TargetTotalDocs += cfg.Target.DocCount + st.TotalDiffDocs += totalDiffDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + } + if subTask.Status == task.StatusRunning { + st.RunningChildren++ + indexState[indexName] = st + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + continue + } + sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") + targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") + st.SourceScrollDocs += sourceDocs + st.TargetScrollDocs += targetDocs + st.TotalScrollDocs += sourceDocs + targetDocs + taskStats.TargetScrollDocs += targetDocs + taskStats.SourceScrollDocs += sourceDocs + indexState[indexName] = st + } + + if len(indexMigrationTaskIDs) == 0 { continue } - sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") - targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") - totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") - taskStats.SourceTotalDocs += cfg.Source.DocCount - taskStats.SourceScrollDocs += sourceDocs - taskStats.TargetTotalDocs += cfg.Target.DocCount - taskStats.TargetScrollDocs += targetDocs - taskStats.TotalDiffDocs += totalDiffDocs + + taskQuery = util.MapStr{ + "size": len(indexMigrationTaskIDs) * 2, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "parent_id": indexMigrationTaskIDs, + }, + }, + }, + }, + }, + } + subTasks, err = migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } + + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + pipelineIndexNames[subTask.ID] = indexName + + if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + } + } + + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + for pipelineID, pipelineContext := range pipelineContexts { + // add scrolledDocs of running tasks + scrollDocs := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs") + indexName := pipelineIndexNames[pipelineID] st := indexState[indexName] - st.SourceTotalDocs += cfg.Source.DocCount - st.SourceScrollDocs += sourceDocs - st.TargetTotalDocs += cfg.Target.DocCount - st.TargetScrollDocs += targetDocs - st.TotalDiffDocs += totalDiffDocs - if subTask.Status == task.StatusError { - st.ErrorPartitions += 1 - } - if subTask.Status == task.StatusRunning { - st.RunningChildren++ - } + st.TotalScrollDocs += scrollDocs indexState[indexName] = st } - return taskStats, indexState, nil } diff --git a/plugin/task_manager/model/comparison.go b/plugin/task_manager/model/comparison.go index 07a4085a..b64d61ce 100644 --- a/plugin/task_manager/model/comparison.go +++ b/plugin/task_manager/model/comparison.go @@ -31,7 +31,8 @@ type ClusterComparisonIndexConfig struct { Partition *IndexPartition `json:"partition,omitempty"` // only used in API - ScrollPercent float64 `json:"scroll_percent,omitempty"` + ScrollPercent float64 `json:"scroll_percent,omitempty"` + TotalScrollDocs int64 `json:"total_scroll_docs,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` RunningChildren int `json:"running_children,omitempty"` }