diff --git a/plugin/task_manager/migration_api.go b/plugin/task_manager/migration_api.go index f2898818..0c702365 100644 --- a/plugin/task_manager/migration_api.go +++ b/plugin/task_manager/migration_api.go @@ -76,7 +76,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R for i, index := range taskConfig.Indices { indexName := index.Source.GetUniqueIndexName() count := indexState[indexName].IndexDocs - sourceDocs := indexState[indexName].SourceDocs + sourceDocs := index.Source.Docs var percent float64 if sourceDocs <= 0 { percent = 100 @@ -86,7 +86,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R percent = 100 } } - taskConfig.Indices[i].Source.Docs = sourceDocs + //taskConfig.Indices[i].Source.Docs = sourceDocs taskConfig.Indices[i].Target.Docs = count taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions @@ -257,114 +257,133 @@ We count data from two sources: - realtime bulk indexing info is only available for running index_migrations */ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) { - taskQuery := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_migration", - }, - }, - }, - }, - }, - }, - } - subTasks, err := migration_util.GetTasks(taskQuery) - if err != nil { - return taskStats, indexState, err - } - - var indexMigrationTaskIDs []string - indexState = map[string]MigrationIndexStateInfo{} - for _, subTask := range subTasks { - taskLabels := util.MapStr(subTask.Metadata.Labels) - indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") - if indexName == "" { - continue - } - - cfg := migration_model.IndexMigrationTaskConfig{} - err = migration_util.GetTaskConfig(&subTask, &cfg) - if err != nil { - log.Errorf("failed to get task config, err: %v", err) - continue - } - - taskStats.SourceDocs += cfg.Source.DocCount - st := indexState[indexName] - st.SourceDocs += cfg.Source.DocCount - indexState[indexName] = st - - if subTask.Status == task.StatusRunning { - indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) - continue - } - - indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") - taskStats.IndexDocs += indexDocs - st.IndexDocs += indexDocs - if subTask.Status == task.StatusError { - st.ErrorPartitions += 1 - taskStats.ErrorPartitions += 1 - } - indexState[indexName] = st - indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) - } - - if len(indexMigrationTaskIDs) == 0 { - return taskStats, indexState, nil - } - - taskQuery = util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "parent_id": indexMigrationTaskIDs, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "bulk_indexing", - }, - }, - }, - }, - }, - }, - } - subTasks, err = migration_util.GetTasks(taskQuery) - if err != nil { - return taskStats, indexState, err - } - var pipelineTaskIDs = map[string][]string{} var pipelineIndexNames = map[string]string{} - for _, subTask := range subTasks { - taskLabels := util.MapStr(subTask.Metadata.Labels) - indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") - if indexName == "" { + indexState = map[string]MigrationIndexStateInfo{} + const size = 500 + var ( + from = -size + hasMore = true + ) + for hasMore { + from += size + taskQuery := util.MapStr{ + "from": from, + "size": size, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "asc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + }, + } + subTasks, err := migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } + if len(subTasks) < size { + hasMore = false + } + + var indexMigrationTaskIDs []string + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + cfg := migration_model.IndexMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + + taskStats.SourceDocs += cfg.Source.DocCount + st := indexState[indexName] + st.SourceDocs += cfg.Source.DocCount + indexState[indexName] = st + + if subTask.Status == task.StatusRunning { + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + continue + } + + indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") + taskStats.IndexDocs += indexDocs + st.IndexDocs += indexDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + taskStats.ErrorPartitions += 1 + } + indexState[indexName] = st + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + } + + if len(indexMigrationTaskIDs) == 0 { continue } - pipelineIndexNames[subTask.ID] = indexName + taskQuery = util.MapStr{ + "size": len(indexMigrationTaskIDs), + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "parent_id": indexMigrationTaskIDs, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.pipeline_id": util.MapStr{ + "value": "bulk_indexing", + }, + }, + }, + }, + }, + }, + } + subTasks, err = migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } - if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { - pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + pipelineIndexNames[subTask.ID] = indexName + + if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } } }