[migration] optimize scroll/bulk error handling and status resetting
This commit is contained in:
parent
c89c8616b1
commit
7d675ba3d1
|
@ -291,8 +291,8 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
|
|||
|
||||
if totalDocs == 0 {
|
||||
taskItem.Status = task2.StatusComplete
|
||||
taskItem.Metadata.Labels["index_docs"] = 0
|
||||
taskItem.Metadata.Labels["scrolled_docs"] = 0
|
||||
taskItem.Metadata.Labels["index_docs"] = 0
|
||||
now := time.Now()
|
||||
taskItem.CompletedTime = &now
|
||||
|
||||
|
@ -308,29 +308,82 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
|
|||
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if scrollTask == nil || bulkTask == nil {
|
||||
return errors.New("scroll/bulk pipeline task missing")
|
||||
}
|
||||
|
||||
scrolled, scrolledDocs, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs)
|
||||
if !scrolled {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
now := time.Now()
|
||||
taskItem.CompletedTime = &now
|
||||
taskItem.Status = task2.StatusError
|
||||
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
|
||||
Success: false,
|
||||
Error: err.Error(),
|
||||
}, "index scroll failed")
|
||||
p.decrInstanceJobs(instanceID)
|
||||
return nil
|
||||
}
|
||||
|
||||
if migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "scrolled_docs") == 0 {
|
||||
taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs
|
||||
p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "")
|
||||
}
|
||||
|
||||
bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, totalDocs)
|
||||
if !bulked {
|
||||
return nil
|
||||
}
|
||||
now := time.Now()
|
||||
taskItem.CompletedTime = &now
|
||||
taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs
|
||||
taskItem.Metadata.Labels["index_docs"] = successDocs
|
||||
if err != nil {
|
||||
taskItem.Status = task2.StatusError
|
||||
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
|
||||
Success: false,
|
||||
Error: err.Error(),
|
||||
}, "index bulk failed")
|
||||
} else {
|
||||
taskItem.Status = task2.StatusComplete
|
||||
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
|
||||
Success: true,
|
||||
}, "index migration completed")
|
||||
}
|
||||
p.decrInstanceJobs(instanceID)
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Task, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) {
|
||||
// NOTE: old-version pipeline tasks has empty status
|
||||
if scrollTask.Status == task2.StatusError || scrollTask.Status == "" {
|
||||
return fmt.Errorf("scroll pipeline failed")
|
||||
}
|
||||
if bulkTask.Status == task2.StatusError || bulkTask.Status == "" {
|
||||
return errors.New("bulk pipeline failed")
|
||||
return false, 0, errors.New("scroll pipeline failed")
|
||||
}
|
||||
|
||||
// scroll not finished yet
|
||||
if scrollTask.Status != task2.StatusComplete {
|
||||
return nil
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
var (
|
||||
scrollLabels = util.MapStr(scrollTask.Metadata.Labels)
|
||||
scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs")
|
||||
)
|
||||
scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs")
|
||||
|
||||
if scrolledDocs != totalDocs {
|
||||
return fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs)
|
||||
return true, scrolledDocs, fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs)
|
||||
}
|
||||
|
||||
return true, scrolledDocs, nil
|
||||
}
|
||||
|
||||
func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) {
|
||||
if bulkTask.Status == task2.StatusError || bulkTask.Status == "" {
|
||||
return false, 0, errors.New("bulk pipeline failed")
|
||||
}
|
||||
|
||||
// start bulk as needed
|
||||
|
@ -339,37 +392,28 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
|
|||
p.saveTaskAndWriteLog(bulkTask, "", &task2.TaskResult{
|
||||
Success: true,
|
||||
}, fmt.Sprintf("scroll completed, bulk pipeline started"))
|
||||
return nil
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
// bulk not finished yet
|
||||
if bulkTask.Status != task2.StatusComplete {
|
||||
return nil
|
||||
return false, 0, nil
|
||||
}
|
||||
|
||||
var (
|
||||
bulkLabels = util.MapStr(bulkTask.Metadata.Labels)
|
||||
successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs")
|
||||
invalidDocs = migration_util.GetMapStringValue(bulkLabels, "invalid_docs")
|
||||
invalidReasons = migration_util.GetMapStringValue(bulkLabels, "invalid_reasons")
|
||||
failureDocs = migration_util.GetMapStringValue(bulkLabels, "failure_docs")
|
||||
failureReasons = migration_util.GetMapStringValue(bulkLabels, "failure_reasons")
|
||||
)
|
||||
successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs")
|
||||
|
||||
if successDocs != totalDocs {
|
||||
return fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons)
|
||||
return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons)
|
||||
}
|
||||
|
||||
taskItem.Status = task2.StatusComplete
|
||||
taskItem.Metadata.Labels["index_docs"] = successDocs
|
||||
taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs
|
||||
now := time.Now()
|
||||
taskItem.CompletedTime = &now
|
||||
|
||||
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
|
||||
Success: true,
|
||||
}, "index migration completed")
|
||||
p.decrInstanceJobs(instanceID)
|
||||
|
||||
return nil
|
||||
return true, successDocs, nil
|
||||
}
|
||||
|
||||
func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) error {
|
||||
|
|
|
@ -83,6 +83,15 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// TODO: find a better way to handle this
|
||||
taskItem.Metadata.Labels["index_docs"] = 0
|
||||
taskItem.Metadata.Labels["success_docs"] = 0
|
||||
taskItem.Metadata.Labels["invalid_docs"] = ""
|
||||
taskItem.Metadata.Labels["invalid_reasons"] = ""
|
||||
taskItem.Metadata.Labels["failure_docs"] = ""
|
||||
taskItem.Metadata.Labels["failure_reasons"] = ""
|
||||
taskItem.Metadata.Labels["scrolled_docs"] = 0
|
||||
|
||||
taskItem.Status = task.StatusRunning
|
||||
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
||||
p.saveTaskAndWriteLog(taskItem, "wait_for", &task.TaskResult{
|
||||
|
|
Loading…
Reference in New Issue