format log message of migration task

This commit is contained in:
liugq 2023-04-10 18:21:27 +08:00
parent b5218b8516
commit 4eb742d47e
1 changed files with 8 additions and 8 deletions

View File

@ -157,7 +157,7 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{ p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{
Success: false, Success: false,
Error: err.Error(), Error: err.Error(),
}, fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err)) }, fmt.Sprintf("error handling task [%s]", t.ID))
} }
} }
//es index refresh //es index refresh
@ -216,7 +216,7 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error {
p.sendMajorTaskNotification(taskItem) p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
Success: true, Success: true,
}, fmt.Sprintf("success to start task [%s]", taskItem.ID)) }, fmt.Sprintf("task [%s] started", taskItem.ID))
}() }()
esClient := elastic.GetClient(p.config.Elasticsearch) esClient := elastic.GetClient(p.config.Elasticsearch)
@ -304,7 +304,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e
if len(tasks) == 0 { if len(tasks) == 0 {
taskItem.Status = task2.StatusStopped taskItem.Status = task2.StatusStopped
p.sendMajorTaskNotification(taskItem) p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
} }
return nil return nil
} }
@ -319,7 +319,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error
tn := time.Now() tn := time.Now()
taskItem.CompletedTime = &tn taskItem.CompletedTime = &tn
p.sendMajorTaskNotification(taskItem) p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is complete", taskItem.ID)) p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] completed", taskItem.ID))
} }
return nil return nil
} }
@ -374,7 +374,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
Success: state.Error == "", Success: state.Error == "",
Error: state.Error, Error: state.Error,
}, fmt.Sprintf("task [%s] is complete", taskItem.ID)) }, fmt.Sprintf("task [%s] completed", taskItem.ID))
} else { } else {
if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) {
ptasks, err := p.getPipelineTasks(taskItem.ID) ptasks, err := p.getPipelineTasks(taskItem.ID)
@ -408,9 +408,9 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
return err return err
} }
taskItem.Metadata.Labels["running_phase"] = 2 taskItem.Metadata.Labels["running_phase"] = 2
p.saveTaskAndWriteLog(taskItem, "wait_for", nil, fmt.Sprintf("task [%s] started phase 2", taskItem.ID))
} }
} }
p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "")
} }
} }
return nil return nil
@ -514,7 +514,7 @@ MainLoop:
p.state[instanceID] = st p.state[instanceID] = st
} }
} }
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
return nil return nil
} }
@ -769,7 +769,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
Success: true, Success: true,
}, fmt.Sprintf("dispatch task [%s] to instance [%s]", taskItem.ID, instance.ID)) }, fmt.Sprintf("task [%s] started", taskItem.ID))
return nil return nil
} }