From 8926460150fefafa9a78ac005a11f53d094e6fa5 Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 13 May 2022 12:27:06 +0800 Subject: [PATCH] update alerting api --- model/alerting/alert.go | 1 + model/alerting/rule.go | 4 + plugin/api/alerting/alert.go | 2 +- plugin/api/alerting/rule.go | 5 +- service/alerting/elasticsearch/engine.go | 94 +++++++++++++++--------- service/alerting/engine.go | 6 +- 6 files changed, 72 insertions(+), 40 deletions(-) diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 3952467f..7d9ac6d7 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -35,6 +35,7 @@ type ActionExecutionResult struct { LastExecutionTime int `json:"last_execution_time"` Error string `json:"error"` Result string `json:"result"` + Message string `json:"message"` } const ( diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 46829847..6848ff5b 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -75,6 +75,10 @@ func (tr *TimeRange) Include( t time.Time) bool { return tr.Start <= currentTimeStr && currentTimeStr <= tr.End } +type FilterParam struct { + Start interface{} `json:"start"` + End interface{} `json:"end"` +} //ctx //rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values], //check_status ,timestamp, \ No newline at end of file diff --git a/plugin/api/alerting/alert.go b/plugin/api/alerting/alert.go index 931f5a08..0fdf70c7 100644 --- a/plugin/api/alerting/alert.go +++ b/plugin/api/alerting/alert.go @@ -6,13 +6,13 @@ package alerting import ( "fmt" + log "github.com/cihub/seelog" "infini.sh/console/model/alerting" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" - log "src/github.com/cihub/seelog" "strconv" "strings" ) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 5f58a5fb..9c1171d2 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -431,6 +431,9 @@ func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Reque }, http.StatusInternalServerError) return } + if rule.ID == "" { + rule.ID = util.GetUUID() + } eng := alerting2.GetEngine(rule.Resource.Type) actionResults, err := eng.Test(&rule) if err != nil { @@ -483,7 +486,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request return } eng := alerting2.GetEngine(rule.Resource.Type) - metricData, err := eng.GetTargetMetricData(&rule, true) + metricData, err := eng.GetTargetMetricData(&rule, true, nil) if err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 09b77f6c..ce900dff 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -35,8 +35,8 @@ type Engine struct { //auto generate elasticsearch aggregations by metrics of rule //group of metric item converted to terms aggregation and TimeField of rule converted to date_histogram aggregation //convert statistic of metric item to elasticsearch aggregation -func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { - filter, err := engine.GenerateRawFilter(rule) +func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.FilterParam) (interface{}, error) { + filter, err := engine.GenerateRawFilter(rule, filterParam) if err != nil { return nil, err } @@ -258,7 +258,7 @@ func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[str return resultQuery, nil } -func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) { +func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) { query := map[string]interface{}{} var err error if rule.Resource.RawFilter != nil { @@ -271,30 +271,47 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa } } } - intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval) - if err != nil { - return nil, err - } var ( - units string - value int + timeStart interface{} + timeEnd interface{} ) - if intervalDuration / time.Hour >= 1 { - units = "h" - value = int(intervalDuration / time.Hour) - }else if intervalDuration / time.Minute >= 1{ - units = "m" - value = int(intervalDuration / time.Minute) - }else if intervalDuration / time.Second >= 1 { - units = "s" - value = int(intervalDuration / time.Second) + if filterParam != nil { + timeStart = filterParam.Start + timeEnd = filterParam.End }else{ - return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval) + var ( + units string + value int + ) + intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval) + if err != nil { + return nil, err + } + if intervalDuration / time.Hour >= 1 { + units = "h" + value = int(intervalDuration / time.Hour) + }else if intervalDuration / time.Minute >= 1{ + units = "m" + value = int(intervalDuration / time.Minute) + }else if intervalDuration / time.Second >= 1 { + units = "s" + value = int(intervalDuration / time.Second) + }else{ + return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval) + } + duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * 15, units)) + if err != nil { + return nil, err + } + timeStart = time.Now().Add(-duration).Format(time.RFC3339Nano) + timeEnd = time.Now().Format(time.RFC3339Nano) } + timeQuery := util.MapStr{ "range": util.MapStr{ rule.Resource.TimeField: util.MapStr{ - "gte": fmt.Sprintf("now-%d%s", value * 15, units), + "gte": timeStart, + "lte": timeEnd, }, }, } @@ -331,12 +348,12 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa return query, nil } -func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error){ +func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.FilterParam)(*alerting.QueryResult, error){ esClient := elastic.GetClient(rule.Resource.ID) queryResult := &alerting.QueryResult{} indexName := strings.Join(rule.Resource.Objects, ",") //todo cache queryDsl - queryDsl, err := engine.GenerateQuery(rule) + queryDsl, err := engine.GenerateQuery(rule, filterParam) if err != nil { return nil, err } @@ -363,8 +380,8 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, e queryResult.MetricData = metricData return queryResult, nil } -func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool)([]alerting.MetricData, error){ - queryResult, err := engine.ExecuteQuery(rule) +func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error){ + queryResult, err := engine.ExecuteQuery(rule, filterParam) if err != nil { return nil, err } @@ -429,7 +446,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool) //if triggered returns an array of ConditionResult //sort conditions by severity desc before check , and then if condition is true, then continue check another group func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ - queryResult, err := engine.ExecuteQuery(rule) + queryResult, err := engine.ExecuteQuery(rule, nil) conditionResult := &alerting.ConditionResult{ QueryResult: queryResult, } @@ -438,7 +455,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } var resultItems []alerting.ConditionResultItem - targetMetricData, err := engine.GetTargetMetricData(rule, false) + targetMetricData, err := engine.GetTargetMetricData(rule, false, nil) if err != nil { return nil, err } @@ -542,7 +559,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } }() log.Tracef("start check condition of rule %s", rule.ID) - checkResults, err := engine.CheckCondition(rule) + alertItem = &alerting.Alert{ ID: util.GetUUID(), Created: time.Now(), @@ -552,10 +569,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error { ResourceName: rule.Resource.Name, Expression: rule.Metrics.Expression, Objects: rule.Resource.Objects, - ConditionResult: checkResults, Conditions: rule.Conditions, State: alerting.AlertStateNormal, } + checkResults, err := engine.CheckCondition(rule) + alertItem.ConditionResult = checkResults if err != nil { return err } @@ -716,7 +734,7 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([ var errCount int var actionResults []alerting.ActionExecutionResult for _, channel := range channels { - resBytes, err := performChannel(&channel, ctx) + resBytes, err, messageBytes := performChannel(&channel, ctx) var errStr string if err != nil { errCount++ @@ -725,6 +743,7 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([ actionResults = append(actionResults, alerting.ActionExecutionResult{ Result: string(resBytes), Error: errStr, + Message: string(messageBytes), LastExecutionTime: int(time.Now().UnixNano()/1e6), }) } @@ -744,23 +763,28 @@ func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, return msgBuffer.Bytes(), err } -func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error) { - var act action.Action +func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error, []byte) { + var ( + act action.Action + message []byte + err error + ) switch channel.Type { case alerting.ChannelWebhook: - message, err := resolveMessage(channel.Webhook.Body, ctx) + message, err = resolveMessage(channel.Webhook.Body, ctx) if err != nil { - return nil, err + return nil, err, message } act = &action.WebhookAction{ Data: channel.Webhook, Message: string(message), } default: - return nil, fmt.Errorf("unsupported action type: %s", channel.Type) + return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message } - return act.Execute() + executeResult, err := act.Execute() + return executeResult, err, message } func (engine *Engine) GenerateTask(rule *alerting.Rule) func(ctx context.Context) { return func(ctx context.Context) { diff --git a/service/alerting/engine.go b/service/alerting/engine.go index 46dc11c3..5f3c548b 100644 --- a/service/alerting/engine.go +++ b/service/alerting/engine.go @@ -12,12 +12,12 @@ import ( ) type Engine interface { - GenerateQuery(rule *alerting.Rule) (interface{}, error) - ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error) + GenerateQuery(rule *alerting.Rule, filterParam *alerting.FilterParam) (interface{}, error) + ExecuteQuery(rule *alerting.Rule, filterParam *alerting.FilterParam)(*alerting.QueryResult, error) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error) GenerateTask(rule *alerting.Rule) func(ctx context.Context) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) - GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool)([]alerting.MetricData, error) + GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error) } var (