From 06554054af30715c86837aff64ebb6dabf3082ef Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Mon, 24 Apr 2023 12:27:45 +0800 Subject: [PATCH] [migration] remove default sleep, schedule by task type & status --- plugin/migration/pipeline.go | 204 ++++++++---------- .../migration/pipeline_task/pipeline_task.go | 12 +- 2 files changed, 100 insertions(+), 116 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 9ce2c9c5..ce765b80 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -107,91 +107,69 @@ func (p *DispatcherProcessor) Name() string { } func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { - for { - if ctx.IsCanceled() { - return nil - } - for _, taskType := range []string{"cluster_migration", "index_migration", "pipeline"} { - tasks, err := p.getMigrationTasks(taskType, p.config.TaskBatchSize) - if err != nil { - log.Errorf("failed to get migration tasks, err: %v", err) - return err - } - if len(tasks) == 0 { - return nil - } - for _, t := range tasks { - if ctx.IsCanceled() { - return nil - } - if t.Metadata.Labels == nil { - log.Errorf("got migration task [%s] with empty labels, skip handling", t.ID) - continue - } - log.Debugf("start handling task [%s] (type: %s, status: %s)", t.ID, t.Metadata.Type, t.Status) - switch t.Metadata.Type { - case "cluster_migration": - // handle major task - switch t.Status { - case task2.StatusReady: - err = p.handleReadyMajorTask(&t) - case task2.StatusRunning: - err = p.handleRunningMajorTask(&t) - case task2.StatusPendingStop: - err = p.handlePendingStopMajorTask(&t) - } - if err != nil { - log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err) - } - case "index_migration": - // handle sub migration task - switch t.Status { - case task2.StatusReady: - // split sub task - err = p.handleReadySubTask(&t) - case task2.StatusRunning: - // check pipeline tasks status - err = p.handleRunningSubTask(&t) - case task2.StatusPendingStop: - // mark pipeline tasks as pending_stop - err = p.handlePendingStopSubTask(&t) - } - if err != nil { - log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err) - } - case "pipeline": - // handle pipeline task - err = p.pipelineTaskProcessor.Process(&t) - if err != nil { - log.Errorf("failed to handling pipeline task [%s]: [%v]", t.ID, err) - } - } - if err != nil { - t.Status = task2.StatusError - tn := time.Now() - t.CompletedTime = &tn - p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{ - Success: false, - Error: err.Error(), - }, fmt.Sprintf("error handling task [%s]", t.ID)) - } - } - } - //es index refresh - time.Sleep(time.Millisecond * 1200) - } + // handle pipeline task + p.handleTasks(ctx, "pipeline", []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, p.pipelineTaskProcessor.Process) + // mark index_migrations as pending_stop + p.handleTasks(ctx, "cluster_migration", []string{task2.StatusPendingStop}, p.handlePendingStopMajorTask) + // mark pipeline tasks as pending_stop + p.handleTasks(ctx, "index_migration", []string{task2.StatusPendingStop}, p.handlePendingStopSubTask) + // check pipeline tasks status + p.handleTasks(ctx, "index_migration", []string{task2.StatusRunning}, p.handleRunningSubTask) + // split & schedule pipline tasks + p.handleTasks(ctx, "index_migration", []string{task2.StatusReady}, p.handleReadySubTask) + // check index_migration tasks status + p.handleTasks(ctx, "cluster_migration", []string{task2.StatusRunning}, p.handleRunningMajorTask) + // split & schedule index_migration tasks + p.handleTasks(ctx, "cluster_migration", []string{task2.StatusReady}, p.handleReadyMajorTask) return nil } -func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { - if taskItem.Metadata.Labels["is_split"] != true { - err := p.splitMajorMigrationTask(taskItem) - if err != nil { - return err +func (p *DispatcherProcessor) handleTasks(ctx *pipeline.Context, taskType string, taskStatus []string, taskHandler func(taskItem *task2.Task) error) { + tasks, err := p.getMigrationTasks(taskType, taskStatus, p.config.TaskBatchSize) + if err != nil { + log.Errorf("failed to get [%s] with status %s, err: %v", taskType, taskStatus, err) + return + } + if len(tasks) == 0 { + return + } + log.Debugf("handling [%s] with status [%s], count: %d", taskType, taskStatus, len(tasks)) + // refresh index after each batch + defer func() { + p.refreshTask() + }() + for i := range tasks { + if ctx.IsCanceled() { + return } - taskItem.Metadata.Labels["is_split"] = true - } else { - taskItem.RetryTimes++ + taskItem := &tasks[i] + err := p.handleTask(taskItem, taskHandler) + if err != nil { + log.Errorf("failed to handle task [%s]: [%v]", taskItem.ID, err) + + taskItem.Status = task2.StatusError + tn := time.Now() + taskItem.CompletedTime = &tn + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ + Success: false, + Error: err.Error(), + }, fmt.Sprintf("failed to handle task [%s]", taskItem.ID)) + } + } + return +} + +func (p *DispatcherProcessor) handleTask(taskItem *task2.Task, taskHandler func(taskItem *task2.Task) error) error { + if taskItem.Metadata.Labels == nil { + log.Errorf("got migration task [%s] with empty labels, skip handling", taskItem.ID) + return errors.New("missing labels") + } + return taskHandler(taskItem) +} + +func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { + if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { + return p.splitMajorMigrationTask(taskItem) } //update status of subtask to ready query := util.MapStr{ @@ -226,21 +204,18 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { }, } - // saved is_split if the following steps failed - defer func() { - p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ - Success: true, - }, fmt.Sprintf("task [%s] started", taskItem.ID)) - }() - esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl)) if err != nil { log.Errorf("failed to update sub task status, err: %v", err) return nil } + taskItem.RetryTimes++ taskItem.Status = task2.StatusRunning + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("major task [%s] started", taskItem.ID)) + p.sendMajorTaskNotification(taskItem) return nil } @@ -261,12 +236,13 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e if len(tasks) == 0 { taskItem.Status = task2.StatusStopped p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, "wait_for", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) + p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) // NOTE: we don't know how many running index_migration's stopped, so do a refresh from ES p.refreshInstanceJobsFromES() } return nil } + func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error { ts, err := p.getMajorTaskState(taskItem) if err != nil { @@ -297,7 +273,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error tn := time.Now() taskItem.CompletedTime = &tn p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ Success: errMsg == "", Error: errMsg, }, fmt.Sprintf("major task [%s] finished with status [%s]", taskItem.ID, taskItem.Status)) @@ -321,7 +297,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { now := time.Now() taskItem.CompletedTime = &now - p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ Success: true, }, "empty index migration completed") p.cleanGatewayQueue(taskItem) @@ -346,7 +322,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { now := time.Now() taskItem.CompletedTime = &now taskItem.Status = task2.StatusError - p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ Success: false, Error: err.Error(), }, "index scroll failed") @@ -358,7 +334,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { if migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "scrolled_docs") == 0 { taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs - p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "") + p.saveTaskAndWriteLog(taskItem, nil, "") } bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, totalDocs) @@ -371,13 +347,13 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { taskItem.Metadata.Labels["index_docs"] = successDocs if err != nil { taskItem.Status = task2.StatusError - p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ Success: false, Error: err.Error(), }, "index bulk failed") } else { taskItem.Status = task2.StatusComplete - p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ Success: true, }, "index migration completed") } @@ -426,7 +402,7 @@ func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, // start bulk as needed if bulkTask.Status == task2.StatusInit { bulkTask.Status = task2.StatusReady - p.saveTaskAndWriteLog(bulkTask, "", &task2.TaskResult{ + p.saveTaskAndWriteLog(bulkTask, &task2.TaskResult{ Success: true, }, fmt.Sprintf("scroll completed, bulk pipeline started")) return false, 0, nil @@ -469,7 +445,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err // all subtask stopped or error or complete if len(tasks) == 0 { taskItem.Status = task2.StatusStopped - p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID)) + p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID)) // clean disk queue if manually stopped p.cleanGatewayQueue(taskItem) } @@ -477,11 +453,11 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err } func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { - if taskItem.Metadata.Labels["is_split"] == true { - return p.handleScheduleSubTask(taskItem) + if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { + return p.handleSplitSubTask(taskItem) } - return p.handleSplitSubTask(taskItem) + return p.handleScheduleSubTask(taskItem) } func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error { @@ -636,7 +612,7 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error { taskItem.Metadata.Labels["is_split"] = true taskItem.Status = task2.StatusReady - p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ Success: true, }, fmt.Sprintf("task [%s] splitted", taskItem.ID)) return nil @@ -690,7 +666,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error taskItem.Status = task2.StatusRunning taskItem.StartTimeInMillis = time.Now().UnixMilli() - p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ Success: true, }, fmt.Sprintf("task [%s] started", taskItem.ID)) // update dispatcher state @@ -746,7 +722,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc return } -func (p *DispatcherProcessor) getMigrationTasks(taskType string, size int) ([]task2.Task, error) { +func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []string, size int) ([]task2.Task, error) { queryDsl := util.MapStr{ "size": size, "sort": []util.MapStr{ @@ -761,7 +737,7 @@ func (p *DispatcherProcessor) getMigrationTasks(taskType string, size int) ([]ta "must": []util.MapStr{ { "terms": util.MapStr{ - "status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, + "status": taskStatus, }, }, { @@ -778,9 +754,9 @@ func (p *DispatcherProcessor) getMigrationTasks(taskType string, size int) ([]ta return p.getTasks(queryDsl) } -func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh string, taskResult *task2.TaskResult, message string) { +func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) { esClient := elastic.GetClient(p.config.Elasticsearch) - _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh) + _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, "") if err != nil { log.Errorf("failed to update task, err: %v", err) } @@ -789,11 +765,15 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh } } -func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error { - if taskItem.Metadata.Labels["is_split"] == true { - return nil +func (p *DispatcherProcessor) refreshTask() { + esClient := elastic.GetClient(p.config.Elasticsearch) + err := esClient.Refresh(p.config.IndexName) + if err != nil { + log.Errorf("failed to refresh state, err: %v", err) } +} +func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) error { clusterMigrationTask := migration_model.ClusterMigrationTaskConfig{} err := migration_util.GetTaskConfig(taskItem, &clusterMigrationTask) if err != nil { @@ -981,6 +961,10 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro } } } + taskItem.Metadata.Labels["is_split"] = true + p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("major task [%s] splitted", taskItem.ID)) return nil } diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index 8e579bff..ffa1c3ea 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -90,7 +90,7 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error { taskItem.Status = task.StatusRunning taskItem.StartTimeInMillis = time.Now().UnixMilli() - p.saveTaskAndWriteLog(taskItem, "wait_for", &task.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, fmt.Sprintf("pipeline task [%s] started", taskItem.ID)) @@ -135,7 +135,7 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error taskItem.Status = task.StatusComplete } - p.saveTaskAndWriteLog(taskItem, "", &task.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: errMsg == "", Error: errMsg, }, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID)) @@ -168,7 +168,7 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e taskItem.Status = task.StatusComplete } - p.saveTaskAndWriteLog(taskItem, "", &task.TaskResult{ + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: errMsg == "", Error: errMsg, }, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID)) @@ -194,7 +194,7 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { if stopped { taskItem.Status = task.StatusStopped - p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) + p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) p.cleanGatewayPipeline(taskItem) return nil } @@ -407,9 +407,9 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]ela return res.Hits.Hits, nil } -func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, refresh string, taskResult *task.TaskResult, message string) { +func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) { esClient := elastic.GetClient(p.Elasticsearch) - _, err := esClient.Index(p.IndexName, "", taskItem.ID, taskItem, refresh) + _, err := esClient.Index(p.IndexName, "", taskItem.ID, taskItem, "") if err != nil { log.Errorf("failed to update task, err: %v", err) }