return pipeline task info

This commit is contained in:
liugq 2023-04-24 14:37:57 +08:00
parent 63a5f4674f
commit 0ac40751cd
1 changed files with 21 additions and 4 deletions

View File

@ -659,6 +659,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
var pipelineTaskIDs = map[string][]string{}
pipelineSubParentIDs := map[string]string{}
subTaskStatus := map[string]string{}
parentIDPipelineTasks := map[string][]task2.Task{}
for _, row := range result.Result {
buf := util.MustToJSONBytes(row)
subTask := task2.Task{}
@ -677,9 +678,11 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok {
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
if pl := len(subTask.ParentId); pl > 0 {
if subTaskStatus[subTask.ParentId[pl-1]] == task2.StatusRunning {
pipelineSubParentIDs[subTask.ID] = subTask.ParentId[pl-1]
parentID := subTask.ParentId[pl-1]
if subTaskStatus[parentID] == task2.StatusRunning {
pipelineSubParentIDs[subTask.ID] = parentID
}
parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask)
}
}
}
@ -770,7 +773,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
}
partitionTotalDocs := cfg.Source.DocCount
partitionTaskInfos = append(partitionTaskInfos, util.MapStr{
partitionTaskInfo := util.MapStr{
"task_id": ptask.ID,
"status": ptask.Status,
"start_time": ptask.StartTimeInMillis,
@ -781,7 +784,21 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
"scroll_docs": scrollDocs,
"index_docs": indexDocs,
"total_docs": partitionTotalDocs,
})
}
for _, pipelineTask := range parentIDPipelineTasks[ptask.ID] {
if pipelineTask.Metadata.Labels["pipeline_id"] == "es_scroll" {
partitionTaskInfo["scroll_task"] = util.MapStr{
"id": pipelineTask.ID,
"status": pipelineTask.Status,
}
}else if pipelineTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
partitionTaskInfo["bulk_task"] = util.MapStr{
"id": pipelineTask.ID,
"status": pipelineTask.Status,
}
}
}
partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo)
if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError {
completedPartitions++
}