[migration] fail cluster migration if docs count unmatch

This commit is contained in:
Kassian Sun 2023-04-20 15:30:53 +08:00 committed by Gitea
parent 57e1fb0844
commit ae9134afd5
1 changed files with 28 additions and 7 deletions

View File

@ -268,14 +268,35 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error
if err != nil { if err != nil {
return err return err
} }
if ts.Status == task2.StatusComplete || ts.Status == task2.StatusError { if !(ts.Status == task2.StatusComplete || ts.Status == task2.StatusError) {
taskItem.Metadata.Labels["target_total_docs"] = ts.IndexDocs return nil
taskItem.Status = ts.Status
tn := time.Now()
taskItem.CompletedTime = &tn
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] finished with status [%s]", taskItem.ID, ts.Status))
} }
totalDocs := migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "source_total_docs")
var errMsg string
if ts.Status == task2.StatusError {
errMsg = "index migration(s) failed"
}
if errMsg == "" {
if totalDocs != ts.IndexDocs {
errMsg = fmt.Sprintf("cluster migration completed but docs count unmatch: %d / %d", ts.IndexDocs, totalDocs)
}
}
if errMsg == "" {
taskItem.Status = task2.StatusComplete
} else {
taskItem.Status = task2.StatusError
}
taskItem.Metadata.Labels["target_total_docs"] = ts.IndexDocs
tn := time.Now()
taskItem.CompletedTime = &tn
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
Success: errMsg == "",
Error: errMsg,
}, fmt.Sprintf("major task [%s] finished with status [%s]", taskItem.ID, taskItem.Status))
return nil return nil
} }