refactor getComparisonMajorTaskInfo

This commit is contained in:
liugq 2023-10-08 17:46:55 +08:00
parent 2f45792480
commit 71ba08d266
2 changed files with 119 additions and 54 deletions

View File

@ -74,7 +74,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.
var completedIndices int var completedIndices int
for i, index := range taskConfig.Indices { for i, index := range taskConfig.Indices {
indexName := index.Source.GetUniqueIndexName() indexName := index.Source.GetUniqueIndexName()
count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs count := indexState[indexName].TotalScrollDocs
percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100 percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100
if percent > 100 { if percent > 100 {
percent = 100 percent = 100
@ -82,6 +82,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.
taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs
taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
taskConfig.Indices[i].TotalScrollDocs = count
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren
if count == index.Source.Docs+index.Target.Docs { if count == index.Source.Docs+index.Target.Docs {
@ -127,76 +128,139 @@ type ComparisonIndexStateInfo struct {
TargetScrollDocs int64 TargetScrollDocs int64
TotalDiffDocs int64 TotalDiffDocs int64
RunningChildren int RunningChildren int
TotalScrollDocs int64
} }
// TODO: calc realtime info from instance
func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) { func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) {
var pipelineTaskIDs = map[string][]string{}
var pipelineIndexNames = map[string]string{}
indexState = map[string]ComparisonIndexStateInfo{} indexState = map[string]ComparisonIndexStateInfo{}
const size = 500
taskQuery := util.MapStr{ var (
"size": 500, from = -size
"query": util.MapStr{ hasMore = true
"bool": util.MapStr{ )
"must": []util.MapStr{ for hasMore {
{ from += size
"term": util.MapStr{ taskQuery := util.MapStr{
"parent_id": util.MapStr{ "size": 500,
"value": taskID, "query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": taskID,
},
}, },
}, },
}, {
{ "term": util.MapStr{
"term": util.MapStr{ "metadata.type": util.MapStr{
"metadata.type": util.MapStr{ "value": "index_comparison",
"value": "index_comparison", },
}, },
}, },
}, },
}, },
}, },
},
}
subTasks, err := migration_util.GetTasks(taskQuery)
if err != nil {
return taskStats, indexState, err
}
for _, subTask := range subTasks {
taskLabels := util.MapStr(subTask.Metadata.Labels)
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
if indexName == "" {
continue
} }
subTasks, err := migration_util.GetTasks(taskQuery)
cfg := migration_model.IndexComparisonTaskConfig{}
err = migration_util.GetTaskConfig(&subTask, &cfg)
if err != nil { if err != nil {
log.Errorf("failed to get task config, err: %v", err) return taskStats, indexState, err
}
if len(subTasks) < size {
hasMore = false
}
var indexMigrationTaskIDs []string
for _, subTask := range subTasks {
taskLabels := util.MapStr(subTask.Metadata.Labels)
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
if indexName == "" {
continue
}
cfg := migration_model.IndexComparisonTaskConfig{}
err = migration_util.GetTaskConfig(&subTask, &cfg)
if err != nil {
log.Errorf("failed to get task config, err: %v", err)
continue
}
totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs")
taskStats.SourceTotalDocs += cfg.Source.DocCount
taskStats.TargetTotalDocs += cfg.Target.DocCount
taskStats.TotalDiffDocs += totalDiffDocs
st := indexState[indexName]
st.SourceTotalDocs += cfg.Source.DocCount
st.TargetTotalDocs += cfg.Target.DocCount
st.TotalDiffDocs += totalDiffDocs
if subTask.Status == task.StatusError {
st.ErrorPartitions += 1
}
if subTask.Status == task.StatusRunning {
st.RunningChildren++
indexState[indexName] = st
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
continue
}
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled")
st.SourceScrollDocs += sourceDocs
st.TargetScrollDocs += targetDocs
st.TotalScrollDocs += sourceDocs + targetDocs
taskStats.TargetScrollDocs += targetDocs
taskStats.SourceScrollDocs += sourceDocs
indexState[indexName] = st
}
if len(indexMigrationTaskIDs) == 0 {
continue continue
} }
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") taskQuery = util.MapStr{
totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") "size": len(indexMigrationTaskIDs) * 2,
taskStats.SourceTotalDocs += cfg.Source.DocCount "query": util.MapStr{
taskStats.SourceScrollDocs += sourceDocs "bool": util.MapStr{
taskStats.TargetTotalDocs += cfg.Target.DocCount "must": []util.MapStr{
taskStats.TargetScrollDocs += targetDocs {
taskStats.TotalDiffDocs += totalDiffDocs "terms": util.MapStr{
"parent_id": indexMigrationTaskIDs,
},
},
},
},
},
}
subTasks, err = migration_util.GetTasks(taskQuery)
if err != nil {
return taskStats, indexState, err
}
for _, subTask := range subTasks {
taskLabels := util.MapStr(subTask.Metadata.Labels)
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
if indexName == "" {
continue
}
pipelineIndexNames[subTask.ID] = indexName
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
}
}
}
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
for pipelineID, pipelineContext := range pipelineContexts {
// add scrolledDocs of running tasks
scrollDocs := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs")
indexName := pipelineIndexNames[pipelineID]
st := indexState[indexName] st := indexState[indexName]
st.SourceTotalDocs += cfg.Source.DocCount st.TotalScrollDocs += scrollDocs
st.SourceScrollDocs += sourceDocs
st.TargetTotalDocs += cfg.Target.DocCount
st.TargetScrollDocs += targetDocs
st.TotalDiffDocs += totalDiffDocs
if subTask.Status == task.StatusError {
st.ErrorPartitions += 1
}
if subTask.Status == task.StatusRunning {
st.RunningChildren++
}
indexState[indexName] = st indexState[indexName] = st
} }
return taskStats, indexState, nil return taskStats, indexState, nil
} }

View File

@ -31,7 +31,8 @@ type ClusterComparisonIndexConfig struct {
Partition *IndexPartition `json:"partition,omitempty"` Partition *IndexPartition `json:"partition,omitempty"`
// only used in API // only used in API
ScrollPercent float64 `json:"scroll_percent,omitempty"` ScrollPercent float64 `json:"scroll_percent,omitempty"`
TotalScrollDocs int64 `json:"total_scroll_docs,omitempty"`
ErrorPartitions int `json:"error_partitions,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"`
RunningChildren int `json:"running_children,omitempty"` RunningChildren int `json:"running_children,omitempty"`
} }