update alerting api

This commit is contained in:
liugq 2022-04-18 09:42:44 +08:00
parent fbcd324113
commit e1d85bc4f3
11 changed files with 499 additions and 30 deletions

View File

@ -12,17 +12,21 @@ type Alert struct {
ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"`
Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"`
Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"`
RuleID string `json:"rule_id"` RuleID string `json:"rule_id" elastic_mapping:"rule_id: { type: keyword }"`
ClusterID string `json:"cluster_id"` ResourceID string `json:"resource_id" elastic_mapping:"resource_id: { type: keyword }"`
Expression string `json:"expression"` ResourceName string `json:"resource_name" elastic_mapping:"resource_name: { type: keyword }"`
Objects []string `json:"objects"` Expression string `json:"expression" elastic_mapping:"expression: { type: keyword, copy_to:search_text }"`
Severity string `json:"severity"` Objects []string `json:"objects" elastic_mapping:"objects: { type:keyword,copy_to:search_text }"`
Content string `json:"content"` Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"`
Content string `json:"content" elastic_mapping:"context: { type: keyword, copy_to:search_text }"`
AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"`
ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"`
Users []string `json:"users,omitempty"` Users []string `json:"users,omitempty"`
State string `json:"state"` State string `json:"state"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
IsNotified bool `json:"is_notified" elastic_mapping:"is_notified: { type: boolean }"` //标识本次检测是否发送了告警通知
IsEscalated bool `json:"is_escalated" elastic_mapping:"is_escalated: { type: boolean }"` //标识本次检测是否发送了升级告警通知
SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"`
} }
type ActionExecutionResult struct { type ActionExecutionResult struct {

View File

@ -10,3 +10,7 @@ type Filter struct {
Not []FilterQuery `json:"not,omitempty"` Not []FilterQuery `json:"not,omitempty"`
//MinimumShouldMatch int `json:"minimum_should_match"` //MinimumShouldMatch int `json:"minimum_should_match"`
} }
func (f Filter) IsEmpty() bool {
return len(f.And) == 0 && len(f.Or) == 0 && len(f.Not) == 0
}

View File

@ -12,3 +12,11 @@ type FilterQuery struct {
Or []FilterQuery `json:"or,omitempty"` Or []FilterQuery `json:"or,omitempty"`
Not []FilterQuery `json:"not,omitempty"` Not []FilterQuery `json:"not,omitempty"`
} }
func (fq FilterQuery) IsComplex() bool {
return len(fq.And) > 0 || len(fq.Or) > 0 || len(fq.Not) > 0
}
func (f FilterQuery) IsEmpty() bool {
return !f.IsComplex() && f.Operator == ""
}

View File

@ -4,12 +4,25 @@
package alerting package alerting
import (
"fmt"
)
type Resource struct { type Resource struct {
ID string `json:"resource_id" elastic_mapping:"resource_id:{type:keyword}"` ID string `json:"resource_id" elastic_mapping:"resource_id:{type:keyword}"`
Name string `json:"resource_name" elastic_mapping:"resource_name:{type:keyword}"`
Type string `json:"type" elastic_mapping:"type:{type:keyword}"` Type string `json:"type" elastic_mapping:"type:{type:keyword}"`
Objects []string `json:"objects" elastic_mapping:"objects:{type:keyword,copy_to:search_text}"` Objects []string `json:"objects" elastic_mapping:"objects:{type:keyword,copy_to:search_text}"`
Filter Filter `json:"filter,omitempty" elastic_mapping:"-"` Filter FilterQuery `json:"filter,omitempty" elastic_mapping:"-"`
RawFilter map[string]interface{} `json:"raw_filter,omitempty"` RawFilter map[string]interface{} `json:"raw_filter,omitempty"`
TimeField string `json:"time_field,omitempty" elastic_mapping:"id:{type:keyword}"` TimeField string `json:"time_field,omitempty" elastic_mapping:"id:{type:keyword}"`
Context Context `json:"context"` Context Context `json:"context"`
} }
func (r Resource) Validate() error{
if r.TimeField == "" {
return fmt.Errorf("TimeField can not be empty")
}
return nil
}

View File

@ -19,6 +19,9 @@ type Rule struct {
Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"`
Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"` Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"`
Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"`
LastNotificationTime time.Time `json:"-" elastic_mapping:"last_notification_time:{type:date}"`
LastTermStartTime time.Time `json:"-"` //标识最近一轮告警的开始时间
LastEscalationTime time.Time `json:"-"` //标识最近一轮告警的开始时间
SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"`
} }

View File

@ -0,0 +1,106 @@
/* Copyright © INFINI Ltd. All rights reserved.
* web: https://infinilabs.com
* mail: hello#infini.ltd */
package alerting
import (
"fmt"
"infini.sh/console/model/alerting"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"net/http"
log "src/github.com/cihub/seelog"
"strconv"
"strings"
)
func (h *AlertAPI) getAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("alert_id")
obj := alerting.Alert{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"found": true,
"_id": id,
"_source": obj,
}, 200)
}
func (h *AlertAPI) acknowledgeAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
body := struct {
AlertIDs []string `json:"alert_ids"`
}{}
queryDsl := util.MapStr{
"query": util.MapStr{
"terms": util.MapStr{
"_id": body.AlertIDs,
},
},
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['state'] = '%s'", alerting.AlertStateAcknowledge),
},
}
err := orm.UpdateBy(alerting.Alert{}, util.MustToJSONBytes(queryDsl))
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"ids": body.AlertIDs,
"result": "updated",
}, 200)
}
func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
keyword = h.GetParameterOrDefault(req, "keyword", "")
queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}`
strSize = h.GetParameterOrDefault(req, "size", "20")
strFrom = h.GetParameterOrDefault(req, "from", "0")
mustBuilder = &strings.Builder{}
)
if keyword != "" {
mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword))
}
size, _ := strconv.Atoi(strSize)
if size <= 0 {
size = 20
}
from, _ := strconv.Atoi(strFrom)
if from < 0 {
from = 0
}
q := orm.Query{}
queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from)
q.RawQuery = []byte(queryDSL)
err, res := orm.Search(&alerting.Alert{}, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.Write(w, res.Raw)
}

View File

@ -4,20 +4,24 @@
package alerting package alerting
import "infini.sh/framework/core/api" import (
"infini.sh/console/config"
"infini.sh/framework/core/api"
)
type AlertAPI struct { type AlertAPI struct {
api.Handler api.Handler
Config *config.AppConfig
} }
func init() { func (alert *AlertAPI) Init() {
alert:=AlertAPI{}
api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule)
api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule) api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule)
api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule) api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule)
api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule)
api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule)
api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.fetchAlertInfos)
api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.getChannel) api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.getChannel)
api.HandleAPIMethod(api.POST, "/alerting/channel", alert.createChannel) api.HandleAPIMethod(api.POST, "/alerting/channel", alert.createChannel)

View File

@ -238,6 +238,110 @@ func (alertAPI *AlertAPI) searchRule(w http.ResponseWriter, req *http.Request, p
w.Write(searchResult.Raw) w.Write(searchResult.Raw)
} }
func (alertAPI *AlertAPI) getRuleAlertNumbers(ruleIDs []string) ( map[string]interface{},error) {
esClient := elastic.GetClient(alertAPI.Config.Elasticsearch)
queryDsl := util.MapStr{
"size": 0,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"rule_id": ruleIDs,
},
},
{
"terms": util.MapStr{
"state": []string{alerting.AlertStateError, alerting.AlertStateActive},
},
},
},
},
},
"aggs": util.MapStr{
"terms_rule_id": util.MapStr{
"terms": util.MapStr{
"field": "rule_id",
},
},
},
}
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) )
if err != nil {
return nil, err
}
ruleAlertNumbers := map[string]interface{}{}
if termRules, ok := searchRes.Aggregations["terms_rule_id"]; ok {
for _, bk := range termRules.Buckets {
key := bk["key"].(string)
ruleAlertNumbers[key] = bk["doc_count"]
}
}
return ruleAlertNumbers, nil
}
func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var ruleIDs = []string{}
alertAPI.DecodeJSON(req, &ruleIDs)
if len(ruleIDs) == 0 {
alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
esClient := elastic.GetClient(alertAPI.Config.Elasticsearch)
queryDsl := util.MapStr{
"_source": []string{"state", "rule_id"},
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"collapse": util.MapStr{
"field": "rule_id",
},
"query": util.MapStr{
"terms": util.MapStr{
"rule_id": ruleIDs,
},
},
}
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) )
if err != nil {
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
if len(searchRes.Hits.Hits) == 0 {
alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
aletNumbers, err := alertAPI.getRuleAlertNumbers(ruleIDs)
if err != nil {
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
latestAlertInfos := map[string]util.MapStr{}
for _, hit := range searchRes.Hits.Hits {
if ruleID, ok := hit.Source["rule_id"].(string); ok {
latestAlertInfos[ruleID] = util.MapStr{
"status": hit.Source["state"],
"alert_count": aletNumbers[ruleID],
}
}
}
alertAPI.WriteJSON(w, latestAlertInfos, http.StatusOK)
}
func checkResourceExists(rule *alerting.Rule) (bool, error) { func checkResourceExists(rule *alerting.Rule) (bool, error) {
if rule.Resource.ID == "" { if rule.Resource.ID == "" {
return false, fmt.Errorf("resource id can not be empty") return false, fmt.Errorf("resource id can not be empty")
@ -250,6 +354,9 @@ func checkResourceExists(rule *alerting.Rule) (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
if rule.Resource.Name == "" {
rule.Resource.Name = obj.Name
}
return ok && obj.Name != "", nil return ok && obj.Name != "", nil
default: default:
return false, fmt.Errorf("unsupport resource type: %s", rule.Resource.Type) return false, fmt.Errorf("unsupport resource type: %s", rule.Resource.Type)

View File

@ -2,6 +2,7 @@ package api
import ( import (
"infini.sh/console/config" "infini.sh/console/config"
"infini.sh/console/plugin/api/alerting"
"infini.sh/console/plugin/api/index_management" "infini.sh/console/plugin/api/index_management"
"infini.sh/framework/core/api" "infini.sh/framework/core/api"
"path" "path"
@ -53,5 +54,9 @@ func Init(cfg *config.AppConfig) {
// } // }
// }, // },
//}) //})
alertAPI := alerting.AlertAPI{
Config: cfg,
}
alertAPI.Init()
} }

View File

@ -27,7 +27,12 @@ import (
type Engine struct { type Engine struct {
} }
//GenerateQuery generate a final elasticsearch query dsl object
//when RawFilter of rule is not empty, priority use it, otherwise to covert from Filter of rule (todo)
//auto generate time filter query and then attach to final query
//auto generate elasticsearch aggregations by metrics of rule
//group of metric item converted to terms aggregation and TimeField of rule converted to date_histogram aggregation
//convert statistic of metric item to elasticsearch aggregation
func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
filter, err := engine.GenerateRawFilter(rule) filter, err := engine.GenerateRawFilter(rule)
if err != nil { if err != nil {
@ -89,6 +94,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
"aggs": rootAggs, "aggs": rootAggs,
}, nil }, nil
} }
//generateAgg convert statistic of metric item to elasticsearch aggregation
func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{
var ( var (
aggType = "value_count" aggType = "value_count"
@ -121,10 +127,117 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{
} }
} }
func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[string]interface{}, error){
if !fq.IsComplex(){
q := map[string]interface{}{}
if len(fq.Values) == 0 {
return nil, fmt.Errorf("values should not be empty")
}
//equals/gte/gt/lt/lte/in/match/regexp/wildcard/range/prefix/suffix/contain/
switch fq.Operator {
case "equals":
q["term"] = util.MapStr{
fq.Field: util.MapStr{
"value": fq.Values[0],
},
}
case "in":
q["terms"] = util.MapStr{
fq.Field: fq.Values,
}
case "match":
q[fq.Operator] = util.MapStr{
fq.Field: fq.Values[0],
}
case "gte", "gt", "lt", "lte":
q["range"] = util.MapStr{
fq.Field: util.MapStr{
fq.Operator: fq.Values[0],
},
}
case "range":
if len(fq.Values) != 2 {
return nil, fmt.Errorf("values length of range query must be 2, but got %d", len(fq.Values))
}
q["range"] = util.MapStr{
fq.Field: util.MapStr{
"gte": fq.Values[0],
"lte": fq.Values[1],
},
}
case "prefix":
q["prefix"] = util.MapStr{
fq.Field: fq.Values[0],
}
case "regexp", "wildcard":
q[fq.Operator] = util.MapStr{
fq.Field: util.MapStr{
"value": fq.Values[0],
},
}
default:
return nil, fmt.Errorf("unsupport query operator %s", fq.Operator)
}
return q, nil
}
if fq.Or != nil && fq.And != nil {
return nil, fmt.Errorf("filter format error: or, and bool operation in same level")
}
if fq.Or != nil && fq.Not != nil {
return nil, fmt.Errorf("filter format error: or, not bool operation in same level")
}
if fq.And != nil && fq.Not != nil {
return nil, fmt.Errorf("filter format error: and, not bool operation in same level")
}
var (
boolOperator string
filterQueries []alerting.FilterQuery
)
if len(fq.Not) >0 {
boolOperator = "must_not"
filterQueries = fq.Not
}else if len(fq.Or) > 0 {
boolOperator = "should"
filterQueries = fq.Or
}else {
boolOperator = "must"
filterQueries = fq.And
}
var subQueries []interface{}
for _, subQ := range filterQueries {
subQuery, err := engine.ConvertFilterQueryToDsl(&subQ)
if err != nil {
return nil, err
}
subQueries = append(subQueries, subQuery)
}
boolQuery := util.MapStr{
boolOperator: subQueries,
}
if boolOperator == "should" {
boolQuery["minimum_should_match"] = 1
}
resultQuery := map[string]interface{}{
"bool": boolQuery,
}
return resultQuery, nil
}
func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) { func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) {
query := map[string]interface{}{} query := map[string]interface{}{}
var err error
if rule.Resource.RawFilter != nil { if rule.Resource.RawFilter != nil {
query = rule.Resource.RawFilter query = rule.Resource.RawFilter
}else{
if !rule.Resource.Filter.IsEmpty(){
query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter)
if err != nil {
return nil, err
}
}
} }
intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval) intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval)
if err != nil { if err != nil {
@ -208,6 +321,9 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e
collectMetricData(searchResult["aggregations"], "", &metricData) collectMetricData(searchResult["aggregations"], "", &metricData)
return metricData, nil return metricData, nil
} }
//CheckCondition check whether rule conditions triggered or not
//if triggered returns an array of ConditionResult
//sort conditions by severity desc before check , and then if condition is true, then continue check another group
func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error){ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error){
metricData, err := engine.ExecuteQuery(rule) metricData, err := engine.ExecuteQuery(rule)
if err != nil { if err != nil {
@ -325,7 +441,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
return err return err
} }
lastAlertItem := alerting.Alert{} lastAlertItem := alerting.Alert{}
err = getLastAlert(rule.ID, rule.Resource.ID, &lastAlertItem) err = getLastAlert(rule.ID, &lastAlertItem)
if err != nil { if err != nil {
return err return err
} }
@ -353,7 +469,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
Created: time.Now(), Created: time.Now(),
Updated: time.Now(), Updated: time.Now(),
RuleID: rule.ID, RuleID: rule.ID,
ClusterID: rule.Resource.ID, ResourceID: rule.Resource.ID,
Expression: rule.Metrics.Expression, Expression: rule.Metrics.Expression,
Objects: rule.Resource.Objects, Objects: rule.Resource.Objects,
Severity: "info", Severity: "info",
@ -363,6 +479,9 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
} }
return nil return nil
}else{ }else{
if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal {
rule.LastTermStartTime = time.Now()
}
log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID )
var ( var (
severity = conditionResults[0].ConditionItem.Severity severity = conditionResults[0].ConditionItem.Severity
@ -379,7 +498,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
Created: time.Now(), Created: time.Now(),
Updated: time.Now(), Updated: time.Now(),
RuleID: rule.ID, RuleID: rule.ID,
ClusterID: rule.Resource.ID, ResourceID: rule.Resource.ID,
ResourceName: rule.Resource.Name,
Expression: rule.Metrics.Expression, Expression: rule.Metrics.Expression,
Objects: rule.Resource.Objects, Objects: rule.Resource.Objects,
Severity: severity, Severity: severity,
@ -394,20 +514,36 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
alertItem.Error = err.Error() alertItem.Error = err.Error()
return err return err
} }
period := time.Now().Sub(rule.LastNotificationTime)
//log.Error(lastAlertItem.ID, period, periodDuration)
if time.Now().Sub(lastAlertItem.Created) > periodDuration { if lastAlertItem.ID == "" || period > periodDuration {
actionResults := performChannels(rule.Channels.Normal, conditionResults) actionResults := performChannels(rule.Channels.Normal, conditionResults)
alertItem.ActionExecutionResults = actionResults alertItem.ActionExecutionResults = actionResults
//todo init last notification time when create task (by last alert item is notified)
rule.LastNotificationTime = time.Now()
alertItem.IsNotified = true
} }
if rule.Channels.EscalationEnabled && lastAlertItem.State != alerting.AlertStateNormal { isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime)
periodDuration, err = time.ParseDuration(rule.Channels.EscalationThrottlePeriod) if err != nil {
alertItem.Error = err.Error()
return err
}
if rule.Channels.EscalationEnabled && lastAlertItem.ID !="" && !isAck {
throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod)
if err != nil { if err != nil {
return err return err
} }
if time.Now().Sub(lastAlertItem.Created) > periodDuration { //todo init last term start time when create task (by last alert item of state normal)
if time.Now().Sub(rule.LastTermStartTime) > throttlePeriod {
if time.Now().Sub(rule.LastEscalationTime) > periodDuration {
actionResults := performChannels(rule.Channels.Escalation, conditionResults) actionResults := performChannels(rule.Channels.Escalation, conditionResults)
alertItem.ActionExecutionResults = actionResults alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...)
//todo init last escalation time when create task (by last alert item is escalated)
rule.LastEscalationTime = time.Now()
alertItem.IsEscalated = true
}
} }
} }
} }
@ -555,8 +691,7 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti
} }
} }
func getLastAlert(ruleID, clusterID string, alertItem *alerting.Alert) error { func getLastAlert(ruleID string, alertItem *alerting.Alert) error {
esClient := elastic.GetClient(clusterID)
queryDsl := util.MapStr{ queryDsl := util.MapStr{
"size": 1, "size": 1,
"sort": []util.MapStr{ "sort": []util.MapStr{
@ -574,13 +709,60 @@ func getLastAlert(ruleID, clusterID string, alertItem *alerting.Alert) error {
}, },
}, },
} }
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alertItem), util.MustToJSONBytes(queryDsl) ) q := orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),
}
err, searchResult := orm.Search(alertItem, &q )
if err != nil { if err != nil {
return err return err
} }
if len(searchRes.Hits.Hits) == 0 { if len(searchResult.Result) == 0 {
return nil return nil
} }
alertBytes := util.MustToJSONBytes(searchRes.Hits.Hits[0].Source) alertBytes := util.MustToJSONBytes(searchResult.Result[0])
return util.FromJSONBytes(alertBytes, alertItem) return util.FromJSONBytes(alertBytes, alertItem)
} }
func hasAcknowledgedRule(ruleID string, startTime time.Time) (bool, error){
queryDsl := util.MapStr{
"size": 1,
"query": util.MapStr{
"bool": util.MapStr{
"must":[]util.MapStr{
{
"term": util.MapStr{
"rule_id": util.MapStr{
"value": ruleID,
},
},
},
{
"term": util.MapStr{
"state": alerting.AlertStateAcknowledge,
},
},
{
"range": util.MapStr{
"created": util.MapStr{
"gte": startTime,
},
},
},
},
},
},
}
q := orm.Query{
WildcardIndex: true,
RawQuery: util.MustToJSONBytes(queryDsl),
}
err, searchResult := orm.Search(alerting.Alert{}, &q )
if err != nil {
return false, err
}
if len(searchResult.Result) == 0 {
return false, nil
}
return true, nil
}

View File

@ -25,7 +25,7 @@ func TestEngine( t *testing.T) {
Type: "elasticsearch", Type: "elasticsearch",
Objects: []string{".infini_metrics*"}, Objects: []string{".infini_metrics*"},
TimeField: "timestamp", TimeField: "timestamp",
Filter: alerting.Filter{ Filter: alerting.FilterQuery{
And: []alerting.FilterQuery{ And: []alerting.FilterQuery{
{Field: "timestamp", Operator: "gte", Values: []string{"now-15m"}}, {Field: "timestamp", Operator: "gte", Values: []string{"now-15m"}},
//{Field: "payload.elasticsearch.cluster_health.status", Operator: "equals", Values: []string{"red"}}, //{Field: "payload.elasticsearch.cluster_health.status", Operator: "equals", Values: []string{"red"}},
@ -184,3 +184,36 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
} }
fmt.Println(util.MustToJSON(q)) fmt.Println(util.MustToJSON(q))
} }
func TestConvertFilterQuery(t *testing.T) {
fq := alerting.FilterQuery{
And: []alerting.FilterQuery{
{
Field: "metadata.category",
Values: []string{"elasticsearch"},
Operator: "equals",
},
{
Field: "metadata.name",
Values: []string{"index_stats", "node_stats"},
Operator: "in",
},
{
Not: []alerting.FilterQuery{
{
Field: "timestamp",
Operator: "gt",
Values: []string{"2022-04-16T16:16:39.168605+08:00"},
},
},
},
},
}
eng := &Engine{}
q, err := eng.ConvertFilterQueryToDsl(&fq)
if err != nil {
t.Fatal(err)
}
fmt.Println(util.MustToJSON(q))
}