[migration] optimize major task progress calculation

This commit is contained in:
Kassian Sun 2023-04-19 15:18:26 +08:00 committed by Gitea
parent 1f95d4af48
commit 57e1fb0844
3 changed files with 7 additions and 13 deletions

View File

@ -1124,15 +1124,14 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod
log.Error(err) log.Error(err)
continue continue
} }
taskLabels := util.MapStr(subTask.Metadata.Labels)
if subTask.Metadata.Labels != nil { if subTask.Metadata.Labels != nil {
//add indexDocs of already complete //add indexDocs of already complete
if subTask.Metadata.Type == "index_migration" { if subTask.Metadata.Type == "index_migration" {
if v, ok := subTask.Metadata.Labels["index_docs"].(float64); ok { taskStats.IndexDocs += migration_util.GetMapIntValue(taskLabels, "index_docs")
taskStats.IndexDocs += v
}
continue continue
} }
if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
} }
} }
@ -1152,11 +1151,7 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod
} }
for _, status := range pipelines { for _, status := range pipelines {
if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { taskStats.IndexDocs += migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count")
if vv, ok := v.(float64); ok {
taskStats.IndexDocs += vv
}
}
} }
} }
return taskStats, nil return taskStats, nil

View File

@ -96,9 +96,8 @@ type ClusterInfo struct {
} }
type MajorTaskState struct { type MajorTaskState struct {
ScrolledDocs float64 IndexDocs int64
IndexDocs float64 Status string
Status string
} }
type IndexStateInfo struct { type IndexStateInfo struct {

View File

@ -1032,7 +1032,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat
log.Errorf("search es failed, err: %v", err) log.Errorf("search es failed, err: %v", err)
return taskState, nil return taskState, nil
} }
if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { if v, err := util.ExtractInt(res.Aggregations["total_docs"].Value); err == nil {
taskState.IndexDocs = v taskState.IndexDocs = v
} }
var ( var (