Merge pull request 'set pipeline to finish state when there is no tasks handled' (#133) from fix_migration into master

This commit is contained in:
silenceqi 2023-07-21 10:50:03 +08:00
commit 99a211cde8
3 changed files with 57 additions and 32 deletions

View File

@ -2,6 +2,7 @@ package model
import (
"fmt"
"infini.sh/framework/core/task"
"time"
"infini.sh/framework/core/util"
@ -82,3 +83,9 @@ func (incremental *IndexIncremental) BuildFilter(current int64, step time.Durati
},
}, nil
}
type QueryTask struct {
Type string
Status []string
TaskHandler func(taskItem *task.Task) error
}

View File

@ -40,6 +40,7 @@ type DispatcherProcessor struct {
indexMigrationTaskProcessor migration_model.Processor
clusterComparisonTaskProcessor migration_model.Processor
indexComparisonTaskProcessor migration_model.Processor
queryTasks []migration_model.QueryTask
}
type DispatcherConfig struct {
@ -101,6 +102,25 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
processor.indexComparisonTaskProcessor = index_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
processor.clusterComparisonTaskProcessor = cluster_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
processor.queryTasks = []migration_model.QueryTask{
// handle pipeline task
{"pipeline", []string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, processor.pipelineTaskProcessor.Process},
// handle comparison tasks
{"cluster_comparison", []string{task.StatusPendingStop}, processor.clusterComparisonTaskProcessor.Process},
{"index_comparison", []string{task.StatusPendingStop}, processor.indexComparisonTaskProcessor.Process},
{"index_comparison", []string{task.StatusPendingStop}, processor.indexComparisonTaskProcessor.Process},
{"index_comparison", []string{task.StatusRunning}, processor.indexComparisonTaskProcessor.Process},
{"index_comparison", []string{task.StatusReady}, processor.indexComparisonTaskProcessor.Process},
{"cluster_comparison", []string{task.StatusRunning}, processor.clusterComparisonTaskProcessor.Process},
{"cluster_comparison", []string{task.StatusReady}, processor.clusterComparisonTaskProcessor.Process},
// handle migration tasks
{"cluster_migration", []string{task.StatusPendingStop}, processor.clusterMigrationTaskProcessor.Process},
{"index_migration", []string{task.StatusPendingStop}, processor.indexMigrationTaskProcessor.Process},
{"index_migration", []string{task.StatusRunning}, processor.indexMigrationTaskProcessor.Process},
{"index_migration", []string{task.StatusReady}, processor.indexMigrationTaskProcessor.Process},
{"cluster_migration", []string{task.StatusRunning}, processor.clusterMigrationTaskProcessor.Process},
{"cluster_migration", []string{task.StatusReady}, processor.clusterMigrationTaskProcessor.Process},
}
return &processor, nil
}
@ -109,40 +129,38 @@ func (p *DispatcherProcessor) Name() string {
return "migration_dispatcher"
}
func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
// handle repeating tasks
p.handleRepeatingTasks(ctx, "cluster_comparison")
p.handleRepeatingTasks(ctx, "cluster_migration")
var (
repeatingTaskTypes = []string{"cluster_comparison", "cluster_migration"}
)
// handle pipeline task
p.handleTasks(ctx, "pipeline", []string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, p.pipelineTaskProcessor.Process)
// handle comparison tasks
p.handleTasks(ctx, "cluster_comparison", []string{task.StatusPendingStop}, p.clusterComparisonTaskProcessor.Process)
p.handleTasks(ctx, "index_comparison", []string{task.StatusPendingStop}, p.indexComparisonTaskProcessor.Process)
p.handleTasks(ctx, "index_comparison", []string{task.StatusRunning}, p.indexComparisonTaskProcessor.Process)
p.handleTasks(ctx, "index_comparison", []string{task.StatusReady}, p.indexComparisonTaskProcessor.Process)
p.handleTasks(ctx, "cluster_comparison", []string{task.StatusRunning}, p.clusterComparisonTaskProcessor.Process)
p.handleTasks(ctx, "cluster_comparison", []string{task.StatusReady}, p.clusterComparisonTaskProcessor.Process)
// handle migration tasks
p.handleTasks(ctx, "cluster_migration", []string{task.StatusPendingStop}, p.clusterMigrationTaskProcessor.Process)
p.handleTasks(ctx, "index_migration", []string{task.StatusPendingStop}, p.indexMigrationTaskProcessor.Process)
p.handleTasks(ctx, "index_migration", []string{task.StatusRunning}, p.indexMigrationTaskProcessor.Process)
p.handleTasks(ctx, "index_migration", []string{task.StatusReady}, p.indexMigrationTaskProcessor.Process)
p.handleTasks(ctx, "cluster_migration", []string{task.StatusRunning}, p.clusterMigrationTaskProcessor.Process)
p.handleTasks(ctx, "cluster_migration", []string{task.StatusReady}, p.clusterMigrationTaskProcessor.Process)
func (p *DispatcherProcessor) getTasks() error {
return nil
}
func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task.Task) error) {
func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
var handledTaskNum int
// handle repeating tasks
for _, taskType := range repeatingTaskTypes {
handledTaskNum += p.handleRepeatingTasks(ctx, taskType)
}
for _, tsk := range p.queryTasks {
handledTaskNum += p.handleTasks(ctx, tsk.Type, tsk.Status, tsk.TaskHandler)
}
if handledTaskNum == 0 {
ctx.Finished()
}
return nil
}
func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task.Task) error) int {
tasks, err := p.getMigrationTasks(taskType, taskStatus, p.config.TaskBatchSize)
if err != nil {
log.Errorf("failed to get [%s] with status %s, err: %v", taskType, taskStatus, err)
return
return 0
}
if len(tasks) == 0 {
return
return 0
}
log.Debugf("handling [%s] with status [%s], count: %d", taskType, taskStatus, len(tasks))
// refresh index after each batch
@ -151,7 +169,7 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string
}()
for i := range tasks {
if ctx.IsCanceled() {
return
return 0
}
taskItem := &tasks[i]
err := p.handleTask(taskItem, taskHandler)
@ -167,7 +185,7 @@ func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string
}, fmt.Sprintf("failed to handle task [%s]", taskItem.ID))
}
}
return
return len(tasks)
}
func (p *DispatcherProcessor) handleTask(taskItem *task.Task, taskHandler func(taskItem *task.Task) error) error {

View File

@ -14,14 +14,14 @@ import (
"infini.sh/framework/core/util"
)
func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) {
func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) int {
tasks, err := p.getPendingExecutionTasks(taskType, p.config.TaskBatchSize)
if err != nil {
log.Errorf("failed to get pending [%s] tasks, err: %v", taskType, err)
return
return 0
}
if len(tasks) == 0 {
return
return 0
}
log.Debugf("handling pending [%s] tasks, count: %d", taskType, len(tasks))
// refresh index after each batch
@ -30,7 +30,7 @@ func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskTy
}()
for i := range tasks {
if ctx.IsCanceled() {
return
return 0
}
taskItem := &tasks[i]
err := p.handleTask(taskItem, p.handleRepeatingTask)
@ -46,7 +46,7 @@ func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskTy
}, fmt.Sprintf("failed to handle task [%s]", taskItem.ID))
}
}
return
return len(tasks)
}
func (p *DispatcherProcessor) getPendingExecutionTasks(taskType string, size int) ([]task.Task, error) {