From ae9134afd5065ea5a319e8ebef767969e6c8df60 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Thu, 20 Apr 2023 15:30:53 +0800 Subject: [PATCH] [migration] fail cluster migration if docs count unmatch --- plugin/migration/pipeline.go | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 4801dcb1..3a88bc84 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -268,14 +268,35 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error if err != nil { return err } - if ts.Status == task2.StatusComplete || ts.Status == task2.StatusError { - taskItem.Metadata.Labels["target_total_docs"] = ts.IndexDocs - taskItem.Status = ts.Status - tn := time.Now() - taskItem.CompletedTime = &tn - p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] finished with status [%s]", taskItem.ID, ts.Status)) + if !(ts.Status == task2.StatusComplete || ts.Status == task2.StatusError) { + return nil } + + totalDocs := migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "source_total_docs") + var errMsg string + if ts.Status == task2.StatusError { + errMsg = "index migration(s) failed" + } + + if errMsg == "" { + if totalDocs != ts.IndexDocs { + errMsg = fmt.Sprintf("cluster migration completed but docs count unmatch: %d / %d", ts.IndexDocs, totalDocs) + } + } + + if errMsg == "" { + taskItem.Status = task2.StatusComplete + } else { + taskItem.Status = task2.StatusError + } + taskItem.Metadata.Labels["target_total_docs"] = ts.IndexDocs + tn := time.Now() + taskItem.CompletedTime = &tn + p.sendMajorTaskNotification(taskItem) + p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + Success: errMsg == "", + Error: errMsg, + }, fmt.Sprintf("major task [%s] finished with status [%s]", taskItem.ID, taskItem.Status)) return nil }