From 155cb4cfbe27f1b2b104d69d6dabf4a3a581a6fb Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Thu, 1 Jun 2023 06:08:46 +0800 Subject: [PATCH] [migration] clear child tasks before split task --- .../cluster_comparison/cluster_comparison.go | 6 +++++ .../cluster_migration/cluster_migration.go | 6 +++++ .../index_comparison/index_comparison.go | 6 +++++ .../index_migration/index_migration.go | 7 ++++++ plugin/migration/util/orm.go | 23 +++++++++++++++++++ 5 files changed, 48 insertions(+) diff --git a/plugin/migration/cluster_comparison/cluster_comparison.go b/plugin/migration/cluster_comparison/cluster_comparison.go index 5d6531da..e2f39eb3 100644 --- a/plugin/migration/cluster_comparison/cluster_comparison.go +++ b/plugin/migration/cluster_comparison/cluster_comparison.go @@ -76,6 +76,12 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error { esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id) esTargetClient := elastic.GetClient(clusterComparisonTask.Cluster.Target.Id) + err = migration_util.DeleteChildTasks(taskItem.ID) + if err != nil { + log.Warnf("failed to clear child tasks, err: %v", err) + return nil + } + for _, index := range clusterComparisonTask.Indices { sourceDump := migration_model.IndexComparisonDumpConfig{ ClusterId: clusterComparisonTask.Cluster.Source.Id, diff --git a/plugin/migration/cluster_migration/cluster_migration.go b/plugin/migration/cluster_migration/cluster_migration.go index fead588c..95269115 100644 --- a/plugin/migration/cluster_migration/cluster_migration.go +++ b/plugin/migration/cluster_migration/cluster_migration.go @@ -78,6 +78,12 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id) + err = migration_util.DeleteChildTasks(taskItem.ID) + if err != nil { + log.Warnf("failed to clear child tasks, err: %v", err) + return nil + } + for _, index := range clusterMigrationTask.Indices { source := migration_model.IndexMigrationSourceConfig{ ClusterId: clusterMigrationTask.Cluster.Source.Id, diff --git a/plugin/migration/index_comparison/index_comparison.go b/plugin/migration/index_comparison/index_comparison.go index 8e598fe3..1875e32b 100644 --- a/plugin/migration/index_comparison/index_comparison.go +++ b/plugin/migration/index_comparison/index_comparison.go @@ -63,6 +63,12 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) } + err = migration_util.DeleteChildTasks(taskItem.ID) + if err != nil { + log.Warnf("failed to clear child tasks, err: %v", err) + return nil + } + var pids []string pids = append(pids, taskItem.ParentId...) pids = append(pids, taskItem.ID) diff --git a/plugin/migration/index_migration/index_migration.go b/plugin/migration/index_migration/index_migration.go index b82062c5..1783e277 100644 --- a/plugin/migration/index_migration/index_migration.go +++ b/plugin/migration/index_migration/index_migration.go @@ -92,6 +92,13 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { if len(taskItem.ParentId) == 0 { return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) } + + err = migration_util.DeleteChildTasks(taskItem.ID) + if err != nil { + log.Warnf("failed to clear child tasks, err: %v", err) + return nil + } + indexName := cfg.Source.Indices scrollTask := &task.Task{ ParentId: pids, diff --git a/plugin/migration/util/orm.go b/plugin/migration/util/orm.go index bf569b5f..f9a02141 100644 --- a/plugin/migration/util/orm.go +++ b/plugin/migration/util/orm.go @@ -10,6 +10,29 @@ import ( "infini.sh/framework/core/util" ) +func DeleteChildTasks(taskID string) error { + q := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, + }, + }, + }, + }, + }, + } + err := orm.DeleteBy(&task.Task{}, util.MustToJSONBytes(q)) + if err != nil { + return err + } + return nil +} + func GetPendingChildTasks(elasticsearch, indexName string, taskID string, taskType string) ([]task.Task, error) { return GetChildTasks(elasticsearch, indexName, taskID, taskType, []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady}) }