From f0532827543579604794e2bdc66c185e8a2bb6db Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 19 Dec 2023 17:42:24 +0800 Subject: [PATCH] add api to restart all failed sub tasks --- plugin/task_manager/api.go | 1 + plugin/task_manager/migration_api.go | 60 ++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/plugin/task_manager/api.go b/plugin/task_manager/api.go index 2279250a..c1db839d 100644 --- a/plugin/task_manager/api.go +++ b/plugin/task_manager/api.go @@ -22,6 +22,7 @@ func InitAPI() { api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/logging/:index", handler.RequirePermission(handler.searchIndexLevelTaskLogging, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/_search_values", handler.RequirePermission(handler.searchTaskFieldValues("cluster_migration"), enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.POST, "/migration/data/partition/_restart", handler.RequirePermission(handler.restartAllFailedPartitions, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) diff --git a/plugin/task_manager/migration_api.go b/plugin/task_manager/migration_api.go index cfe63465..06b423c8 100644 --- a/plugin/task_manager/migration_api.go +++ b/plugin/task_manager/migration_api.go @@ -443,3 +443,63 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m } return taskStats, indexState, nil } + +func (h *APIHandler) restartAllFailedPartitions(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + mustQ := []util.MapStr{ + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "cluster_migration", + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": "error", + }, + }, + }, + } + + queryDSL := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusRunning), + }, + } + + body := util.MustToJSONBytes(queryDSL) + + err := orm.UpdateBy(&task.Task{}, body) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + //update status of sub task + mustQ[0] = util.MapStr{ + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_migration", + }, + }, + } + queryDSL["script"] = util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady), + } + body = util.MustToJSONBytes(queryDSL) + err = orm.UpdateBy(&task.Task{}, body) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + h.WriteAckOKJSON(w) +} \ No newline at end of file