update alerting api

This commit is contained in:
liugq 2022-03-09 18:07:30 +08:00
parent 1927b5e2e6
commit cc9c49f884
5 changed files with 132 additions and 97 deletions

View File

@ -13,6 +13,7 @@ func Init(cfg *config.AppConfig) {
handler := index_management.APIHandler{ handler := index_management.APIHandler{
Config: cfg, Config: cfg,
} }
alerting.InitAppConfig(cfg)
var pathPrefix = "/_search-center/" var pathPrefix = "/_search-center/"
var esPrefix = "/elasticsearch/:id/" var esPrefix = "/elasticsearch/:id/"
api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/overview"), handler.ElasticsearchOverviewAction) api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/overview"), handler.ElasticsearchOverviewAction)

View File

@ -12,7 +12,7 @@ func TestEmailAction(t *testing.T){
Host: "smtp.ym.163.com", Host: "smtp.ym.163.com",
Port: 994, Port: 994,
Method: "ssl", Method: "ssl",
Password: "<your email password>", Password: "hello@infini$", //"<your email password>",
}, },
Message: "hello world", Message: "hello world",
Subject: "test email", Subject: "test email",

View File

@ -3,11 +3,12 @@ package alerting
import ( import (
"errors" "errors"
"fmt" "fmt"
"infini.sh/console/config"
"infini.sh/console/model/alerting"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"infini.sh/console/model/alerting"
"net/http" "net/http"
"strings" "strings"
"time" "time"
@ -252,7 +253,7 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
destinationId := ps.ByName("destinationId") destinationId := ps.ByName("destinationId")
config := getDefaultConfig() config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId) reqUrl := fmt.Sprintf("%s/%s/_doc/%s?refresh=wait_for", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId)
res, err := doRequest(reqUrl, http.MethodDelete, nil, nil) res, err := doRequest(reqUrl, http.MethodDelete, nil, nil)
if err != nil { if err != nil {
writeError(w, err) writeError(w, err)
@ -279,7 +280,15 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
} }
func getDefaultConfig() *elastic.ElasticsearchConfig { func getDefaultConfig() *elastic.ElasticsearchConfig {
return elastic.GetConfig("default") elasticsearch := "default"
if appConfig != nil {
elasticsearch = appConfig.Elasticsearch
}
return elastic.GetConfig(elasticsearch)
}
var appConfig *config.AppConfig
func InitAppConfig(config *config.AppConfig){
appConfig = config
} }
//var ( //var (

View File

@ -3,12 +3,12 @@ package alerting
import ( import (
"errors" "errors"
"fmt" "fmt"
"infini.sh/console/model/alerting"
alertUtil "infini.sh/console/service/alerting/util"
httprouter "infini.sh/framework/core/api/router" httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"infini.sh/console/model/alerting"
alertUtil "infini.sh/console/service/alerting/util"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
@ -160,28 +160,27 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
}) })
} }
var monitorSorts = IfaceMap{ "name": "monitor.name.keyword" } var monitorSorts = IfaceMap{ "name": "monitor.name.keyword" }
sortPageData := IfaceMap{
"size": 1000,
"from": 0,
}
var ( var (
intSize int intSize int
intFrom int intFrom int
) )
if msort, ok := monitorSorts[sortField]; ok {
sortPageData["sort"] = []IfaceMap{
{ msort.(string): sortDirection },
}
intSize, _ = strconv.Atoi(size) intSize, _ = strconv.Atoi(size)
if intSize < 0 { if intSize < 0 {
intSize = 1000 intSize = 1000
} }
sortPageData["size"] = intSize
intFrom, _ = strconv.Atoi(from) intFrom, _ = strconv.Atoi(from)
if intFrom < 0 { if intFrom < 0 {
intFrom = 0 intFrom = 0
} }
sortPageData["from"] = intFrom
sortPageData := IfaceMap{
"size": intSize,
"from": intFrom,
}
if msort, ok := monitorSorts[sortField]; ok {
sortPageData["sort"] = []IfaceMap{
{ msort.(string): sortDirection },
}
} }
var params = IfaceMap{ var params = IfaceMap{
"query": IfaceMap{ "query": IfaceMap{
@ -215,22 +214,24 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
} }
totalMonitors := queryValue(resBody, "hits.total.value", 0) totalMonitors := queryValue(resBody, "hits.total.value", 0)
monitorMap:= map[string]IfaceMap{} var monitors []IfaceMap
var hits = queryValue(resBody, "hits.hits", []IfaceMap{}) var hits = queryValue(resBody, "hits.hits", []IfaceMap{})
monitorIDs := []interface{}{} monitorIDs := []interface{}{}
monitorMap := map[string]int{}
if hitsArr, ok := hits.([]interface{}); ok { if hitsArr, ok := hits.([]interface{}); ok {
for _, hitIface := range hitsArr { for i, hitIface := range hitsArr {
if hit, ok := hitIface.(map[string]interface{}); ok { if hit, ok := hitIface.(map[string]interface{}); ok {
id := queryValue(hit, "_id", "") id := queryValue(hit, "_id", "")
monitorIDs = append(monitorIDs, id) monitorIDs = append(monitorIDs, id)
monitor := queryValue(hit, "_source.monitor", IfaceMap{}).(map[string]interface{}) monitor := queryValue(hit, "_source.monitor", IfaceMap{}).(map[string]interface{})
monitorMap[id.(string)] = IfaceMap{ monitorMap[id.(string)] = i
monitors = append(monitors, IfaceMap{
"id": id, "id": id,
"version": queryValue(hit, "_version", ""), "version": queryValue(hit, "_version", ""),
"name": queryValue(monitor, "name", ""), "name": queryValue(monitor, "name", ""),
"enabled": queryValue(monitor, "enabled", false), "enabled": queryValue(monitor, "enabled", false),
"monitor": monitor, "monitor": monitor,
} })
} }
} }
} }
@ -301,12 +302,11 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
return return
} }
buckets := queryValue(searchResBody, "aggregations.uniq_monitor_ids.buckets",[]IfaceMap{}) buckets := queryValue(searchResBody, "aggregations.uniq_monitor_ids.buckets",[]IfaceMap{})
usedMonitors := []IfaceMap{}
if bks, ok := buckets.([]interface{}); ok { if bks, ok := buckets.([]interface{}); ok {
for _, bk := range bks { for _, bk := range bks {
if bk, ok := bk.(map[string]interface{}); ok { if bk, ok := bk.(map[string]interface{}); ok {
id := queryValue(bk, "key", "") id := queryValue(bk, "key", "")
monitor := monitorMap[id.(string)] monitor := monitors[monitorMap[id.(string)]]
monitor["lastNotificationTime"] = queryValue(bk, "last_notification_time.value", "") monitor["lastNotificationTime"] = queryValue(bk, "last_notification_time.value", "")
monitor["ignored"] = queryValue(bk, "ignored.doc_count", 0) monitor["ignored"] = queryValue(bk, "ignored.doc_count", 0)
alertHits := queryValue(bk, "latest_alert.hits.hits", nil) alertHits := queryValue(bk, "latest_alert.hits.hits", nil)
@ -321,15 +321,13 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
monitor["errors"] = queryValue(bk, "errors.doc_count", 0) monitor["errors"] = queryValue(bk, "errors.doc_count", 0)
monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0) monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0)
monitor["currentTime"] = time.Now().UnixNano() / 1e6 monitor["currentTime"] = time.Now().UnixNano() / 1e6
usedMonitors = append(usedMonitors, monitor)
delete(monitorMap, id.(string)) delete(monitorMap, id.(string))
} }
} }
} }
unusedMonitors := []IfaceMap{}
for _, m := range monitorMap { for _, idx := range monitorMap {
assignTo(m, IfaceMap{ assignTo(monitors[idx], IfaceMap{
"lastNotificationTime": nil, "lastNotificationTime": nil,
"ignored": 0, "ignored": 0,
"active": 0, "active": 0,
@ -338,14 +336,13 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
"latestAlert": "--", "latestAlert": "--",
"currentTime": time.Now().UnixNano()/1e6, "currentTime": time.Now().UnixNano()/1e6,
}) })
unusedMonitors = append(unusedMonitors,m)
} }
results := append(usedMonitors, unusedMonitors...) results := monitors
if _, ok := monitorSorts[sortField]; !ok { //if _, ok := monitorSorts[sortField]; !ok {
results = results[intFrom: intFrom + intSize] // results = results[intFrom: intFrom + intSize]
} //}
writeJSON(w, IfaceMap{ writeJSON(w, IfaceMap{
"ok": true, "ok": true,
"monitors": results, "monitors": results,
@ -667,39 +664,66 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para
} }
var triggerResults = IfaceMap{} var triggerResults = IfaceMap{}
if dryrun == "true" {
sm := ScheduleMonitor{ sm := ScheduleMonitor{
Monitor: monitor, Monitor: monitor,
} }
period := alertUtil.GetMonitorPeriod(periodStart, &monitor.Schedule)
var monitorCtx []byte
if dryrun == "true" {
for _, trigger := range monitor.Triggers { for _, trigger := range monitor.Triggers {
triggerResult := IfaceMap{ triggerResult := IfaceMap{
"error": nil, "error": nil,
"action_results": IfaceMap{}, "action_results": IfaceMap{},
"name": trigger.Name, "name": trigger.Name,
} }
monitorCtx, err := createMonitorContext(&trigger, resBody, &sm, IfaceMap{}) monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{})
if err != nil { if err != nil {
triggerResult["error"] = err.Error() triggerResult["error"] = err.Error()
triggerResults[trigger.ID] = triggerResult triggerResults[trigger.ID] = triggerResult
continue continue
} }
isTrigger, err := resolveTriggerResult(&trigger, monitorCtx) isTrigger, rerr := resolveTriggerResult(&trigger, monitorCtx)
triggerResult["triggered"] = isTrigger triggerResult["triggered"] = isTrigger
if err != nil { if rerr != nil {
triggerResult["error"] = err.Error() triggerResult["error"] = rerr.Error()
} }
if trigger.ID == "" { if trigger.ID == "" {
trigger.ID = util.GetUUID() trigger.ID = util.GetUUID()
} }
triggerResults[trigger.ID] = triggerResult triggerResults[trigger.ID] = triggerResult
} }
}else{
LOOP_TRIGGER:
for _, trigger := range monitor.Triggers {
monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{
"periodStart": period.Start,
"periodEnd": period.End,
})
if err != nil {
break
}
for _, action := range trigger.Actions {
_, err = doAction(action, monitorCtx)
if err != nil {
break LOOP_TRIGGER
}
}
}
} }
period := alertUtil.GetMonitorPeriod(periodStart, &monitor.Schedule)
var (
ok = true
errStr string
)
if err != nil {
ok = false
errStr = err.Error()
}
writeJSON(w, IfaceMap{ writeJSON(w, IfaceMap{
"ok": true, "ok": ok,
"resp": IfaceMap{ "resp": IfaceMap{
"error": nil, "error": errStr,
"monitor_name": monitor.Name, "monitor_name": monitor.Name,
"input_results": IfaceMap{ "input_results": IfaceMap{
"error": nil, "error": nil,

View File

@ -202,34 +202,57 @@ func generateMonitorJob(smt *ScheduleMonitor) MonitorJob{
alertItem.State = ALERT_ACTIVE alertItem.State = ALERT_ACTIVE
for _, act := range trigger.Actions { for _, act := range trigger.Actions {
actResult, err := doAction(act, monitorCtx)
var errMsg string
if err != nil {
errMsg = err.Error()
alertItem.ErrorMessage += errMsg
}
alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, alerting.ActionExecutionResult{
ActionID: act.ID,
LastExecutionTime: alertItem.LastNotificationTime,
Error: errMsg,
Result: string(actResult),
})
alertItem.LastNotificationTime = time.Now().UnixNano()/1e6
if alertItem.ErrorMessage != "" {
alertItem.State = ALERT_ERROR
}
err = saveAlertInfo(&alertItem)
if err != nil {
log.Error(err)
}
}
}
}
}
func doAction(act alerting.Action, monitorCtx []byte) ([]byte, error) {
message, err := resolveMessage(act.MessageTemplate, monitorCtx) message, err := resolveMessage(act.MessageTemplate, monitorCtx)
if err != nil { if err != nil {
alertItem.ErrorMessage = err.Error() //alertItem.ErrorMessage = err.Error()
continue return nil, err
} }
destination, err := resolveDestination(act.DestinationId) destination, err := resolveDestination(act.DestinationId)
if err != nil { if err != nil {
alertItem.ErrorMessage = err.Error() return nil, err
continue
} }
var tact action.Action var tact action.Action
alertItem.LastNotificationTime = time.Now().UnixNano()/1e6
switch destination.Type { switch destination.Type {
case action.ACTION_EMAIL: case action.ACTION_EMAIL:
sender, err := resolveEmailAccount(destination.Email.EmailAccountID) sender, err := resolveEmailAccount(destination.Email.EmailAccountID)
if err != nil { if err != nil {
alertItem.ErrorMessage = err.Error() return nil, err
continue
} }
subject, err := resolveMessage(act.SubjectTemplate, monitorCtx) subject, err := resolveMessage(act.SubjectTemplate, monitorCtx)
if err != nil { if err != nil {
alertItem.ErrorMessage = err.Error() return nil, err
continue
} }
receiver, err := getEmailRecipient(destination.Email.Recipients) receiver, err := getEmailRecipient(destination.Email.Recipients)
if err != nil { if err != nil {
alertItem.ErrorMessage = err.Error() return nil, err
continue
} }
tact = &action.EmailAction{ tact = &action.EmailAction{
Message: string(message), Message: string(message),
@ -242,32 +265,10 @@ func generateMonitorJob(smt *ScheduleMonitor) MonitorJob{
Data: &destination.CustomWebhook, Data: &destination.CustomWebhook,
Message: string(message), Message: string(message),
} }
default:
return nil, fmt.Errorf("unsupported action type: %s", destination.Type)
} }
if tact != nil { return tact.Execute()
actResult, err := tact.Execute()
var errStr string
if err != nil {
errStr = err.Error()
alertItem.ErrorMessage += errStr
}
alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, alerting.ActionExecutionResult{
ActionID: act.ID,
LastExecutionTime: alertItem.LastNotificationTime,
Error: errStr,
Result: string(actResult),
})
}
if alertItem.ErrorMessage != "" {
alertItem.State = ALERT_ERROR
}
err = saveAlertInfo(&alertItem)
if err != nil {
log.Error(err)
}
}
}
}
} }