[migration] fix progress when index task is running

This commit is contained in:
Kassian Sun 2023-04-19 14:22:34 +08:00 committed by Gitea
parent 7d675ba3d1
commit 1bca4335f1
2 changed files with 19 additions and 33 deletions

View File

@ -686,10 +686,8 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
}
}
taskInfo["data_partition"] = len(subTasks)
var taskStats = map[string]struct {
ScrolledDocs float64
IndexDocs float64
}{}
var scrollStats = map[string]int64{}
var bulkStats = map[string]int64{}
for instID, taskIDs := range pipelineTaskIDs {
inst := &model.Instance{}
inst.ID = instID
@ -706,19 +704,11 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
for pipelineID, status := range pipelines {
if pid, ok := pipelineSubParentIDs[pipelineID]; ok {
if v, err := status.Context.GetValue("es_scroll.scrolled_docs"); err == nil {
if vv, ok := v.(float64); ok {
stat := taskStats[pid]
stat.ScrolledDocs = vv
taskStats[pid] = stat
}
if vv := migration_util.GetMapIntValue(status.Context, "es_scroll.scrolled_docs"); vv > 0 {
scrollStats[pid] = vv
}
if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil {
if vv, ok := v.(float64); ok {
stat := taskStats[pid]
stat.IndexDocs = vv
taskStats[pid] = stat
}
if vv := migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count"); vv > 0 {
bulkStats[pid] = vv
}
}
}
@ -756,23 +746,19 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
}
}
var (
scrollDocs float64
indexDocs float64
scrollDocs int64
indexDocs int64
)
if stat, ok := taskStats[ptask.ID]; ok {
scrollDocs = stat.ScrolledDocs
indexDocs = stat.IndexDocs
ptaskLabels := util.MapStr(ptask.Metadata.Labels)
if vv, ok := scrollStats[ptask.ID]; ok {
scrollDocs = vv
} else {
if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError {
if ptask.Metadata.Labels != nil {
if v, ok := ptask.Metadata.Labels["scrolled_docs"].(float64); ok {
scrollDocs = v
}
if v, ok := ptask.Metadata.Labels["index_docs"].(float64); ok {
indexDocs = v
}
}
}
scrollDocs = migration_util.GetMapIntValue(ptaskLabels, "scrolled_docs")
}
if vv, ok := bulkStats[ptask.ID]; ok {
indexDocs = vv
} else {
indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs")
}
var subCompletedTime int64

View File

@ -361,7 +361,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Task, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) {
// NOTE: old-version pipeline tasks has empty status
if scrollTask.Status == task2.StatusError || scrollTask.Status == "" {
return false, 0, errors.New("scroll pipeline failed")
return true, 0, errors.New("scroll pipeline failed")
}
// scroll not finished yet
@ -383,7 +383,7 @@ func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Ta
func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) {
if bulkTask.Status == task2.StatusError || bulkTask.Status == "" {
return false, 0, errors.New("bulk pipeline failed")
return true, 0, errors.New("bulk pipeline failed")
}
// start bulk as needed