add diff result info and reset total_diff_docs after task restarted

This commit is contained in:
liugq 2023-10-08 11:02:39 +08:00
parent 97731564c1
commit 2f45792480
3 changed files with 20 additions and 0 deletions

View File

@ -60,6 +60,10 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error {
taskItem.StartTimeInMillis = time.Now().UnixMilli() taskItem.StartTimeInMillis = time.Now().UnixMilli()
} }
taskItem.Status = task.StatusRunning taskItem.Status = task.StatusRunning
taskItem.Metadata.Labels["total_diff_docs"] = 0
taskItem.Metadata.Labels["only_in_source"] = 0
taskItem.Metadata.Labels["only_in_target"] = 0
taskItem.Metadata.Labels["diff_both"] = 0
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
Success: true, Success: true,
}, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID)) }, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID))

View File

@ -83,6 +83,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.
taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren
if count == index.Source.Docs+index.Target.Docs { if count == index.Source.Docs+index.Target.Docs {
completedIndices++ completedIndices++
} }
@ -115,6 +116,7 @@ type ClusterComparisonTaskState struct {
TargetTotalDocs int64 TargetTotalDocs int64
TargetScrollDocs int64 TargetScrollDocs int64
TotalDiffDocs int64 TotalDiffDocs int64
RunningChildren int
} }
type ComparisonIndexStateInfo struct { type ComparisonIndexStateInfo struct {
@ -124,6 +126,7 @@ type ComparisonIndexStateInfo struct {
TargetTotalDocs int64 TargetTotalDocs int64
TargetScrollDocs int64 TargetScrollDocs int64
TotalDiffDocs int64 TotalDiffDocs int64
RunningChildren int
} }
// TODO: calc realtime info from instance // TODO: calc realtime info from instance
@ -188,6 +191,9 @@ func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats Cluste
if subTask.Status == task.StatusError { if subTask.Status == task.StatusError {
st.ErrorPartitions += 1 st.ErrorPartitions += 1
} }
if subTask.Status == task.StatusRunning {
st.RunningChildren++
}
indexState[indexName] = st indexState[indexName] = st
} }
@ -256,6 +262,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
subTaskLabels := util.MapStr(subTask.Metadata.Labels) subTaskLabels := util.MapStr(subTask.Metadata.Labels)
sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled") sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled")
targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled") targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled")
onlyInSource := migration_util.GetMapIntValue(subTaskLabels, "only_in_source")
onlyInTarget := migration_util.GetMapIntValue(subTaskLabels, "only_in_target")
diffBoth := migration_util.GetMapIntValue(subTaskLabels, "diff_both")
partitionTaskInfo := util.MapStr{ partitionTaskInfo := util.MapStr{
"task_id": subTask.ID, "task_id": subTask.ID,
@ -267,6 +276,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
"duration": durationInMS, "duration": durationInMS,
"source_total_docs": cfg.Source.DocCount, "source_total_docs": cfg.Source.DocCount,
"target_total_docs": cfg.Target.DocCount, "target_total_docs": cfg.Target.DocCount,
"only_in_source": onlyInSource,
"only_in_target": onlyInTarget,
"diff_both": diffBoth,
} }
sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg) sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg)
if sourceDumpTask != nil { if sourceDumpTask != nil {

View File

@ -362,6 +362,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 { if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 {
now := time.Now() now := time.Now()
taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth
taskItem.Metadata.Labels["only_in_source"] = onlyInSource
taskItem.Metadata.Labels["only_in_target"] = onlyInTarget
taskItem.Metadata.Labels["diff_both"] = diffBoth
taskItem.CompletedTime = &now taskItem.CompletedTime = &now
taskItem.Status = task.StatusError taskItem.Status = task.StatusError
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
@ -390,6 +393,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
now := time.Now() now := time.Now()
taskItem.CompletedTime = &now taskItem.CompletedTime = &now
taskItem.Status = task.StatusComplete taskItem.Status = task.StatusComplete
taskItem.Metadata.Labels["total_diff_docs"] = 0
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
Success: true, Success: true,
}, "index comparison completed") }, "index comparison completed")