set pipeline to finish state when there is no tasks handled

This commit is contained in:
liugq 2023-07-20 18:48:28 +08:00
parent 503b201eb2
commit 4801650cd5
3 changed files with 57 additions and 32 deletions

View File

@ -2,6 +2,7 @@ package model
import ( import (
"fmt" "fmt"
"infini.sh/framework/core/task"
"time" "time"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
@ -82,3 +83,9 @@ func (incremental *IndexIncremental) BuildFilter(current int64, step time.Durati
}, },
}, nil }, nil
} }
type QueryTask struct {
Type string
Status []string
TaskHandler func(taskItem *task.Task) error
}

View File

@ -40,6 +40,7 @@ type DispatcherProcessor struct {
indexMigrationTaskProcessor migration_model.Processor indexMigrationTaskProcessor migration_model.Processor
clusterComparisonTaskProcessor migration_model.Processor clusterComparisonTaskProcessor migration_model.Processor
indexComparisonTaskProcessor migration_model.Processor indexComparisonTaskProcessor migration_model.Processor
queryTasks []migration_model.QueryTask
} }
type DispatcherConfig struct { type DispatcherConfig struct {
@ -101,6 +102,25 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
processor.indexComparisonTaskProcessor = index_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.indexComparisonTaskProcessor = index_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
processor.clusterComparisonTaskProcessor = cluster_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.clusterComparisonTaskProcessor = cluster_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
processor.queryTasks = []migration_model.QueryTask{
// handle pipeline task
{"pipeline", []string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, processor.pipelineTaskProcessor.Process},
// handle comparison tasks
{"cluster_comparison", []string{task.StatusPendingStop}, processor.clusterComparisonTaskProcessor.Process},
{"index_comparison", []string{task.StatusPendingStop}, processor.indexComparisonTaskProcessor.Process},
{"index_comparison", []string{task.StatusPendingStop}, processor.indexComparisonTaskProcessor.Process},
{"index_comparison", []string{task.StatusRunning}, processor.indexComparisonTaskProcessor.Process},
{"index_comparison", []string{task.StatusReady}, processor.indexComparisonTaskProcessor.Process},
{"cluster_comparison", []string{task.StatusRunning}, processor.clusterComparisonTaskProcessor.Process},
{"cluster_comparison", []string{task.StatusReady}, processor.clusterComparisonTaskProcessor.Process},
// handle migration tasks
{"cluster_migration", []string{task.StatusPendingStop}, processor.clusterMigrationTaskProcessor.Process},
{"index_migration", []string{task.StatusPendingStop}, processor.indexMigrationTaskProcessor.Process},
{"index_migration", []string{task.StatusRunning}, processor.indexMigrationTaskProcessor.Process},
{"index_migration", []string{task.StatusReady}, processor.indexMigrationTaskProcessor.Process},
{"cluster_migration", []string{task.StatusRunning}, processor.clusterMigrationTaskProcessor.Process},
{"cluster_migration", []string{task.StatusReady}, processor.clusterMigrationTaskProcessor.Process},
}
return &processor, nil return &processor, nil
} }
@ -109,40 +129,38 @@ func (p *DispatcherProcessor) Name() string {
return "migration_dispatcher" return "migration_dispatcher"
} }
func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { var (
// handle repeating tasks repeatingTaskTypes = []string{"cluster_comparison", "cluster_migration"}
p.handleRepeatingTasks(ctx, "cluster_comparison") )
p.handleRepeatingTasks(ctx, "cluster_migration")
// handle pipeline task func (p *DispatcherProcessor) getTasks() error {
p.handleTasks(ctx, "pipeline", []string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, p.pipelineTaskProcessor.Process)
// handle comparison tasks
p.handleTasks(ctx, "cluster_comparison", []string{task.StatusPendingStop}, p.clusterComparisonTaskProcessor.Process)
p.handleTasks(ctx, "index_comparison", []string{task.StatusPendingStop}, p.indexComparisonTaskProcessor.Process)
p.handleTasks(ctx, "index_comparison", []string{task.StatusRunning}, p.indexComparisonTaskProcessor.Process)
p.handleTasks(ctx, "index_comparison", []string{task.StatusReady}, p.indexComparisonTaskProcessor.Process)
p.handleTasks(ctx, "cluster_comparison", []string{task.StatusRunning}, p.clusterComparisonTaskProcessor.Process)
p.handleTasks(ctx, "cluster_comparison", []string{task.StatusReady}, p.clusterComparisonTaskProcessor.Process)
// handle migration tasks
p.handleTasks(ctx, "cluster_migration", []string{task.StatusPendingStop}, p.clusterMigrationTaskProcessor.Process)
p.handleTasks(ctx, "index_migration", []string{task.StatusPendingStop}, p.indexMigrationTaskProcessor.Process)
p.handleTasks(ctx, "index_migration", []string{task.StatusRunning}, p.indexMigrationTaskProcessor.Process)
p.handleTasks(ctx, "index_migration", []string{task.StatusReady}, p.indexMigrationTaskProcessor.Process)
p.handleTasks(ctx, "cluster_migration", []string{task.StatusRunning}, p.clusterMigrationTaskProcessor.Process)
p.handleTasks(ctx, "cluster_migration", []string{task.StatusReady}, p.clusterMigrationTaskProcessor.Process)
return nil return nil
} }
func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task.Task) error) { func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
var handledTaskNum int
// handle repeating tasks
for _, taskType := range repeatingTaskTypes {
handledTaskNum += p.handleRepeatingTasks(ctx, taskType)
}
for _, tsk := range p.queryTasks {
handledTaskNum += p.handleTasks(ctx, tsk.Type, tsk.Status, tsk.TaskHandler)
}
if handledTaskNum == 0 {
ctx.Finished()
}
return nil
}
func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task.Task) error) int {
tasks, err := p.getMigrationTasks(taskType, taskStatus, p.config.TaskBatchSize) tasks, err := p.getMigrationTasks(taskType, taskStatus, p.config.TaskBatchSize)
if err != nil { if err != nil {
log.Errorf("failed to get [%s] with status %s, err: %v", taskType, taskStatus, err) log.Errorf("failed to get [%s] with status %s, err: %v", taskType, taskStatus, err)
return return 0
} }
if len(tasks) == 0 { if len(tasks) == 0 {
return return 0
} }
log.Debugf("handling [%s] with status [%s], count: %d", taskType, taskStatus, len(tasks)) log.Debugf("handling [%s] with status [%s], count: %d", taskType, taskStatus, len(tasks))
// refresh index after each batch // refresh index after each batch
@ -151,7 +169,7 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string
}() }()
for i := range tasks { for i := range tasks {
if ctx.IsCanceled() { if ctx.IsCanceled() {
return return 0
} }
taskItem := &tasks[i] taskItem := &tasks[i]
err := p.handleTask(taskItem, taskHandler) err := p.handleTask(taskItem, taskHandler)
@ -167,7 +185,7 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string
}, fmt.Sprintf("failed to handle task [%s]", taskItem.ID)) }, fmt.Sprintf("failed to handle task [%s]", taskItem.ID))
} }
} }
return return len(tasks)
} }
func (p *DispatcherProcessor) handleTask(taskItem *task.Task, taskHandler func(taskItem *task.Task) error) error { func (p *DispatcherProcessor) handleTask(taskItem *task.Task, taskHandler func(taskItem *task.Task) error) error {

View File

@ -14,14 +14,14 @@ import (
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
) )
func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) { func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) int {
tasks, err := p.getPendingExecutionTasks(taskType, p.config.TaskBatchSize) tasks, err := p.getPendingExecutionTasks(taskType, p.config.TaskBatchSize)
if err != nil { if err != nil {
log.Errorf("failed to get pending [%s] tasks, err: %v", taskType, err) log.Errorf("failed to get pending [%s] tasks, err: %v", taskType, err)
return return 0
} }
if len(tasks) == 0 { if len(tasks) == 0 {
return return 0
} }
log.Debugf("handling pending [%s] tasks, count: %d", taskType, len(tasks)) log.Debugf("handling pending [%s] tasks, count: %d", taskType, len(tasks))
// refresh index after each batch // refresh index after each batch
@ -30,7 +30,7 @@ func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskTy
}() }()
for i := range tasks { for i := range tasks {
if ctx.IsCanceled() { if ctx.IsCanceled() {
return return 0
} }
taskItem := &tasks[i] taskItem := &tasks[i]
err := p.handleTask(taskItem, p.handleRepeatingTask) err := p.handleTask(taskItem, p.handleRepeatingTask)
@ -46,7 +46,7 @@ func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskTy
}, fmt.Sprintf("failed to handle task [%s]", taskItem.ID)) }, fmt.Sprintf("failed to handle task [%s]", taskItem.ID))
} }
} }
return return len(tasks)
} }
func (p *DispatcherProcessor) getPendingExecutionTasks(taskType string, size int) ([]task.Task, error) { func (p *DispatcherProcessor) getPendingExecutionTasks(taskType string, size int) ([]task.Task, error) {