From e1d85bc4f3139b1b52713bf944af4567c0f83237 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 18 Apr 2022 09:42:44 +0800 Subject: [PATCH] update alerting api --- model/alerting/alert.go | 16 +- model/alerting/filter.go | 4 + model/alerting/filter_query.go | 8 + model/alerting/resource.go | 15 +- model/alerting/rule.go | 3 + plugin/api/alerting/alert.go | 106 +++++++++ plugin/api/alerting/api.go | 10 +- plugin/api/alerting/rule.go | 107 +++++++++ plugin/api/init.go | 5 + service/alerting/elasticsearch/engine.go | 220 ++++++++++++++++-- service/alerting/elasticsearch/engine_test.go | 35 ++- 11 files changed, 499 insertions(+), 30 deletions(-) create mode 100644 plugin/api/alerting/alert.go diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 515550e1..efd93b03 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -12,17 +12,21 @@ type Alert struct { ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` - RuleID string `json:"rule_id"` - ClusterID string `json:"cluster_id"` - Expression string `json:"expression"` - Objects []string `json:"objects"` - Severity string `json:"severity"` - Content string `json:"content"` + RuleID string `json:"rule_id" elastic_mapping:"rule_id: { type: keyword }"` + ResourceID string `json:"resource_id" elastic_mapping:"resource_id: { type: keyword }"` + ResourceName string `json:"resource_name" elastic_mapping:"resource_name: { type: keyword }"` + Expression string `json:"expression" elastic_mapping:"expression: { type: keyword, copy_to:search_text }"` + Objects []string `json:"objects" elastic_mapping:"objects: { type:keyword,copy_to:search_text }"` + Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"` + Content string `json:"content" elastic_mapping:"context: { type: keyword, copy_to:search_text }"` AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` Users []string `json:"users,omitempty"` State string `json:"state"` Error string `json:"error,omitempty"` + IsNotified bool `json:"is_notified" elastic_mapping:"is_notified: { type: boolean }"` //标识本次检测是否发送了告警通知 + IsEscalated bool `json:"is_escalated" elastic_mapping:"is_escalated: { type: boolean }"` //标识本次检测是否发送了升级告警通知 + SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` } type ActionExecutionResult struct { diff --git a/model/alerting/filter.go b/model/alerting/filter.go index 6ae07519..f419c3f0 100644 --- a/model/alerting/filter.go +++ b/model/alerting/filter.go @@ -10,3 +10,7 @@ type Filter struct { Not []FilterQuery `json:"not,omitempty"` //MinimumShouldMatch int `json:"minimum_should_match"` } + +func (f Filter) IsEmpty() bool { + return len(f.And) == 0 && len(f.Or) == 0 && len(f.Not) == 0 +} \ No newline at end of file diff --git a/model/alerting/filter_query.go b/model/alerting/filter_query.go index ece64139..30a4ab67 100644 --- a/model/alerting/filter_query.go +++ b/model/alerting/filter_query.go @@ -12,3 +12,11 @@ type FilterQuery struct { Or []FilterQuery `json:"or,omitempty"` Not []FilterQuery `json:"not,omitempty"` } + +func (fq FilterQuery) IsComplex() bool { + return len(fq.And) > 0 || len(fq.Or) > 0 || len(fq.Not) > 0 +} + +func (f FilterQuery) IsEmpty() bool { + return !f.IsComplex() && f.Operator == "" +} \ No newline at end of file diff --git a/model/alerting/resource.go b/model/alerting/resource.go index 33764bde..6a126b67 100644 --- a/model/alerting/resource.go +++ b/model/alerting/resource.go @@ -4,12 +4,25 @@ package alerting +import ( + "fmt" +) + type Resource struct { ID string `json:"resource_id" elastic_mapping:"resource_id:{type:keyword}"` + Name string `json:"resource_name" elastic_mapping:"resource_name:{type:keyword}"` Type string `json:"type" elastic_mapping:"type:{type:keyword}"` Objects []string `json:"objects" elastic_mapping:"objects:{type:keyword,copy_to:search_text}"` - Filter Filter `json:"filter,omitempty" elastic_mapping:"-"` + Filter FilterQuery `json:"filter,omitempty" elastic_mapping:"-"` RawFilter map[string]interface{} `json:"raw_filter,omitempty"` TimeField string `json:"time_field,omitempty" elastic_mapping:"id:{type:keyword}"` Context Context `json:"context"` } + +func (r Resource) Validate() error{ + if r.TimeField == "" { + return fmt.Errorf("TimeField can not be empty") + } + return nil +} + diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 4da263dc..8e5339c6 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -19,6 +19,9 @@ type Rule struct { Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"` Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` + LastNotificationTime time.Time `json:"-" elastic_mapping:"last_notification_time:{type:date}"` + LastTermStartTime time.Time `json:"-"` //标识最近一轮告警的开始时间 + LastEscalationTime time.Time `json:"-"` //标识最近一轮告警的开始时间 SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` } diff --git a/plugin/api/alerting/alert.go b/plugin/api/alerting/alert.go new file mode 100644 index 00000000..63195d3a --- /dev/null +++ b/plugin/api/alerting/alert.go @@ -0,0 +1,106 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "fmt" + "infini.sh/console/model/alerting" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + log "src/github.com/cihub/seelog" + "strconv" + "strings" +) + +func (h *AlertAPI) getAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("alert_id") + + obj := alerting.Alert{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} + +func (h *AlertAPI) acknowledgeAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + body := struct { + AlertIDs []string `json:"alert_ids"` + }{} + queryDsl := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "_id": body.AlertIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['state'] = '%s'", alerting.AlertStateAcknowledge), + }, + } + err := orm.UpdateBy(alerting.Alert{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "ids": body.AlertIDs, + "result": "updated", + }, 200) +} + + +func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + mustBuilder = &strings.Builder{} + ) + if keyword != "" { + mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) + } + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{} + queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from) + q.RawQuery = []byte(queryDSL) + + err, res := orm.Search(&alerting.Alert{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.Write(w, res.Raw) +} diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 4cf3e591..77fa1d64 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -4,20 +4,24 @@ package alerting -import "infini.sh/framework/core/api" +import ( + "infini.sh/console/config" + "infini.sh/framework/core/api" +) type AlertAPI struct { api.Handler + Config *config.AppConfig } -func init() { - alert:=AlertAPI{} +func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule) api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule) api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule) + api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.fetchAlertInfos) api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.getChannel) api.HandleAPIMethod(api.POST, "/alerting/channel", alert.createChannel) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index bb3fde1b..a03f81ca 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -238,6 +238,110 @@ func (alertAPI *AlertAPI) searchRule(w http.ResponseWriter, req *http.Request, p w.Write(searchResult.Raw) } +func (alertAPI *AlertAPI) getRuleAlertNumbers(ruleIDs []string) ( map[string]interface{},error) { + esClient := elastic.GetClient(alertAPI.Config.Elasticsearch) + queryDsl := util.MapStr{ + "size": 0, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "rule_id": ruleIDs, + }, + }, + { + "terms": util.MapStr{ + "state": []string{alerting.AlertStateError, alerting.AlertStateActive}, + }, + }, + }, + }, + }, + "aggs": util.MapStr{ + "terms_rule_id": util.MapStr{ + "terms": util.MapStr{ + "field": "rule_id", + }, + }, + }, + } + + searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) + if err != nil { + return nil, err + } + + ruleAlertNumbers := map[string]interface{}{} + if termRules, ok := searchRes.Aggregations["terms_rule_id"]; ok { + for _, bk := range termRules.Buckets { + key := bk["key"].(string) + ruleAlertNumbers[key] = bk["doc_count"] + } + } + return ruleAlertNumbers, nil +} + +func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + alertAPI.DecodeJSON(req, &ruleIDs) + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + esClient := elastic.GetClient(alertAPI.Config.Elasticsearch) + queryDsl := util.MapStr{ + "_source": []string{"state", "rule_id"}, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "rule_id", + }, + "query": util.MapStr{ + "terms": util.MapStr{ + "rule_id": ruleIDs, + }, + }, + } + + searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + if len(searchRes.Hits.Hits) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + aletNumbers, err := alertAPI.getRuleAlertNumbers(ruleIDs) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + + latestAlertInfos := map[string]util.MapStr{} + for _, hit := range searchRes.Hits.Hits { + if ruleID, ok := hit.Source["rule_id"].(string); ok { + latestAlertInfos[ruleID] = util.MapStr{ + "status": hit.Source["state"], + "alert_count": aletNumbers[ruleID], + } + } + + } + alertAPI.WriteJSON(w, latestAlertInfos, http.StatusOK) +} + func checkResourceExists(rule *alerting.Rule) (bool, error) { if rule.Resource.ID == "" { return false, fmt.Errorf("resource id can not be empty") @@ -250,6 +354,9 @@ func checkResourceExists(rule *alerting.Rule) (bool, error) { if err != nil { return false, err } + if rule.Resource.Name == "" { + rule.Resource.Name = obj.Name + } return ok && obj.Name != "", nil default: return false, fmt.Errorf("unsupport resource type: %s", rule.Resource.Type) diff --git a/plugin/api/init.go b/plugin/api/init.go index 9e6d3a70..c7f323aa 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -2,6 +2,7 @@ package api import ( "infini.sh/console/config" + "infini.sh/console/plugin/api/alerting" "infini.sh/console/plugin/api/index_management" "infini.sh/framework/core/api" "path" @@ -53,5 +54,9 @@ func Init(cfg *config.AppConfig) { // } // }, //}) + alertAPI := alerting.AlertAPI{ + Config: cfg, + } + alertAPI.Init() } diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 649f7e90..f2096e72 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -27,7 +27,12 @@ import ( type Engine struct { } - +//GenerateQuery generate a final elasticsearch query dsl object +//when RawFilter of rule is not empty, priority use it, otherwise to covert from Filter of rule (todo) +//auto generate time filter query and then attach to final query +//auto generate elasticsearch aggregations by metrics of rule +//group of metric item converted to terms aggregation and TimeField of rule converted to date_histogram aggregation +//convert statistic of metric item to elasticsearch aggregation func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { filter, err := engine.GenerateRawFilter(rule) if err != nil { @@ -89,6 +94,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { "aggs": rootAggs, }, nil } +//generateAgg convert statistic of metric item to elasticsearch aggregation func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{ var ( aggType = "value_count" @@ -121,10 +127,117 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{ } } +func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[string]interface{}, error){ + if !fq.IsComplex(){ + q := map[string]interface{}{} + if len(fq.Values) == 0 { + return nil, fmt.Errorf("values should not be empty") + } + //equals/gte/gt/lt/lte/in/match/regexp/wildcard/range/prefix/suffix/contain/ + switch fq.Operator { + case "equals": + q["term"] = util.MapStr{ + fq.Field: util.MapStr{ + "value": fq.Values[0], + }, + } + case "in": + q["terms"] = util.MapStr{ + fq.Field: fq.Values, + } + case "match": + q[fq.Operator] = util.MapStr{ + fq.Field: fq.Values[0], + } + case "gte", "gt", "lt", "lte": + q["range"] = util.MapStr{ + fq.Field: util.MapStr{ + fq.Operator: fq.Values[0], + }, + } + case "range": + if len(fq.Values) != 2 { + return nil, fmt.Errorf("values length of range query must be 2, but got %d", len(fq.Values)) + } + q["range"] = util.MapStr{ + fq.Field: util.MapStr{ + "gte": fq.Values[0], + "lte": fq.Values[1], + }, + } + case "prefix": + q["prefix"] = util.MapStr{ + fq.Field: fq.Values[0], + } + case "regexp", "wildcard": + q[fq.Operator] = util.MapStr{ + fq.Field: util.MapStr{ + "value": fq.Values[0], + }, + } + default: + return nil, fmt.Errorf("unsupport query operator %s", fq.Operator) + } + return q, nil + } + if fq.Or != nil && fq.And != nil { + return nil, fmt.Errorf("filter format error: or, and bool operation in same level") + } + if fq.Or != nil && fq.Not != nil { + return nil, fmt.Errorf("filter format error: or, not bool operation in same level") + } + if fq.And != nil && fq.Not != nil { + return nil, fmt.Errorf("filter format error: and, not bool operation in same level") + } + var ( + boolOperator string + filterQueries []alerting.FilterQuery + ) + + if len(fq.Not) >0 { + boolOperator = "must_not" + filterQueries = fq.Not + + }else if len(fq.Or) > 0 { + boolOperator = "should" + filterQueries = fq.Or + }else { + boolOperator = "must" + filterQueries = fq.And + } + var subQueries []interface{} + for _, subQ := range filterQueries { + subQuery, err := engine.ConvertFilterQueryToDsl(&subQ) + if err != nil { + return nil, err + } + subQueries = append(subQueries, subQuery) + } + boolQuery := util.MapStr{ + boolOperator: subQueries, + } + if boolOperator == "should" { + boolQuery["minimum_should_match"] = 1 + } + resultQuery := map[string]interface{}{ + "bool": boolQuery, + } + + return resultQuery, nil +} + func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) { query := map[string]interface{}{} + var err error if rule.Resource.RawFilter != nil { query = rule.Resource.RawFilter + }else{ + if !rule.Resource.Filter.IsEmpty(){ + query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter) + if err != nil { + return nil, err + } + } } intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval) if err != nil { @@ -208,6 +321,9 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e collectMetricData(searchResult["aggregations"], "", &metricData) return metricData, nil } +//CheckCondition check whether rule conditions triggered or not +//if triggered returns an array of ConditionResult +//sort conditions by severity desc before check , and then if condition is true, then continue check another group func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error){ metricData, err := engine.ExecuteQuery(rule) if err != nil { @@ -325,7 +441,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return err } lastAlertItem := alerting.Alert{} - err = getLastAlert(rule.ID, rule.Resource.ID, &lastAlertItem) + err = getLastAlert(rule.ID, &lastAlertItem) if err != nil { return err } @@ -349,11 +465,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if len(conditionResults) == 0 { if lastAlertItem.State != alerting.AlertStateNormal && lastAlertItem.ID != "" { alertItem = &alerting.Alert{ - ID: util.GetUUID(), - Created: time.Now(), - Updated: time.Now(), + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), RuleID: rule.ID, - ClusterID: rule.Resource.ID, + ResourceID: rule.Resource.ID, Expression: rule.Metrics.Expression, Objects: rule.Resource.Objects, Severity: "info", @@ -363,6 +479,9 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } return nil }else{ + if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal { + rule.LastTermStartTime = time.Now() + } log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) var ( severity = conditionResults[0].ConditionItem.Severity @@ -379,7 +498,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { Created: time.Now(), Updated: time.Now(), RuleID: rule.ID, - ClusterID: rule.Resource.ID, + ResourceID: rule.Resource.ID, + ResourceName: rule.Resource.Name, Expression: rule.Metrics.Expression, Objects: rule.Resource.Objects, Severity: severity, @@ -394,20 +514,36 @@ func (engine *Engine) Do(rule *alerting.Rule) error { alertItem.Error = err.Error() return err } + period := time.Now().Sub(rule.LastNotificationTime) + //log.Error(lastAlertItem.ID, period, periodDuration) - if time.Now().Sub(lastAlertItem.Created) > periodDuration { + if lastAlertItem.ID == "" || period > periodDuration { actionResults := performChannels(rule.Channels.Normal, conditionResults) alertItem.ActionExecutionResults = actionResults - + //todo init last notification time when create task (by last alert item is notified) + rule.LastNotificationTime = time.Now() + alertItem.IsNotified = true } - if rule.Channels.EscalationEnabled && lastAlertItem.State != alerting.AlertStateNormal { - periodDuration, err = time.ParseDuration(rule.Channels.EscalationThrottlePeriod) + isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime) + if err != nil { + alertItem.Error = err.Error() + return err + } + if rule.Channels.EscalationEnabled && lastAlertItem.ID !="" && !isAck { + throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod) if err != nil { return err } - if time.Now().Sub(lastAlertItem.Created) > periodDuration { - actionResults := performChannels(rule.Channels.Escalation, conditionResults) - alertItem.ActionExecutionResults = actionResults + //todo init last term start time when create task (by last alert item of state normal) + if time.Now().Sub(rule.LastTermStartTime) > throttlePeriod { + if time.Now().Sub(rule.LastEscalationTime) > periodDuration { + actionResults := performChannels(rule.Channels.Escalation, conditionResults) + alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...) + //todo init last escalation time when create task (by last alert item is escalated) + rule.LastEscalationTime = time.Now() + alertItem.IsEscalated = true + } + } } } @@ -555,8 +691,7 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti } } -func getLastAlert(ruleID, clusterID string, alertItem *alerting.Alert) error { - esClient := elastic.GetClient(clusterID) +func getLastAlert(ruleID string, alertItem *alerting.Alert) error { queryDsl := util.MapStr{ "size": 1, "sort": []util.MapStr{ @@ -574,13 +709,60 @@ func getLastAlert(ruleID, clusterID string, alertItem *alerting.Alert) error { }, }, } - searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alertItem), util.MustToJSONBytes(queryDsl) ) + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), + } + err, searchResult := orm.Search(alertItem, &q ) if err != nil { return err } - if len(searchRes.Hits.Hits) == 0 { + if len(searchResult.Result) == 0 { return nil } - alertBytes := util.MustToJSONBytes(searchRes.Hits.Hits[0].Source) + alertBytes := util.MustToJSONBytes(searchResult.Result[0]) return util.FromJSONBytes(alertBytes, alertItem) +} + +func hasAcknowledgedRule(ruleID string, startTime time.Time) (bool, error){ + queryDsl := util.MapStr{ + "size": 1, + "query": util.MapStr{ + "bool": util.MapStr{ + "must":[]util.MapStr{ + { + "term": util.MapStr{ + "rule_id": util.MapStr{ + "value": ruleID, + }, + }, + }, + { + "term": util.MapStr{ + "state": alerting.AlertStateAcknowledge, + }, + }, + { + "range": util.MapStr{ + "created": util.MapStr{ + "gte": startTime, + }, + }, + }, + }, + + }, + }, + } + q := orm.Query{ + WildcardIndex: true, + RawQuery: util.MustToJSONBytes(queryDsl), + } + err, searchResult := orm.Search(alerting.Alert{}, &q ) + if err != nil { + return false, err + } + if len(searchResult.Result) == 0 { + return false, nil + } + return true, nil } \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 7db1ec2c..d9e3e609 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -25,7 +25,7 @@ func TestEngine( t *testing.T) { Type: "elasticsearch", Objects: []string{".infini_metrics*"}, TimeField: "timestamp", - Filter: alerting.Filter{ + Filter: alerting.FilterQuery{ And: []alerting.FilterQuery{ {Field: "timestamp", Operator: "gte", Values: []string{"now-15m"}}, //{Field: "payload.elasticsearch.cluster_health.status", Operator: "equals", Values: []string{"red"}}, @@ -184,3 +184,36 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { } fmt.Println(util.MustToJSON(q)) } + +func TestConvertFilterQuery(t *testing.T) { + fq := alerting.FilterQuery{ + And: []alerting.FilterQuery{ + { + Field: "metadata.category", + Values: []string{"elasticsearch"}, + Operator: "equals", + }, + { + Field: "metadata.name", + Values: []string{"index_stats", "node_stats"}, + Operator: "in", + }, + { + Not: []alerting.FilterQuery{ + { + Field: "timestamp", + Operator: "gt", + Values: []string{"2022-04-16T16:16:39.168605+08:00"}, + }, + }, + }, + }, + } + + eng := &Engine{} + q, err := eng.ConvertFilterQueryToDsl(&fq) + if err != nil { + t.Fatal(err) + } + fmt.Println(util.MustToJSON(q)) +} \ No newline at end of file