[migration] extract UpdateStoppedChildTasksToReady

This commit is contained in:
Kassian Sun 2023-05-17 16:44:27 +08:00
parent 7e79c509b2
commit 556a3d49d8
3 changed files with 45 additions and 70 deletions

View File

@ -48,41 +48,8 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error {
if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok {
return p.splitMajorTask(taskItem)
}
//update status of subtask to ready
query := util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": taskItem.ID,
},
},
},
{
"terms": util.MapStr{
"status": []string{task.StatusError, task.StatusStopped},
},
},
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": "index_comparison",
},
},
},
},
},
}
queryDsl := util.MapStr{
"query": query,
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady),
},
}
esClient := elastic.GetClient(p.Elasticsearch)
_, err := esClient.UpdateByQuery(p.IndexName, util.MustToJSONBytes(queryDsl))
// update status of subtask to ready
err := migration_util.UpdateStoppedChildTasksToReady(taskItem, "index_comparison")
if err != nil {
log.Errorf("failed to update sub task status, err: %v", err)
return nil

View File

@ -50,41 +50,8 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error {
if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok {
return p.splitMajorMigrationTask(taskItem)
}
//update status of subtask to ready
query := util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": taskItem.ID,
},
},
},
{
"terms": util.MapStr{
"status": []string{task.StatusError, task.StatusStopped},
},
},
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": "index_migration",
},
},
},
},
},
}
queryDsl := util.MapStr{
"query": query,
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady),
},
}
esClient := elastic.GetClient(p.Elasticsearch)
_, err := esClient.UpdateByQuery(p.IndexName, util.MustToJSONBytes(queryDsl))
// update status of subtask to ready
err := migration_util.UpdateStoppedChildTasksToReady(taskItem, "index_migration")
if err != nil {
log.Errorf("failed to update sub task status, err: %v", err)
return nil

View File

@ -115,3 +115,44 @@ func UpdatePendingChildTasksToPendingStop(taskItem *task.Task, taskType string)
}
return nil
}
// update status of subtask to ready
func UpdateStoppedChildTasksToReady(taskItem *task.Task, taskType string) error {
query := util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": taskItem.ID,
},
},
},
{
"terms": util.MapStr{
"status": []string{task.StatusError, task.StatusStopped},
},
},
{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": taskType,
},
},
},
},
},
}
queryDsl := util.MapStr{
"query": query,
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady),
},
}
err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl))
if err != nil {
return err
}
return nil
}