[migration] clear child tasks before split task

This commit is contained in:
Kassian Sun 2023-06-01 06:08:46 +08:00
parent 09ba88c411
commit 155cb4cfbe
5 changed files with 48 additions and 0 deletions

View File

@ -76,6 +76,12 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id)
esTargetClient := elastic.GetClient(clusterComparisonTask.Cluster.Target.Id)
err = migration_util.DeleteChildTasks(taskItem.ID)
if err != nil {
log.Warnf("failed to clear child tasks, err: %v", err)
return nil
}
for _, index := range clusterComparisonTask.Indices {
sourceDump := migration_model.IndexComparisonDumpConfig{
ClusterId: clusterComparisonTask.Cluster.Source.Id,

View File

@ -78,6 +78,12 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id)
targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id)
err = migration_util.DeleteChildTasks(taskItem.ID)
if err != nil {
log.Warnf("failed to clear child tasks, err: %v", err)
return nil
}
for _, index := range clusterMigrationTask.Indices {
source := migration_model.IndexMigrationSourceConfig{
ClusterId: clusterMigrationTask.Cluster.Source.Id,

View File

@ -63,6 +63,12 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
}
err = migration_util.DeleteChildTasks(taskItem.ID)
if err != nil {
log.Warnf("failed to clear child tasks, err: %v", err)
return nil
}
var pids []string
pids = append(pids, taskItem.ParentId...)
pids = append(pids, taskItem.ID)

View File

@ -92,6 +92,13 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
if len(taskItem.ParentId) == 0 {
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
}
err = migration_util.DeleteChildTasks(taskItem.ID)
if err != nil {
log.Warnf("failed to clear child tasks, err: %v", err)
return nil
}
indexName := cfg.Source.Indices
scrollTask := &task.Task{
ParentId: pids,

View File

@ -10,6 +10,29 @@ import (
"infini.sh/framework/core/util"
)
func DeleteChildTasks(taskID string) error {
q := util.MapStr{
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": taskID,
},
},
},
},
},
},
}
err := orm.DeleteBy(&task.Task{}, util.MustToJSONBytes(q))
if err != nil {
return err
}
return nil
}
func GetPendingChildTasks(elasticsearch, indexName string, taskID string, taskType string) ([]task.Task, error) {
return GetChildTasks(elasticsearch, indexName, taskID, taskType, []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady})
}