[migration] move disk queue clean up to index_migration level

This commit is contained in:
Kassian Sun 2023-04-20 16:36:18 +08:00 committed by Gitea
parent ae9134afd5
commit c691bbc607
2 changed files with 44 additions and 32 deletions

View File

@ -320,6 +320,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
Success: true,
}, "empty index migration completed")
p.cleanGatewayQueue(taskItem)
p.decrInstanceJobs(instanceID)
return nil
}
@ -346,6 +347,8 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
Error: err.Error(),
}, "index scroll failed")
p.decrInstanceJobs(instanceID)
// clean disk queue if scroll failed
p.cleanGatewayQueue(taskItem)
return nil
}
@ -374,6 +377,8 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
Success: true,
}, "index migration completed")
}
// clean disk queue if bulk failed/completed
p.cleanGatewayQueue(taskItem)
p.decrInstanceJobs(instanceID)
return nil
@ -455,6 +460,8 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
taskItem.Status = task2.StatusStopped
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
// clean disk queue if manually stopped
p.cleanGatewayQueue(taskItem)
}
return nil
}
@ -646,6 +653,9 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
return nil
}
// try to clear disk queue before running es_scroll
p.cleanGatewayQueue(taskItem)
// update scroll task to ready
scrollTask.Metadata.Labels["execution_instance_id"] = instance.ID
scrollTask.Status = task2.StatusReady
@ -1275,3 +1285,25 @@ func (p *DispatcherProcessor) incrInstanceJobs(instanceID string) {
instanceState.Total = instanceState.Total + 1
p.state[instanceID] = instanceState
}
func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) {
var err error
instance := model.Instance{}
instanceID := taskItem.Metadata.Labels["execution_instance_id"]
instance.ID, _ = util.ExtractString(instanceID)
_, err = orm.Get(&instance)
if err != nil {
log.Errorf("failed to get instance, err: %v", err)
return
}
selector := util.MapStr{
"labels": util.MapStr{
"migration_task_id": taskItem.ID,
},
}
err = instance.DeleteQueueBySelector(selector)
if err != nil {
log.Errorf("failed to delete queue, err: %v", err)
}
}

View File

@ -48,18 +48,14 @@ func (p *processor) Process(t *task.Task) (err error) {
}
func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error {
cleanPipeline, cleanQueue := true, false
switch taskItem.Metadata.Labels["pipeline_id"] {
case "es_scroll":
// try to clear queue before running es_scroll
cleanQueue = true
case "bulk_indexing":
default:
return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"])
}
instance, err := p.cleanGatewayPipeline(taskItem, cleanPipeline, cleanQueue)
instance, err := p.cleanGatewayPipeline(taskItem)
if err != nil {
log.Errorf("failed to prepare instance before running pipeline, err: %v", err)
return nil
@ -143,8 +139,7 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error
Success: errMsg == "",
Error: errMsg,
}, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID))
// clean queue if scroll failed
p.cleanGatewayPipeline(taskItem, true, taskItem.Status == task.StatusError)
p.cleanGatewayPipeline(taskItem)
return nil
}
@ -177,8 +172,7 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e
Success: errMsg == "",
Error: errMsg,
}, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID))
// clean queue if bulk completed
p.cleanGatewayPipeline(taskItem, true, taskItem.Status == task.StatusComplete)
p.cleanGatewayPipeline(taskItem)
return nil
}
@ -201,12 +195,11 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error {
if stopped {
taskItem.Status = task.StatusStopped
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
// clean all stuffs if manually stopped
p.cleanGatewayPipeline(taskItem, true, true)
p.cleanGatewayPipeline(taskItem)
return nil
}
_, instance, err := p.getPipelineExecutionInstance(taskItem)
instance, err := p.getPipelineExecutionInstance(taskItem)
if err != nil {
log.Errorf("failed to get execution instance for task [%s], err: %v", taskItem.ID, err)
return nil
@ -219,34 +212,21 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error {
return nil
}
func (p *processor) cleanGatewayPipeline(taskItem *task.Task, pipeline, queue bool) (instance model.Instance, err error) {
parentTask, instance, err := p.getPipelineExecutionInstance(taskItem)
func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.Instance, err error) {
instance, err = p.getPipelineExecutionInstance(taskItem)
if err != nil {
return
}
if pipeline {
err = instance.DeletePipeline(taskItem.ID)
if err != nil {
log.Errorf("delete pipeline failed, err: %v", err)
}
err = instance.DeletePipeline(taskItem.ID)
if err != nil {
log.Errorf("delete pipeline failed, err: %v", err)
}
if queue {
selector := util.MapStr{
"labels": util.MapStr{
"migration_task_id": parentTask.ID,
},
}
err = instance.DeleteQueueBySelector(selector)
if err != nil {
log.Errorf("failed to delete queue, err: %v", err)
}
}
return instance, nil
}
func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (parentTask *task.Task, instance model.Instance, err error) {
parentTask, err = p.getParentTask(taskItem, "index_migration")
func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance model.Instance, err error) {
parentTask, err := p.getParentTask(taskItem, "index_migration")
if err != nil {
return
}