diff --git a/model/dict.go b/model/dict.go deleted file mode 100644 index 753b5bf0..00000000 --- a/model/dict.go +++ /dev/null @@ -1,66 +0,0 @@ -package model - -import ( - "fmt" - "infini.sh/framework/core/elastic" - "strings" - "time" - - "infini.sh/framework/core/orm" -) - -//Dict model -type Dict struct { - ID string `json:"id" elastic_meta:"_id"` - Name string `json:"name,omitempty" elastic_mapping:"name:{type:text}"` - Tags []string `json:"tags" elastic_mapping:"tags:{type:text}"` - Content []byte `json:"content" elastic_mapping:"content:{type:binary}"` - CreatedAt time.Time `json:"created_at" elastic_mapping:"created_at:{type:date}"` - UpdatedAt time.Time `json:"updated_at" elastic_mapping:"updated_at:{type:date}"` -} - -func GetDictList(from, size int, name string, tags []string, esName string) (*elastic.SearchResponse, error) { - //sort := []orm.Sort{} - //sort = append(sort, orm.Sort{Field: "created_at", SortType: orm.DESC}) - var ( - sort = `[{ - "created_at": { - "order": "desc" - }}]` - query = `{ - "bool": { - "must": [ - %s - ], - "should": [ - %s - ], - "minimum_should_match": %d - } - }` - should = "" - must = "" - minShould = 0 - ) - if name = strings.Trim(name, " "); name != "" { - must = fmt.Sprintf(`{"match":{"name": "%s"}}`, name) - } - for i, tag := range tags { - if tag == "" { - continue - } - should += fmt.Sprintf(`{"match":{"tags":"%s"}}`, tag) - if i != len(tags)-1 { - should += "," - } - minShould = 1 - } - query = fmt.Sprintf(query, must, should, minShould) - rq := fmt.Sprintf(`{"from":%d, "size":%d, "sort": %s, "query": %s}`, from, size, sort, query) - //q := &orm.Query{ - // RawQuery: []byte(rq), - //} - //err, sr := orm.Search(Dict{}, nil, q) - client := elastic.GetClient(esName) - return client.SearchWithRawQueryDSL(orm.GetIndexName(Dict{}), []byte(rq)) -} diff --git a/model/reindex.go b/model/reindex.go deleted file mode 100644 index 6ac86cba..00000000 --- a/model/reindex.go +++ /dev/null @@ -1,62 +0,0 @@ -package model - -import ( - "fmt" - "infini.sh/framework/core/orm" - "strings" - "time" - - "infini.sh/framework/core/elastic" -) - -type ReindexStatus string - -const ( - ReindexStatusRunning ReindexStatus = "RUNNING" - ReindexStatusSuccess ReindexStatus = "SUCCEED" - ReindexStatusFailed ReindexStatus = "FAILED" -) - -type Reindex struct { - ID string `json:"id" elastic_meta:"_id"` - Name string `json:"name" elastic_mapping:"name:{type:text}"` - Desc string `json:"desc" elastic_mapping:"desc:{type:text}"` - TaskId string `json:"task_id" elastic_mapping:"task_id:{type:keyword}"` - Source struct { - Index string `json:"index"` - //Size int `json:"size"` - Query map[string]interface{} `json:"query"` - Source []string `json:"_source"` - } `json:"source" elastic_mapping:"source:{type:object}"` - Dest struct { - Index string `json:"index"` - Pipeline string `json:"pipeline"` - } `json:"dest" elastic_mapping:"dest:{type:object}"` - - CreatedAt time.Time `json:"created_at" elastic_mapping:"created_at:{type:date}"` - Status ReindexStatus `json:"status" elastic_mapping:"status:{type:keyword}"` -} - -func GetRebuildList(esName string, from, size int, name string) (*elastic.SearchResponse, error) { - var ( - sort = `[{ - "created_at": { - "order": "desc" - }}]` - query = `{ - "bool": { - "must": [ - %s - ] - } - }` - must = "" - ) - if name = strings.Trim(name, " "); name != "" { - must = fmt.Sprintf(`{"match":{"name": "%s"}}`, name) - } - query = fmt.Sprintf(query, must) - rq := fmt.Sprintf(`{"from":%d, "size":%d, "sort": %s, "query": %s}`, from, size, sort, query) - client := elastic.GetClient(esName) - return client.SearchWithRawQueryDSL(orm.GetIndexName(Reindex{}), []byte(rq)) -} diff --git a/plugin/api/index_management/index.go b/plugin/api/index_management/index.go index e3fcd7b1..0cda5969 100644 --- a/plugin/api/index_management/index.go +++ b/plugin/api/index_management/index.go @@ -1,19 +1,8 @@ package index_management import ( - "infini.sh/console/core" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/global" - "net/http" - "strconv" - "strings" - "time" - "infini.sh/console/config" - model2 "infini.sh/console/model" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" + "infini.sh/console/core" ) type APIHandler struct { @@ -21,100 +10,6 @@ type APIHandler struct { core.Handler } -func (handler APIHandler) GetDictListAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - fromStr = handler.GetParameterOrDefault(req, "from", "0") - sizeStr = handler.GetParameterOrDefault(req, "size", "6") - tag = handler.GetParameterOrDefault(req, "tags", "") - name = handler.GetParameterOrDefault(req, "name", "") - from, _ = strconv.Atoi(fromStr) - size, _ = strconv.Atoi(sizeStr) - tags = strings.Split(tag, ",") - resp = newResponseBody() - ) - if len(tags) > 3 { - tags = tags[0:3] - } - rel, err := model2.GetDictList(from, size, name, tags, global.MustLookupString(elastic.GlobalSystemElasticsearchID)) - if err != nil { - resp["error"] = err - resp["status"] = false - handler.WriteJSON(w, resp, http.StatusOK) - return - } - resp["payload"] = rel - handler.WriteJSON(w, resp, http.StatusOK) -} - -func (handler APIHandler) CreateDictItemAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - if strings.Trim(id, "/") == "" { - id = util.GetUUID() - } - createdAt := time.Now() - - resp := newResponseBody() - dict := model2.Dict{ - ID: id, - CreatedAt: createdAt, - UpdatedAt: createdAt, - } - err := handler.DecodeJSON(req, &dict) - if err != nil { - resp["status"] = false - resp["error"] = err - handler.WriteJSON(w, resp, http.StatusOK) - return - } - - err = orm.Create(nil, &dict) - if err != nil { - resp["status"] = false - resp["error"] = err - handler.WriteJSON(w, resp, http.StatusOK) - return - } - resp["payload"] = dict - handler.WriteJSON(w, resp, http.StatusOK) -} - -func (handler APIHandler) DeleteDictItemAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - dict := model2.Dict{} - dict.ID = id - resp := newResponseBody() - - err := orm.Delete(nil, dict) - if err != nil { - resp["status"] = false - resp["error"] = err - handler.WriteJSON(w, resp, http.StatusOK) - return - } - handler.WriteJSON(w, resp, http.StatusOK) -} - -func (handler APIHandler) UpdateDictItemAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - dict := model2.Dict{} - err := handler.DecodeJSON(req, &dict) - resp := newResponseBody() - if err != nil { - resp["status"] = false - resp["error"] = err - handler.WriteJSON(w, resp, http.StatusOK) - return - - } - dict.UpdatedAt = time.Now() - - err = orm.Update(nil, dict) - if err != nil { - resp["status"] = false - resp["error"] = err - handler.WriteJSON(w, resp, http.StatusOK) - return - } - resp["payload"] = dict - handler.WriteJSON(w, resp, http.StatusOK) - +func newResponseBody() map[string]interface{} { + return map[string]interface{}{} } diff --git a/plugin/api/index_management/rebuild.go b/plugin/api/index_management/rebuild.go deleted file mode 100644 index 2853cd35..00000000 --- a/plugin/api/index_management/rebuild.go +++ /dev/null @@ -1,205 +0,0 @@ -package index_management - -import ( - "encoding/json" - "fmt" - log "github.com/cihub/seelog" - "infini.sh/framework/core/global" - "infini.sh/framework/core/orm" - "net/http" - "strings" - "time" - - "infini.sh/console/model" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/util" -) - -func (handler APIHandler) HandleReindexAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - reindexItem := &model.Reindex{} - id := ps.ByName("id") - if strings.Trim(id, "/") != "" { - reindexItem.ID = id - } - resResult := newResponseBody() - - err := handler.DecodeJSON(req, reindexItem) - if err != nil { - log.Error(err) - resResult["error"] = err - handler.WriteJSON(w, resResult, http.StatusOK) - return - } - - //fmt.Println(reindexItem) - typ := handler.GetParameter(req, "_type") - ID, err := reindex(global.MustLookupString(elastic.GlobalSystemElasticsearchID), reindexItem, typ) - if err != nil { - log.Error(err) - resResult["error"] = err - handler.WriteJSON(w, resResult, http.StatusOK) - return - } - resResult["payload"] = ID - handler.WriteJSON(w, resResult, http.StatusOK) -} - -func reindex(esName string, body *model.Reindex, typ string) (string, error) { - client := elastic.GetClient(esName) - source := map[string]interface{}{ - "index": body.Source.Index, - } - if body.Source.Query != nil { - source["query"] = body.Source.Query - } - if len(body.Source.Source) > 0 { - source["_source"] = body.Source.Source - } - dest := map[string]string{ - "index": body.Dest.Index, - } - if body.Dest.Pipeline != "" { - dest["pipeline"] = body.Dest.Pipeline - } - esBody := map[string]interface{}{ - "source": source, - "dest": dest, - } - buf, _ := json.Marshal(esBody) - //fmt.Println(string(buf)) - reindexResp, err := client.Reindex(buf) - if err != nil { - return "", err - } - if body.ID == "" { - body.ID = util.GetUUID() - } - body.TaskId = reindexResp.Task - body.Status = model.ReindexStatusRunning - body.CreatedAt = time.Now() - - _, err = client.Index(orm.GetIndexName(body), typ, body.ID, body, "wait_for") - if err != nil { - return "", err - } - return body.ID, nil -} - -func newResponseBody() map[string]interface{} { - return map[string]interface{}{ - } -} - -func (handler APIHandler) HandleDeleteRebuildAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - var ids = []string{id} - resBody := newResponseBody() - err := deleteTasksByIds(global.MustLookupString(elastic.GlobalSystemElasticsearchID), ids) - if err != nil { - log.Error(err) - resBody["error"] = err - handler.WriteJSON(w, resBody, http.StatusOK) - return - } - resBody["payload"] = true - handler.WriteJSON(w, resBody, http.StatusOK) -} - -func (handler APIHandler) HandleGetRebuildListAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - from = handler.GetIntOrDefault(req, "from", 0) - size = handler.GetIntOrDefault(req, "size", 10) - name = handler.GetParameter(req, "name") - resBody = newResponseBody() - esName = global.MustLookupString(elastic.GlobalSystemElasticsearchID) - ) - esResp, err := model.GetRebuildList(esName, from, size, name) - if err != nil { - log.Error(err) - resBody["error"] = err.Error() - handler.WriteJSON(w, resBody, http.StatusOK) - return - } - err = SyncRebuildResult(esName) - if err != nil { - log.Error(err) - resBody["error"] = err - handler.WriteJSON(w, resBody, http.StatusOK) - return - } - - resBody["payload"] = esResp - handler.WriteJSON(w, resBody, http.StatusOK) -} - -func SyncRebuildResult(esName string) error { - client := elastic.GetClient(esName) - esBody := fmt.Sprintf(`{"query":{"match":{"status": "%s"}}}`, model.ReindexStatusRunning) - esRes, err := client.SearchWithRawQueryDSL(orm.GetIndexName(model.Reindex{}), []byte(esBody)) - if err != nil { - return err - } - var ids = []string{} - idMap := map[string]int{} - for idx, doc := range esRes.Hits.Hits { - taskId := doc.Source["task_id"].(string) - ids = append(ids, taskId) - idMap[taskId] = idx - } - if len(ids) == 0 { - return nil - } - taskResp, err := client.SearchTasksByIds(ids) - if err != nil { - return err - } - var ( - status model.ReindexStatus - ) - for _, doc := range taskResp.Hits.Hits { - status = model.ReindexStatusRunning - source := esRes.Hits.Hits[idMap[doc.ID]].Source - if _, ok := doc.Source["error"]; ok { - status = model.ReindexStatusFailed - } else { - status = model.ReindexStatusSuccess - } - source["status"] = status - source["task_source"] = doc.Source - _, err := client.Index(orm.GetIndexName(model.Reindex{}), "", esRes.Hits.Hits[idMap[doc.ID]].ID, source, "") - return err - } - return nil -} - -func buildTermsQuery(fieldName string, terms []string) string { - esBody := `{ - "query":{ - "terms": { - "%s": [ - %s - ] - } - } -}` - strTerms := "" - for _, term := range terms { - strTerms += fmt.Sprintf(`"%s",`, term) - } - esBody = fmt.Sprintf(esBody, fieldName, strTerms[0:len(strTerms)-1]) - return esBody -} - -func deleteTasksByIds(esName string, terms []string) error { - client := elastic.GetClient(esName) - esBody := buildTermsQuery("_id", terms) - deleteRes, err := client.DeleteByQuery(orm.GetIndexName(model.Reindex{}), []byte(esBody)) - if err != nil { - return err - } - if deleteRes.Deleted != deleteRes.Total { - return fmt.Errorf("total: %d, deleted: %d", deleteRes.Total, deleteRes.Deleted) - } - return nil -} diff --git a/plugin/api/init.go b/plugin/api/init.go index c06134af..4a96d40e 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -24,13 +24,6 @@ func Init(cfg *config.AppConfig) { var pathPrefix = "/_search-center/" var esPrefix = "/elasticsearch/:id/" api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/overview"), handler.RequirePermission(handler.ElasticsearchOverviewAction, enum.PermissionElasticsearchMetricRead)) - //api.HandleAPIMethod(api.POST, "/api/get_indices",index_management.API1) - - api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "dict/_search"), handler.GetDictListAction) - api.HandleAPIMethod(api.POST, path.Join(pathPrefix, "dict/*id"), handler.CreateDictItemAction) - //api.HandleAPIMethod(api.GET, "/api/dict/:id",handler.GetDictItemAction) - api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "dict/:id"), handler.DeleteDictItemAction) - api.HandleAPIMethod(api.PUT, path.Join(pathPrefix, "dict/:id"), handler.UpdateDictItemAction) api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index/_search"), handler.RequireLogin(handler.HandleSearchDocumentAction)) api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index"), handler.IndexRequired(handler.HandleAddDocumentAction, "doc.create")) @@ -38,10 +31,6 @@ func Init(cfg *config.AppConfig) { api.HandleAPIMethod(api.DELETE, path.Join(esPrefix, "doc/:index/:docId"), handler.IndexRequired(handler.HandleDeleteDocumentAction, "doc.delete")) api.HandleAPIMethod(api.GET, path.Join(esPrefix, "doc/_validate"), handler.ValidateDocIDAction) - api.HandleAPIMethod(api.POST, path.Join(pathPrefix, "rebuild/*id"), handler.HandleReindexAction) - api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "rebuild/_search"), handler.HandleGetRebuildListAction) - api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "rebuild/:id"), handler.HandleDeleteRebuildAction) - api.HandleAPIMethod(api.GET, path.Join(esPrefix, "_cat/indices"), handler.RequireLogin(handler.HandleCatIndicesAction)) api.HandleAPIMethod(api.GET, path.Join(esPrefix, "index/:index/_mappings"), handler.IndexRequired(handler.HandleGetMappingsAction, "indices.get_mapping")) api.HandleAPIMethod(api.GET, path.Join(esPrefix, "index/:index/_settings"), handler.IndexRequired(handler.HandleGetSettingsAction, "indices.get_settings")) @@ -56,15 +45,6 @@ func Init(cfg *config.AppConfig) { api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.RequirePermission(handler.HandleDeleteCommonCommandAction, enum.PermissionCommandWrite)) api.HandleAPIMethod(api.GET, "/elasticsearch/overview/status", handler.RequireLogin(handler.ElasticsearchStatusSummaryAction)) api.HandleAPIMethod(api.GET, "/elasticsearch/:id/overview/treemap", handler.RequireClusterPermission(handler.RequirePermission(handler.ClusterOverTreeMap, enum.PermissionElasticsearchMetricRead))) - //task.RegisterScheduleTask(task.ScheduleTask{ - // Description: "sync reindex task result", - // Task: func() { - // err := index_management.SyncRebuildResult(cfg.Elasticsearch) - // if err != nil { - // log.Error(err) - // } - // }, - //}) alertAPI := alerting.AlertAPI{}