From e75a566f03a3b7f41fd46b54995e9dfd41fca72e Mon Sep 17 00:00:00 2001 From: sunjiacheng Date: Thu, 30 Mar 2023 18:24:26 +0800 Subject: [PATCH] [plugin][migration] add more logs (#46) [plugin][migration] add more logs Co-authored-by: Kassian Sun --- plugin/migration/pipeline.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index ce77c6cd..d2ad25ce 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -118,7 +118,7 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { return nil } if t.Metadata.Labels == nil { - log.Error("got migration task with empty labels, skip handling: %v", t) + log.Errorf("got migration task with empty labels, skip handling: %v", t) continue } if t.Metadata.Labels["business_id"] == "cluster_migration" { @@ -320,7 +320,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { for _, pipelineID := range state.PipelineIds { err = inst.DeletePipeline(pipelineID) if err != nil { - log.Error(err) + log.Errorf("delete pipeline failed, err: %v", err) continue } selector := util.MapStr{ @@ -331,7 +331,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { //clear queue err = inst.DeleteQueueBySelector(selector) if err != nil { - log.Error(err) + log.Errorf("delete queue failed, err: %v", err) } } } @@ -472,7 +472,7 @@ MainLoop: hasStopped = false break } - log.Error(err) + log.Errorf("failed to stop pipeline, err: %v", err) } } if hasStopped { @@ -495,7 +495,7 @@ MainLoop: for _, pipelineID := range taskIDs { err = inst.DeletePipeline(pipelineID) if err != nil { - log.Error(err) + log.Errorf("failed to delete pipeline, err: %v", err) continue } selector := util.MapStr{ @@ -506,7 +506,7 @@ MainLoop: //clear queue err = inst.DeleteQueueBySelector(selector) if err != nil { - log.Error(err) + log.Errorf("failed to delete queue, err: %v", err) } } if st, ok := p.state[instanceID]; ok { @@ -821,7 +821,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc tempInst.ID = node.ID _, err = orm.Get(&tempInst) if err != nil { - log.Error(err) + log.Errorf("failed to get instance, err: %v", err) continue } err = tempInst.TryConnectWithTimeout(time.Second) @@ -902,7 +902,7 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh) if err != nil { - log.Error(err) + log.Errorf("failed to update task, err: %v", err) } if logItem != nil { event.SaveLog(event.Event{