diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 2a56380e..d5495230 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -385,10 +385,13 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { } func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Task, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) { - // NOTE: old-version pipeline tasks has empty status - if scrollTask.Status == task2.StatusError || scrollTask.Status == "" { + if scrollTask.Status == task2.StatusError { return true, 0, errors.New("scroll pipeline failed") } + // NOTE: old-version pipeline tasks has empty status + if scrollTask.Status == "" { + return true, 0, errors.New("task was started by an old-version console, need to manually restart it") + } // scroll not finished yet if scrollTask.Status != task2.StatusComplete { @@ -408,9 +411,13 @@ func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Ta } func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) { - if bulkTask.Status == task2.StatusError || bulkTask.Status == "" { + if bulkTask.Status == task2.StatusError { return true, 0, errors.New("bulk pipeline failed") } + // NOTE: old-version pipeline tasks has empty status + if bulkTask.Status == "" { + return true, 0, errors.New("task was started by an old-version console, need to manually restart it") + } // start bulk as needed if bulkTask.Status == task2.StatusInit {