[migration] remove default sleep, schedule by task type & status

This commit is contained in:
Kassian Sun 2023-04-24 12:27:45 +08:00
parent 5f8ab9859b
commit 06554054af
2 changed files with 100 additions and 116 deletions

View File

@ -107,91 +107,69 @@ func (p *DispatcherProcessor) Name() string {
}
func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
for {
if ctx.IsCanceled() {
return nil
}
for _, taskType := range []string{"cluster_migration", "index_migration", "pipeline"} {
tasks, err := p.getMigrationTasks(taskType, p.config.TaskBatchSize)
if err != nil {
log.Errorf("failed to get migration tasks, err: %v", err)
return err
}
if len(tasks) == 0 {
return nil
}
for _, t := range tasks {
if ctx.IsCanceled() {
return nil
}
if t.Metadata.Labels == nil {
log.Errorf("got migration task [%s] with empty labels, skip handling", t.ID)
continue
}
log.Debugf("start handling task [%s] (type: %s, status: %s)", t.ID, t.Metadata.Type, t.Status)
switch t.Metadata.Type {
case "cluster_migration":
// handle major task
switch t.Status {
case task2.StatusReady:
err = p.handleReadyMajorTask(&t)
case task2.StatusRunning:
err = p.handleRunningMajorTask(&t)
case task2.StatusPendingStop:
err = p.handlePendingStopMajorTask(&t)
}
if err != nil {
log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err)
}
case "index_migration":
// handle sub migration task
switch t.Status {
case task2.StatusReady:
// split sub task
err = p.handleReadySubTask(&t)
case task2.StatusRunning:
// check pipeline tasks status
err = p.handleRunningSubTask(&t)
case task2.StatusPendingStop:
// mark pipeline tasks as pending_stop
err = p.handlePendingStopSubTask(&t)
}
if err != nil {
log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err)
}
case "pipeline":
// handle pipeline task
err = p.pipelineTaskProcessor.Process(&t)
if err != nil {
log.Errorf("failed to handling pipeline task [%s]: [%v]", t.ID, err)
}
}
if err != nil {
t.Status = task2.StatusError
tn := time.Now()
t.CompletedTime = &tn
p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{
Success: false,
Error: err.Error(),
}, fmt.Sprintf("error handling task [%s]", t.ID))
}
}
}
//es index refresh
time.Sleep(time.Millisecond * 1200)
}
// handle pipeline task
p.handleTasks(ctx, "pipeline", []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, p.pipelineTaskProcessor.Process)
// mark index_migrations as pending_stop
p.handleTasks(ctx, "cluster_migration", []string{task2.StatusPendingStop}, p.handlePendingStopMajorTask)
// mark pipeline tasks as pending_stop
p.handleTasks(ctx, "index_migration", []string{task2.StatusPendingStop}, p.handlePendingStopSubTask)
// check pipeline tasks status
p.handleTasks(ctx, "index_migration", []string{task2.StatusRunning}, p.handleRunningSubTask)
// split & schedule pipline tasks
p.handleTasks(ctx, "index_migration", []string{task2.StatusReady}, p.handleReadySubTask)
// check index_migration tasks status
p.handleTasks(ctx, "cluster_migration", []string{task2.StatusRunning}, p.handleRunningMajorTask)
// split & schedule index_migration tasks
p.handleTasks(ctx, "cluster_migration", []string{task2.StatusReady}, p.handleReadyMajorTask)
return nil
}
func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error {
if taskItem.Metadata.Labels["is_split"] != true {
err := p.splitMajorMigrationTask(taskItem)
if err != nil {
return err
func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task2.Task) error) {
tasks, err := p.getMigrationTasks(taskType, taskStatus, p.config.TaskBatchSize)
if err != nil {
log.Errorf("failed to get [%s] with status %s, err: %v", taskType, taskStatus, err)
return
}
if len(tasks) == 0 {
return
}
log.Debugf("handling [%s] with status [%s], count: %d", taskType, taskStatus, len(tasks))
// refresh index after each batch
defer func() {
p.refreshTask()
}()
for i := range tasks {
if ctx.IsCanceled() {
return
}
taskItem.Metadata.Labels["is_split"] = true
} else {
taskItem.RetryTimes++
taskItem := &tasks[i]
err := p.handleTask(taskItem, taskHandler)
if err != nil {
log.Errorf("failed to handle task [%s]: [%v]", taskItem.ID, err)
taskItem.Status = task2.StatusError
tn := time.Now()
taskItem.CompletedTime = &tn
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: false,
Error: err.Error(),
}, fmt.Sprintf("failed to handle task [%s]", taskItem.ID))
}
}
return
}
func (p *DispatcherProcessor) handleTask(taskItem *task2.Task, taskHandler func(taskItem *task2.Task) error) error {
if taskItem.Metadata.Labels == nil {
log.Errorf("got migration task [%s] with empty labels, skip handling", taskItem.ID)
return errors.New("missing labels")
}
return taskHandler(taskItem)
}
func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error {
if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok {
return p.splitMajorMigrationTask(taskItem)
}
//update status of subtask to ready
query := util.MapStr{
@ -226,21 +204,18 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error {
},
}
// saved is_split if the following steps failed
defer func() {
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
Success: true,
}, fmt.Sprintf("task [%s] started", taskItem.ID))
}()
esClient := elastic.GetClient(p.config.Elasticsearch)
_, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl))
if err != nil {
log.Errorf("failed to update sub task status, err: %v", err)
return nil
}
taskItem.RetryTimes++
taskItem.Status = task2.StatusRunning
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: true,
}, fmt.Sprintf("major task [%s] started", taskItem.ID))
p.sendMajorTaskNotification(taskItem)
return nil
}
@ -261,12 +236,13 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e
if len(tasks) == 0 {
taskItem.Status = task2.StatusStopped
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "wait_for", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
// NOTE: we don't know how many running index_migration's stopped, so do a refresh from ES
p.refreshInstanceJobsFromES()
}
return nil
}
func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error {
ts, err := p.getMajorTaskState(taskItem)
if err != nil {
@ -297,7 +273,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error
tn := time.Now()
taskItem.CompletedTime = &tn
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: errMsg == "",
Error: errMsg,
}, fmt.Sprintf("major task [%s] finished with status [%s]", taskItem.ID, taskItem.Status))
@ -321,7 +297,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
now := time.Now()
taskItem.CompletedTime = &now
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: true,
}, "empty index migration completed")
p.cleanGatewayQueue(taskItem)
@ -346,7 +322,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
now := time.Now()
taskItem.CompletedTime = &now
taskItem.Status = task2.StatusError
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: false,
Error: err.Error(),
}, "index scroll failed")
@ -358,7 +334,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
if migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "scrolled_docs") == 0 {
taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs
p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "")
p.saveTaskAndWriteLog(taskItem, nil, "")
}
bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, totalDocs)
@ -371,13 +347,13 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
taskItem.Metadata.Labels["index_docs"] = successDocs
if err != nil {
taskItem.Status = task2.StatusError
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: false,
Error: err.Error(),
}, "index bulk failed")
} else {
taskItem.Status = task2.StatusComplete
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: true,
}, "index migration completed")
}
@ -426,7 +402,7 @@ func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task,
// start bulk as needed
if bulkTask.Status == task2.StatusInit {
bulkTask.Status = task2.StatusReady
p.saveTaskAndWriteLog(bulkTask, "", &task2.TaskResult{
p.saveTaskAndWriteLog(bulkTask, &task2.TaskResult{
Success: true,
}, fmt.Sprintf("scroll completed, bulk pipeline started"))
return false, 0, nil
@ -469,7 +445,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
// all subtask stopped or error or complete
if len(tasks) == 0 {
taskItem.Status = task2.StatusStopped
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID))
p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID))
// clean disk queue if manually stopped
p.cleanGatewayQueue(taskItem)
}
@ -477,11 +453,11 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
}
func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
if taskItem.Metadata.Labels["is_split"] == true {
return p.handleScheduleSubTask(taskItem)
if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok {
return p.handleSplitSubTask(taskItem)
}
return p.handleSplitSubTask(taskItem)
return p.handleScheduleSubTask(taskItem)
}
func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
@ -636,7 +612,7 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
taskItem.Metadata.Labels["is_split"] = true
taskItem.Status = task2.StatusReady
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: true,
}, fmt.Sprintf("task [%s] splitted", taskItem.ID))
return nil
@ -690,7 +666,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
taskItem.Status = task2.StatusRunning
taskItem.StartTimeInMillis = time.Now().UnixMilli()
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: true,
}, fmt.Sprintf("task [%s] started", taskItem.ID))
// update dispatcher state
@ -746,7 +722,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc
return
}
func (p *DispatcherProcessor) getMigrationTasks(taskType string, size int) ([]task2.Task, error) {
func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []string, size int) ([]task2.Task, error) {
queryDsl := util.MapStr{
"size": size,
"sort": []util.MapStr{
@ -761,7 +737,7 @@ func (p *DispatcherProcessor) getMigrationTasks(taskType string, size int) ([]ta
"must": []util.MapStr{
{
"terms": util.MapStr{
"status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop},
"status": taskStatus,
},
},
{
@ -778,9 +754,9 @@ func (p *DispatcherProcessor) getMigrationTasks(taskType string, size int) ([]ta
return p.getTasks(queryDsl)
}
func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh string, taskResult *task2.TaskResult, message string) {
func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) {
esClient := elastic.GetClient(p.config.Elasticsearch)
_, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh)
_, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, "")
if err != nil {
log.Errorf("failed to update task, err: %v", err)
}
@ -789,11 +765,15 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh
}
}
func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error {
if taskItem.Metadata.Labels["is_split"] == true {
return nil
func (p *DispatcherProcessor) refreshTask() {
esClient := elastic.GetClient(p.config.Elasticsearch)
err := esClient.Refresh(p.config.IndexName)
if err != nil {
log.Errorf("failed to refresh state, err: %v", err)
}
}
func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error {
clusterMigrationTask := migration_model.ClusterMigrationTaskConfig{}
err := migration_util.GetTaskConfig(taskItem, &clusterMigrationTask)
if err != nil {
@ -981,6 +961,10 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
}
}
}
taskItem.Metadata.Labels["is_split"] = true
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
Success: true,
}, fmt.Sprintf("major task [%s] splitted", taskItem.ID))
return nil
}

View File

@ -90,7 +90,7 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error {
taskItem.Status = task.StatusRunning
taskItem.StartTimeInMillis = time.Now().UnixMilli()
p.saveTaskAndWriteLog(taskItem, "wait_for", &task.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
Success: true,
}, fmt.Sprintf("pipeline task [%s] started", taskItem.ID))
@ -135,7 +135,7 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error
taskItem.Status = task.StatusComplete
}
p.saveTaskAndWriteLog(taskItem, "", &task.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
Success: errMsg == "",
Error: errMsg,
}, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID))
@ -168,7 +168,7 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e
taskItem.Status = task.StatusComplete
}
p.saveTaskAndWriteLog(taskItem, "", &task.TaskResult{
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
Success: errMsg == "",
Error: errMsg,
}, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID))
@ -194,7 +194,7 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error {
if stopped {
taskItem.Status = task.StatusStopped
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
p.cleanGatewayPipeline(taskItem)
return nil
}
@ -407,9 +407,9 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]ela
return res.Hits.Hits, nil
}
func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, refresh string, taskResult *task.TaskResult, message string) {
func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) {
esClient := elastic.GetClient(p.Elasticsearch)
_, err := esClient.Index(p.IndexName, "", taskItem.ID, taskItem, refresh)
_, err := esClient.Index(p.IndexName, "", taskItem.ID, taskItem, "")
if err != nil {
log.Errorf("failed to update task, err: %v", err)
}