diff --git a/plugin/task_manager/model/common.go b/plugin/task_manager/model/common.go index b5d2a517..4bc8512e 100644 --- a/plugin/task_manager/model/common.go +++ b/plugin/task_manager/model/common.go @@ -2,6 +2,7 @@ package model import ( "fmt" + "infini.sh/framework/core/task" "time" "infini.sh/framework/core/util" @@ -82,3 +83,9 @@ func (incremental *IndexIncremental) BuildFilter(current int64, step time.Durati }, }, nil } + +type QueryTask struct { + Type string + Status []string + TaskHandler func(taskItem *task.Task) error +} \ No newline at end of file diff --git a/plugin/task_manager/pipeline.go b/plugin/task_manager/pipeline.go index e5652eac..afd4a65d 100644 --- a/plugin/task_manager/pipeline.go +++ b/plugin/task_manager/pipeline.go @@ -40,6 +40,7 @@ type DispatcherProcessor struct { indexMigrationTaskProcessor migration_model.Processor clusterComparisonTaskProcessor migration_model.Processor indexComparisonTaskProcessor migration_model.Processor + queryTasks []migration_model.QueryTask } type DispatcherConfig struct { @@ -101,6 +102,25 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.indexComparisonTaskProcessor = index_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.clusterComparisonTaskProcessor = cluster_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) + processor.queryTasks = []migration_model.QueryTask{ + // handle pipeline task + {"pipeline", []string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, processor.pipelineTaskProcessor.Process}, + // handle comparison tasks + {"cluster_comparison", []string{task.StatusPendingStop}, processor.clusterComparisonTaskProcessor.Process}, + {"index_comparison", []string{task.StatusPendingStop}, processor.indexComparisonTaskProcessor.Process}, + {"index_comparison", []string{task.StatusPendingStop}, processor.indexComparisonTaskProcessor.Process}, + {"index_comparison", []string{task.StatusRunning}, processor.indexComparisonTaskProcessor.Process}, + {"index_comparison", []string{task.StatusReady}, processor.indexComparisonTaskProcessor.Process}, + {"cluster_comparison", []string{task.StatusRunning}, processor.clusterComparisonTaskProcessor.Process}, + {"cluster_comparison", []string{task.StatusReady}, processor.clusterComparisonTaskProcessor.Process}, + // handle migration tasks + {"cluster_migration", []string{task.StatusPendingStop}, processor.clusterMigrationTaskProcessor.Process}, + {"index_migration", []string{task.StatusPendingStop}, processor.indexMigrationTaskProcessor.Process}, + {"index_migration", []string{task.StatusRunning}, processor.indexMigrationTaskProcessor.Process}, + {"index_migration", []string{task.StatusReady}, processor.indexMigrationTaskProcessor.Process}, + {"cluster_migration", []string{task.StatusRunning}, processor.clusterMigrationTaskProcessor.Process}, + {"cluster_migration", []string{task.StatusReady}, processor.clusterMigrationTaskProcessor.Process}, + } return &processor, nil } @@ -109,40 +129,38 @@ func (p *DispatcherProcessor) Name() string { return "migration_dispatcher" } -func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { - // handle repeating tasks - p.handleRepeatingTasks(ctx, "cluster_comparison") - p.handleRepeatingTasks(ctx, "cluster_migration") +var ( + repeatingTaskTypes = []string{"cluster_comparison", "cluster_migration"} +) - // handle pipeline task - p.handleTasks(ctx, "pipeline", []string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, p.pipelineTaskProcessor.Process) - - // handle comparison tasks - p.handleTasks(ctx, "cluster_comparison", []string{task.StatusPendingStop}, p.clusterComparisonTaskProcessor.Process) - p.handleTasks(ctx, "index_comparison", []string{task.StatusPendingStop}, p.indexComparisonTaskProcessor.Process) - p.handleTasks(ctx, "index_comparison", []string{task.StatusRunning}, p.indexComparisonTaskProcessor.Process) - p.handleTasks(ctx, "index_comparison", []string{task.StatusReady}, p.indexComparisonTaskProcessor.Process) - p.handleTasks(ctx, "cluster_comparison", []string{task.StatusRunning}, p.clusterComparisonTaskProcessor.Process) - p.handleTasks(ctx, "cluster_comparison", []string{task.StatusReady}, p.clusterComparisonTaskProcessor.Process) - - // handle migration tasks - p.handleTasks(ctx, "cluster_migration", []string{task.StatusPendingStop}, p.clusterMigrationTaskProcessor.Process) - p.handleTasks(ctx, "index_migration", []string{task.StatusPendingStop}, p.indexMigrationTaskProcessor.Process) - p.handleTasks(ctx, "index_migration", []string{task.StatusRunning}, p.indexMigrationTaskProcessor.Process) - p.handleTasks(ctx, "index_migration", []string{task.StatusReady}, p.indexMigrationTaskProcessor.Process) - p.handleTasks(ctx, "cluster_migration", []string{task.StatusRunning}, p.clusterMigrationTaskProcessor.Process) - p.handleTasks(ctx, "cluster_migration", []string{task.StatusReady}, p.clusterMigrationTaskProcessor.Process) +func (p *DispatcherProcessor) getTasks() error { return nil } -func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task.Task) error) { +func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { + var handledTaskNum int + // handle repeating tasks + for _, taskType := range repeatingTaskTypes { + handledTaskNum += p.handleRepeatingTasks(ctx, taskType) + + } + for _, tsk := range p.queryTasks { + handledTaskNum += p.handleTasks(ctx, tsk.Type, tsk.Status, tsk.TaskHandler) + } + if handledTaskNum == 0 { + ctx.Finished() + } + return nil +} + +func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task.Task) error) int { tasks, err := p.getMigrationTasks(taskType, taskStatus, p.config.TaskBatchSize) if err != nil { log.Errorf("failed to get [%s] with status %s, err: %v", taskType, taskStatus, err) - return + return 0 } if len(tasks) == 0 { - return + return 0 } log.Debugf("handling [%s] with status [%s], count: %d", taskType, taskStatus, len(tasks)) // refresh index after each batch @@ -151,7 +169,7 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string }() for i := range tasks { if ctx.IsCanceled() { - return + return 0 } taskItem := &tasks[i] err := p.handleTask(taskItem, taskHandler) @@ -167,7 +185,7 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string }, fmt.Sprintf("failed to handle task [%s]", taskItem.ID)) } } - return + return len(tasks) } func (p *DispatcherProcessor) handleTask(taskItem *task.Task, taskHandler func(taskItem *task.Task) error) error { diff --git a/plugin/task_manager/repeat.go b/plugin/task_manager/repeat.go index aa8c23aa..66931cb3 100644 --- a/plugin/task_manager/repeat.go +++ b/plugin/task_manager/repeat.go @@ -14,14 +14,14 @@ import ( "infini.sh/framework/core/util" ) -func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) { +func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) int { tasks, err := p.getPendingExecutionTasks(taskType, p.config.TaskBatchSize) if err != nil { log.Errorf("failed to get pending [%s] tasks, err: %v", taskType, err) - return + return 0 } if len(tasks) == 0 { - return + return 0 } log.Debugf("handling pending [%s] tasks, count: %d", taskType, len(tasks)) // refresh index after each batch @@ -30,7 +30,7 @@ func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskTy }() for i := range tasks { if ctx.IsCanceled() { - return + return 0 } taskItem := &tasks[i] err := p.handleTask(taskItem, p.handleRepeatingTask) @@ -46,7 +46,7 @@ func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskTy }, fmt.Sprintf("failed to handle task [%s]", taskItem.ID)) } } - return + return len(tasks) } func (p *DispatcherProcessor) getPendingExecutionTasks(taskType string, size int) ([]task.Task, error) {