From 57e1fb0844afecc435490805213f7c990aa92087 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Wed, 19 Apr 2023 15:18:26 +0800 Subject: [PATCH] [migration] optimize major task progress calculation --- plugin/migration/api.go | 13 ++++--------- plugin/migration/model/model.go | 5 ++--- plugin/migration/pipeline.go | 2 +- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 09b3fd7b..f14f76ce 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -1124,15 +1124,14 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod log.Error(err) continue } + taskLabels := util.MapStr(subTask.Metadata.Labels) if subTask.Metadata.Labels != nil { //add indexDocs of already complete if subTask.Metadata.Type == "index_migration" { - if v, ok := subTask.Metadata.Labels["index_docs"].(float64); ok { - taskStats.IndexDocs += v - } + taskStats.IndexDocs += migration_util.GetMapIntValue(taskLabels, "index_docs") continue } - if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { + if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) } } @@ -1152,11 +1151,7 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod } for _, status := range pipelines { - if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { - if vv, ok := v.(float64); ok { - taskStats.IndexDocs += vv - } - } + taskStats.IndexDocs += migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count") } } return taskStats, nil diff --git a/plugin/migration/model/model.go b/plugin/migration/model/model.go index 3d2c36c5..d5d4d9e2 100644 --- a/plugin/migration/model/model.go +++ b/plugin/migration/model/model.go @@ -96,9 +96,8 @@ type ClusterInfo struct { } type MajorTaskState struct { - ScrolledDocs float64 - IndexDocs float64 - Status string + IndexDocs int64 + Status string } type IndexStateInfo struct { diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 8db3d47e..4801dcb1 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -1032,7 +1032,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat log.Errorf("search es failed, err: %v", err) return taskState, nil } - if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { + if v, err := util.ExtractInt(res.Aggregations["total_docs"].Value); err == nil { taskState.IndexDocs = v } var (