diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 42b1ac79..37caaf62 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -6,7 +6,9 @@ package migration import ( "context" + "errors" "fmt" + elastic2 "infini.sh/framework/modules/elastic" "net/http" "strconv" "strings" @@ -41,6 +43,7 @@ func InitAPI() { api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionTaskRead)) api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequirePermission(handler.updateDataMigrationTaskStatus, enum.PermissionTaskRead)) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_init", handler.initIndex) + api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.deleteDataMigrationTask) } @@ -1002,6 +1005,65 @@ func (h *APIHandler) initIndex(w http.ResponseWriter, req *http.Request, ps http }, http.StatusOK) } +func (h *APIHandler) deleteDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + obj := task2.Task{} + obj.ID = id + + _, err := orm.Get(&obj) + if err != nil { + if errors.Is(err, elastic2.ErrNotFound) { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if util.StringInArray([]string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, obj.Status) { + h.WriteError(w, fmt.Sprintf("can not delete task [%s] with status [%s]", obj.ID, obj.Status), http.StatusInternalServerError) + return + } + + q := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "id": util.MapStr{ + "value": id, + }, + }, + }, + }, + }, + }, + } + err = orm.DeleteBy( &obj, util.MustToJSONBytes(q)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "deleted", + }, 200) +} + func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) { taskQuery := util.MapStr{ "size": 500,