From 556a3d49d871e9d6d9baf59567347f951489d457 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Wed, 17 May 2023 16:44:27 +0800 Subject: [PATCH] [migration] extract UpdateStoppedChildTasksToReady --- .../cluster_comparison/cluster_comparison.go | 37 +---------------- .../cluster_migration/cluster_migration.go | 37 +---------------- plugin/migration/util/orm.go | 41 +++++++++++++++++++ 3 files changed, 45 insertions(+), 70 deletions(-) diff --git a/plugin/migration/cluster_comparison/cluster_comparison.go b/plugin/migration/cluster_comparison/cluster_comparison.go index 908622ef..ad61d5b5 100644 --- a/plugin/migration/cluster_comparison/cluster_comparison.go +++ b/plugin/migration/cluster_comparison/cluster_comparison.go @@ -48,41 +48,8 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { return p.splitMajorTask(taskItem) } - //update status of subtask to ready - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskItem.ID, - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task.StatusError, task.StatusStopped}, - }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_comparison", - }, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady), - }, - } - - esClient := elastic.GetClient(p.Elasticsearch) - _, err := esClient.UpdateByQuery(p.IndexName, util.MustToJSONBytes(queryDsl)) + // update status of subtask to ready + err := migration_util.UpdateStoppedChildTasksToReady(taskItem, "index_comparison") if err != nil { log.Errorf("failed to update sub task status, err: %v", err) return nil diff --git a/plugin/migration/cluster_migration/cluster_migration.go b/plugin/migration/cluster_migration/cluster_migration.go index 43ecf5d9..1b4f2870 100644 --- a/plugin/migration/cluster_migration/cluster_migration.go +++ b/plugin/migration/cluster_migration/cluster_migration.go @@ -50,41 +50,8 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { return p.splitMajorMigrationTask(taskItem) } - //update status of subtask to ready - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskItem.ID, - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task.StatusError, task.StatusStopped}, - }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_migration", - }, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady), - }, - } - - esClient := elastic.GetClient(p.Elasticsearch) - _, err := esClient.UpdateByQuery(p.IndexName, util.MustToJSONBytes(queryDsl)) + // update status of subtask to ready + err := migration_util.UpdateStoppedChildTasksToReady(taskItem, "index_migration") if err != nil { log.Errorf("failed to update sub task status, err: %v", err) return nil diff --git a/plugin/migration/util/orm.go b/plugin/migration/util/orm.go index b1eb7e44..2613b31f 100644 --- a/plugin/migration/util/orm.go +++ b/plugin/migration/util/orm.go @@ -115,3 +115,44 @@ func UpdatePendingChildTasksToPendingStop(taskItem *task.Task, taskType string) } return nil } + +// update status of subtask to ready +func UpdateStoppedChildTasksToReady(taskItem *task.Task, taskType string) error { + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task.StatusError, task.StatusStopped}, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": taskType, + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady), + }, + } + + err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl)) + if err != nil { + return err + } + return nil +}