From e9fdb7ab2251294f4398052a7cbce6f7fe551745 Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 27 May 2022 18:35:57 +0800 Subject: [PATCH] add alert nodata status --- model/alerting/alert.go | 1 + model/alerting/metric.go | 1 + service/alerting/elasticsearch/engine.go | 146 ++++++++++++++++------- 3 files changed, 104 insertions(+), 44 deletions(-) diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 10b3421c..2307b888 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -46,6 +46,7 @@ const ( AlertStateAlerting string = "alerting" AlertStateOK = "ok" AlertStateError = "error" + AlertStateNodata = "nodata" ) const ( diff --git a/model/alerting/metric.go b/model/alerting/metric.go index 86f7979f..5336d628 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -52,6 +52,7 @@ type QueryResult struct { Query string `json:"query"` Raw string `json:"raw"` MetricData []MetricData `json:"metric_data"` + Nodata bool `json:"nodata"` } type MetricData struct { diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index be461b88..7e5eeb2b 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -41,6 +41,10 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F if err != nil { return nil, err } + timeFilter, err := engine.generateTimeFilter(rule, filterParam) + if err != nil { + return nil, err + } if len(rule.Metrics.Items) == 0 { return nil, fmt.Errorf("metric items should not be empty") } @@ -52,6 +56,14 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F return nil, err } } + if len(filter) > 0 { + basicAggs = util.MapStr{ + "filter_agg": util.MapStr{ + "filter": filter, + "aggs": basicAggs, + }, + } + } targetESVersion := elastic.GetMetadata(rule.Resource.ID).Config.Version intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion) if err != nil { @@ -95,9 +107,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F "time_buckets": timeAggs, } } - lastGroupAgg = groupAgg - } rootAggs = util.MapStr{ util.GetUUID(): lastGroupAgg, @@ -110,7 +120,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F return util.MapStr{ "size": 0, - "query": filter, + "query": timeFilter, "aggs": rootAggs, }, nil } @@ -263,19 +273,7 @@ func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[str return resultQuery, nil } -func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) { - query := map[string]interface{}{} - var err error - if rule.Resource.RawFilter != nil { - query = util.DeepCopy(rule.Resource.RawFilter).(map[string]interface{}) - }else{ - if !rule.Resource.Filter.IsEmpty(){ - query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter) - if err != nil { - return nil, err - } - } - } +func (engine *Engine) generateTimeFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error){ var ( timeStart interface{} timeEnd interface{} @@ -324,36 +322,56 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerti }, }, } + return timeQuery, nil +} - if boolQ, ok := query["bool"].(map[string]interface{}); ok { - if mustQ, ok := boolQ["must"]; ok { - - if mustArr, ok := mustQ.([]interface{}); ok { - boolQ["must"] = append(mustArr, timeQuery) - - }else{ - return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ) - } - }else{ - boolQ["must"] = []interface{}{ - timeQuery, - } - } +func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) { + query := map[string]interface{}{} + var err error + if rule.Resource.RawFilter != nil { + query = util.DeepCopy(rule.Resource.RawFilter).(map[string]interface{}) }else{ - must := []interface{}{ - timeQuery, - } - if len(query) > 0 { - if _, ok = query["match_all"]; !ok { - must = append(must, query) + if !rule.Resource.Filter.IsEmpty(){ + query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter) + if err != nil { + return nil, err } } - query = util.MapStr{ - "bool": util.MapStr{ - "must": must, - }, - } } + //timeQuery, err := engine.generateTimeFilter(rule, filterParam) + //if err != nil { + // return nil, err + //} + // + //if boolQ, ok := query["bool"].(map[string]interface{}); ok { + // if mustQ, ok := boolQ["must"]; ok { + // + // if mustArr, ok := mustQ.([]interface{}); ok { + // boolQ["must"] = append(mustArr, timeQuery) + // + // }else{ + // return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ) + // } + // }else{ + // boolQ["must"] = []interface{}{ + // timeQuery, + // } + // } + //}else{ + // must := []interface{}{ + // timeQuery, + // } + // if len(query) > 0 { + // if _, ok = query["match_all"]; !ok { + // must = append(must, query) + // } + // } + // query = util.MapStr{ + // "bool": util.MapStr{ + // "must": must, + // }, + // } + //} return query, nil } @@ -373,7 +391,10 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi queryResult.Query = string(queryDslBytes) searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes) if err != nil { - return nil, err + return queryResult, err + } + if searchRes.GetTotal() == 0 { + queryResult.Nodata = true } if searchRes.StatusCode != 200 { return nil, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body)) @@ -599,8 +620,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error { var paramsCtx map[string]interface{} if len(conditionResults) == 0 { alertItem.Severity = "info" - alertItem.State = alerting.AlertStateOK - if alertMessage != nil && alertMessage.Status != alerting.MessageStateRecovered { + if checkResults.QueryResult.Nodata { + alertItem.State = alerting.AlertStateNodata + } + + if alertMessage != nil && alertMessage.Status != alerting.MessageStateRecovered && !checkResults.QueryResult.Nodata { alertMessage.Status = alerting.MessageStateRecovered alertMessage.Updated = time.Now() err = saveAlertMessage(alertMessage) @@ -929,6 +953,40 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti if k == "key" || k == "key_as_string" || k== "doc_count"{ continue } + //has filter + if k == "filter_agg" { + if filterM, ok := v.(map[string]interface{}); ok { + for fk, fv := range filterM { + if fk == "doc_count" { + continue + } + if vm, ok := fv.(map[string]interface{}); ok { + if metricVal, ok := vm["value"]; ok { + md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], metricVal}) + }else{ + //percentiles agg type + switch vm["values"].(type) { + case []interface{}: + for _, val := range vm["values"].([]interface{}) { + if valM, ok := val.(map[string]interface{}); ok { + md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], valM["value"]}) + } + break + } + case map[string]interface{}: + for _, val := range vm["values"].(map[string]interface{}) { + md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], val}) + break + } + } + + } + + } + } + } + continue + } if vm, ok := v.(map[string]interface{}); ok { if metricVal, ok := vm["value"]; ok { md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal})