From c691bbc607b788e1541abd699f72eb751b1296ee Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Thu, 20 Apr 2023 16:36:18 +0800 Subject: [PATCH] [migration] move disk queue clean up to index_migration level --- plugin/migration/pipeline.go | 32 ++++++++++++++ .../migration/pipeline_task/pipeline_task.go | 44 +++++-------------- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 3a88bc84..d575d0c0 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -320,6 +320,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ Success: true, }, "empty index migration completed") + p.cleanGatewayQueue(taskItem) p.decrInstanceJobs(instanceID) return nil } @@ -346,6 +347,8 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { Error: err.Error(), }, "index scroll failed") p.decrInstanceJobs(instanceID) + // clean disk queue if scroll failed + p.cleanGatewayQueue(taskItem) return nil } @@ -374,6 +377,8 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { Success: true, }, "index migration completed") } + // clean disk queue if bulk failed/completed + p.cleanGatewayQueue(taskItem) p.decrInstanceJobs(instanceID) return nil @@ -455,6 +460,8 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err taskItem.Status = task2.StatusStopped p.sendMajorTaskNotification(taskItem) p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) + // clean disk queue if manually stopped + p.cleanGatewayQueue(taskItem) } return nil } @@ -646,6 +653,9 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error return nil } + // try to clear disk queue before running es_scroll + p.cleanGatewayQueue(taskItem) + // update scroll task to ready scrollTask.Metadata.Labels["execution_instance_id"] = instance.ID scrollTask.Status = task2.StatusReady @@ -1275,3 +1285,25 @@ func (p *DispatcherProcessor) incrInstanceJobs(instanceID string) { instanceState.Total = instanceState.Total + 1 p.state[instanceID] = instanceState } + +func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) { + var err error + instance := model.Instance{} + instanceID := taskItem.Metadata.Labels["execution_instance_id"] + instance.ID, _ = util.ExtractString(instanceID) + _, err = orm.Get(&instance) + if err != nil { + log.Errorf("failed to get instance, err: %v", err) + return + } + + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + err = instance.DeleteQueueBySelector(selector) + if err != nil { + log.Errorf("failed to delete queue, err: %v", err) + } +} diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index f00948da..c34d084f 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -48,18 +48,14 @@ func (p *processor) Process(t *task.Task) (err error) { } func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error { - cleanPipeline, cleanQueue := true, false - switch taskItem.Metadata.Labels["pipeline_id"] { case "es_scroll": - // try to clear queue before running es_scroll - cleanQueue = true case "bulk_indexing": default: return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"]) } - instance, err := p.cleanGatewayPipeline(taskItem, cleanPipeline, cleanQueue) + instance, err := p.cleanGatewayPipeline(taskItem) if err != nil { log.Errorf("failed to prepare instance before running pipeline, err: %v", err) return nil @@ -143,8 +139,7 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error Success: errMsg == "", Error: errMsg, }, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID)) - // clean queue if scroll failed - p.cleanGatewayPipeline(taskItem, true, taskItem.Status == task.StatusError) + p.cleanGatewayPipeline(taskItem) return nil } @@ -177,8 +172,7 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e Success: errMsg == "", Error: errMsg, }, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID)) - // clean queue if bulk completed - p.cleanGatewayPipeline(taskItem, true, taskItem.Status == task.StatusComplete) + p.cleanGatewayPipeline(taskItem) return nil } @@ -201,12 +195,11 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { if stopped { taskItem.Status = task.StatusStopped p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) - // clean all stuffs if manually stopped - p.cleanGatewayPipeline(taskItem, true, true) + p.cleanGatewayPipeline(taskItem) return nil } - _, instance, err := p.getPipelineExecutionInstance(taskItem) + instance, err := p.getPipelineExecutionInstance(taskItem) if err != nil { log.Errorf("failed to get execution instance for task [%s], err: %v", taskItem.ID, err) return nil @@ -219,34 +212,21 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { return nil } -func (p *processor) cleanGatewayPipeline(taskItem *task.Task, pipeline, queue bool) (instance model.Instance, err error) { - parentTask, instance, err := p.getPipelineExecutionInstance(taskItem) +func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.Instance, err error) { + instance, err = p.getPipelineExecutionInstance(taskItem) if err != nil { return } - if pipeline { - err = instance.DeletePipeline(taskItem.ID) - if err != nil { - log.Errorf("delete pipeline failed, err: %v", err) - } + err = instance.DeletePipeline(taskItem.ID) + if err != nil { + log.Errorf("delete pipeline failed, err: %v", err) } - if queue { - selector := util.MapStr{ - "labels": util.MapStr{ - "migration_task_id": parentTask.ID, - }, - } - err = instance.DeleteQueueBySelector(selector) - if err != nil { - log.Errorf("failed to delete queue, err: %v", err) - } - } return instance, nil } -func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (parentTask *task.Task, instance model.Instance, err error) { - parentTask, err = p.getParentTask(taskItem, "index_migration") +func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance model.Instance, err error) { + parentTask, err := p.getParentTask(taskItem, "index_migration") if err != nil { return }