[plugin][migration] add more logs (#46)

[plugin][migration] add more logs

Co-authored-by: Kassian Sun <kassiansun@outlook.com>
This commit is contained in:
sunjiacheng 2023-03-30 18:24:26 +08:00 committed by medcl
parent 060d53d5ca
commit e75a566f03
1 changed files with 8 additions and 8 deletions

View File

@ -118,7 +118,7 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
return nil return nil
} }
if t.Metadata.Labels == nil { if t.Metadata.Labels == nil {
log.Error("got migration task with empty labels, skip handling: %v", t) log.Errorf("got migration task with empty labels, skip handling: %v", t)
continue continue
} }
if t.Metadata.Labels["business_id"] == "cluster_migration" { if t.Metadata.Labels["business_id"] == "cluster_migration" {
@ -320,7 +320,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
for _, pipelineID := range state.PipelineIds { for _, pipelineID := range state.PipelineIds {
err = inst.DeletePipeline(pipelineID) err = inst.DeletePipeline(pipelineID)
if err != nil { if err != nil {
log.Error(err) log.Errorf("delete pipeline failed, err: %v", err)
continue continue
} }
selector := util.MapStr{ selector := util.MapStr{
@ -331,7 +331,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
//clear queue //clear queue
err = inst.DeleteQueueBySelector(selector) err = inst.DeleteQueueBySelector(selector)
if err != nil { if err != nil {
log.Error(err) log.Errorf("delete queue failed, err: %v", err)
} }
} }
} }
@ -472,7 +472,7 @@ MainLoop:
hasStopped = false hasStopped = false
break break
} }
log.Error(err) log.Errorf("failed to stop pipeline, err: %v", err)
} }
} }
if hasStopped { if hasStopped {
@ -495,7 +495,7 @@ MainLoop:
for _, pipelineID := range taskIDs { for _, pipelineID := range taskIDs {
err = inst.DeletePipeline(pipelineID) err = inst.DeletePipeline(pipelineID)
if err != nil { if err != nil {
log.Error(err) log.Errorf("failed to delete pipeline, err: %v", err)
continue continue
} }
selector := util.MapStr{ selector := util.MapStr{
@ -506,7 +506,7 @@ MainLoop:
//clear queue //clear queue
err = inst.DeleteQueueBySelector(selector) err = inst.DeleteQueueBySelector(selector)
if err != nil { if err != nil {
log.Error(err) log.Errorf("failed to delete queue, err: %v", err)
} }
} }
if st, ok := p.state[instanceID]; ok { if st, ok := p.state[instanceID]; ok {
@ -821,7 +821,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc
tempInst.ID = node.ID tempInst.ID = node.ID
_, err = orm.Get(&tempInst) _, err = orm.Get(&tempInst)
if err != nil { if err != nil {
log.Error(err) log.Errorf("failed to get instance, err: %v", err)
continue continue
} }
err = tempInst.TryConnectWithTimeout(time.Second) err = tempInst.TryConnectWithTimeout(time.Second)
@ -902,7 +902,7 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem
esClient := elastic.GetClient(p.config.Elasticsearch) esClient := elastic.GetClient(p.config.Elasticsearch)
_, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh) _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh)
if err != nil { if err != nil {
log.Error(err) log.Errorf("failed to update task, err: %v", err)
} }
if logItem != nil { if logItem != nil {
event.SaveLog(event.Event{ event.SaveLog(event.Event{