diff --git a/plugin/migration/api.go b/plugin/migration/api.go index f14f76ce..ac4ef5ab 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -659,6 +659,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt var pipelineTaskIDs = map[string][]string{} pipelineSubParentIDs := map[string]string{} subTaskStatus := map[string]string{} + parentIDPipelineTasks := map[string][]task2.Task{} for _, row := range result.Result { buf := util.MustToJSONBytes(row) subTask := task2.Task{} @@ -677,9 +678,11 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok { pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) if pl := len(subTask.ParentId); pl > 0 { - if subTaskStatus[subTask.ParentId[pl-1]] == task2.StatusRunning { - pipelineSubParentIDs[subTask.ID] = subTask.ParentId[pl-1] + parentID := subTask.ParentId[pl-1] + if subTaskStatus[parentID] == task2.StatusRunning { + pipelineSubParentIDs[subTask.ID] = parentID } + parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask) } } } @@ -770,7 +773,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } partitionTotalDocs := cfg.Source.DocCount - partitionTaskInfos = append(partitionTaskInfos, util.MapStr{ + partitionTaskInfo := util.MapStr{ "task_id": ptask.ID, "status": ptask.Status, "start_time": ptask.StartTimeInMillis, @@ -781,7 +784,21 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt "scroll_docs": scrollDocs, "index_docs": indexDocs, "total_docs": partitionTotalDocs, - }) + } + for _, pipelineTask := range parentIDPipelineTasks[ptask.ID] { + if pipelineTask.Metadata.Labels["pipeline_id"] == "es_scroll" { + partitionTaskInfo["scroll_task"] = util.MapStr{ + "id": pipelineTask.ID, + "status": pipelineTask.Status, + } + }else if pipelineTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + partitionTaskInfo["bulk_task"] = util.MapStr{ + "id": pipelineTask.ID, + "status": pipelineTask.Status, + } + } + } + partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo) if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError { completedPartitions++ }