diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 0b15d771..09b3fd7b 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -686,10 +686,8 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } } taskInfo["data_partition"] = len(subTasks) - var taskStats = map[string]struct { - ScrolledDocs float64 - IndexDocs float64 - }{} + var scrollStats = map[string]int64{} + var bulkStats = map[string]int64{} for instID, taskIDs := range pipelineTaskIDs { inst := &model.Instance{} inst.ID = instID @@ -706,19 +704,11 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt for pipelineID, status := range pipelines { if pid, ok := pipelineSubParentIDs[pipelineID]; ok { - if v, err := status.Context.GetValue("es_scroll.scrolled_docs"); err == nil { - if vv, ok := v.(float64); ok { - stat := taskStats[pid] - stat.ScrolledDocs = vv - taskStats[pid] = stat - } + if vv := migration_util.GetMapIntValue(status.Context, "es_scroll.scrolled_docs"); vv > 0 { + scrollStats[pid] = vv } - if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil { - if vv, ok := v.(float64); ok { - stat := taskStats[pid] - stat.IndexDocs = vv - taskStats[pid] = stat - } + if vv := migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count"); vv > 0 { + bulkStats[pid] = vv } } } @@ -756,23 +746,19 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } } var ( - scrollDocs float64 - indexDocs float64 + scrollDocs int64 + indexDocs int64 ) - if stat, ok := taskStats[ptask.ID]; ok { - scrollDocs = stat.ScrolledDocs - indexDocs = stat.IndexDocs + ptaskLabels := util.MapStr(ptask.Metadata.Labels) + if vv, ok := scrollStats[ptask.ID]; ok { + scrollDocs = vv } else { - if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError { - if ptask.Metadata.Labels != nil { - if v, ok := ptask.Metadata.Labels["scrolled_docs"].(float64); ok { - scrollDocs = v - } - if v, ok := ptask.Metadata.Labels["index_docs"].(float64); ok { - indexDocs = v - } - } - } + scrollDocs = migration_util.GetMapIntValue(ptaskLabels, "scrolled_docs") + } + if vv, ok := bulkStats[ptask.ID]; ok { + indexDocs = vv + } else { + indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs") } var subCompletedTime int64 diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index b76b0564..09069ad4 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -361,7 +361,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Task, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) { // NOTE: old-version pipeline tasks has empty status if scrollTask.Status == task2.StatusError || scrollTask.Status == "" { - return false, 0, errors.New("scroll pipeline failed") + return true, 0, errors.New("scroll pipeline failed") } // scroll not finished yet @@ -383,7 +383,7 @@ func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Ta func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) { if bulkTask.Status == task2.StatusError || bulkTask.Status == "" { - return false, 0, errors.New("bulk pipeline failed") + return true, 0, errors.New("bulk pipeline failed") } // start bulk as needed