diff --git a/plugin/api/init.go b/plugin/api/init.go index 96190254..1e57db0b 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -13,6 +13,7 @@ func Init(cfg *config.AppConfig) { handler := index_management.APIHandler{ Config: cfg, } + alerting.InitAppConfig(cfg) var pathPrefix = "/_search-center/" var esPrefix = "/elasticsearch/:id/" api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/overview"), handler.ElasticsearchOverviewAction) diff --git a/service/alerting/action/email_test.go b/service/alerting/action/email_test.go index 5b54cc7e..6ca8428c 100644 --- a/service/alerting/action/email_test.go +++ b/service/alerting/action/email_test.go @@ -12,7 +12,7 @@ func TestEmailAction(t *testing.T){ Host: "smtp.ym.163.com", Port: 994, Method: "ssl", - Password: "", + Password: "hello@infini$", //"", }, Message: "hello world", Subject: "test email", diff --git a/service/alerting/destination.go b/service/alerting/destination.go index 87286791..b89aed56 100644 --- a/service/alerting/destination.go +++ b/service/alerting/destination.go @@ -3,11 +3,12 @@ package alerting import ( "errors" "fmt" + "infini.sh/console/config" + "infini.sh/console/model/alerting" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" - "infini.sh/console/model/alerting" "net/http" "strings" "time" @@ -252,7 +253,7 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P destinationId := ps.ByName("destinationId") config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId) + reqUrl := fmt.Sprintf("%s/%s/_doc/%s?refresh=wait_for", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId) res, err := doRequest(reqUrl, http.MethodDelete, nil, nil) if err != nil { writeError(w, err) @@ -279,7 +280,15 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P } func getDefaultConfig() *elastic.ElasticsearchConfig { - return elastic.GetConfig("default") + elasticsearch := "default" + if appConfig != nil { + elasticsearch = appConfig.Elasticsearch + } + return elastic.GetConfig(elasticsearch) +} +var appConfig *config.AppConfig +func InitAppConfig(config *config.AppConfig){ + appConfig = config } //var ( diff --git a/service/alerting/monitor.go b/service/alerting/monitor.go index 6df6094a..8debf656 100644 --- a/service/alerting/monitor.go +++ b/service/alerting/monitor.go @@ -3,12 +3,12 @@ package alerting import ( "errors" "fmt" + "infini.sh/console/model/alerting" + alertUtil "infini.sh/console/service/alerting/util" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" - "infini.sh/console/model/alerting" - alertUtil "infini.sh/console/service/alerting/util" "net/http" "strconv" "strings" @@ -160,28 +160,27 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) }) } var monitorSorts = IfaceMap{ "name": "monitor.name.keyword" } - sortPageData := IfaceMap{ - "size": 1000, - "from": 0, - } var ( intSize int intFrom int ) + intSize, _ = strconv.Atoi(size) + if intSize < 0 { + intSize = 1000 + } + intFrom, _ = strconv.Atoi(from) + if intFrom < 0 { + intFrom = 0 + } + + sortPageData := IfaceMap{ + "size": intSize, + "from": intFrom, + } if msort, ok := monitorSorts[sortField]; ok { sortPageData["sort"] = []IfaceMap{ { msort.(string): sortDirection }, } - intSize, _ = strconv.Atoi(size) - if intSize < 0 { - intSize = 1000 - } - sortPageData["size"] = intSize - intFrom, _ = strconv.Atoi(from) - if intFrom < 0 { - intFrom = 0 - } - sortPageData["from"] = intFrom } var params = IfaceMap{ "query": IfaceMap{ @@ -215,22 +214,24 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) } totalMonitors := queryValue(resBody, "hits.total.value", 0) - monitorMap:= map[string]IfaceMap{} + var monitors []IfaceMap var hits = queryValue(resBody, "hits.hits", []IfaceMap{}) monitorIDs := []interface{}{} + monitorMap := map[string]int{} if hitsArr, ok := hits.([]interface{}); ok { - for _, hitIface := range hitsArr { + for i, hitIface := range hitsArr { if hit, ok := hitIface.(map[string]interface{}); ok { id := queryValue(hit, "_id", "") monitorIDs = append(monitorIDs, id) monitor := queryValue(hit, "_source.monitor", IfaceMap{}).(map[string]interface{}) - monitorMap[id.(string)] = IfaceMap{ + monitorMap[id.(string)] = i + monitors = append(monitors, IfaceMap{ "id": id, "version": queryValue(hit, "_version", ""), "name": queryValue(monitor, "name", ""), "enabled": queryValue(monitor, "enabled", false), "monitor": monitor, - } + }) } } } @@ -301,12 +302,11 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) return } buckets := queryValue(searchResBody, "aggregations.uniq_monitor_ids.buckets",[]IfaceMap{}) - usedMonitors := []IfaceMap{} if bks, ok := buckets.([]interface{}); ok { for _, bk := range bks { if bk, ok := bk.(map[string]interface{}); ok { id := queryValue(bk, "key", "") - monitor := monitorMap[id.(string)] + monitor := monitors[monitorMap[id.(string)]] monitor["lastNotificationTime"] = queryValue(bk, "last_notification_time.value", "") monitor["ignored"] = queryValue(bk, "ignored.doc_count", 0) alertHits := queryValue(bk, "latest_alert.hits.hits", nil) @@ -321,15 +321,13 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) monitor["errors"] = queryValue(bk, "errors.doc_count", 0) monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0) monitor["currentTime"] = time.Now().UnixNano() / 1e6 - usedMonitors = append(usedMonitors, monitor) delete(monitorMap, id.(string)) } } } - unusedMonitors := []IfaceMap{} - for _, m := range monitorMap { - assignTo(m, IfaceMap{ + for _, idx := range monitorMap { + assignTo(monitors[idx], IfaceMap{ "lastNotificationTime": nil, "ignored": 0, "active": 0, @@ -338,14 +336,13 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) "latestAlert": "--", "currentTime": time.Now().UnixNano()/1e6, }) - unusedMonitors = append(unusedMonitors,m) } - results := append(usedMonitors, unusedMonitors...) + results := monitors - if _, ok := monitorSorts[sortField]; !ok { - results = results[intFrom: intFrom + intSize] - } + //if _, ok := monitorSorts[sortField]; !ok { + // results = results[intFrom: intFrom + intSize] + //} writeJSON(w, IfaceMap{ "ok": true, "monitors": results, @@ -667,39 +664,66 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para } var triggerResults = IfaceMap{} + sm := ScheduleMonitor{ + Monitor: monitor, + } + period := alertUtil.GetMonitorPeriod(periodStart, &monitor.Schedule) + var monitorCtx []byte if dryrun == "true" { - sm := ScheduleMonitor{ - Monitor: monitor, - } for _, trigger := range monitor.Triggers { triggerResult := IfaceMap{ "error": nil, "action_results": IfaceMap{}, "name": trigger.Name, } - monitorCtx, err := createMonitorContext(&trigger, resBody, &sm, IfaceMap{}) + monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{}) if err != nil { triggerResult["error"] = err.Error() triggerResults[trigger.ID] = triggerResult continue } - isTrigger, err := resolveTriggerResult(&trigger, monitorCtx) + isTrigger, rerr := resolveTriggerResult(&trigger, monitorCtx) triggerResult["triggered"] = isTrigger - if err != nil { - triggerResult["error"] = err.Error() + if rerr != nil { + triggerResult["error"] = rerr.Error() } if trigger.ID == "" { trigger.ID = util.GetUUID() } triggerResults[trigger.ID] = triggerResult } + }else{ + LOOP_TRIGGER: + for _, trigger := range monitor.Triggers { + monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{ + "periodStart": period.Start, + "periodEnd": period.End, + }) + if err != nil { + break + } + for _, action := range trigger.Actions { + _, err = doAction(action, monitorCtx) + if err != nil { + break LOOP_TRIGGER + } + } + } } - period := alertUtil.GetMonitorPeriod(periodStart, &monitor.Schedule) + + var ( + ok = true + errStr string + ) + if err != nil { + ok = false + errStr = err.Error() + } writeJSON(w, IfaceMap{ - "ok": true, + "ok": ok, "resp": IfaceMap{ - "error": nil, + "error": errStr, "monitor_name": monitor.Name, "input_results": IfaceMap{ "error": nil, diff --git a/service/alerting/schedule.go b/service/alerting/schedule.go index 44ead09b..4e14901a 100644 --- a/service/alerting/schedule.go +++ b/service/alerting/schedule.go @@ -202,62 +202,20 @@ func generateMonitorJob(smt *ScheduleMonitor) MonitorJob{ alertItem.State = ALERT_ACTIVE for _, act := range trigger.Actions { - message, err := resolveMessage(act.MessageTemplate, monitorCtx) + actResult, err := doAction(act, monitorCtx) + var errMsg string if err != nil { - alertItem.ErrorMessage = err.Error() - continue + errMsg = err.Error() + alertItem.ErrorMessage += errMsg } - destination, err := resolveDestination(act.DestinationId) - if err != nil { - alertItem.ErrorMessage = err.Error() - continue - } - var tact action.Action + alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, alerting.ActionExecutionResult{ + ActionID: act.ID, + LastExecutionTime: alertItem.LastNotificationTime, + Error: errMsg, + Result: string(actResult), + }) alertItem.LastNotificationTime = time.Now().UnixNano()/1e6 - switch destination.Type { - case action.ACTION_EMAIL: - sender, err := resolveEmailAccount(destination.Email.EmailAccountID) - if err != nil { - alertItem.ErrorMessage = err.Error() - continue - } - subject, err := resolveMessage(act.SubjectTemplate, monitorCtx) - if err != nil { - alertItem.ErrorMessage = err.Error() - continue - } - receiver, err := getEmailRecipient(destination.Email.Recipients) - if err != nil { - alertItem.ErrorMessage = err.Error() - continue - } - tact = &action.EmailAction{ - Message: string(message), - Subject: string(subject), - Sender: sender, - Receiver: receiver, - } - case action.ACTION_WEBHOOK: - tact = &action.WebhookAction{ - Data: &destination.CustomWebhook, - Message: string(message), - } - } - if tact != nil { - actResult, err := tact.Execute() - var errStr string - if err != nil { - errStr = err.Error() - alertItem.ErrorMessage += errStr - } - alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, alerting.ActionExecutionResult{ - ActionID: act.ID, - LastExecutionTime: alertItem.LastNotificationTime, - Error: errStr, - Result: string(actResult), - }) - } if alertItem.ErrorMessage != "" { alertItem.State = ALERT_ERROR } @@ -270,6 +228,49 @@ func generateMonitorJob(smt *ScheduleMonitor) MonitorJob{ } } +func doAction(act alerting.Action, monitorCtx []byte) ([]byte, error) { + message, err := resolveMessage(act.MessageTemplate, monitorCtx) + if err != nil { + //alertItem.ErrorMessage = err.Error() + return nil, err + } + destination, err := resolveDestination(act.DestinationId) + if err != nil { + return nil, err + } + var tact action.Action + + switch destination.Type { + case action.ACTION_EMAIL: + sender, err := resolveEmailAccount(destination.Email.EmailAccountID) + if err != nil { + return nil, err + } + subject, err := resolveMessage(act.SubjectTemplate, monitorCtx) + if err != nil { + return nil, err + } + receiver, err := getEmailRecipient(destination.Email.Recipients) + if err != nil { + return nil, err + } + tact = &action.EmailAction{ + Message: string(message), + Subject: string(subject), + Sender: sender, + Receiver: receiver, + } + case action.ACTION_WEBHOOK: + tact = &action.WebhookAction{ + Data: &destination.CustomWebhook, + Message: string(message), + } + default: + return nil, fmt.Errorf("unsupported action type: %s", destination.Type) + } + return tact.Execute() +} + func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{}, error) { conf := getDefaultConfig()