add alert nodata status

This commit is contained in:
liugq 2022-05-27 18:35:57 +08:00
parent 1d8cd16774
commit e9fdb7ab22
3 changed files with 104 additions and 44 deletions

View File

@ -46,6 +46,7 @@ const (
AlertStateAlerting string = "alerting"
AlertStateOK = "ok"
AlertStateError = "error"
AlertStateNodata = "nodata"
)
const (

View File

@ -52,6 +52,7 @@ type QueryResult struct {
Query string `json:"query"`
Raw string `json:"raw"`
MetricData []MetricData `json:"metric_data"`
Nodata bool `json:"nodata"`
}
type MetricData struct {

View File

@ -41,6 +41,10 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
if err != nil {
return nil, err
}
timeFilter, err := engine.generateTimeFilter(rule, filterParam)
if err != nil {
return nil, err
}
if len(rule.Metrics.Items) == 0 {
return nil, fmt.Errorf("metric items should not be empty")
}
@ -52,6 +56,14 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
return nil, err
}
}
if len(filter) > 0 {
basicAggs = util.MapStr{
"filter_agg": util.MapStr{
"filter": filter,
"aggs": basicAggs,
},
}
}
targetESVersion := elastic.GetMetadata(rule.Resource.ID).Config.Version
intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion)
if err != nil {
@ -95,9 +107,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
"time_buckets": timeAggs,
}
}
lastGroupAgg = groupAgg
}
rootAggs = util.MapStr{
util.GetUUID(): lastGroupAgg,
@ -110,7 +120,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
return util.MapStr{
"size": 0,
"query": filter,
"query": timeFilter,
"aggs": rootAggs,
}, nil
}
@ -263,19 +273,7 @@ func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[str
return resultQuery, nil
}
func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) {
query := map[string]interface{}{}
var err error
if rule.Resource.RawFilter != nil {
query = util.DeepCopy(rule.Resource.RawFilter).(map[string]interface{})
}else{
if !rule.Resource.Filter.IsEmpty(){
query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter)
if err != nil {
return nil, err
}
}
}
func (engine *Engine) generateTimeFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error){
var (
timeStart interface{}
timeEnd interface{}
@ -324,36 +322,56 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerti
},
},
}
return timeQuery, nil
}
if boolQ, ok := query["bool"].(map[string]interface{}); ok {
if mustQ, ok := boolQ["must"]; ok {
if mustArr, ok := mustQ.([]interface{}); ok {
boolQ["must"] = append(mustArr, timeQuery)
func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) {
query := map[string]interface{}{}
var err error
if rule.Resource.RawFilter != nil {
query = util.DeepCopy(rule.Resource.RawFilter).(map[string]interface{})
}else{
return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ)
}
}else{
boolQ["must"] = []interface{}{
timeQuery,
if !rule.Resource.Filter.IsEmpty(){
query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter)
if err != nil {
return nil, err
}
}
}else{
must := []interface{}{
timeQuery,
}
if len(query) > 0 {
if _, ok = query["match_all"]; !ok {
must = append(must, query)
}
}
query = util.MapStr{
"bool": util.MapStr{
"must": must,
},
}
}
//timeQuery, err := engine.generateTimeFilter(rule, filterParam)
//if err != nil {
// return nil, err
//}
//
//if boolQ, ok := query["bool"].(map[string]interface{}); ok {
// if mustQ, ok := boolQ["must"]; ok {
//
// if mustArr, ok := mustQ.([]interface{}); ok {
// boolQ["must"] = append(mustArr, timeQuery)
//
// }else{
// return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ)
// }
// }else{
// boolQ["must"] = []interface{}{
// timeQuery,
// }
// }
//}else{
// must := []interface{}{
// timeQuery,
// }
// if len(query) > 0 {
// if _, ok = query["match_all"]; !ok {
// must = append(must, query)
// }
// }
// query = util.MapStr{
// "bool": util.MapStr{
// "must": must,
// },
// }
//}
return query, nil
}
@ -373,7 +391,10 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi
queryResult.Query = string(queryDslBytes)
searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes)
if err != nil {
return nil, err
return queryResult, err
}
if searchRes.GetTotal() == 0 {
queryResult.Nodata = true
}
if searchRes.StatusCode != 200 {
return nil, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body))
@ -599,8 +620,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
var paramsCtx map[string]interface{}
if len(conditionResults) == 0 {
alertItem.Severity = "info"
alertItem.State = alerting.AlertStateOK
if alertMessage != nil && alertMessage.Status != alerting.MessageStateRecovered {
if checkResults.QueryResult.Nodata {
alertItem.State = alerting.AlertStateNodata
}
if alertMessage != nil && alertMessage.Status != alerting.MessageStateRecovered && !checkResults.QueryResult.Nodata {
alertMessage.Status = alerting.MessageStateRecovered
alertMessage.Updated = time.Now()
err = saveAlertMessage(alertMessage)
@ -929,6 +953,40 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti
if k == "key" || k == "key_as_string" || k== "doc_count"{
continue
}
//has filter
if k == "filter_agg" {
if filterM, ok := v.(map[string]interface{}); ok {
for fk, fv := range filterM {
if fk == "doc_count" {
continue
}
if vm, ok := fv.(map[string]interface{}); ok {
if metricVal, ok := vm["value"]; ok {
md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], metricVal})
}else{
//percentiles agg type
switch vm["values"].(type) {
case []interface{}:
for _, val := range vm["values"].([]interface{}) {
if valM, ok := val.(map[string]interface{}); ok {
md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], valM["value"]})
}
break
}
case map[string]interface{}:
for _, val := range vm["values"].(map[string]interface{}) {
md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], val})
break
}
}
}
}
}
}
continue
}
if vm, ok := v.(map[string]interface{}); ok {
if metricVal, ok := vm["value"]; ok {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal})