From c89c8616b16d46615dc2ab59ecb557fec83c4552 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Wed, 19 Apr 2023 11:58:30 +0800 Subject: [PATCH] [migration] clean all business_id checks --- plugin/migration/api.go | 12 ++++++------ plugin/migration/pipeline.go | 8 ++------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 5fce6440..0b15d771 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -121,7 +121,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re ) mustQ = append(mustQ, util.MapStr{ "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ + "metadata.type": util.MapStr{ "value": "cluster_migration", }, }, @@ -253,7 +253,7 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request Success: true, }, "task status manually set to ready") - if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "index_migration" && len(obj.ParentId) > 0 { + if obj.Metadata.Labels != nil && obj.Metadata.Type == "index_migration" && len(obj.ParentId) > 0 { //update status of major task to running query := util.MapStr{ "bool": util.MapStr{ @@ -492,7 +492,7 @@ func getMajorTaskInfoByIndex(taskID string) (map[string]migration_model.IndexSta "must": []util.MapStr{ { "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ + "metadata.type": util.MapStr{ "value": "index_migration", }, }, @@ -668,7 +668,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt continue } if subTask.Metadata.Labels != nil { - if subTask.Metadata.Labels["business_id"] == "index_migration" { + if subTask.Metadata.Type == "index_migration" { subTasks = append(subTasks, subTask) subTaskStatus[subTask.ID] = subTask.Status continue @@ -1102,7 +1102,7 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod "must": []util.MapStr{ { "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ + "metadata.type": util.MapStr{ "value": "index_migration", }, }, @@ -1140,7 +1140,7 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod } if subTask.Metadata.Labels != nil { //add indexDocs of already complete - if subTask.Metadata.Labels["business_id"] == "index_migration" { + if subTask.Metadata.Type == "index_migration" { if v, ok := subTask.Metadata.Labels["index_docs"].(float64); ok { taskStats.IndexDocs += v } diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 464c3491..cd31abd0 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -209,7 +209,7 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { }, { "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ + "metadata.type": util.MapStr{ "value": "index_migration", }, }, @@ -700,10 +700,6 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro if taskItem.Metadata.Labels["is_split"] == true { return nil } - if taskItem.Metadata.Labels["business_id"] != "cluster_migration" { - log.Tracef("got unexpect task type of %s with task id [%s] in cluster migration processor", taskItem.Metadata.Type, taskItem.ID) - return nil - } clusterMigrationTask := migration_model.ClusterMigrationTaskConfig{} err := migration_util.GetTaskConfig(taskItem, &clusterMigrationTask) @@ -1026,7 +1022,7 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState "must": []util.MapStr{ { "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ + "metadata.type": util.MapStr{ "value": "index_migration", }, },