From 9e254de3e48d24d8f04534ce168ef747e78e0ff7 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 9 May 2022 20:40:43 +0800 Subject: [PATCH] add alert test api --- model/alerting/metric.go | 1 + plugin/api/alerting/api.go | 1 + plugin/api/alerting/rule.go | 23 ++++++++++++++++++++ service/alerting/elasticsearch/engine.go | 27 +++++++++++++++++++++++- service/alerting/engine.go | 1 + 5 files changed, 52 insertions(+), 1 deletion(-) diff --git a/model/alerting/metric.go b/model/alerting/metric.go index c698e000..ca40e945 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -44,6 +44,7 @@ type MetricItem struct { Field string `json:"field"` Statistic string `json:"statistic"` Group []string `json:"group"` //bucket group + Limit int `json:"limit"` } type QueryResult struct { diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index ca27be12..00b8769e 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -18,6 +18,7 @@ type AlertAPI struct { func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule) api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule) + api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.sendTestMessage) api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index bd556ab5..4890bed6 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -398,6 +398,29 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p "_id": id, }, http.StatusOK) } + +func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + rule := alerting.Rule{} + err := alertAPI.DecodeJSON(req, &rule) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + eng := alerting2.GetEngine(rule.Resource.Type) + actionResults, err := eng.Test(&rule) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + alertAPI.WriteJSON(w, util.MapStr{ + "action_results": actionResults, + }, http.StatusOK) + +} func checkResourceExists(rule *alerting.Rule) (bool, error) { if rule.Resource.ID == "" { return false, fmt.Errorf("resource id can not be empty") diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 669df8b3..46e81319 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -67,6 +67,11 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { } var rootAggs util.MapStr groups := rule.Metrics.Items[0].Group + limit := rule.Metrics.Items[0].Limit + //top group 10 + if limit <= 0 { + limit = 10 + } if grpLength := len(groups); grpLength > 0 { var lastGroupAgg util.MapStr @@ -74,7 +79,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { groupAgg := util.MapStr{ "terms": util.MapStr{ "field": groups[i], - "size": 500, + "size": limit, }, } groupID := util.GetUUID() @@ -634,11 +639,31 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } +func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) { + checkResults, err := engine.CheckCondition(rule) + if err != nil { + return nil, fmt.Errorf("check condition error:%w", err) + } + conditionResults := checkResults.ResultItems + var actionResults []alerting.ActionExecutionResult + if len(rule.Channels.Normal) > 0 { + actionResults = performChannels(rule.Channels.Normal, conditionResults) + }else if len(rule.Channels.Escalation) > 0{ + actionResults = performChannels(rule.Channels.Escalation, conditionResults) + }else{ + return nil, fmt.Errorf("no useable channel") + } + return actionResults, nil +} + func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResultItem) []alerting.ActionExecutionResult { var message string for _, conditionResult := range conditionResults { message += fmt.Sprintf("severity: %s\t message:%s\t groups:%v\t timestamp: %v;", conditionResult.ConditionItem.Severity, conditionResult.ConditionItem.Message, conditionResult.GroupValues, time.Now()) } + if message == ""{ + message = "normal" + } ctx := util.MapStr{ "ctx": util.MapStr{ "message": message, diff --git a/service/alerting/engine.go b/service/alerting/engine.go index 5151b7d5..d9dece81 100644 --- a/service/alerting/engine.go +++ b/service/alerting/engine.go @@ -16,6 +16,7 @@ type Engine interface { ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error) GenerateTask(rule *alerting.Rule) func(ctx context.Context) + Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) } var (