diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 22e20d2e..f3b84b69 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -157,7 +157,7 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{ Success: false, Error: err.Error(), - }, fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err)) + }, fmt.Sprintf("error handling task [%s]", t.ID)) } } //es index refresh @@ -216,7 +216,7 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { p.sendMajorTaskNotification(taskItem) p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ Success: true, - }, fmt.Sprintf("success to start task [%s]", taskItem.ID)) + }, fmt.Sprintf("task [%s] started", taskItem.ID)) }() esClient := elastic.GetClient(p.config.Elasticsearch) @@ -304,7 +304,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e if len(tasks) == 0 { taskItem.Status = task2.StatusStopped p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) } return nil } @@ -319,7 +319,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error tn := time.Now() taskItem.CompletedTime = &tn p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is complete", taskItem.ID)) + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] completed", taskItem.ID)) } return nil } @@ -374,7 +374,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ Success: state.Error == "", Error: state.Error, - }, fmt.Sprintf("task [%s] is complete", taskItem.ID)) + }, fmt.Sprintf("task [%s] completed", taskItem.ID)) } else { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { ptasks, err := p.getPipelineTasks(taskItem.ID) @@ -408,9 +408,9 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { return err } taskItem.Metadata.Labels["running_phase"] = 2 + p.saveTaskAndWriteLog(taskItem, "wait_for", nil, fmt.Sprintf("task [%s] started phase 2", taskItem.ID)) } } - p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "") } } return nil @@ -514,7 +514,7 @@ MainLoop: p.state[instanceID] = st } } - p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) return nil } @@ -769,7 +769,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ Success: true, - }, fmt.Sprintf("dispatch task [%s] to instance [%s]", taskItem.ID, instance.ID)) + }, fmt.Sprintf("task [%s] started", taskItem.ID)) return nil }