From 8f443644416398b9bdbdd3f2ca1e5dc1cde4368c Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 11 Mar 2022 16:05:03 +0800 Subject: [PATCH] change request es using elastic client --- main.go | 2 +- service/alerting/alert.go | 39 ++-- service/alerting/destination.go | 286 +++++++--------------------- service/alerting/elasticsearch.go | 91 ++++----- service/alerting/monitor.go | 304 ++++++++++++++---------------- service/alerting/overview.go | 51 ++--- service/alerting/schedule.go | 110 ++++------- 7 files changed, 312 insertions(+), 571 deletions(-) diff --git a/main.go b/main.go index 3d5a0443..f23e0278 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,6 @@ import ( "infini.sh/framework/core/env" "infini.sh/framework/core/module" "infini.sh/framework/core/orm" - _ "infini.sh/framework/plugins" queue2 "infini.sh/framework/modules/disk_queue" elastic2 "infini.sh/framework/modules/elastic" "infini.sh/framework/modules/filter" @@ -24,6 +23,7 @@ import ( "infini.sh/framework/modules/stats" "infini.sh/framework/modules/task" "infini.sh/framework/modules/ui" + _ "infini.sh/framework/plugins" api2 "infini.sh/gateway/api" _ "infini.sh/gateway/proxy" ) diff --git a/service/alerting/alert.go b/service/alerting/alert.go index b10bc438..d9f6277c 100644 --- a/service/alerting/alert.go +++ b/service/alerting/alert.go @@ -6,9 +6,11 @@ import ( "encoding/json" "errors" "fmt" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/orm" "infini.sh/console/model/alerting" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" "io" "net/http" "net/url" @@ -130,39 +132,22 @@ func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){ indexName := getAlertIndexName(alertType) config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, indexName ) - - res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) - if err != nil { - writeError(w, err) - return - } - var alertRes = IfaceMap{} - err = decodeJSON(res.Body, &alertRes) - defer res.Body.Close() + esClient := elastic.GetClient(config.ID) + res, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(reqBody)) if err != nil { writeError(w, err) return } alerts := []interface{}{} - rawAlerts := queryValue(alertRes, "hits.hits", nil) - if ds, ok := rawAlerts.([]interface{}); ok { - for _, alert := range ds { - if alertItem, ok := alert.(map[string]interface{}); ok { - //alertItem["version"] = queryValue(alertItem, "alert_version", "") - if alertID, ok := queryValue(alertItem, "_source.id", "").(string); ok && alertID == "" { - if source, ok := alertItem["_source"].(map[string]interface{}); ok { - source["id"] = alertItem["_id"] - } - } - alerts = append(alerts, alertItem["_source"]) - } - } + rawAlerts := res.Hits.Hits + for _, alert := range rawAlerts { + alert.Source["id"] = alert.ID + alerts = append(alerts, alert.Source) } writeJSON(w, IfaceMap{ "ok": true, "alerts": alerts, - "totalAlerts": queryValue(alertRes, "hits.total.value", 0), + "totalAlerts": res.GetTotal(), }, http.StatusOK) } @@ -235,7 +220,7 @@ func doRequest(requestUrl string, method string, params map[string]string, body } req, _ = http.NewRequest(method, requestUrl, reader) req.Header.Set("content-type", "application/json") - req.Header.Set("User-Agent", "Kibana") + req.Header.Set("User-Agent", "infini-client") return alertClient.Do(req) } diff --git a/service/alerting/destination.go b/service/alerting/destination.go index b89aed56..4b09b441 100644 --- a/service/alerting/destination.go +++ b/service/alerting/destination.go @@ -99,38 +99,26 @@ func GetDestinations(w http.ResponseWriter, req *http.Request, ps httprouter.Par }, } - config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{})) - - res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) - - if err != nil { - writeError(w, err) - return - } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) + esConfig := getDefaultConfig() + esClient := elastic.GetClient(esConfig.ID) + res, err := esClient.SearchWithRawQueryDSL( orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(reqBody)) if err != nil { writeError(w, err) return } - totalDestinations := queryValue(resBody, "hits.total.value", 0) - rawHits := queryValue(resBody, "hits.hits", []interface{}{}) + totalDestinations := res.GetTotal() + rawHits := res.Hits.Hits dests := []IfaceMap{} - if rh, ok := rawHits.([]interface{}); ok { - for _, hit := range rh { - if destination, ok := hit.(map[string]interface{}); ok { - newItem := IfaceMap{} - newItem["id"] = queryValue(destination, "_id", "") - source := queryValue(destination, "_source."+DESTINATION_FIELD, nil) - if ms, ok := source.(map[string]interface{}); ok { - assignTo(newItem, ms) - } - dests = append(dests, newItem) - } + for _, hit := range rawHits { + newItem := IfaceMap{} + newItem["id"] = hit.ID + source := queryValue(hit.Source,DESTINATION_FIELD, nil) + if ms, ok := source.(map[string]interface{}); ok { + assignTo(newItem, ms) } + dests = append(dests, newItem) } writeJSON(w, IfaceMap{ @@ -141,9 +129,8 @@ func GetDestinations(w http.ResponseWriter, req *http.Request, ps httprouter.Par } func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - config := getDefaultConfig() + esConfig := getDefaultConfig() destId := util.GetUUID() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destId) var destination = &alerting.Destination{} err := decodeJSON(req.Body, &destination) if err != nil { @@ -167,9 +154,8 @@ func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P writeError(w, errors.New("type unsupported")) return } - res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ - "refresh": "wait_for", - }, IfaceMap{ + esClient := elastic.GetClient(esConfig.ID) + indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", destId, IfaceMap{ DESTINATION_FIELD: toSaveDest, }) if err != nil { @@ -177,20 +163,13 @@ func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } writeJSON(w, IfaceMap{ "ok": true, "resp": IfaceMap{ DESTINATION_FIELD: toSaveDest, - "_id": queryValue(resBody, "_id", ""), - "_version": queryValue(resBody, "_version", 0), + "_id": indexRes.ID, + "_version": indexRes.Version, }, }, http.StatusOK) @@ -200,7 +179,6 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P destinationId := ps.ByName("destinationId") config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId) var destination = &alerting.Destination{} err := decodeJSON(req.Body, &destination) if err != nil { @@ -224,9 +202,8 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P writeError(w, errors.New("type unsupported")) return } - res, err := doRequest(reqUrl, http.MethodPut, map[string]string{ - "refresh": "wait_for", - }, IfaceMap{ + esClient := elastic.GetClient(config.ID) + indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", destinationId, IfaceMap{ DESTINATION_FIELD: toSaveDest, }) if err != nil { @@ -234,17 +211,10 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } writeJSON(w, IfaceMap{ "ok": true, - "version": queryValue(resBody, "_version", ""), - "id": queryValue(resBody, "_id", ""), + "version": indexRes.Version, + "id": indexRes.ID, }, http.StatusOK) } @@ -253,24 +223,15 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P destinationId := ps.ByName("destinationId") config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s?refresh=wait_for", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId) - res, err := doRequest(reqUrl, http.MethodDelete, nil, nil) + esClient := elastic.GetClient(config.ID) + deleteRes, err := esClient.Delete(orm.GetIndexName(alerting.Config{}), "", destinationId) if err != nil { writeError(w, err) return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } - - resultIfce := queryValue(resBody, "result", "") var isOk = false - if result, ok := resultIfce.(string); ok && result == "deleted" { + if deleteRes.Result == "deleted" { isOk = true } writeJSON(w, IfaceMap{ @@ -312,7 +273,6 @@ func InitAppConfig(config *config.AppConfig){ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{})) var emailAccount = &alerting.EmailAccount{} err := decodeJSON(req.Body, &emailAccount) if err != nil { @@ -321,9 +281,8 @@ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. } //var password = emailAccount.Password //emailAccount.Password = "" - res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ - "refresh": "wait_for", - }, IfaceMap{ + esClient := elastic.GetClient(config.ID) + indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", util.GetUUID(), IfaceMap{ EMAIL_ACCOUNT_FIELD: emailAccount, }) if err != nil { @@ -331,14 +290,6 @@ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } - //kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", emailAccount.Name) //secStr, _ := getKeystore().Retrieve(kk) //kst := getKeystore() @@ -357,8 +308,8 @@ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. "ok": true, "resp": IfaceMap{ EMAIL_ACCOUNT_FIELD: emailAccount, - "_id": queryValue(resBody, "_id", ""), - "_version": queryValue(resBody, "_version", 0), + "_id": indexRes.ID, + "_version": indexRes.Version, }, }, http.StatusOK) } @@ -367,17 +318,14 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. emailAccountId := ps.ByName("emailAccountId") config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId) var emailAccount = &alerting.EmailAccount{} err := decodeJSON(req.Body, &emailAccount) if err != nil { writeError(w, err) return } - - res, err := doRequest(reqUrl, http.MethodPut, map[string]string{ - "refresh": "wait_for", - }, IfaceMap{ + esClient := elastic.GetClient(config.ID) + indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}),"", emailAccountId, IfaceMap{ EMAIL_ACCOUNT_FIELD: emailAccount, }) if err != nil { @@ -385,17 +333,9 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } - writeJSON(w, IfaceMap{ "ok": true, - "id": queryValue(resBody, "_id", ""), + "id": indexRes.ID, }, http.StatusOK) } @@ -403,28 +343,14 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. func DeleteEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { emailAccountId := ps.ByName("emailAccountId") config := getDefaultConfig() - - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId) - - res, err := doRequest(reqUrl, http.MethodDelete, map[string]string{ - "refresh": "wait_for", - }, nil) + esClient := elastic.GetClient(config.ID) + deleteRes, err := esClient.Delete(orm.GetIndexName(alerting.Config{}), "", emailAccountId, "wait_for") if err != nil { writeError(w, err) return } - - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } - - resultIfce := queryValue(resBody, "result", "") var isOk = false - if result, ok := resultIfce.(string); ok && result == "deleted" { + if deleteRes.Result == "deleted" { isOk = true } writeJSON(w, IfaceMap{ @@ -477,37 +403,25 @@ func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Pa } config := getDefaultConfig() - - reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{})) - - res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) + esClient := elastic.GetClient(config.ID) + searchRes, err := esClient.SearchWithRawQueryDSL( orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(reqBody)) if err != nil { writeError(w, err) return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - writeError(w, err) - return - } - totalEmailAccounts := queryValue(resBody, "hits.total.value", 0) - rawHits := queryValue(resBody, "hits.hits", []interface{}{}) + totalEmailAccounts := searchRes.GetTotal() + rawHits := searchRes.Hits.Hits emailAccounts := []IfaceMap{} - if rh, ok := rawHits.([]interface{}); ok { - for _, hit := range rh { - if emailAccount, ok := hit.(map[string]interface{}); ok { - newItem := IfaceMap{} - newItem["id"] = queryValue(emailAccount, "_id", "") - source := queryValue(emailAccount, "_source."+EMAIL_ACCOUNT_FIELD, nil) - if ms, ok := source.(map[string]interface{}); ok { - assignTo(newItem, ms) - } - emailAccounts = append(emailAccounts, newItem) - } + for _, hit := range rawHits { + newItem := IfaceMap{} + newItem["id"] = hit.ID + source := queryValue(hit.Source, EMAIL_ACCOUNT_FIELD, nil) + if ms, ok := source.(map[string]interface{}); ok { + assignTo(newItem, ms) } + emailAccounts = append(emailAccounts, newItem) } writeJSON(w, IfaceMap{ "ok": true, @@ -520,22 +434,13 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par emailAccountId := ps.ByName("emailAccountId") config := getDefaultConfig() - - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId) - - res, err := doRequest(reqUrl, http.MethodGet,nil, nil) + esClient := elastic.GetClient(config.ID) + getRes, err := esClient.Get(orm.GetIndexName(alerting.Config{}), "", emailAccountId) if err != nil { writeError(w, err) return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } //name := queryValue(resBody,"_source.email_account.name", "") //kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", name) //secStr, _ := getKeystore().Retrieve(kk) @@ -546,7 +451,7 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par writeJSON(w, IfaceMap{ "ok": true, - "resp": queryValue(resBody, "_source.email_account", nil), + "resp": queryValue(getRes.Source, "email_account", nil), }, http.StatusOK) } @@ -555,16 +460,14 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{})) var emailGroup = &alerting.EmailGroup{} err := decodeJSON(req.Body, &emailGroup) if err != nil { writeError(w, err) return } - res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ - "refresh": "wait_for", - }, IfaceMap{ + esClient := elastic.GetClient(config.ID) + indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", util.GetUUID(), IfaceMap{ EMAIL_GROUP_FIELD: emailGroup, }) if err != nil { @@ -572,14 +475,6 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } - writeJSON(w, IfaceMap{ "ok": true, "resp": IfaceMap{ @@ -588,8 +483,8 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa "name": emailGroup.Name, "schema_version": emailGroup.SchemaVersion, }, - "_id": queryValue(resBody, "_id", ""), - "_version": queryValue(resBody, "_version", 0), + "_id": indexRes.ID, + "_version": indexRes.Version, }, }, http.StatusOK) } @@ -597,16 +492,14 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { emailGroupId := ps.ByName("emailGroupId") config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId) var emailGroup = &alerting.EmailGroup{} err := decodeJSON(req.Body, &emailGroup) if err != nil { writeError(w, err) return } - res, err := doRequest(reqUrl, http.MethodPut, map[string]string{ - "refresh": "wait_for", - }, IfaceMap{ + esClient := elastic.GetClient(config.ID) + indexRes, err := esClient.Index( orm.GetIndexName(alerting.Config{}), "", emailGroupId, IfaceMap{ EMAIL_GROUP_FIELD: emailGroup, }) if err != nil { @@ -614,16 +507,9 @@ func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } writeJSON(w, IfaceMap{ "ok": true, - "id": queryValue(resBody, "_id", ""), + "id": indexRes.ID, }, http.StatusOK) } @@ -631,25 +517,15 @@ func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa func DeleteEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { emailGroupId := ps.ByName("emailGroupId") config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId) - - res, err := doRequest(reqUrl, http.MethodDelete, nil, nil) + esClient := elastic.GetClient(config.ID) + res, err := esClient.Delete(orm.GetIndexName(alerting.Config{}), "", emailGroupId) if err != nil { writeError(w, err) return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } - - resultIfce := queryValue(resBody, "result", "") var isOk = false - if result, ok := resultIfce.(string); ok && result == "deleted" { + if res.Result == "deleted" { isOk = true } writeJSON(w, IfaceMap{ @@ -703,36 +579,25 @@ func GetEmailGroups(w http.ResponseWriter, req *http.Request, ps httprouter.Para } config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{})) - - res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) + esClient := elastic.GetClient(config.ID) + res, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(reqBody)) if err != nil { writeError(w, err) return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - writeError(w, err) - return - } - totalEmailGroups := queryValue(resBody, "hits.total.value", 0) - rawHits := queryValue(resBody, "hits.hits", []interface{}{}) + totalEmailGroups := res.GetTotal() + rawHits := res.Hits.Hits emailGroups := []IfaceMap{} - if rh, ok := rawHits.([]interface{}); ok { - for _, hit := range rh { - if emailGroup, ok := hit.(map[string]interface{}); ok { - newItem := IfaceMap{} - newItem["id"] = queryValue(emailGroup, "_id", "") - source := queryValue(emailGroup, "_source."+EMAIL_GROUP_FIELD, nil) - if ms, ok := source.(map[string]interface{}); ok { - assignTo(newItem, ms) - } - emailGroups = append(emailGroups, newItem) - } + for _, hit := range rawHits { + newItem := IfaceMap{} + newItem["id"] = hit.ID + source := queryValue(hit.Source, EMAIL_GROUP_FIELD, nil) + if ms, ok := source.(map[string]interface{}); ok { + assignTo(newItem, ms) } + emailGroups = append(emailGroups, newItem) } writeJSON(w, IfaceMap{ "ok": true, @@ -745,21 +610,14 @@ func GetEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Param emailGroupId := ps.ByName("emailGroupId") config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId) - res, err := doRequest(reqUrl, http.MethodGet,nil, req.Body) + esClient := elastic.GetClient(config.ID) + getRes, err := esClient.Get(orm.GetIndexName(alerting.Config{}), "", emailGroupId) if err != nil { writeError(w, err) return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } - emailGroup := queryValue(resBody, "_source."+EMAIL_GROUP_FIELD, nil) + emailGroup := queryValue(getRes.Source, EMAIL_GROUP_FIELD, nil) if emailGroup == nil { writeJSON(w, IfaceMap{ "ok": false, diff --git a/service/alerting/elasticsearch.go b/service/alerting/elasticsearch.go index 73d6a844..667d1955 100644 --- a/service/alerting/elasticsearch.go +++ b/service/alerting/elasticsearch.go @@ -5,6 +5,7 @@ import ( "fmt" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/util" "net/http" "runtime/debug" "strings" @@ -24,17 +25,9 @@ func Search(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ return } config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, body.Index) - + esClient := elastic.GetClient(config.ID) body.Query["size"] = body.Size - res, err := doRequest(reqUrl, http.MethodPost, nil, body.Query) - if err != nil { - writeError(w, err) - return - } - defer res.Body.Close() - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) + searchRes, err := esClient.SearchWithRawQueryDSL(body.Index, util.MustToJSONBytes(body.Query)) if err != nil { writeError(w, err) return @@ -42,15 +35,15 @@ func Search(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody, + "resp": searchRes, }, http.StatusOK) } func GetIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.ByName("id") - meta := elastic.GetMetadata(id) - if meta == nil { + esClient := elastic.GetClient(id) + if esClient == nil { writeError(w, errors.New("cluster not found")) return } @@ -63,27 +56,19 @@ func GetIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) writeError(w, err) return } - reqUrl := fmt.Sprintf("%s/_cat/indices/%s", meta.GetActiveEndpoint(), body.Index) - params := map[string]string{ - "format": "json", - "h": "health,index,status", - } - res, err := doRequest(reqUrl, http.MethodGet, params, nil) - if err != nil { - writeError(w, err) - return - } - defer res.Body.Close() - var resBody = []IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - writeError(w, err) - return - } + indexInfos, err := esClient.GetIndices( body.Index) + if err != nil { + writeError(w, err) + return + } + indices := make([]elastic.IndexInfo, 0, len(*indexInfos)) + for _, info := range *indexInfos { + indices = append(indices, info) + } writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody, + "resp": indices, }, http.StatusOK) } @@ -94,8 +79,8 @@ func GetAliases(w http.ResponseWriter, req *http.Request, ps httprouter.Params) } }() id := ps.ByName("id") - meta := elastic.GetMetadata(id) - if meta == nil { + esClient := elastic.GetClient(id) + if esClient == nil { writeError(w, errors.New("cluster not found")) return } @@ -108,34 +93,30 @@ func GetAliases(w http.ResponseWriter, req *http.Request, ps httprouter.Params) writeError(w, err) return } - reqUrl := fmt.Sprintf("%s/_cat/aliases/%s", meta.GetActiveEndpoint(), body.Alias) - params := map[string]string{ - "format": "json", - "h": "alias,index", - } - res, err := doRequest(reqUrl, http.MethodGet, params, nil) + //reqUrl := fmt.Sprintf("%s/_cat/aliases/%s", meta.GetActiveEndpoint(), body.Alias) + //params := map[string]string{ + // "format": "json", + // "h": "alias,index", + //} + res, err := esClient.GetAliases() if err != nil { writeError(w, err) return } - defer res.Body.Close() - var resBody = []IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - writeError(w, err) - return + aliases := make([]elastic.AliasInfo, 0, len(*res)) + for _, alias := range *res { + aliases =append(aliases, alias) } - writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody, + "resp": aliases, }, http.StatusOK) } func GetMappings(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.ByName("id") - meta := elastic.GetMetadata(id) - if meta == nil { + esClient := elastic.GetClient(id) + if esClient == nil { writeError(w, errors.New("cluster not found")) return } @@ -148,15 +129,7 @@ func GetMappings(w http.ResponseWriter, req *http.Request, ps httprouter.Params) writeError(w, err) return } - reqUrl := fmt.Sprintf("%s/%s/_mapping", meta.GetActiveEndpoint(), strings.Join(body.Index, ",")) - res, err := doRequest(reqUrl, http.MethodGet, nil, nil) - if err != nil { - writeError(w, err) - return - } - defer res.Body.Close() - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) + _, _, mappings, err := esClient.GetMapping(false, strings.Join(body.Index, ",")) if err != nil { writeError(w, err) return @@ -164,7 +137,7 @@ func GetMappings(w http.ResponseWriter, req *http.Request, ps httprouter.Params) writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody, + "resp": mappings, }, http.StatusOK) } diff --git a/service/alerting/monitor.go b/service/alerting/monitor.go index 8debf656..c2e89866 100644 --- a/service/alerting/monitor.go +++ b/service/alerting/monitor.go @@ -1,6 +1,7 @@ package alerting import ( + "bytes" "errors" "fmt" "infini.sh/console/model/alerting" @@ -9,7 +10,10 @@ import ( "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" + "io" "net/http" + "src/github.com/buger/jsonparser" + "src/github.com/valyala/fasttemplate" "strconv" "strings" "time" @@ -25,21 +29,14 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) mid := ps.ByName("monitorID") config := getDefaultConfig() + esClient := elastic.GetClient(config.ID) + res, err := esClient.Get(orm.GetIndexName(alerting.Config{}), "", mid) + if err != nil { + writeError(w, err) + return + } - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), mid) - res, err := doRequest(reqUrl, http.MethodGet, nil, nil) - if err != nil { - writeError(w, err) - return - } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - writeError(w, err) - return - } - res.Body.Close() - if found, ok := resBody["found"].(bool); ok && !found { + if !res.Found { writeError(w, errors.New("monitor not found")) return } @@ -70,47 +67,40 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) } }` queryDSL = fmt.Sprintf(queryDSL, mid) - reqUrl = fmt.Sprintf("%s/%s/_search", config.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS)) - res, err = doRequest(reqUrl, http.MethodPost,nil, queryDSL) - + searchRes, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(INDEX_ALL_ALERTS), []byte(queryDSL)) if err != nil { writeError(w, err) return } - - var searchResBody = IfaceMap{} - err = decodeJSON(res.Body, &searchResBody) - if err != nil { - writeError(w, err) - return + var dayCountBuckets interface{} + if agg, ok := searchRes.Aggregations["24_hour_count"]; ok { + dayCountBuckets = agg.Buckets } - dayCountBuckets := queryValue(searchResBody, "aggregations.24_hour_count.buckets", 0) dayCount := 0 - if dcb, ok := dayCountBuckets.([]interface{}); ok { - if dayAgg, ok := dcb[0].(map[string]interface{}); ok { - dayCount = int(dayAgg["doc_count"].(float64)) - } + if dcb, ok := dayCountBuckets.([]elastic.BucketBase); ok { + dayCount = int(dcb[0]["doc_count"].(float64)) } - activeBuckets := queryValue(searchResBody, "aggregations.active_count.buckets",[]interface{}{}) + var activeBuckets interface{} + if agg, ok := searchRes.Aggregations["active_count"]; ok { + activeBuckets = agg.Buckets + } activeCount := 0 - if ab, ok := activeBuckets.([]interface{}); ok { + if ab, ok := activeBuckets.([]elastic.BucketBase); ok { for _, bk := range ab { - if curr, ok := bk.(map[string]interface{}); ok { - if curr["key"].(string) == "ACTIVE" { - activeCount = int(curr["doc_count"].(float64)) - break - } + if bk["key"].(string) == "ACTIVE" { + activeCount = int(bk["doc_count"].(float64)) + break } } } - monitor := queryValue(resBody, "_source.monitor", nil) + monitor := queryValue(res.Source, "monitor", nil) writeJSON(w, IfaceMap{ "ok": true, "resp": monitor, "activeCount": activeCount, "dayCount": dayCount, - "version": queryValue(resBody, "_version", nil), + "version": res.Version, }, http.StatusOK) } @@ -199,44 +189,33 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) } assignTo(params, sortPageData) config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}) ) - res, err := doRequest(reqUrl, http.MethodGet, nil, params) + esClient := elastic.GetClient(config.ID) + resBody, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(params)) if err != nil { writeError(w, err) return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - writeError(w, err) - return - } - - totalMonitors := queryValue(resBody, "hits.total.value", 0) + totalMonitors := resBody.GetTotal() var monitors []IfaceMap - var hits = queryValue(resBody, "hits.hits", []IfaceMap{}) + var hits = resBody.Hits.Hits monitorIDs := []interface{}{} monitorMap := map[string]int{} - if hitsArr, ok := hits.([]interface{}); ok { - for i, hitIface := range hitsArr { - if hit, ok := hitIface.(map[string]interface{}); ok { - id := queryValue(hit, "_id", "") - monitorIDs = append(monitorIDs, id) - monitor := queryValue(hit, "_source.monitor", IfaceMap{}).(map[string]interface{}) - monitorMap[id.(string)] = i - monitors = append(monitors, IfaceMap{ - "id": id, - "version": queryValue(hit, "_version", ""), - "name": queryValue(monitor, "name", ""), - "enabled": queryValue(monitor, "enabled", false), - "monitor": monitor, - }) - } - } + for i, hit := range hits { + monitorIDs = append(monitorIDs, hit.ID) + monitor := hit.Source["monitor"].(map[string]interface{}) + monitorMap[hit.ID] = i + monitors = append(monitors, IfaceMap{ + "id": hit.ID, + //"version": hit., + "name": monitor["name"], + "enabled": monitor["enabled"], + "monitor": monitor, + }) } + aggsOrderData := IfaceMap{} aggsSorts := IfaceMap{ "active": "active", @@ -288,41 +267,35 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) }, } - reqUrl = fmt.Sprintf("%s/%s/_search", config.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS)) - searchRes, err := doRequest(reqUrl, http.MethodPost, nil, aggsParams) + searchRes, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(INDEX_ALL_ALERTS), util.MustToJSONBytes(aggsParams)) if err != nil { writeError(w, err) return } - var searchResBody = IfaceMap{} - err = decodeJSON(searchRes.Body, &searchResBody) - if err != nil { - writeError(w, err) - return + var buckets interface{} + if agg, ok := searchRes.Aggregations["uniq_monitor_ids"]; ok { + buckets = agg.Buckets } - buckets := queryValue(searchResBody, "aggregations.uniq_monitor_ids.buckets",[]IfaceMap{}) - if bks, ok := buckets.([]interface{}); ok { + if bks, ok := buckets.([]elastic.BucketBase); ok { for _, bk := range bks { - if bk, ok := bk.(map[string]interface{}); ok { - id := queryValue(bk, "key", "") - monitor := monitors[monitorMap[id.(string)]] - monitor["lastNotificationTime"] = queryValue(bk, "last_notification_time.value", "") - monitor["ignored"] = queryValue(bk, "ignored.doc_count", 0) - alertHits := queryValue(bk, "latest_alert.hits.hits", nil) - var latestAlert interface{} - if hits, ok := alertHits.([]interface{}); ok && len(hits) > 0 { - if hitMap, ok := hits[0].(map[string]interface{}); ok { - latestAlert = queryValue(hitMap, "_source.trigger_name", nil) - } + id := queryValue(bk, "key", "") + monitor := monitors[monitorMap[id.(string)]] + monitor["lastNotificationTime"] = queryValue(bk, "last_notification_time.value", "") + monitor["ignored"] = queryValue(bk, "ignored.doc_count", 0) + alertHits := queryValue(bk, "latest_alert.hits.hits", nil) + var latestAlert interface{} + if hits, ok := alertHits.([]interface{}); ok && len(hits) > 0 { + if hitMap, ok := hits[0].(map[string]interface{}); ok { + latestAlert = queryValue(hitMap, "_source.trigger_name", nil) } - monitor["latestAlert"] = latestAlert - monitor["active"] = queryValue(bk, "active.doc_count", 0) - monitor["errors"] = queryValue(bk, "errors.doc_count", 0) - monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0) - monitor["currentTime"] = time.Now().UnixNano() / 1e6 - delete(monitorMap, id.(string)) } + monitor["latestAlert"] = latestAlert + monitor["active"] = queryValue(bk, "active.doc_count", 0) + monitor["errors"] = queryValue(bk, "errors.doc_count", 0) + monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0) + monitor["currentTime"] = time.Now().UnixNano() / 1e6 + delete(monitorMap, id.(string)) } } @@ -368,10 +341,9 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param monitor.LastUpdateTime = time.Now().UnixNano()/1e6 config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{})) - res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ - "refresh": "wait_for", - }, IfaceMap{ + esClient := elastic.GetClient(config.ID) + indexName := orm.GetIndexName(alerting.Config{}) + indexRes, err := esClient.Index(indexName,"",util.GetUUID(),IfaceMap{ "cluster_id": id, MONITOR_FIELD: monitor, }) @@ -380,14 +352,8 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } - monitorId := queryValue(resBody, "_id", "").(string) + + monitorId := indexRes.ID GetScheduler().AddMonitor(monitorId, &ScheduleMonitor{ Monitor: monitor, ClusterID: id, @@ -398,7 +364,7 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param "resp": IfaceMap{ MONITOR_FIELD: monitor, "_id": monitorId, - "_version": queryValue(resBody, "_version", 0), + "_version": indexRes.Version, }, }, http.StatusOK) } @@ -413,9 +379,8 @@ func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param monitorId := ps.ByName("monitorID") config := getDefaultConfig() - + esClient := elastic.GetClient(config.ID) //change alert state to deleted and move alert to history - reqUrl := fmt.Sprintf("%s/_reindex", config.Endpoint) query := IfaceMap{ "bool": IfaceMap{ "must": []IfaceMap{ @@ -437,47 +402,38 @@ func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param "source": fmt.Sprintf("ctx._source['state'] = '%s';", ALERT_DELETED), }, } - _, err := doRequest(reqUrl, http.MethodPost,nil, reqBody) + _, err := esClient.Reindex(util.MustToJSONBytes(reqBody)) if err != nil { writeError(w, err) return } //delete alert - reqUrl = fmt.Sprintf("%s/%s/_delete_by_query", config.Endpoint, getAlertIndexName(INDEX_ALERT)) - _, err = doRequest(reqUrl, http.MethodPost, nil, IfaceMap{ + _, err = esClient.DeleteByQuery(getAlertIndexName(INDEX_ALERT), util.MustToJSONBytes(IfaceMap{ "query" : query, - }) + }) ) + if err != nil { writeError(w, err) return } //logic delete monitor - reqUrl = fmt.Sprintf("%s/%s/_update/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), monitorId) - res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ - "refresh": "wait_for", - }, IfaceMap{ - "script" : IfaceMap{ - "source": "ctx._source.monitor.status = 'DELETED';", - "lang": "painless", - }, - }) + var indexName = orm.GetIndexName(alerting.Config{}) + getRes, err := esClient.Get(indexName, "", monitorId) + if err != nil { + writeError(w, err) + return + } + source := util.MapStr(getRes.Source) + source.Put("monitor.status", "DELETED") + indexRes, err := esClient.Index(indexName, "", monitorId, source) if err != nil { writeError(w, err) return } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } - - resultIfce := queryValue(resBody, "result", "") var isOk = false - if result, ok := resultIfce.(string); ok && result == "updated" { + if indexRes.Result == "updated" { isOk = true GetScheduler().RemoveMonitor(monitorId) } @@ -496,8 +452,6 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param } monitorId := ps.ByName("monitorID") - config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), monitorId) var monitor = &alerting.Monitor{} err := decodeJSON(req.Body, &monitor) @@ -520,20 +474,23 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param } } monitor.LastUpdateTime = time.Now().UnixNano()/1e6 - res, err := doRequest(reqUrl, http.MethodPut, map[string]string{ - "refresh": "wait_for", - }, IfaceMap{ - "cluster_id": id, - MONITOR_FIELD: monitor, - }) + config := getDefaultConfig() + esClient := elastic.GetClient(config.ID) + indexName := orm.GetIndexName(alerting.Config{}) + getRes, err := esClient.Get(indexName, "", monitorId) if err != nil { writeError(w, err) return } + if !getRes.Found { + w.WriteHeader(http.StatusNotFound) + return + } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() + indexRes, err := esClient.Index(indexName, "", monitorId, IfaceMap{ + "cluster_id": getRes.Source["cluster_id"], + MONITOR_FIELD: monitor, + }) if err != nil { writeError(w, err) return @@ -545,8 +502,8 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param }) writeJSON(w, IfaceMap{ "ok": true, - "version": queryValue(resBody, "_version", ""), - "id": queryValue(resBody, "_id", ""), + "version": indexRes.Version, + "id": indexRes.ID, }, http.StatusOK) } @@ -570,7 +527,6 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P } config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_update_by_query", config.Endpoint, getAlertIndexName(INDEX_ALERT)) reqBody := IfaceMap{ "query": IfaceMap{ "bool": IfaceMap{ @@ -591,24 +547,15 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P "source": fmt.Sprintf("ctx._source['state'] = '%s';ctx._source['acknowledged_time'] = %dL;", ALERT_ACKNOWLEDGED, time.Now().UnixNano()/1e6), }, } - res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ - "refresh":"", - }, reqBody) - if err != nil { - writeError(w, err) - return - } - - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() + esClient := elastic.GetClient(config.ID) + res, err := esClient.UpdateByQuery( getAlertIndexName(INDEX_ALERT), util.MustToJSONBytes(reqBody)) if err != nil { writeError(w, err) return } var isOk = false - if failed, ok := resBody["failures"].([]interface{}); ok && len(failed) == 0 { + if len(res.Failures) == 0 { isOk = true } @@ -616,7 +563,7 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P "ok": isOk, "resp": IfaceMap{ "success": ackAlertsReq.AlertIDs, - "failed": []string{}, + "failed": res.Failures, }, }, http.StatusOK) @@ -624,8 +571,8 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.ByName("id") - meta := elastic.GetMetadata(id) - if meta == nil { + esClient := elastic.GetClient(id) + if esClient == nil { writeError(w, errors.New("cluster not found")) return } @@ -648,26 +595,29 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para return } periodStart := time.Now() - reqUrl := fmt.Sprintf("%s/%s/_search", meta.GetActiveEndpoint(), strings.Join(monitor.Inputs[0].Search.Indices, ",")) - res, err := doRequest(reqUrl, http.MethodGet, nil, monitor.Inputs[0].Search.Query) + period := alertUtil.GetMonitorPeriod(periodStart, &monitor.Schedule) + //strQuery := string(util.MustToJSONBytes(monitor.Inputs[0].Search.Query)) + //resolveQuery(strQuery, IfaceMap{ + // + // "periodStart": period.Start, + // "periodEnd": period.End, + //}) + + queryDsl := util.MustToJSONBytes(monitor.Inputs[0].Search.Query) + searchRes, err := esClient.SearchWithRawQueryDSL(strings.Join(monitor.Inputs[0].Search.Indices, ","), queryDsl) if err != nil { writeError(w, err) return } var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - res.Body.Close() - if err != nil { - writeError(w, err) - return - } + util.MustFromJSONBytes(searchRes.RawResult.Body, &resBody) var triggerResults = IfaceMap{} sm := ScheduleMonitor{ Monitor: monitor, } - period := alertUtil.GetMonitorPeriod(periodStart, &monitor.Schedule) + var monitorCtx []byte if dryrun == "true" { for _, trigger := range monitor.Triggers { @@ -676,7 +626,10 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para "action_results": IfaceMap{}, "name": trigger.Name, } - monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{}) + monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{ + "periodStart": period.Start, + "periodEnd": period.End, + }) if err != nil { triggerResult["error"] = err.Error() triggerResults[trigger.ID] = triggerResult @@ -735,3 +688,20 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para }, }, http.StatusOK) } + +func resolveQuery(query string, ctx IfaceMap ) ([]byte, error){ + ctxBytes := util.MustToJSONBytes(ctx) + msg := query + tpl := fasttemplate.New(msg, "{{", "}}") + msgBuffer := bytes.NewBuffer(nil) + _, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ + keyParts := strings.Split(tag,".") + value, _, _, err := jsonparser.Get(ctxBytes, keyParts...) + if err != nil { + return 0, err + } + return writer.Write(value) + }) + return msgBuffer.Bytes(), err + //return json.Marshal(msg) +} \ No newline at end of file diff --git a/service/alerting/overview.go b/service/alerting/overview.go index 5722a9a8..adab0c7f 100644 --- a/service/alerting/overview.go +++ b/service/alerting/overview.go @@ -1,10 +1,10 @@ package alerting import ( - "fmt" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" "net/http" ) @@ -56,11 +56,9 @@ func getAlertByState() (IfaceMap, error){ return nil, err } var metricData = IfaceMap{} - if bks, ok := buckets.([]interface{}); ok { + if bks, ok := buckets.([]elastic.BucketBase); ok { for _, bk := range bks { - if bkm, ok := bk.(map[string]interface{}); ok { - metricData[queryValue(bkm, "key", "").(string)]= queryValue(bkm, "doc_count", 0) - } + metricData[queryValue(bk, "key", "").(string)]= queryValue(bk, "doc_count", 0) } } return metricData, nil @@ -68,18 +66,15 @@ func getAlertByState() (IfaceMap, error){ func queryMetricBuckets(reqBody IfaceMap, metricKey, indexName string)(interface{}, error){ conf := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, getAlertIndexName(indexName)) - res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) + esClient := elastic.GetClient(conf.ID) + res, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(indexName), util.MustToJSONBytes(reqBody)) if err != nil { return nil, err } - result := IfaceMap{} - defer res.Body.Close() - err = decodeJSON(res.Body, &result) - if err != nil { - return nil, err + var buckets interface{} + if agg, ok := res.Aggregations[metricKey]; ok { + buckets = agg.Buckets } - buckets := queryValue(result, fmt.Sprintf("aggregations.%s.buckets", metricKey), []interface{}{}) return buckets, nil } @@ -109,11 +104,10 @@ func getTopTenAlertCluster()(interface{}, error){ } var metricData []IfaceMap var clusterIDs []interface{} - if bks, ok := buckets.([]interface{}); ok { + if bks, ok := buckets.([]elastic.BucketBase); ok { for _, bk := range bks { - if bkm, ok := bk.(map[string]interface{}); ok { - stateBuckets := queryValue(bkm, "group_by_state.buckets", nil ) - key := queryValue(bkm, "key", "" ) + stateBuckets := queryValue(bk, "group_by_state.buckets", nil ) + key := queryValue(bk, "key", "" ) clusterIDs = append(clusterIDs, key) if stateBKS, ok := stateBuckets.([]interface{}); ok{ for _, stateBK := range stateBKS { @@ -127,7 +121,6 @@ func getTopTenAlertCluster()(interface{}, error){ } } } - } } reqBody = IfaceMap{ "_source": "name", @@ -138,17 +131,11 @@ func getTopTenAlertCluster()(interface{}, error){ }, } config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(elastic.ElasticsearchConfig{})) - res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) + esClient := elastic.GetClient(config.ID) + resBody, err := esClient.SearchWithRawQueryDSL( orm.GetIndexName(elastic.ElasticsearchConfig{}), util.MustToJSONBytes(reqBody)) if err != nil { return nil, err } - var resBody = &elastic.SearchResponse{} - err = decodeJSON(res.Body, resBody) - if err != nil { - return nil, err - } - res.Body.Close() clusterMap := IfaceMap{} for _, hit := range resBody.Hits.Hits { clusterMap[hit.ID] = hit.Source["name"] @@ -189,14 +176,12 @@ func getLastAlertDayCount() (interface{}, error){ return nil, err } var metricData []interface{} - if bks, ok := buckets.([]interface{}); ok { + if bks, ok := buckets.([]elastic.BucketBase); ok { for _, bk := range bks { - if bkm, ok := bk.(map[string]interface{}); ok { - metricData = append(metricData, []interface{}{ - queryValue(bkm, "key", ""), - queryValue(bkm, "doc_count", 0), - }) - } + metricData = append(metricData, []interface{}{ + queryValue(bk, "key", ""), + queryValue(bk, "doc_count", 0), + }) } } return metricData, nil diff --git a/service/alerting/schedule.go b/service/alerting/schedule.go index 4e14901a..288112bc 100644 --- a/service/alerting/schedule.go +++ b/service/alerting/schedule.go @@ -12,12 +12,12 @@ import ( "github.com/valyala/fasttemplate" "infini.sh/console/model/alerting" "infini.sh/console/service/alerting/action" - "infini.sh/console/service/alerting/util" + util1 "infini.sh/console/service/alerting/util" "infini.sh/framework/core/conditions" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" "io" - "net/http" "strings" "sync" "time" @@ -154,7 +154,7 @@ func generateMonitorJob(smt *ScheduleMonitor) MonitorJob{ if err != nil { log.Error(err) } - periods := util.GetMonitorPeriod(startTime, &sm.Monitor.Schedule) + periods := util1.GetMonitorPeriod(startTime, &sm.Monitor.Schedule) for _, trigger := range sm.Monitor.Triggers { monitorCtx, err := createMonitorContext(&trigger, queryResult, &sm, IfaceMap{ "periodStart": periods.Start, @@ -274,7 +274,7 @@ func doAction(act alerting.Action, monitorCtx []byte) ([]byte, error) { func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{}, error) { conf := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, getAlertIndexName(INDEX_ALERT)) + esClient := elastic.GetClient(conf.ID) reqBody := IfaceMap{ "size": 1, "query": IfaceMap{ @@ -303,12 +303,8 @@ func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{ {"start_time": IfaceMap{"order":"desc"}}, }, } - res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) - if err != nil { - return nil, err - } - var resBody = &elastic.SearchResponse{} - err = decodeJSON(res.Body, resBody) + + resBody, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(INDEX_ALERT), util.MustToJSONBytes(reqBody)) if err != nil { return nil, err } @@ -320,8 +316,8 @@ func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{ func saveAlertInfo(alertItem *alerting.Alert) error { conf := getDefaultConfig() + esClient := elastic.GetClient(conf.ID) indexName := getAlertIndexName(INDEX_ALERT) - reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, indexName) reqBody := IfaceMap{ "size": 1, "query": IfaceMap{ @@ -352,28 +348,20 @@ func saveAlertInfo(alertItem *alerting.Alert) error { }, } - res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) + resBody, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(reqBody)) if err != nil { return err } - var resBody = elastic.SearchResponse{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - return err - } - res.Body.Close() if len(resBody.Hits.Hits) == 0 { if alertItem.State == ALERT_COMPLETED { return nil } - reqUrl = fmt.Sprintf("%s/%s/_doc", conf.Endpoint, indexName) - _,err = doRequest(reqUrl, http.MethodPost, nil, alertItem) + _, err = esClient.Index(indexName,"", util.GetUUID(), alertItem) return err } currentState := queryValue(resBody.Hits.Hits[0].Source, "state", "").(string) alertItem.Id = resBody.Hits.Hits[0].ID if currentState != alertItem.State { - reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, getAlertIndexName(INDEX_ALERT_HISTORY), alertItem.Id) source := resBody.Hits.Hits[0].Source source["end_time"] = time.Now().UnixNano()/1e6 if alertItem.State == ALERT_COMPLETED { @@ -381,15 +369,13 @@ func saveAlertInfo(alertItem *alerting.Alert) error { source["state"] = ALERT_COMPLETED } } - _,err = doRequest(reqUrl, http.MethodPut, nil, source) - reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, indexName, resBody.Hits.Hits[0].ID) - _,err = doRequest(reqUrl, http.MethodDelete, nil, alertItem) + esClient.Index( getAlertIndexName(INDEX_ALERT_HISTORY), "", alertItem.Id, source) + _,err = esClient.Delete(indexName, "", resBody.Hits.Hits[0].ID) return err } alertItem.StartTime = int64(queryValue(resBody.Hits.Hits[0].Source, "start_time", 0).(float64)) - reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, indexName, alertItem.Id) - _,err = doRequest(reqUrl, http.MethodPut, nil, alertItem) + _, err = esClient.Index(indexName, "", alertItem.Id, alertItem ) return err } @@ -416,74 +402,55 @@ func getEmailRecipient(recipients []alerting.Recipient) ([]string, error){ } func resolveDestination(ID string)(*alerting.Destination, error){ - //todo may be cache destination ? conf := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID) - res, err := doRequest(reqUrl, http.MethodGet, nil, nil) + esClient := elastic.GetClient(conf.ID) + res, err := esClient.Get( orm.GetIndexName(alerting.Config{}), "", ID) if err != nil { return nil,err } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - return nil, err - } - res.Body.Close() destination := &alerting.Destination{} - buf, _ := json.Marshal(queryValue(resBody, "_source."+DESTINATION_FIELD, IfaceMap{})) + buf, _ := json.Marshal(queryValue(res.Source, DESTINATION_FIELD, IfaceMap{})) _ = json.Unmarshal(buf, destination) return destination, nil } func resolveEmailAccount(ID string)(*alerting.EmailAccount, error){ conf := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID) - res, err := doRequest(reqUrl, http.MethodGet, nil, nil) + esClient := elastic.GetClient(conf.ID) + res, err := esClient.Get( orm.GetIndexName(alerting.Config{}), "", ID) if err != nil { return nil,err } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - return nil, err - } - res.Body.Close() email := &alerting.EmailAccount{} - buf, _ := json.Marshal(queryValue(resBody, "_source."+EMAIL_ACCOUNT_FIELD, IfaceMap{})) + buf, _ := json.Marshal(queryValue(res.Source, EMAIL_ACCOUNT_FIELD, IfaceMap{})) _ = json.Unmarshal(buf, email) return email, nil } func resolveEmailGroup(ID string)(*alerting.EmailGroup, error){ conf := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID) - res, err := doRequest(reqUrl, http.MethodGet, nil, nil) + esClient := elastic.GetClient(conf.ID) + res, err := esClient.Get( orm.GetIndexName(alerting.Config{}), "", ID) if err != nil { return nil,err } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - return nil, err - } - res.Body.Close() + emailGroup := &alerting.EmailGroup{} - buf, _ := json.Marshal(queryValue(resBody, "_source."+EMAIL_GROUP_FIELD, IfaceMap{})) - _ = json.Unmarshal(buf, emailGroup) - return emailGroup, nil + buf, _ := json.Marshal(queryValue(res.Source, EMAIL_GROUP_FIELD, IfaceMap{})) + err = json.Unmarshal(buf, emailGroup) + return emailGroup, err } func getQueryResult(clusterID string, input *alerting.MonitorInput) (IfaceMap, error) { - meta := elastic.GetMetadata(clusterID) - reqUrl := fmt.Sprintf("%s/%s/_search", meta.GetActiveEndpoint(), strings.Join(input.Search.Indices, ",")) - res, err := doRequest(reqUrl, http.MethodGet, nil, input.Search.Query) + esClient := elastic.GetClient(clusterID) + queryDsl := util.MustToJSONBytes(input.Search.Query) + searchRes, err := esClient.SearchWithRawQueryDSL(strings.Join(input.Search.Indices, ","), queryDsl) if err != nil { return nil, err } - defer res.Body.Close() - resBody := IfaceMap{} - err = decodeJSON(res.Body, &resBody) - return resBody, err + var resBody = IfaceMap{} + util.MustFromJSONBytes(searchRes.RawResult.Body, &resBody) + return resBody, nil } func resolveMessage(messageTemplate IfaceMap, monitorCtx []byte ) ([]byte, error){ @@ -544,7 +511,7 @@ func resolveTriggerResult(trigger *alerting.Trigger, monitorCtx []byte ) (bool, func getEnabledMonitors() (map[string]ScheduleMonitor, error){ config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{})) + esClient := elastic.GetClient(config.ID) must := []IfaceMap{ { "exists": IfaceMap{ @@ -562,15 +529,18 @@ func getEnabledMonitors() (map[string]ScheduleMonitor, error){ "query": IfaceMap{ "bool": IfaceMap{ "must": must, + "must_not": []IfaceMap{ + { + "match": IfaceMap{ + MONITOR_FIELD+".status": "DELETED", + }, + }, + }, }, }, } - resBody := elastic.SearchResponse{} - res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) - if err != nil { - return nil, err - } - err = decodeJSON(res.Body, &resBody) + queryDsl := util.MustToJSONBytes(reqBody) + resBody, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(alerting.Config{}),queryDsl) if err != nil { return nil, err }