From 527c3c9e461af4fafbf9b6d789d9a55e92d2e1ef Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Wed, 19 Apr 2023 11:22:02 +0800 Subject: [PATCH] [migration] delete ready1 state --- plugin/migration/pipeline.go | 23 ++++++++++------------- plugin/migration/util/util.go | 2 +- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index bf69eb26..464c3491 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -148,9 +148,6 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { case task2.StatusReady: // split sub task err = p.handleReadySubTask(&t) - case task2.StatusReady1: - // update pipeline tasks to ready - err = p.handleReady1SubTask(&t) case task2.StatusRunning: // check pipeline tasks status err = p.handleRunningSubTask(&t) @@ -399,13 +396,13 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { if taskItem.Metadata.Labels["is_split"] == true { - taskItem.Status = task2.StatusReady1 - p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ - Success: true, - }, fmt.Sprintf("sub task [%s] splitted, skip to ready1", taskItem.ID)) - return nil + return p.handleScheduleSubTask(taskItem) } + return p.handleSplitSubTask(taskItem) +} + +func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error { //split task to scroll/bulk_indexing pipeline and then persistent var pids []string pids = append(pids, taskItem.ParentId...) @@ -555,7 +552,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { } taskItem.Metadata.Labels["is_split"] = true - taskItem.Status = task2.StatusReady1 + taskItem.Status = task2.StatusReady p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ Success: true, @@ -563,7 +560,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { return nil } -func (p *DispatcherProcessor) handleReady1SubTask(taskItem *task2.Task) error { +func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error { scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem) if err != nil { log.Errorf("failed to get pipeline tasks, err: %v", err) @@ -678,7 +675,7 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) "must": []util.MapStr{ { "terms": util.MapStr{ - "status": []string{task2.StatusReady, task2.StatusReady1, task2.StatusRunning, task2.StatusPendingStop}, + "status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, }, }, }, @@ -1124,7 +1121,7 @@ func (p *DispatcherProcessor) updatePendingChildTasksToPendingStop(taskItem *tas }, { "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusReady, task2.StatusReady1}, + "status": []string{task2.StatusRunning, task2.StatusReady}, }, }, { @@ -1173,7 +1170,7 @@ func (p *DispatcherProcessor) getPendingChildTasks(taskItem *task2.Task, taskTyp }, { "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady, task2.StatusReady1}, + "status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady}, }, }, }, diff --git a/plugin/migration/util/util.go b/plugin/migration/util/util.go index aa984ebb..46c34811 100644 --- a/plugin/migration/util/util.go +++ b/plugin/migration/util/util.go @@ -35,7 +35,7 @@ func WriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) }) } -var runningTaskStatus = []string{task.StatusRunning, task.StatusReady, task.StatusReady1} +var runningTaskStatus = []string{task.StatusRunning, task.StatusReady} func IsRunningState(status string) bool { return util.StringInArray(runningTaskStatus, status)