fix calc migration task progress info with a large number of sub task

This commit is contained in:
liugq 2023-08-14 10:41:17 +08:00
parent c8aab5c566
commit ef36af329f
1 changed files with 123 additions and 104 deletions

View File

@ -76,7 +76,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
for i, index := range taskConfig.Indices {
indexName := index.Source.GetUniqueIndexName()
count := indexState[indexName].IndexDocs
sourceDocs := indexState[indexName].SourceDocs
sourceDocs := index.Source.Docs
var percent float64
if sourceDocs <= 0 {
percent = 100
@ -86,7 +86,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
percent = 100
}
}
taskConfig.Indices[i].Source.Docs = sourceDocs
//taskConfig.Indices[i].Source.Docs = sourceDocs
taskConfig.Indices[i].Target.Docs = count
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
@ -257,114 +257,133 @@ We count data from two sources:
- realtime bulk indexing info is only available for running index_migrations
*/
func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) {
taskQuery := util.MapStr{
"size": 500,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": id,
},
},
},
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": "index_migration",
},
},
},
},
},
},
}
subTasks, err := migration_util.GetTasks(taskQuery)
if err != nil {
return taskStats, indexState, err
}
var indexMigrationTaskIDs []string
indexState = map[string]MigrationIndexStateInfo{}
for _, subTask := range subTasks {
taskLabels := util.MapStr(subTask.Metadata.Labels)
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
if indexName == "" {
continue
}
cfg := migration_model.IndexMigrationTaskConfig{}
err = migration_util.GetTaskConfig(&subTask, &cfg)
if err != nil {
log.Errorf("failed to get task config, err: %v", err)
continue
}
taskStats.SourceDocs += cfg.Source.DocCount
st := indexState[indexName]
st.SourceDocs += cfg.Source.DocCount
indexState[indexName] = st
if subTask.Status == task.StatusRunning {
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
continue
}
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
taskStats.IndexDocs += indexDocs
st.IndexDocs += indexDocs
if subTask.Status == task.StatusError {
st.ErrorPartitions += 1
taskStats.ErrorPartitions += 1
}
indexState[indexName] = st
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
}
if len(indexMigrationTaskIDs) == 0 {
return taskStats, indexState, nil
}
taskQuery = util.MapStr{
"size": 500,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"parent_id": indexMigrationTaskIDs,
},
},
{
"term": util.MapStr{
"metadata.labels.pipeline_id": util.MapStr{
"value": "bulk_indexing",
},
},
},
},
},
},
}
subTasks, err = migration_util.GetTasks(taskQuery)
if err != nil {
return taskStats, indexState, err
}
var pipelineTaskIDs = map[string][]string{}
var pipelineIndexNames = map[string]string{}
for _, subTask := range subTasks {
taskLabels := util.MapStr(subTask.Metadata.Labels)
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
if indexName == "" {
indexState = map[string]MigrationIndexStateInfo{}
const size = 500
var (
from = -size
hasMore = true
)
for hasMore {
from += size
taskQuery := util.MapStr{
"from": from,
"size": size,
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "asc",
},
},
},
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": id,
},
},
},
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": "index_migration",
},
},
},
},
},
},
}
subTasks, err := migration_util.GetTasks(taskQuery)
if err != nil {
return taskStats, indexState, err
}
if len(subTasks) < size {
hasMore = false
}
var indexMigrationTaskIDs []string
for _, subTask := range subTasks {
taskLabels := util.MapStr(subTask.Metadata.Labels)
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
if indexName == "" {
continue
}
cfg := migration_model.IndexMigrationTaskConfig{}
err = migration_util.GetTaskConfig(&subTask, &cfg)
if err != nil {
log.Errorf("failed to get task config, err: %v", err)
continue
}
taskStats.SourceDocs += cfg.Source.DocCount
st := indexState[indexName]
st.SourceDocs += cfg.Source.DocCount
indexState[indexName] = st
if subTask.Status == task.StatusRunning {
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
continue
}
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
taskStats.IndexDocs += indexDocs
st.IndexDocs += indexDocs
if subTask.Status == task.StatusError {
st.ErrorPartitions += 1
taskStats.ErrorPartitions += 1
}
indexState[indexName] = st
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
}
if len(indexMigrationTaskIDs) == 0 {
continue
}
pipelineIndexNames[subTask.ID] = indexName
taskQuery = util.MapStr{
"size": len(indexMigrationTaskIDs),
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"parent_id": indexMigrationTaskIDs,
},
},
{
"term": util.MapStr{
"metadata.labels.pipeline_id": util.MapStr{
"value": "bulk_indexing",
},
},
},
},
},
},
}
subTasks, err = migration_util.GetTasks(taskQuery)
if err != nil {
return taskStats, indexState, err
}
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
for _, subTask := range subTasks {
taskLabels := util.MapStr(subTask.Metadata.Labels)
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
if indexName == "" {
continue
}
pipelineIndexNames[subTask.ID] = indexName
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
}
}
}