diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index cd31abd0..b76b0564 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -291,8 +291,8 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { if totalDocs == 0 { taskItem.Status = task2.StatusComplete - taskItem.Metadata.Labels["index_docs"] = 0 taskItem.Metadata.Labels["scrolled_docs"] = 0 + taskItem.Metadata.Labels["index_docs"] = 0 now := time.Now() taskItem.CompletedTime = &now @@ -308,29 +308,82 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { log.Errorf("failed to get pipeline tasks, err: %v", err) return nil } - if scrollTask == nil || bulkTask == nil { return errors.New("scroll/bulk pipeline task missing") } + + scrolled, scrolledDocs, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) + if !scrolled { + return nil + } + if err != nil { + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Status = task2.StatusError + p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + Success: false, + Error: err.Error(), + }, "index scroll failed") + p.decrInstanceJobs(instanceID) + return nil + } + + if migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "scrolled_docs") == 0 { + taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs + p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "") + } + + bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, totalDocs) + if !bulked { + return nil + } + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs + taskItem.Metadata.Labels["index_docs"] = successDocs + if err != nil { + taskItem.Status = task2.StatusError + p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + Success: false, + Error: err.Error(), + }, "index bulk failed") + } else { + taskItem.Status = task2.StatusComplete + p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + Success: true, + }, "index migration completed") + } + p.decrInstanceJobs(instanceID) + return nil + +} + +func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Task, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) { // NOTE: old-version pipeline tasks has empty status if scrollTask.Status == task2.StatusError || scrollTask.Status == "" { - return fmt.Errorf("scroll pipeline failed") - } - if bulkTask.Status == task2.StatusError || bulkTask.Status == "" { - return errors.New("bulk pipeline failed") + return false, 0, errors.New("scroll pipeline failed") } // scroll not finished yet if scrollTask.Status != task2.StatusComplete { - return nil + return false, 0, nil } var ( scrollLabels = util.MapStr(scrollTask.Metadata.Labels) - scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs") ) + scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs") + if scrolledDocs != totalDocs { - return fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs) + return true, scrolledDocs, fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs) + } + + return true, scrolledDocs, nil +} + +func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) { + if bulkTask.Status == task2.StatusError || bulkTask.Status == "" { + return false, 0, errors.New("bulk pipeline failed") } // start bulk as needed @@ -339,37 +392,28 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { p.saveTaskAndWriteLog(bulkTask, "", &task2.TaskResult{ Success: true, }, fmt.Sprintf("scroll completed, bulk pipeline started")) - return nil + return false, 0, nil } + // bulk not finished yet if bulkTask.Status != task2.StatusComplete { - return nil + return false, 0, nil } var ( bulkLabels = util.MapStr(bulkTask.Metadata.Labels) - successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs") invalidDocs = migration_util.GetMapStringValue(bulkLabels, "invalid_docs") invalidReasons = migration_util.GetMapStringValue(bulkLabels, "invalid_reasons") failureDocs = migration_util.GetMapStringValue(bulkLabels, "failure_docs") failureReasons = migration_util.GetMapStringValue(bulkLabels, "failure_reasons") ) + successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs") + if successDocs != totalDocs { - return fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons) + return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons) } - taskItem.Status = task2.StatusComplete - taskItem.Metadata.Labels["index_docs"] = successDocs - taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs - now := time.Now() - taskItem.CompletedTime = &now - - p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ - Success: true, - }, "index migration completed") - p.decrInstanceJobs(instanceID) - - return nil + return true, successDocs, nil } func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) error { diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index 2f54beb5..f00948da 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -83,6 +83,15 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error { return err } + // TODO: find a better way to handle this + taskItem.Metadata.Labels["index_docs"] = 0 + taskItem.Metadata.Labels["success_docs"] = 0 + taskItem.Metadata.Labels["invalid_docs"] = "" + taskItem.Metadata.Labels["invalid_reasons"] = "" + taskItem.Metadata.Labels["failure_docs"] = "" + taskItem.Metadata.Labels["failure_reasons"] = "" + taskItem.Metadata.Labels["scrolled_docs"] = 0 + taskItem.Status = task.StatusRunning taskItem.StartTimeInMillis = time.Now().UnixMilli() p.saveTaskAndWriteLog(taskItem, "wait_for", &task.TaskResult{