[plugin][migration] add error loggings
This commit is contained in:
parent
09f93d5190
commit
6de2e14235
|
@ -53,7 +53,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
|
|||
|
||||
cfg := DispatcherConfig{}
|
||||
if err := c.Unpack(&cfg); err != nil {
|
||||
log.Error(err)
|
||||
log.Errorf("failed to unpack config, err: %v", err)
|
||||
return nil, fmt.Errorf("failed to unpack the configuration of migration dispatcher processor: %s", err)
|
||||
}
|
||||
if cfg.IndexName == "" || cfg.LogIndexName == "" {
|
||||
|
@ -68,7 +68,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
|
|||
}
|
||||
} else {
|
||||
err = fmt.Errorf("parse config elastic.orm error: %w", err)
|
||||
log.Error(err)
|
||||
log.Errorf("failed to parse elastic.orm, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
@ -88,7 +88,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
|
|||
}
|
||||
state, err := processor.getInstanceTaskState()
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Errorf("failed to get instance task state, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
processor.state = state
|
||||
|
@ -131,6 +131,9 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
|
|||
case task2.StatusPendingStop:
|
||||
err = p.handlePendingStopMajorTask(&t)
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err)
|
||||
}
|
||||
} else if t.Metadata.Labels["business_id"] == "index_migration" {
|
||||
//handle sub migration task
|
||||
switch t.Status {
|
||||
|
@ -140,6 +143,9 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
|
|||
err = p.handleRunningSubTask(&t)
|
||||
case task2.StatusPendingStop:
|
||||
err = p.handlePendingStopSubTask(&t)
|
||||
if err != nil {
|
||||
log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
|
@ -532,6 +538,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
|
|||
//query split pipeline task
|
||||
ptasks, err := p.getPipelineTasks(taskItem.ID)
|
||||
if err != nil {
|
||||
log.Errorf("getPipelineTasks failed, err: %+v", err)
|
||||
return err
|
||||
}
|
||||
for i, t := range ptasks {
|
||||
|
@ -727,6 +734,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
|
|||
//call instance api to create pipeline task
|
||||
err = instance.CreatePipeline(util.MustToJSONBytes(scrollTask.Config))
|
||||
if err != nil {
|
||||
log.Errorf("create scroll pipeline failed, err: %+v", err)
|
||||
return err
|
||||
}
|
||||
//err = instance.CreatePipeline(util.MustToJSONBytes(bulkTask.Config))
|
||||
|
@ -1179,6 +1187,7 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error)
|
|||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
||||
if err != nil {
|
||||
log.Errorf("query tasks from es failed, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
if res.GetTotal() == 0 {
|
||||
|
@ -1188,11 +1197,13 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error)
|
|||
for _, hit := range res.Hits.Hits {
|
||||
buf, err := util.ToJSONBytes(hit.Source)
|
||||
if err != nil {
|
||||
log.Errorf("marshal task json failed, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
tk := task2.Task{}
|
||||
err = util.FromJSONBytes(buf, &tk)
|
||||
if err != nil {
|
||||
log.Errorf("unmarshal task json failed, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
migrationTasks = append(migrationTasks, tk)
|
||||
|
@ -1203,6 +1214,7 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error)
|
|||
func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCompleteState, error) {
|
||||
ptasks, err := p.getPipelineTasks(subTask.ID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
var pids []string
|
||||
|
@ -1251,6 +1263,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo
|
|||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||
res, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(query))
|
||||
if err != nil {
|
||||
log.Errorf("search task log from es failed, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
|
@ -1262,6 +1275,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo
|
|||
}
|
||||
totalDocs, err := util.MapStr(cfg).GetValue("source.doc_count")
|
||||
if err != nil {
|
||||
log.Errorf("failed to get source.doc_count, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -1325,6 +1339,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo
|
|||
inst.ID = instID
|
||||
_, err = orm.Get(&inst)
|
||||
if err != nil {
|
||||
log.Errorf("get instance failed, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
err = inst.TryConnectWithTimeout(time.Second * 3)
|
||||
|
@ -1376,6 +1391,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat
|
|||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
||||
if err != nil {
|
||||
log.Errorf("search es failed, err: %v", err)
|
||||
return taskState, err
|
||||
}
|
||||
if v, ok := res.Aggregations["total_docs"].Value.(float64); ok {
|
||||
|
@ -1436,6 +1452,7 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState
|
|||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
||||
if err != nil {
|
||||
log.Errorf("search es failed, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
state := map[string]DispatcherState{}
|
||||
|
|
Loading…
Reference in New Issue