[migration] batch task by task type

This commit is contained in:
Kassian Sun 2023-04-21 18:21:01 +08:00
parent 492bd7dd66
commit 651c86feff
1 changed files with 67 additions and 58 deletions

View File

@ -111,68 +111,70 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
if ctx.IsCanceled() {
return nil
}
tasks, err := p.getMigrationTasks(p.config.TaskBatchSize)
if err != nil {
log.Errorf("failed to get migration tasks, err: %v", err)
return err
}
if len(tasks) == 0 {
return nil
}
for _, t := range tasks {
if ctx.IsCanceled() {
for _, taskType := range []string{"cluster_migration", "index_migration", "pipeline"} {
tasks, err := p.getMigrationTasks(taskType, p.config.TaskBatchSize)
if err != nil {
log.Errorf("failed to get migration tasks, err: %v", err)
return err
}
if len(tasks) == 0 {
return nil
}
if t.Metadata.Labels == nil {
log.Errorf("got migration task [%s] with empty labels, skip handling", t.ID)
continue
}
log.Debugf("start handling task [%s] (type: %s, status: %s)", t.ID, t.Metadata.Type, t.Status)
switch t.Metadata.Type {
case "cluster_migration":
// handle major task
switch t.Status {
case task2.StatusReady:
err = p.handleReadyMajorTask(&t)
case task2.StatusRunning:
err = p.handleRunningMajorTask(&t)
case task2.StatusPendingStop:
err = p.handlePendingStopMajorTask(&t)
for _, t := range tasks {
if ctx.IsCanceled() {
return nil
}
if t.Metadata.Labels == nil {
log.Errorf("got migration task [%s] with empty labels, skip handling", t.ID)
continue
}
log.Debugf("start handling task [%s] (type: %s, status: %s)", t.ID, t.Metadata.Type, t.Status)
switch t.Metadata.Type {
case "cluster_migration":
// handle major task
switch t.Status {
case task2.StatusReady:
err = p.handleReadyMajorTask(&t)
case task2.StatusRunning:
err = p.handleRunningMajorTask(&t)
case task2.StatusPendingStop:
err = p.handlePendingStopMajorTask(&t)
}
if err != nil {
log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err)
}
case "index_migration":
// handle sub migration task
switch t.Status {
case task2.StatusReady:
// split sub task
err = p.handleReadySubTask(&t)
case task2.StatusRunning:
// check pipeline tasks status
err = p.handleRunningSubTask(&t)
case task2.StatusPendingStop:
// mark pipeline tasks as pending_stop
err = p.handlePendingStopSubTask(&t)
}
if err != nil {
log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err)
}
case "pipeline":
// handle pipeline task
err = p.pipelineTaskProcessor.Process(&t)
if err != nil {
log.Errorf("failed to handling pipeline task [%s]: [%v]", t.ID, err)
}
}
if err != nil {
log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err)
t.Status = task2.StatusError
tn := time.Now()
t.CompletedTime = &tn
p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{
Success: false,
Error: err.Error(),
}, fmt.Sprintf("error handling task [%s]", t.ID))
}
case "index_migration":
// handle sub migration task
switch t.Status {
case task2.StatusReady:
// split sub task
err = p.handleReadySubTask(&t)
case task2.StatusRunning:
// check pipeline tasks status
err = p.handleRunningSubTask(&t)
case task2.StatusPendingStop:
// mark pipeline tasks as pending_stop
err = p.handlePendingStopSubTask(&t)
}
if err != nil {
log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err)
}
case "pipeline":
// handle pipeline task
err = p.pipelineTaskProcessor.Process(&t)
if err != nil {
log.Errorf("failed to handling pipeline task [%s]: [%v]", t.ID, err)
}
}
if err != nil {
t.Status = task2.StatusError
tn := time.Now()
t.CompletedTime = &tn
p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{
Success: false,
Error: err.Error(),
}, fmt.Sprintf("error handling task [%s]", t.ID))
}
}
//es index refresh
@ -741,7 +743,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc
return
}
func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) {
func (p *DispatcherProcessor) getMigrationTasks(taskType string, size int) ([]task2.Task, error) {
queryDsl := util.MapStr{
"size": size,
"sort": []util.MapStr{
@ -759,6 +761,13 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error)
"status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop},
},
},
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": taskType,
},
},
},
},
},
},