From 651c86feff34a87e7cac5579428038f501451ae2 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Fri, 21 Apr 2023 18:21:01 +0800 Subject: [PATCH] [migration] batch task by task type --- plugin/migration/pipeline.go | 125 +++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 58 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index d5495230..fcf44d09 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -111,68 +111,70 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { if ctx.IsCanceled() { return nil } - tasks, err := p.getMigrationTasks(p.config.TaskBatchSize) - if err != nil { - log.Errorf("failed to get migration tasks, err: %v", err) - return err - } - if len(tasks) == 0 { - return nil - } - for _, t := range tasks { - if ctx.IsCanceled() { + for _, taskType := range []string{"cluster_migration", "index_migration", "pipeline"} { + tasks, err := p.getMigrationTasks(taskType, p.config.TaskBatchSize) + if err != nil { + log.Errorf("failed to get migration tasks, err: %v", err) + return err + } + if len(tasks) == 0 { return nil } - if t.Metadata.Labels == nil { - log.Errorf("got migration task [%s] with empty labels, skip handling", t.ID) - continue - } - log.Debugf("start handling task [%s] (type: %s, status: %s)", t.ID, t.Metadata.Type, t.Status) - switch t.Metadata.Type { - case "cluster_migration": - // handle major task - switch t.Status { - case task2.StatusReady: - err = p.handleReadyMajorTask(&t) - case task2.StatusRunning: - err = p.handleRunningMajorTask(&t) - case task2.StatusPendingStop: - err = p.handlePendingStopMajorTask(&t) + for _, t := range tasks { + if ctx.IsCanceled() { + return nil + } + if t.Metadata.Labels == nil { + log.Errorf("got migration task [%s] with empty labels, skip handling", t.ID) + continue + } + log.Debugf("start handling task [%s] (type: %s, status: %s)", t.ID, t.Metadata.Type, t.Status) + switch t.Metadata.Type { + case "cluster_migration": + // handle major task + switch t.Status { + case task2.StatusReady: + err = p.handleReadyMajorTask(&t) + case task2.StatusRunning: + err = p.handleRunningMajorTask(&t) + case task2.StatusPendingStop: + err = p.handlePendingStopMajorTask(&t) + } + if err != nil { + log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err) + } + case "index_migration": + // handle sub migration task + switch t.Status { + case task2.StatusReady: + // split sub task + err = p.handleReadySubTask(&t) + case task2.StatusRunning: + // check pipeline tasks status + err = p.handleRunningSubTask(&t) + case task2.StatusPendingStop: + // mark pipeline tasks as pending_stop + err = p.handlePendingStopSubTask(&t) + } + if err != nil { + log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err) + } + case "pipeline": + // handle pipeline task + err = p.pipelineTaskProcessor.Process(&t) + if err != nil { + log.Errorf("failed to handling pipeline task [%s]: [%v]", t.ID, err) + } } if err != nil { - log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err) + t.Status = task2.StatusError + tn := time.Now() + t.CompletedTime = &tn + p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{ + Success: false, + Error: err.Error(), + }, fmt.Sprintf("error handling task [%s]", t.ID)) } - case "index_migration": - // handle sub migration task - switch t.Status { - case task2.StatusReady: - // split sub task - err = p.handleReadySubTask(&t) - case task2.StatusRunning: - // check pipeline tasks status - err = p.handleRunningSubTask(&t) - case task2.StatusPendingStop: - // mark pipeline tasks as pending_stop - err = p.handlePendingStopSubTask(&t) - } - if err != nil { - log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err) - } - case "pipeline": - // handle pipeline task - err = p.pipelineTaskProcessor.Process(&t) - if err != nil { - log.Errorf("failed to handling pipeline task [%s]: [%v]", t.ID, err) - } - } - if err != nil { - t.Status = task2.StatusError - tn := time.Now() - t.CompletedTime = &tn - p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{ - Success: false, - Error: err.Error(), - }, fmt.Sprintf("error handling task [%s]", t.ID)) } } //es index refresh @@ -741,7 +743,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc return } -func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) { +func (p *DispatcherProcessor) getMigrationTasks(taskType string, size int) ([]task2.Task, error) { queryDsl := util.MapStr{ "size": size, "sort": []util.MapStr{ @@ -759,6 +761,13 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) "status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, }, }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": taskType, + }, + }, + }, }, }, },