change request es using elastic client

This commit is contained in:
liugq 2022-03-11 16:05:03 +08:00
parent 24e1c81d51
commit 8f44364441
7 changed files with 312 additions and 571 deletions

View File

@ -14,7 +14,6 @@ import (
"infini.sh/framework/core/env"
"infini.sh/framework/core/module"
"infini.sh/framework/core/orm"
_ "infini.sh/framework/plugins"
queue2 "infini.sh/framework/modules/disk_queue"
elastic2 "infini.sh/framework/modules/elastic"
"infini.sh/framework/modules/filter"
@ -24,6 +23,7 @@ import (
"infini.sh/framework/modules/stats"
"infini.sh/framework/modules/task"
"infini.sh/framework/modules/ui"
_ "infini.sh/framework/plugins"
api2 "infini.sh/gateway/api"
_ "infini.sh/gateway/proxy"
)

View File

@ -6,9 +6,11 @@ import (
"encoding/json"
"errors"
"fmt"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/orm"
"infini.sh/console/model/alerting"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"io"
"net/http"
"net/url"
@ -130,39 +132,22 @@ func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){
indexName := getAlertIndexName(alertType)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, indexName )
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
writeError(w, err)
return
}
var alertRes = IfaceMap{}
err = decodeJSON(res.Body, &alertRes)
defer res.Body.Close()
esClient := elastic.GetClient(config.ID)
res, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(reqBody))
if err != nil {
writeError(w, err)
return
}
alerts := []interface{}{}
rawAlerts := queryValue(alertRes, "hits.hits", nil)
if ds, ok := rawAlerts.([]interface{}); ok {
for _, alert := range ds {
if alertItem, ok := alert.(map[string]interface{}); ok {
//alertItem["version"] = queryValue(alertItem, "alert_version", "")
if alertID, ok := queryValue(alertItem, "_source.id", "").(string); ok && alertID == "" {
if source, ok := alertItem["_source"].(map[string]interface{}); ok {
source["id"] = alertItem["_id"]
}
}
alerts = append(alerts, alertItem["_source"])
}
}
rawAlerts := res.Hits.Hits
for _, alert := range rawAlerts {
alert.Source["id"] = alert.ID
alerts = append(alerts, alert.Source)
}
writeJSON(w, IfaceMap{
"ok": true,
"alerts": alerts,
"totalAlerts": queryValue(alertRes, "hits.total.value", 0),
"totalAlerts": res.GetTotal(),
}, http.StatusOK)
}
@ -235,7 +220,7 @@ func doRequest(requestUrl string, method string, params map[string]string, body
}
req, _ = http.NewRequest(method, requestUrl, reader)
req.Header.Set("content-type", "application/json")
req.Header.Set("User-Agent", "Kibana")
req.Header.Set("User-Agent", "infini-client")
return alertClient.Do(req)
}

View File

@ -99,38 +99,26 @@ func GetDestinations(w http.ResponseWriter, req *http.Request, ps httprouter.Par
},
}
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}))
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
esConfig := getDefaultConfig()
esClient := elastic.GetClient(esConfig.ID)
res, err := esClient.SearchWithRawQueryDSL( orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(reqBody))
if err != nil {
writeError(w, err)
return
}
totalDestinations := queryValue(resBody, "hits.total.value", 0)
rawHits := queryValue(resBody, "hits.hits", []interface{}{})
totalDestinations := res.GetTotal()
rawHits := res.Hits.Hits
dests := []IfaceMap{}
if rh, ok := rawHits.([]interface{}); ok {
for _, hit := range rh {
if destination, ok := hit.(map[string]interface{}); ok {
newItem := IfaceMap{}
newItem["id"] = queryValue(destination, "_id", "")
source := queryValue(destination, "_source."+DESTINATION_FIELD, nil)
if ms, ok := source.(map[string]interface{}); ok {
assignTo(newItem, ms)
}
dests = append(dests, newItem)
}
for _, hit := range rawHits {
newItem := IfaceMap{}
newItem["id"] = hit.ID
source := queryValue(hit.Source,DESTINATION_FIELD, nil)
if ms, ok := source.(map[string]interface{}); ok {
assignTo(newItem, ms)
}
dests = append(dests, newItem)
}
writeJSON(w, IfaceMap{
@ -141,9 +129,8 @@ func GetDestinations(w http.ResponseWriter, req *http.Request, ps httprouter.Par
}
func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
config := getDefaultConfig()
esConfig := getDefaultConfig()
destId := util.GetUUID()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destId)
var destination = &alerting.Destination{}
err := decodeJSON(req.Body, &destination)
if err != nil {
@ -167,9 +154,8 @@ func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
writeError(w, errors.New("type unsupported"))
return
}
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
esClient := elastic.GetClient(esConfig.ID)
indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", destId, IfaceMap{
DESTINATION_FIELD: toSaveDest,
})
if err != nil {
@ -177,20 +163,13 @@ func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
writeJSON(w, IfaceMap{
"ok": true,
"resp": IfaceMap{
DESTINATION_FIELD: toSaveDest,
"_id": queryValue(resBody, "_id", ""),
"_version": queryValue(resBody, "_version", 0),
"_id": indexRes.ID,
"_version": indexRes.Version,
},
}, http.StatusOK)
@ -200,7 +179,6 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
destinationId := ps.ByName("destinationId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId)
var destination = &alerting.Destination{}
err := decodeJSON(req.Body, &destination)
if err != nil {
@ -224,9 +202,8 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
writeError(w, errors.New("type unsupported"))
return
}
res, err := doRequest(reqUrl, http.MethodPut, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
esClient := elastic.GetClient(config.ID)
indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", destinationId, IfaceMap{
DESTINATION_FIELD: toSaveDest,
})
if err != nil {
@ -234,17 +211,10 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
writeJSON(w, IfaceMap{
"ok": true,
"version": queryValue(resBody, "_version", ""),
"id": queryValue(resBody, "_id", ""),
"version": indexRes.Version,
"id": indexRes.ID,
}, http.StatusOK)
}
@ -253,24 +223,15 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
destinationId := ps.ByName("destinationId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s?refresh=wait_for", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId)
res, err := doRequest(reqUrl, http.MethodDelete, nil, nil)
esClient := elastic.GetClient(config.ID)
deleteRes, err := esClient.Delete(orm.GetIndexName(alerting.Config{}), "", destinationId)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
resultIfce := queryValue(resBody, "result", "")
var isOk = false
if result, ok := resultIfce.(string); ok && result == "deleted" {
if deleteRes.Result == "deleted" {
isOk = true
}
writeJSON(w, IfaceMap{
@ -312,7 +273,6 @@ func InitAppConfig(config *config.AppConfig){
func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{}))
var emailAccount = &alerting.EmailAccount{}
err := decodeJSON(req.Body, &emailAccount)
if err != nil {
@ -321,9 +281,8 @@ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
}
//var password = emailAccount.Password
//emailAccount.Password = ""
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
esClient := elastic.GetClient(config.ID)
indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", util.GetUUID(), IfaceMap{
EMAIL_ACCOUNT_FIELD: emailAccount,
})
if err != nil {
@ -331,14 +290,6 @@ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
//kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", emailAccount.Name)
//secStr, _ := getKeystore().Retrieve(kk)
//kst := getKeystore()
@ -357,8 +308,8 @@ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
"ok": true,
"resp": IfaceMap{
EMAIL_ACCOUNT_FIELD: emailAccount,
"_id": queryValue(resBody, "_id", ""),
"_version": queryValue(resBody, "_version", 0),
"_id": indexRes.ID,
"_version": indexRes.Version,
},
}, http.StatusOK)
}
@ -367,17 +318,14 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
emailAccountId := ps.ByName("emailAccountId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId)
var emailAccount = &alerting.EmailAccount{}
err := decodeJSON(req.Body, &emailAccount)
if err != nil {
writeError(w, err)
return
}
res, err := doRequest(reqUrl, http.MethodPut, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
esClient := elastic.GetClient(config.ID)
indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}),"", emailAccountId, IfaceMap{
EMAIL_ACCOUNT_FIELD: emailAccount,
})
if err != nil {
@ -385,17 +333,9 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
writeJSON(w, IfaceMap{
"ok": true,
"id": queryValue(resBody, "_id", ""),
"id": indexRes.ID,
}, http.StatusOK)
}
@ -403,28 +343,14 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
func DeleteEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
emailAccountId := ps.ByName("emailAccountId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId)
res, err := doRequest(reqUrl, http.MethodDelete, map[string]string{
"refresh": "wait_for",
}, nil)
esClient := elastic.GetClient(config.ID)
deleteRes, err := esClient.Delete(orm.GetIndexName(alerting.Config{}), "", emailAccountId, "wait_for")
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
resultIfce := queryValue(resBody, "result", "")
var isOk = false
if result, ok := resultIfce.(string); ok && result == "deleted" {
if deleteRes.Result == "deleted" {
isOk = true
}
writeJSON(w, IfaceMap{
@ -477,37 +403,25 @@ func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
}
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}))
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
esClient := elastic.GetClient(config.ID)
searchRes, err := esClient.SearchWithRawQueryDSL( orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(reqBody))
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
writeError(w, err)
return
}
totalEmailAccounts := queryValue(resBody, "hits.total.value", 0)
rawHits := queryValue(resBody, "hits.hits", []interface{}{})
totalEmailAccounts := searchRes.GetTotal()
rawHits := searchRes.Hits.Hits
emailAccounts := []IfaceMap{}
if rh, ok := rawHits.([]interface{}); ok {
for _, hit := range rh {
if emailAccount, ok := hit.(map[string]interface{}); ok {
newItem := IfaceMap{}
newItem["id"] = queryValue(emailAccount, "_id", "")
source := queryValue(emailAccount, "_source."+EMAIL_ACCOUNT_FIELD, nil)
if ms, ok := source.(map[string]interface{}); ok {
assignTo(newItem, ms)
}
emailAccounts = append(emailAccounts, newItem)
}
for _, hit := range rawHits {
newItem := IfaceMap{}
newItem["id"] = hit.ID
source := queryValue(hit.Source, EMAIL_ACCOUNT_FIELD, nil)
if ms, ok := source.(map[string]interface{}); ok {
assignTo(newItem, ms)
}
emailAccounts = append(emailAccounts, newItem)
}
writeJSON(w, IfaceMap{
"ok": true,
@ -520,22 +434,13 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par
emailAccountId := ps.ByName("emailAccountId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId)
res, err := doRequest(reqUrl, http.MethodGet,nil, nil)
esClient := elastic.GetClient(config.ID)
getRes, err := esClient.Get(orm.GetIndexName(alerting.Config{}), "", emailAccountId)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
//name := queryValue(resBody,"_source.email_account.name", "")
//kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", name)
//secStr, _ := getKeystore().Retrieve(kk)
@ -546,7 +451,7 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par
writeJSON(w, IfaceMap{
"ok": true,
"resp": queryValue(resBody, "_source.email_account", nil),
"resp": queryValue(getRes.Source, "email_account", nil),
}, http.StatusOK)
}
@ -555,16 +460,14 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par
func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{}))
var emailGroup = &alerting.EmailGroup{}
err := decodeJSON(req.Body, &emailGroup)
if err != nil {
writeError(w, err)
return
}
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
esClient := elastic.GetClient(config.ID)
indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", util.GetUUID(), IfaceMap{
EMAIL_GROUP_FIELD: emailGroup,
})
if err != nil {
@ -572,14 +475,6 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
writeJSON(w, IfaceMap{
"ok": true,
"resp": IfaceMap{
@ -588,8 +483,8 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
"name": emailGroup.Name,
"schema_version": emailGroup.SchemaVersion,
},
"_id": queryValue(resBody, "_id", ""),
"_version": queryValue(resBody, "_version", 0),
"_id": indexRes.ID,
"_version": indexRes.Version,
},
}, http.StatusOK)
}
@ -597,16 +492,14 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
emailGroupId := ps.ByName("emailGroupId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId)
var emailGroup = &alerting.EmailGroup{}
err := decodeJSON(req.Body, &emailGroup)
if err != nil {
writeError(w, err)
return
}
res, err := doRequest(reqUrl, http.MethodPut, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
esClient := elastic.GetClient(config.ID)
indexRes, err := esClient.Index( orm.GetIndexName(alerting.Config{}), "", emailGroupId, IfaceMap{
EMAIL_GROUP_FIELD: emailGroup,
})
if err != nil {
@ -614,16 +507,9 @@ func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
writeJSON(w, IfaceMap{
"ok": true,
"id": queryValue(resBody, "_id", ""),
"id": indexRes.ID,
}, http.StatusOK)
}
@ -631,25 +517,15 @@ func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
func DeleteEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
emailGroupId := ps.ByName("emailGroupId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId)
res, err := doRequest(reqUrl, http.MethodDelete, nil, nil)
esClient := elastic.GetClient(config.ID)
res, err := esClient.Delete(orm.GetIndexName(alerting.Config{}), "", emailGroupId)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
resultIfce := queryValue(resBody, "result", "")
var isOk = false
if result, ok := resultIfce.(string); ok && result == "deleted" {
if res.Result == "deleted" {
isOk = true
}
writeJSON(w, IfaceMap{
@ -703,36 +579,25 @@ func GetEmailGroups(w http.ResponseWriter, req *http.Request, ps httprouter.Para
}
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}))
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
esClient := elastic.GetClient(config.ID)
res, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(reqBody))
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
writeError(w, err)
return
}
totalEmailGroups := queryValue(resBody, "hits.total.value", 0)
rawHits := queryValue(resBody, "hits.hits", []interface{}{})
totalEmailGroups := res.GetTotal()
rawHits := res.Hits.Hits
emailGroups := []IfaceMap{}
if rh, ok := rawHits.([]interface{}); ok {
for _, hit := range rh {
if emailGroup, ok := hit.(map[string]interface{}); ok {
newItem := IfaceMap{}
newItem["id"] = queryValue(emailGroup, "_id", "")
source := queryValue(emailGroup, "_source."+EMAIL_GROUP_FIELD, nil)
if ms, ok := source.(map[string]interface{}); ok {
assignTo(newItem, ms)
}
emailGroups = append(emailGroups, newItem)
}
for _, hit := range rawHits {
newItem := IfaceMap{}
newItem["id"] = hit.ID
source := queryValue(hit.Source, EMAIL_GROUP_FIELD, nil)
if ms, ok := source.(map[string]interface{}); ok {
assignTo(newItem, ms)
}
emailGroups = append(emailGroups, newItem)
}
writeJSON(w, IfaceMap{
"ok": true,
@ -745,21 +610,14 @@ func GetEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Param
emailGroupId := ps.ByName("emailGroupId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId)
res, err := doRequest(reqUrl, http.MethodGet,nil, req.Body)
esClient := elastic.GetClient(config.ID)
getRes, err := esClient.Get(orm.GetIndexName(alerting.Config{}), "", emailGroupId)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
emailGroup := queryValue(resBody, "_source."+EMAIL_GROUP_FIELD, nil)
emailGroup := queryValue(getRes.Source, EMAIL_GROUP_FIELD, nil)
if emailGroup == nil {
writeJSON(w, IfaceMap{
"ok": false,

View File

@ -5,6 +5,7 @@ import (
"fmt"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/util"
"net/http"
"runtime/debug"
"strings"
@ -24,17 +25,9 @@ func Search(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
return
}
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, body.Index)
esClient := elastic.GetClient(config.ID)
body.Query["size"] = body.Size
res, err := doRequest(reqUrl, http.MethodPost, nil, body.Query)
if err != nil {
writeError(w, err)
return
}
defer res.Body.Close()
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
searchRes, err := esClient.SearchWithRawQueryDSL(body.Index, util.MustToJSONBytes(body.Query))
if err != nil {
writeError(w, err)
return
@ -42,15 +35,15 @@ func Search(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody,
"resp": searchRes,
}, http.StatusOK)
}
func GetIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.ByName("id")
meta := elastic.GetMetadata(id)
if meta == nil {
esClient := elastic.GetClient(id)
if esClient == nil {
writeError(w, errors.New("cluster not found"))
return
}
@ -63,27 +56,19 @@ func GetIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
writeError(w, err)
return
}
reqUrl := fmt.Sprintf("%s/_cat/indices/%s", meta.GetActiveEndpoint(), body.Index)
params := map[string]string{
"format": "json",
"h": "health,index,status",
}
res, err := doRequest(reqUrl, http.MethodGet, params, nil)
if err != nil {
writeError(w, err)
return
}
defer res.Body.Close()
var resBody = []IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
writeError(w, err)
return
}
indexInfos, err := esClient.GetIndices( body.Index)
if err != nil {
writeError(w, err)
return
}
indices := make([]elastic.IndexInfo, 0, len(*indexInfos))
for _, info := range *indexInfos {
indices = append(indices, info)
}
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody,
"resp": indices,
}, http.StatusOK)
}
@ -94,8 +79,8 @@ func GetAliases(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
}
}()
id := ps.ByName("id")
meta := elastic.GetMetadata(id)
if meta == nil {
esClient := elastic.GetClient(id)
if esClient == nil {
writeError(w, errors.New("cluster not found"))
return
}
@ -108,34 +93,30 @@ func GetAliases(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
writeError(w, err)
return
}
reqUrl := fmt.Sprintf("%s/_cat/aliases/%s", meta.GetActiveEndpoint(), body.Alias)
params := map[string]string{
"format": "json",
"h": "alias,index",
}
res, err := doRequest(reqUrl, http.MethodGet, params, nil)
//reqUrl := fmt.Sprintf("%s/_cat/aliases/%s", meta.GetActiveEndpoint(), body.Alias)
//params := map[string]string{
// "format": "json",
// "h": "alias,index",
//}
res, err := esClient.GetAliases()
if err != nil {
writeError(w, err)
return
}
defer res.Body.Close()
var resBody = []IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
writeError(w, err)
return
aliases := make([]elastic.AliasInfo, 0, len(*res))
for _, alias := range *res {
aliases =append(aliases, alias)
}
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody,
"resp": aliases,
}, http.StatusOK)
}
func GetMappings(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.ByName("id")
meta := elastic.GetMetadata(id)
if meta == nil {
esClient := elastic.GetClient(id)
if esClient == nil {
writeError(w, errors.New("cluster not found"))
return
}
@ -148,15 +129,7 @@ func GetMappings(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
writeError(w, err)
return
}
reqUrl := fmt.Sprintf("%s/%s/_mapping", meta.GetActiveEndpoint(), strings.Join(body.Index, ","))
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
if err != nil {
writeError(w, err)
return
}
defer res.Body.Close()
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
_, _, mappings, err := esClient.GetMapping(false, strings.Join(body.Index, ","))
if err != nil {
writeError(w, err)
return
@ -164,7 +137,7 @@ func GetMappings(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody,
"resp": mappings,
}, http.StatusOK)
}

View File

@ -1,6 +1,7 @@
package alerting
import (
"bytes"
"errors"
"fmt"
"infini.sh/console/model/alerting"
@ -9,7 +10,10 @@ import (
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"io"
"net/http"
"src/github.com/buger/jsonparser"
"src/github.com/valyala/fasttemplate"
"strconv"
"strings"
"time"
@ -25,21 +29,14 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
mid := ps.ByName("monitorID")
config := getDefaultConfig()
esClient := elastic.GetClient(config.ID)
res, err := esClient.Get(orm.GetIndexName(alerting.Config{}), "", mid)
if err != nil {
writeError(w, err)
return
}
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), mid)
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
writeError(w, err)
return
}
res.Body.Close()
if found, ok := resBody["found"].(bool); ok && !found {
if !res.Found {
writeError(w, errors.New("monitor not found"))
return
}
@ -70,47 +67,40 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
}
}`
queryDSL = fmt.Sprintf(queryDSL, mid)
reqUrl = fmt.Sprintf("%s/%s/_search", config.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS))
res, err = doRequest(reqUrl, http.MethodPost,nil, queryDSL)
searchRes, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(INDEX_ALL_ALERTS), []byte(queryDSL))
if err != nil {
writeError(w, err)
return
}
var searchResBody = IfaceMap{}
err = decodeJSON(res.Body, &searchResBody)
if err != nil {
writeError(w, err)
return
var dayCountBuckets interface{}
if agg, ok := searchRes.Aggregations["24_hour_count"]; ok {
dayCountBuckets = agg.Buckets
}
dayCountBuckets := queryValue(searchResBody, "aggregations.24_hour_count.buckets", 0)
dayCount := 0
if dcb, ok := dayCountBuckets.([]interface{}); ok {
if dayAgg, ok := dcb[0].(map[string]interface{}); ok {
dayCount = int(dayAgg["doc_count"].(float64))
}
if dcb, ok := dayCountBuckets.([]elastic.BucketBase); ok {
dayCount = int(dcb[0]["doc_count"].(float64))
}
activeBuckets := queryValue(searchResBody, "aggregations.active_count.buckets",[]interface{}{})
var activeBuckets interface{}
if agg, ok := searchRes.Aggregations["active_count"]; ok {
activeBuckets = agg.Buckets
}
activeCount := 0
if ab, ok := activeBuckets.([]interface{}); ok {
if ab, ok := activeBuckets.([]elastic.BucketBase); ok {
for _, bk := range ab {
if curr, ok := bk.(map[string]interface{}); ok {
if curr["key"].(string) == "ACTIVE" {
activeCount = int(curr["doc_count"].(float64))
break
}
if bk["key"].(string) == "ACTIVE" {
activeCount = int(bk["doc_count"].(float64))
break
}
}
}
monitor := queryValue(resBody, "_source.monitor", nil)
monitor := queryValue(res.Source, "monitor", nil)
writeJSON(w, IfaceMap{
"ok": true,
"resp": monitor,
"activeCount": activeCount,
"dayCount": dayCount,
"version": queryValue(resBody, "_version", nil),
"version": res.Version,
}, http.StatusOK)
}
@ -199,44 +189,33 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
}
assignTo(params, sortPageData)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}) )
res, err := doRequest(reqUrl, http.MethodGet, nil, params)
esClient := elastic.GetClient(config.ID)
resBody, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(params))
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
writeError(w, err)
return
}
totalMonitors := queryValue(resBody, "hits.total.value", 0)
totalMonitors := resBody.GetTotal()
var monitors []IfaceMap
var hits = queryValue(resBody, "hits.hits", []IfaceMap{})
var hits = resBody.Hits.Hits
monitorIDs := []interface{}{}
monitorMap := map[string]int{}
if hitsArr, ok := hits.([]interface{}); ok {
for i, hitIface := range hitsArr {
if hit, ok := hitIface.(map[string]interface{}); ok {
id := queryValue(hit, "_id", "")
monitorIDs = append(monitorIDs, id)
monitor := queryValue(hit, "_source.monitor", IfaceMap{}).(map[string]interface{})
monitorMap[id.(string)] = i
monitors = append(monitors, IfaceMap{
"id": id,
"version": queryValue(hit, "_version", ""),
"name": queryValue(monitor, "name", ""),
"enabled": queryValue(monitor, "enabled", false),
"monitor": monitor,
})
}
}
for i, hit := range hits {
monitorIDs = append(monitorIDs, hit.ID)
monitor := hit.Source["monitor"].(map[string]interface{})
monitorMap[hit.ID] = i
monitors = append(monitors, IfaceMap{
"id": hit.ID,
//"version": hit.,
"name": monitor["name"],
"enabled": monitor["enabled"],
"monitor": monitor,
})
}
aggsOrderData := IfaceMap{}
aggsSorts := IfaceMap{
"active": "active",
@ -288,41 +267,35 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
},
}
reqUrl = fmt.Sprintf("%s/%s/_search", config.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS))
searchRes, err := doRequest(reqUrl, http.MethodPost, nil, aggsParams)
searchRes, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(INDEX_ALL_ALERTS), util.MustToJSONBytes(aggsParams))
if err != nil {
writeError(w, err)
return
}
var searchResBody = IfaceMap{}
err = decodeJSON(searchRes.Body, &searchResBody)
if err != nil {
writeError(w, err)
return
var buckets interface{}
if agg, ok := searchRes.Aggregations["uniq_monitor_ids"]; ok {
buckets = agg.Buckets
}
buckets := queryValue(searchResBody, "aggregations.uniq_monitor_ids.buckets",[]IfaceMap{})
if bks, ok := buckets.([]interface{}); ok {
if bks, ok := buckets.([]elastic.BucketBase); ok {
for _, bk := range bks {
if bk, ok := bk.(map[string]interface{}); ok {
id := queryValue(bk, "key", "")
monitor := monitors[monitorMap[id.(string)]]
monitor["lastNotificationTime"] = queryValue(bk, "last_notification_time.value", "")
monitor["ignored"] = queryValue(bk, "ignored.doc_count", 0)
alertHits := queryValue(bk, "latest_alert.hits.hits", nil)
var latestAlert interface{}
if hits, ok := alertHits.([]interface{}); ok && len(hits) > 0 {
if hitMap, ok := hits[0].(map[string]interface{}); ok {
latestAlert = queryValue(hitMap, "_source.trigger_name", nil)
}
id := queryValue(bk, "key", "")
monitor := monitors[monitorMap[id.(string)]]
monitor["lastNotificationTime"] = queryValue(bk, "last_notification_time.value", "")
monitor["ignored"] = queryValue(bk, "ignored.doc_count", 0)
alertHits := queryValue(bk, "latest_alert.hits.hits", nil)
var latestAlert interface{}
if hits, ok := alertHits.([]interface{}); ok && len(hits) > 0 {
if hitMap, ok := hits[0].(map[string]interface{}); ok {
latestAlert = queryValue(hitMap, "_source.trigger_name", nil)
}
monitor["latestAlert"] = latestAlert
monitor["active"] = queryValue(bk, "active.doc_count", 0)
monitor["errors"] = queryValue(bk, "errors.doc_count", 0)
monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0)
monitor["currentTime"] = time.Now().UnixNano() / 1e6
delete(monitorMap, id.(string))
}
monitor["latestAlert"] = latestAlert
monitor["active"] = queryValue(bk, "active.doc_count", 0)
monitor["errors"] = queryValue(bk, "errors.doc_count", 0)
monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0)
monitor["currentTime"] = time.Now().UnixNano() / 1e6
delete(monitorMap, id.(string))
}
}
@ -368,10 +341,9 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
monitor.LastUpdateTime = time.Now().UnixNano()/1e6
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{}))
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
esClient := elastic.GetClient(config.ID)
indexName := orm.GetIndexName(alerting.Config{})
indexRes, err := esClient.Index(indexName,"",util.GetUUID(),IfaceMap{
"cluster_id": id,
MONITOR_FIELD: monitor,
})
@ -380,14 +352,8 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
monitorId := queryValue(resBody, "_id", "").(string)
monitorId := indexRes.ID
GetScheduler().AddMonitor(monitorId, &ScheduleMonitor{
Monitor: monitor,
ClusterID: id,
@ -398,7 +364,7 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
"resp": IfaceMap{
MONITOR_FIELD: monitor,
"_id": monitorId,
"_version": queryValue(resBody, "_version", 0),
"_version": indexRes.Version,
},
}, http.StatusOK)
}
@ -413,9 +379,8 @@ func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
monitorId := ps.ByName("monitorID")
config := getDefaultConfig()
esClient := elastic.GetClient(config.ID)
//change alert state to deleted and move alert to history
reqUrl := fmt.Sprintf("%s/_reindex", config.Endpoint)
query := IfaceMap{
"bool": IfaceMap{
"must": []IfaceMap{
@ -437,47 +402,38 @@ func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
"source": fmt.Sprintf("ctx._source['state'] = '%s';", ALERT_DELETED),
},
}
_, err := doRequest(reqUrl, http.MethodPost,nil, reqBody)
_, err := esClient.Reindex(util.MustToJSONBytes(reqBody))
if err != nil {
writeError(w, err)
return
}
//delete alert
reqUrl = fmt.Sprintf("%s/%s/_delete_by_query", config.Endpoint, getAlertIndexName(INDEX_ALERT))
_, err = doRequest(reqUrl, http.MethodPost, nil, IfaceMap{
_, err = esClient.DeleteByQuery(getAlertIndexName(INDEX_ALERT), util.MustToJSONBytes(IfaceMap{
"query" : query,
})
}) )
if err != nil {
writeError(w, err)
return
}
//logic delete monitor
reqUrl = fmt.Sprintf("%s/%s/_update/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), monitorId)
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
"script" : IfaceMap{
"source": "ctx._source.monitor.status = 'DELETED';",
"lang": "painless",
},
})
var indexName = orm.GetIndexName(alerting.Config{})
getRes, err := esClient.Get(indexName, "", monitorId)
if err != nil {
writeError(w, err)
return
}
source := util.MapStr(getRes.Source)
source.Put("monitor.status", "DELETED")
indexRes, err := esClient.Index(indexName, "", monitorId, source)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
resultIfce := queryValue(resBody, "result", "")
var isOk = false
if result, ok := resultIfce.(string); ok && result == "updated" {
if indexRes.Result == "updated" {
isOk = true
GetScheduler().RemoveMonitor(monitorId)
}
@ -496,8 +452,6 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
}
monitorId := ps.ByName("monitorID")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), monitorId)
var monitor = &alerting.Monitor{}
err := decodeJSON(req.Body, &monitor)
@ -520,20 +474,23 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
}
}
monitor.LastUpdateTime = time.Now().UnixNano()/1e6
res, err := doRequest(reqUrl, http.MethodPut, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
"cluster_id": id,
MONITOR_FIELD: monitor,
})
config := getDefaultConfig()
esClient := elastic.GetClient(config.ID)
indexName := orm.GetIndexName(alerting.Config{})
getRes, err := esClient.Get(indexName, "", monitorId)
if err != nil {
writeError(w, err)
return
}
if !getRes.Found {
w.WriteHeader(http.StatusNotFound)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
indexRes, err := esClient.Index(indexName, "", monitorId, IfaceMap{
"cluster_id": getRes.Source["cluster_id"],
MONITOR_FIELD: monitor,
})
if err != nil {
writeError(w, err)
return
@ -545,8 +502,8 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
})
writeJSON(w, IfaceMap{
"ok": true,
"version": queryValue(resBody, "_version", ""),
"id": queryValue(resBody, "_id", ""),
"version": indexRes.Version,
"id": indexRes.ID,
}, http.StatusOK)
}
@ -570,7 +527,6 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P
}
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_update_by_query", config.Endpoint, getAlertIndexName(INDEX_ALERT))
reqBody := IfaceMap{
"query": IfaceMap{
"bool": IfaceMap{
@ -591,24 +547,15 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P
"source": fmt.Sprintf("ctx._source['state'] = '%s';ctx._source['acknowledged_time'] = %dL;", ALERT_ACKNOWLEDGED, time.Now().UnixNano()/1e6),
},
}
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh":"",
}, reqBody)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
esClient := elastic.GetClient(config.ID)
res, err := esClient.UpdateByQuery( getAlertIndexName(INDEX_ALERT), util.MustToJSONBytes(reqBody))
if err != nil {
writeError(w, err)
return
}
var isOk = false
if failed, ok := resBody["failures"].([]interface{}); ok && len(failed) == 0 {
if len(res.Failures) == 0 {
isOk = true
}
@ -616,7 +563,7 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P
"ok": isOk,
"resp": IfaceMap{
"success": ackAlertsReq.AlertIDs,
"failed": []string{},
"failed": res.Failures,
},
}, http.StatusOK)
@ -624,8 +571,8 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P
func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.ByName("id")
meta := elastic.GetMetadata(id)
if meta == nil {
esClient := elastic.GetClient(id)
if esClient == nil {
writeError(w, errors.New("cluster not found"))
return
}
@ -648,26 +595,29 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para
return
}
periodStart := time.Now()
reqUrl := fmt.Sprintf("%s/%s/_search", meta.GetActiveEndpoint(), strings.Join(monitor.Inputs[0].Search.Indices, ","))
res, err := doRequest(reqUrl, http.MethodGet, nil, monitor.Inputs[0].Search.Query)
period := alertUtil.GetMonitorPeriod(periodStart, &monitor.Schedule)
//strQuery := string(util.MustToJSONBytes(monitor.Inputs[0].Search.Query))
//resolveQuery(strQuery, IfaceMap{
//
// "periodStart": period.Start,
// "periodEnd": period.End,
//})
queryDsl := util.MustToJSONBytes(monitor.Inputs[0].Search.Query)
searchRes, err := esClient.SearchWithRawQueryDSL(strings.Join(monitor.Inputs[0].Search.Indices, ","), queryDsl)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
res.Body.Close()
if err != nil {
writeError(w, err)
return
}
util.MustFromJSONBytes(searchRes.RawResult.Body, &resBody)
var triggerResults = IfaceMap{}
sm := ScheduleMonitor{
Monitor: monitor,
}
period := alertUtil.GetMonitorPeriod(periodStart, &monitor.Schedule)
var monitorCtx []byte
if dryrun == "true" {
for _, trigger := range monitor.Triggers {
@ -676,7 +626,10 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para
"action_results": IfaceMap{},
"name": trigger.Name,
}
monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{})
monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{
"periodStart": period.Start,
"periodEnd": period.End,
})
if err != nil {
triggerResult["error"] = err.Error()
triggerResults[trigger.ID] = triggerResult
@ -735,3 +688,20 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para
},
}, http.StatusOK)
}
func resolveQuery(query string, ctx IfaceMap ) ([]byte, error){
ctxBytes := util.MustToJSONBytes(ctx)
msg := query
tpl := fasttemplate.New(msg, "{{", "}}")
msgBuffer := bytes.NewBuffer(nil)
_, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){
keyParts := strings.Split(tag,".")
value, _, _, err := jsonparser.Get(ctxBytes, keyParts...)
if err != nil {
return 0, err
}
return writer.Write(value)
})
return msgBuffer.Bytes(), err
//return json.Marshal(msg)
}

View File

@ -1,10 +1,10 @@
package alerting
import (
"fmt"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"net/http"
)
@ -56,11 +56,9 @@ func getAlertByState() (IfaceMap, error){
return nil, err
}
var metricData = IfaceMap{}
if bks, ok := buckets.([]interface{}); ok {
if bks, ok := buckets.([]elastic.BucketBase); ok {
for _, bk := range bks {
if bkm, ok := bk.(map[string]interface{}); ok {
metricData[queryValue(bkm, "key", "").(string)]= queryValue(bkm, "doc_count", 0)
}
metricData[queryValue(bk, "key", "").(string)]= queryValue(bk, "doc_count", 0)
}
}
return metricData, nil
@ -68,18 +66,15 @@ func getAlertByState() (IfaceMap, error){
func queryMetricBuckets(reqBody IfaceMap, metricKey, indexName string)(interface{}, error){
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, getAlertIndexName(indexName))
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
esClient := elastic.GetClient(conf.ID)
res, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(indexName), util.MustToJSONBytes(reqBody))
if err != nil {
return nil, err
}
result := IfaceMap{}
defer res.Body.Close()
err = decodeJSON(res.Body, &result)
if err != nil {
return nil, err
var buckets interface{}
if agg, ok := res.Aggregations[metricKey]; ok {
buckets = agg.Buckets
}
buckets := queryValue(result, fmt.Sprintf("aggregations.%s.buckets", metricKey), []interface{}{})
return buckets, nil
}
@ -109,11 +104,10 @@ func getTopTenAlertCluster()(interface{}, error){
}
var metricData []IfaceMap
var clusterIDs []interface{}
if bks, ok := buckets.([]interface{}); ok {
if bks, ok := buckets.([]elastic.BucketBase); ok {
for _, bk := range bks {
if bkm, ok := bk.(map[string]interface{}); ok {
stateBuckets := queryValue(bkm, "group_by_state.buckets", nil )
key := queryValue(bkm, "key", "" )
stateBuckets := queryValue(bk, "group_by_state.buckets", nil )
key := queryValue(bk, "key", "" )
clusterIDs = append(clusterIDs, key)
if stateBKS, ok := stateBuckets.([]interface{}); ok{
for _, stateBK := range stateBKS {
@ -127,7 +121,6 @@ func getTopTenAlertCluster()(interface{}, error){
}
}
}
}
}
reqBody = IfaceMap{
"_source": "name",
@ -138,17 +131,11 @@ func getTopTenAlertCluster()(interface{}, error){
},
}
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(elastic.ElasticsearchConfig{}))
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
esClient := elastic.GetClient(config.ID)
resBody, err := esClient.SearchWithRawQueryDSL( orm.GetIndexName(elastic.ElasticsearchConfig{}), util.MustToJSONBytes(reqBody))
if err != nil {
return nil, err
}
var resBody = &elastic.SearchResponse{}
err = decodeJSON(res.Body, resBody)
if err != nil {
return nil, err
}
res.Body.Close()
clusterMap := IfaceMap{}
for _, hit := range resBody.Hits.Hits {
clusterMap[hit.ID] = hit.Source["name"]
@ -189,14 +176,12 @@ func getLastAlertDayCount() (interface{}, error){
return nil, err
}
var metricData []interface{}
if bks, ok := buckets.([]interface{}); ok {
if bks, ok := buckets.([]elastic.BucketBase); ok {
for _, bk := range bks {
if bkm, ok := bk.(map[string]interface{}); ok {
metricData = append(metricData, []interface{}{
queryValue(bkm, "key", ""),
queryValue(bkm, "doc_count", 0),
})
}
metricData = append(metricData, []interface{}{
queryValue(bk, "key", ""),
queryValue(bk, "doc_count", 0),
})
}
}
return metricData, nil

View File

@ -12,12 +12,12 @@ import (
"github.com/valyala/fasttemplate"
"infini.sh/console/model/alerting"
"infini.sh/console/service/alerting/action"
"infini.sh/console/service/alerting/util"
util1 "infini.sh/console/service/alerting/util"
"infini.sh/framework/core/conditions"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"io"
"net/http"
"strings"
"sync"
"time"
@ -154,7 +154,7 @@ func generateMonitorJob(smt *ScheduleMonitor) MonitorJob{
if err != nil {
log.Error(err)
}
periods := util.GetMonitorPeriod(startTime, &sm.Monitor.Schedule)
periods := util1.GetMonitorPeriod(startTime, &sm.Monitor.Schedule)
for _, trigger := range sm.Monitor.Triggers {
monitorCtx, err := createMonitorContext(&trigger, queryResult, &sm, IfaceMap{
"periodStart": periods.Start,
@ -274,7 +274,7 @@ func doAction(act alerting.Action, monitorCtx []byte) ([]byte, error) {
func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{}, error) {
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, getAlertIndexName(INDEX_ALERT))
esClient := elastic.GetClient(conf.ID)
reqBody := IfaceMap{
"size": 1,
"query": IfaceMap{
@ -303,12 +303,8 @@ func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{
{"start_time": IfaceMap{"order":"desc"}},
},
}
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
return nil, err
}
var resBody = &elastic.SearchResponse{}
err = decodeJSON(res.Body, resBody)
resBody, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(INDEX_ALERT), util.MustToJSONBytes(reqBody))
if err != nil {
return nil, err
}
@ -320,8 +316,8 @@ func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{
func saveAlertInfo(alertItem *alerting.Alert) error {
conf := getDefaultConfig()
esClient := elastic.GetClient(conf.ID)
indexName := getAlertIndexName(INDEX_ALERT)
reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, indexName)
reqBody := IfaceMap{
"size": 1,
"query": IfaceMap{
@ -352,28 +348,20 @@ func saveAlertInfo(alertItem *alerting.Alert) error {
},
}
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
resBody, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(reqBody))
if err != nil {
return err
}
var resBody = elastic.SearchResponse{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
return err
}
res.Body.Close()
if len(resBody.Hits.Hits) == 0 {
if alertItem.State == ALERT_COMPLETED {
return nil
}
reqUrl = fmt.Sprintf("%s/%s/_doc", conf.Endpoint, indexName)
_,err = doRequest(reqUrl, http.MethodPost, nil, alertItem)
_, err = esClient.Index(indexName,"", util.GetUUID(), alertItem)
return err
}
currentState := queryValue(resBody.Hits.Hits[0].Source, "state", "").(string)
alertItem.Id = resBody.Hits.Hits[0].ID
if currentState != alertItem.State {
reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, getAlertIndexName(INDEX_ALERT_HISTORY), alertItem.Id)
source := resBody.Hits.Hits[0].Source
source["end_time"] = time.Now().UnixNano()/1e6
if alertItem.State == ALERT_COMPLETED {
@ -381,15 +369,13 @@ func saveAlertInfo(alertItem *alerting.Alert) error {
source["state"] = ALERT_COMPLETED
}
}
_,err = doRequest(reqUrl, http.MethodPut, nil, source)
reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, indexName, resBody.Hits.Hits[0].ID)
_,err = doRequest(reqUrl, http.MethodDelete, nil, alertItem)
esClient.Index( getAlertIndexName(INDEX_ALERT_HISTORY), "", alertItem.Id, source)
_,err = esClient.Delete(indexName, "", resBody.Hits.Hits[0].ID)
return err
}
alertItem.StartTime = int64(queryValue(resBody.Hits.Hits[0].Source, "start_time", 0).(float64))
reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, indexName, alertItem.Id)
_,err = doRequest(reqUrl, http.MethodPut, nil, alertItem)
_, err = esClient.Index(indexName, "", alertItem.Id, alertItem )
return err
}
@ -416,74 +402,55 @@ func getEmailRecipient(recipients []alerting.Recipient) ([]string, error){
}
func resolveDestination(ID string)(*alerting.Destination, error){
//todo may be cache destination ?
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID)
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
esClient := elastic.GetClient(conf.ID)
res, err := esClient.Get( orm.GetIndexName(alerting.Config{}), "", ID)
if err != nil {
return nil,err
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
return nil, err
}
res.Body.Close()
destination := &alerting.Destination{}
buf, _ := json.Marshal(queryValue(resBody, "_source."+DESTINATION_FIELD, IfaceMap{}))
buf, _ := json.Marshal(queryValue(res.Source, DESTINATION_FIELD, IfaceMap{}))
_ = json.Unmarshal(buf, destination)
return destination, nil
}
func resolveEmailAccount(ID string)(*alerting.EmailAccount, error){
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID)
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
esClient := elastic.GetClient(conf.ID)
res, err := esClient.Get( orm.GetIndexName(alerting.Config{}), "", ID)
if err != nil {
return nil,err
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
return nil, err
}
res.Body.Close()
email := &alerting.EmailAccount{}
buf, _ := json.Marshal(queryValue(resBody, "_source."+EMAIL_ACCOUNT_FIELD, IfaceMap{}))
buf, _ := json.Marshal(queryValue(res.Source, EMAIL_ACCOUNT_FIELD, IfaceMap{}))
_ = json.Unmarshal(buf, email)
return email, nil
}
func resolveEmailGroup(ID string)(*alerting.EmailGroup, error){
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID)
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
esClient := elastic.GetClient(conf.ID)
res, err := esClient.Get( orm.GetIndexName(alerting.Config{}), "", ID)
if err != nil {
return nil,err
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
return nil, err
}
res.Body.Close()
emailGroup := &alerting.EmailGroup{}
buf, _ := json.Marshal(queryValue(resBody, "_source."+EMAIL_GROUP_FIELD, IfaceMap{}))
_ = json.Unmarshal(buf, emailGroup)
return emailGroup, nil
buf, _ := json.Marshal(queryValue(res.Source, EMAIL_GROUP_FIELD, IfaceMap{}))
err = json.Unmarshal(buf, emailGroup)
return emailGroup, err
}
func getQueryResult(clusterID string, input *alerting.MonitorInput) (IfaceMap, error) {
meta := elastic.GetMetadata(clusterID)
reqUrl := fmt.Sprintf("%s/%s/_search", meta.GetActiveEndpoint(), strings.Join(input.Search.Indices, ","))
res, err := doRequest(reqUrl, http.MethodGet, nil, input.Search.Query)
esClient := elastic.GetClient(clusterID)
queryDsl := util.MustToJSONBytes(input.Search.Query)
searchRes, err := esClient.SearchWithRawQueryDSL(strings.Join(input.Search.Indices, ","), queryDsl)
if err != nil {
return nil, err
}
defer res.Body.Close()
resBody := IfaceMap{}
err = decodeJSON(res.Body, &resBody)
return resBody, err
var resBody = IfaceMap{}
util.MustFromJSONBytes(searchRes.RawResult.Body, &resBody)
return resBody, nil
}
func resolveMessage(messageTemplate IfaceMap, monitorCtx []byte ) ([]byte, error){
@ -544,7 +511,7 @@ func resolveTriggerResult(trigger *alerting.Trigger, monitorCtx []byte ) (bool,
func getEnabledMonitors() (map[string]ScheduleMonitor, error){
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}))
esClient := elastic.GetClient(config.ID)
must := []IfaceMap{
{
"exists": IfaceMap{
@ -562,15 +529,18 @@ func getEnabledMonitors() (map[string]ScheduleMonitor, error){
"query": IfaceMap{
"bool": IfaceMap{
"must": must,
"must_not": []IfaceMap{
{
"match": IfaceMap{
MONITOR_FIELD+".status": "DELETED",
},
},
},
},
},
}
resBody := elastic.SearchResponse{}
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
return nil, err
}
err = decodeJSON(res.Body, &resBody)
queryDsl := util.MustToJSONBytes(reqBody)
resBody, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(alerting.Config{}),queryDsl)
if err != nil {
return nil, err
}