[migration] delete ready1 state

This commit is contained in:
Kassian Sun 2023-04-19 11:22:02 +08:00 committed by Gitea
parent 7472052d90
commit 527c3c9e46
2 changed files with 11 additions and 14 deletions

View File

@ -148,9 +148,6 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
case task2.StatusReady:
// split sub task
err = p.handleReadySubTask(&t)
case task2.StatusReady1:
// update pipeline tasks to ready
err = p.handleReady1SubTask(&t)
case task2.StatusRunning:
// check pipeline tasks status
err = p.handleRunningSubTask(&t)
@ -399,13 +396,13 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
if taskItem.Metadata.Labels["is_split"] == true {
taskItem.Status = task2.StatusReady1
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
Success: true,
}, fmt.Sprintf("sub task [%s] splitted, skip to ready1", taskItem.ID))
return nil
return p.handleScheduleSubTask(taskItem)
}
return p.handleSplitSubTask(taskItem)
}
func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
//split task to scroll/bulk_indexing pipeline and then persistent
var pids []string
pids = append(pids, taskItem.ParentId...)
@ -555,7 +552,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
}
taskItem.Metadata.Labels["is_split"] = true
taskItem.Status = task2.StatusReady1
taskItem.Status = task2.StatusReady
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
Success: true,
@ -563,7 +560,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
return nil
}
func (p *DispatcherProcessor) handleReady1SubTask(taskItem *task2.Task) error {
func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error {
scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem)
if err != nil {
log.Errorf("failed to get pipeline tasks, err: %v", err)
@ -678,7 +675,7 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error)
"must": []util.MapStr{
{
"terms": util.MapStr{
"status": []string{task2.StatusReady, task2.StatusReady1, task2.StatusRunning, task2.StatusPendingStop},
"status": []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop},
},
},
},
@ -1124,7 +1121,7 @@ func (p *DispatcherProcessor) updatePendingChildTasksToPendingStop(taskItem *tas
},
{
"terms": util.MapStr{
"status": []string{task2.StatusRunning, task2.StatusReady, task2.StatusReady1},
"status": []string{task2.StatusRunning, task2.StatusReady},
},
},
{
@ -1173,7 +1170,7 @@ func (p *DispatcherProcessor) getPendingChildTasks(taskItem *task2.Task, taskTyp
},
{
"terms": util.MapStr{
"status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady, task2.StatusReady1},
"status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady},
},
},
},

View File

@ -35,7 +35,7 @@ func WriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string)
})
}
var runningTaskStatus = []string{task.StatusRunning, task.StatusReady, task.StatusReady1}
var runningTaskStatus = []string{task.StatusRunning, task.StatusReady}
func IsRunningState(status string) bool {
return util.StringInArray(runningTaskStatus, status)