1464 lines
43 KiB
Go
1464 lines
43 KiB
Go
// Copyright (C) INFINI Labs & INFINI LIMITED.
|
|
//
|
|
// The INFINI Console is offered under the GNU Affero General Public License v3.0
|
|
// and as commercial software.
|
|
//
|
|
// For commercial licensing, contact us at:
|
|
// - Website: infinilabs.com
|
|
// - Email: hello@infini.ltd
|
|
//
|
|
// Open Source licensed under AGPL V3:
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
/* Copyright © INFINI Ltd. All rights reserved.
|
|
* web: https://infinilabs.com
|
|
* mail: hello#infini.ltd */
|
|
|
|
package elasticsearch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/Knetic/govaluate"
|
|
log "github.com/cihub/seelog"
|
|
"infini.sh/console/model"
|
|
"infini.sh/console/model/alerting"
|
|
"infini.sh/console/model/insight"
|
|
alerting2 "infini.sh/console/service/alerting"
|
|
"infini.sh/console/service/alerting/common"
|
|
"infini.sh/framework/core/elastic"
|
|
"infini.sh/framework/core/kv"
|
|
"infini.sh/framework/core/orm"
|
|
"infini.sh/framework/core/util"
|
|
"math"
|
|
"runtime/debug"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type Engine struct {
|
|
}
|
|
|
|
// GenerateQuery generate a final elasticsearch query dsl object
|
|
// when RawFilter of rule is not empty, priority use it, otherwise to covert from Filter of rule (todo)
|
|
// auto generate time filter query and then attach to final query
|
|
// auto generate elasticsearch aggregations by metrics of rule
|
|
// group of metric item converted to terms aggregation and TimeField of rule converted to date_histogram aggregation
|
|
// convert statistic of metric item to elasticsearch aggregation
|
|
func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.FilterParam) (interface{}, error) {
|
|
filter, err := engine.GenerateRawFilter(rule, filterParam)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
timeFilter, err := engine.generateTimeFilter(rule, filterParam)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(rule.Metrics.Items) == 0 {
|
|
return nil, fmt.Errorf("metric items should not be empty")
|
|
}
|
|
basicAggs := util.MapStr{}
|
|
//todo bucket sort (es 6.1) bucket script (es 2.0)
|
|
for _, metricItem := range rule.Metrics.Items {
|
|
metricAggs := engine.generateAgg(&metricItem)
|
|
if err = util.MergeFields(basicAggs, metricAggs, true); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
verInfo := elastic.GetClient(rule.Resource.ID).GetVersion()
|
|
var periodInterval = rule.Metrics.BucketSize
|
|
if filterParam != nil && filterParam.BucketSize != "" {
|
|
periodInterval = filterParam.BucketSize
|
|
}
|
|
|
|
if verInfo.Number == "" {
|
|
panic("invalid version")
|
|
}
|
|
|
|
intervalField, err := elastic.GetDateHistogramIntervalField(verInfo.Distribution, verInfo.Number, periodInterval)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get interval field error: %w", err)
|
|
}
|
|
timeAggs := util.MapStr{
|
|
"time_buckets": util.MapStr{
|
|
"date_histogram": util.MapStr{
|
|
"field": rule.Resource.TimeField,
|
|
intervalField: periodInterval,
|
|
},
|
|
"aggs": basicAggs,
|
|
},
|
|
}
|
|
|
|
var rootAggs util.MapStr
|
|
groups := rule.Metrics.Groups
|
|
if grpLength := len(groups); grpLength > 0 {
|
|
var lastGroupAgg util.MapStr
|
|
|
|
for i := grpLength - 1; i >= 0; i-- {
|
|
limit := groups[i].Limit
|
|
//top group 10
|
|
if limit <= 0 {
|
|
limit = 10
|
|
}
|
|
groupAgg := util.MapStr{
|
|
"terms": util.MapStr{
|
|
"field": groups[i].Field,
|
|
"size": limit,
|
|
},
|
|
}
|
|
groupID := util.GetUUID()
|
|
if lastGroupAgg != nil {
|
|
groupAgg["aggs"] = util.MapStr{
|
|
groupID: lastGroupAgg,
|
|
}
|
|
} else {
|
|
groupAgg["aggs"] = timeAggs
|
|
}
|
|
lastGroupAgg = groupAgg
|
|
}
|
|
rootAggs = util.MapStr{
|
|
util.GetUUID(): lastGroupAgg,
|
|
}
|
|
} else {
|
|
rootAggs = timeAggs
|
|
}
|
|
if len(filter) > 0 {
|
|
rootAggs = util.MapStr{
|
|
"filter_agg": util.MapStr{
|
|
"filter": filter,
|
|
"aggs": rootAggs,
|
|
},
|
|
}
|
|
}
|
|
|
|
return util.MapStr{
|
|
"size": 0,
|
|
"query": timeFilter,
|
|
"aggs": rootAggs,
|
|
}, nil
|
|
}
|
|
|
|
// generateAgg convert statistic of metric item to elasticsearch aggregation
|
|
func (engine *Engine) generateAgg(metricItem *insight.MetricItem) map[string]interface{} {
|
|
var (
|
|
aggType = "value_count"
|
|
field = metricItem.Field
|
|
)
|
|
if field == "" || field == "*" {
|
|
field = "_id"
|
|
}
|
|
var percent = 0.0
|
|
var isPipeline = false
|
|
switch metricItem.Statistic {
|
|
case "max", "min", "sum", "avg":
|
|
aggType = metricItem.Statistic
|
|
case "count", "value_count":
|
|
aggType = "value_count"
|
|
case "derivative":
|
|
aggType = "max"
|
|
isPipeline = true
|
|
case "medium": // from es version 6.6
|
|
aggType = "median_absolute_deviation"
|
|
case "p99", "p95", "p90", "p80", "p50":
|
|
aggType = "percentiles"
|
|
percentStr := strings.TrimPrefix(metricItem.Statistic, "p")
|
|
percent, _ = strconv.ParseFloat(percentStr, 32)
|
|
}
|
|
aggValue := util.MapStr{
|
|
"field": field,
|
|
}
|
|
if aggType == "percentiles" {
|
|
aggValue["percents"] = []interface{}{percent}
|
|
}
|
|
aggs := util.MapStr{
|
|
metricItem.Name: util.MapStr{
|
|
aggType: aggValue,
|
|
},
|
|
}
|
|
if !isPipeline {
|
|
return aggs
|
|
}
|
|
pipelineAggID := util.GetUUID()
|
|
aggs[pipelineAggID] = aggs[metricItem.Name]
|
|
aggs[metricItem.Name] = util.MapStr{
|
|
"derivative": util.MapStr{
|
|
"buckets_path": pipelineAggID,
|
|
},
|
|
}
|
|
return aggs
|
|
}
|
|
|
|
func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[string]interface{}, error) {
|
|
if !fq.IsComplex() {
|
|
q := map[string]interface{}{}
|
|
if len(fq.Values) == 0 {
|
|
return nil, fmt.Errorf("values should not be empty")
|
|
}
|
|
//equals/gte/gt/lt/lte/in/match/regexp/wildcard/range/prefix/suffix/contain/
|
|
switch fq.Operator {
|
|
case "equals":
|
|
q["term"] = util.MapStr{
|
|
fq.Field: util.MapStr{
|
|
"value": fq.Values[0],
|
|
},
|
|
}
|
|
case "in":
|
|
q["terms"] = util.MapStr{
|
|
fq.Field: fq.Values,
|
|
}
|
|
case "match":
|
|
q[fq.Operator] = util.MapStr{
|
|
fq.Field: fq.Values[0],
|
|
}
|
|
case "gte", "gt", "lt", "lte":
|
|
q["range"] = util.MapStr{
|
|
fq.Field: util.MapStr{
|
|
fq.Operator: fq.Values[0],
|
|
},
|
|
}
|
|
case "range":
|
|
if len(fq.Values) != 2 {
|
|
return nil, fmt.Errorf("values length of range query must be 2, but got %d", len(fq.Values))
|
|
}
|
|
q["range"] = util.MapStr{
|
|
fq.Field: util.MapStr{
|
|
"gte": fq.Values[0],
|
|
"lte": fq.Values[1],
|
|
},
|
|
}
|
|
case "prefix":
|
|
q["prefix"] = util.MapStr{
|
|
fq.Field: fq.Values[0],
|
|
}
|
|
case "regexp", "wildcard":
|
|
q[fq.Operator] = util.MapStr{
|
|
fq.Field: util.MapStr{
|
|
"value": fq.Values[0],
|
|
},
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unsupport query operator %s", fq.Operator)
|
|
}
|
|
return q, nil
|
|
}
|
|
if fq.Or != nil && fq.And != nil {
|
|
return nil, fmt.Errorf("filter format error: or, and bool operation in same level")
|
|
}
|
|
if fq.Or != nil && fq.Not != nil {
|
|
return nil, fmt.Errorf("filter format error: or, not bool operation in same level")
|
|
}
|
|
if fq.And != nil && fq.Not != nil {
|
|
return nil, fmt.Errorf("filter format error: and, not bool operation in same level")
|
|
}
|
|
var (
|
|
boolOperator string
|
|
filterQueries []alerting.FilterQuery
|
|
)
|
|
|
|
if len(fq.Not) > 0 {
|
|
boolOperator = "must_not"
|
|
filterQueries = fq.Not
|
|
|
|
} else if len(fq.Or) > 0 {
|
|
boolOperator = "should"
|
|
filterQueries = fq.Or
|
|
} else {
|
|
boolOperator = "must"
|
|
filterQueries = fq.And
|
|
}
|
|
var subQueries []interface{}
|
|
for _, subQ := range filterQueries {
|
|
subQuery, err := engine.ConvertFilterQueryToDsl(&subQ)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
subQueries = append(subQueries, subQuery)
|
|
}
|
|
boolQuery := util.MapStr{
|
|
boolOperator: subQueries,
|
|
}
|
|
if boolOperator == "should" {
|
|
boolQuery["minimum_should_match"] = 1
|
|
}
|
|
resultQuery := map[string]interface{}{
|
|
"bool": boolQuery,
|
|
}
|
|
|
|
return resultQuery, nil
|
|
}
|
|
|
|
func getQueryTimeRange(rule *alerting.Rule, filterParam *alerting.FilterParam) (start, end interface{}) {
|
|
var (
|
|
timeStart interface{}
|
|
timeEnd interface{}
|
|
)
|
|
if filterParam != nil {
|
|
timeStart = filterParam.Start
|
|
timeEnd = filterParam.End
|
|
} else {
|
|
var (
|
|
units string
|
|
value int
|
|
)
|
|
intervalDuration, err := time.ParseDuration(rule.Metrics.BucketSize)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse bucket size of rule [%s] error: %v", rule.Name, err)
|
|
}
|
|
if intervalDuration/time.Hour >= 1 {
|
|
units = "h"
|
|
value = int(intervalDuration / time.Hour)
|
|
} else if intervalDuration/time.Minute >= 1 {
|
|
units = "m"
|
|
value = int(intervalDuration / time.Minute)
|
|
} else if intervalDuration/time.Second >= 1 {
|
|
units = "s"
|
|
value = int(intervalDuration / time.Second)
|
|
} else {
|
|
return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.BucketSize)
|
|
}
|
|
var bucketCount int
|
|
if rule.BucketConditions != nil {
|
|
bucketCount = rule.BucketConditions.GetMaxBucketCount()
|
|
//for removing first and last time bucket
|
|
bucketCount += 2
|
|
} else {
|
|
bucketCount = rule.Conditions.GetMinimumPeriodMatch() + 1
|
|
}
|
|
if bucketCount <= 0 {
|
|
bucketCount = 2
|
|
}
|
|
duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value*bucketCount, units))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
timeStart = time.Now().Add(-duration).UnixMilli() //.Format(time.RFC3339Nano)
|
|
timeEnd = time.Now().UnixMilli()
|
|
}
|
|
return timeStart, timeEnd
|
|
}
|
|
|
|
func (engine *Engine) generateTimeFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) {
|
|
timeStart, timeEnd := getQueryTimeRange(rule, filterParam)
|
|
timeQuery := util.MapStr{
|
|
"range": util.MapStr{
|
|
rule.Resource.TimeField: util.MapStr{
|
|
"gte": timeStart,
|
|
"lte": timeEnd,
|
|
},
|
|
},
|
|
}
|
|
return timeQuery, nil
|
|
}
|
|
|
|
func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) {
|
|
query := map[string]interface{}{}
|
|
var err error
|
|
if rule.Resource.RawFilter != nil {
|
|
query = util.DeepCopy(rule.Resource.RawFilter).(map[string]interface{})
|
|
} else {
|
|
if !rule.Resource.Filter.IsEmpty() {
|
|
query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
//timeQuery, err := engine.generateTimeFilter(rule, filterParam)
|
|
//if err != nil {
|
|
// return nil, err
|
|
//}
|
|
//
|
|
//if boolQ, ok := query["bool"].(map[string]interface{}); ok {
|
|
// if mustQ, ok := boolQ["must"]; ok {
|
|
//
|
|
// if mustArr, ok := mustQ.([]interface{}); ok {
|
|
// boolQ["must"] = append(mustArr, timeQuery)
|
|
//
|
|
// }else{
|
|
// return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ)
|
|
// }
|
|
// }else{
|
|
// boolQ["must"] = []interface{}{
|
|
// timeQuery,
|
|
// }
|
|
// }
|
|
//}else{
|
|
// must := []interface{}{
|
|
// timeQuery,
|
|
// }
|
|
// if len(query) > 0 {
|
|
// if _, ok = query["match_all"]; !ok {
|
|
// must = append(must, query)
|
|
// }
|
|
// }
|
|
// query = util.MapStr{
|
|
// "bool": util.MapStr{
|
|
// "must": must,
|
|
// },
|
|
// }
|
|
//}
|
|
return query, nil
|
|
}
|
|
|
|
func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.FilterParam) (*alerting.QueryResult, error) {
|
|
esClient := elastic.GetClient(rule.Resource.ID)
|
|
queryResult := &alerting.QueryResult{}
|
|
indexName := strings.Join(rule.Resource.Objects, ",")
|
|
//todo cache queryDsl
|
|
queryDsl, err := engine.GenerateQuery(rule, filterParam)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if vm, ok := queryDsl.(util.MapStr); ok {
|
|
queryResult.Min, err = vm.GetValue(fmt.Sprintf("query.range.%s.gte", rule.Resource.TimeField))
|
|
queryResult.Max, _ = vm.GetValue(fmt.Sprintf("query.range.%s.lte", rule.Resource.TimeField))
|
|
}
|
|
queryDslBytes, err := util.ToJSONBytes(queryDsl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
queryResult.Query = string(queryDslBytes)
|
|
searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes)
|
|
if err != nil {
|
|
return queryResult, err
|
|
}
|
|
if searchRes.GetTotal() == 0 {
|
|
queryResult.Nodata = true
|
|
}
|
|
if searchRes.StatusCode != 200 {
|
|
return queryResult, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body))
|
|
}
|
|
queryResult.Raw = string(searchRes.RawResult.Body)
|
|
searchResult := map[string]interface{}{}
|
|
err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult)
|
|
if err != nil {
|
|
return queryResult, err
|
|
}
|
|
metricData := []alerting.MetricData{}
|
|
CollectMetricData(searchResult["aggregations"], "", &metricData)
|
|
//将 derivative 求导数据 除以 bucket size (单位 /s)
|
|
//statisticM := map[string] string{}
|
|
//for _, mi := range rule.Metrics.Items {
|
|
// statisticM[mi.Name] = mi.Statistic
|
|
//}
|
|
//var periodInterval = rule.Metrics.PeriodInterval
|
|
//if filterParam != nil && filterParam.BucketSize != "" {
|
|
// periodInterval = filterParam.BucketSize
|
|
//}
|
|
//interval, err := time.ParseDuration(periodInterval)
|
|
//if err != nil {
|
|
// log.Error(err)
|
|
//}
|
|
//for i, _ := range metricData {
|
|
// for k, d := range metricData[i].Data {
|
|
// if statisticM[k] == "derivative" {
|
|
// for _, td := range d {
|
|
// if len(td) > 1 {
|
|
// if v, ok := td[1].(float64); ok {
|
|
// td[1] = v / interval.Seconds()
|
|
// }
|
|
// }
|
|
// }
|
|
// }
|
|
// }
|
|
//}
|
|
queryResult.MetricData = metricData
|
|
return queryResult, nil
|
|
}
|
|
func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam) ([]alerting.MetricData, *alerting.QueryResult, error) {
|
|
queryResult, err := engine.ExecuteQuery(rule, filterParam)
|
|
if err != nil {
|
|
return nil, queryResult, err
|
|
}
|
|
var targetMetricData []alerting.MetricData
|
|
for _, md := range queryResult.MetricData {
|
|
var targetData alerting.MetricData
|
|
if len(rule.Metrics.Items) == 1 {
|
|
targetData = md
|
|
} else {
|
|
targetData = alerting.MetricData{
|
|
GroupValues: md.GroupValues,
|
|
Data: map[string][]alerting.MetricDataItem{},
|
|
}
|
|
expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula)
|
|
if err != nil {
|
|
return nil, queryResult, err
|
|
}
|
|
dataLength := 0
|
|
for _, v := range md.Data {
|
|
dataLength = len(v)
|
|
break
|
|
}
|
|
DataLoop:
|
|
for i := 0; i < dataLength; i++ {
|
|
parameters := map[string]interface{}{}
|
|
var timestamp interface{}
|
|
for k, v := range md.Data {
|
|
if len(k) == 20 {
|
|
continue
|
|
}
|
|
if len(v) < dataLength {
|
|
continue
|
|
}
|
|
|
|
//drop nil value bucket
|
|
if v == nil {
|
|
continue DataLoop
|
|
}
|
|
if _, ok := v[i].Value.(float64); !ok {
|
|
continue DataLoop
|
|
}
|
|
parameters[k] = v[i].Value
|
|
timestamp = v[i].Timestamp
|
|
}
|
|
if len(parameters) == 0 {
|
|
continue
|
|
}
|
|
|
|
result, err := expression.Evaluate(parameters)
|
|
if err != nil {
|
|
return nil, queryResult, err
|
|
}
|
|
if r, ok := result.(float64); ok {
|
|
if math.IsNaN(r) || math.IsInf(r, 0) {
|
|
if !isFilterNaN {
|
|
targetData.Data["result"] = append(targetData.Data["result"], alerting.MetricDataItem{Timestamp: timestamp, Value: math.NaN()})
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
|
|
targetData.Data["result"] = append(targetData.Data["result"], alerting.MetricDataItem{Timestamp: timestamp, Value: result})
|
|
}
|
|
}
|
|
targetMetricData = append(targetMetricData, targetData)
|
|
}
|
|
return targetMetricData, queryResult, nil
|
|
}
|
|
|
|
// CheckCondition check whether rule conditions triggered or not
|
|
// if triggered returns an ConditionResult
|
|
// sort conditions by priority desc before check , and then if condition is true, then continue check another group
|
|
func (engine *Engine) CheckCondition(rule *alerting.Rule) (*alerting.ConditionResult, error) {
|
|
var resultItems []alerting.ConditionResultItem
|
|
targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, true, nil)
|
|
conditionResult := &alerting.ConditionResult{
|
|
QueryResult: queryResult,
|
|
}
|
|
if err != nil {
|
|
return conditionResult, err
|
|
}
|
|
if rule.BucketConditions != nil {
|
|
return engine.CheckBucketCondition(rule, targetMetricData, queryResult)
|
|
}
|
|
for idx, targetData := range targetMetricData {
|
|
if idx == 0 {
|
|
sort.Slice(rule.Conditions.Items, func(i, j int) bool {
|
|
return alerting.PriorityWeights[rule.Conditions.Items[i].Priority] > alerting.PriorityWeights[rule.Conditions.Items[j].Priority]
|
|
})
|
|
}
|
|
LoopCondition:
|
|
for _, cond := range rule.Conditions.Items {
|
|
conditionExpression, err := cond.GenerateConditionExpression()
|
|
if err != nil {
|
|
return conditionResult, err
|
|
}
|
|
expression, err := govaluate.NewEvaluableExpression(conditionExpression)
|
|
if err != nil {
|
|
return conditionResult, err
|
|
}
|
|
dataLength := 0
|
|
dataKey := ""
|
|
for k, v := range targetData.Data {
|
|
dataLength = len(v)
|
|
dataKey = k
|
|
}
|
|
triggerCount := 0
|
|
for i := 0; i < dataLength; i++ {
|
|
//clear nil value
|
|
if targetData.Data[dataKey][i].Value == nil {
|
|
continue
|
|
}
|
|
if r, ok := targetData.Data[dataKey][i].Value.(float64); ok {
|
|
if math.IsNaN(r) {
|
|
continue
|
|
}
|
|
}
|
|
evaluateResult, err := expression.Evaluate(map[string]interface{}{
|
|
"result": targetData.Data[dataKey][i].Value,
|
|
})
|
|
if err != nil {
|
|
return conditionResult, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err)
|
|
}
|
|
if evaluateResult == true {
|
|
triggerCount += 1
|
|
} else {
|
|
triggerCount = 0
|
|
}
|
|
if triggerCount >= cond.MinimumPeriodMatch {
|
|
log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues)
|
|
resultItem := alerting.ConditionResultItem{
|
|
GroupValues: targetData.GroupValues,
|
|
ConditionItem: &cond,
|
|
ResultValue: targetData.Data[dataKey][i].Value,
|
|
IssueTimestamp: targetData.Data[dataKey][i].Timestamp,
|
|
RelationValues: map[string]interface{}{},
|
|
}
|
|
for _, metric := range rule.Metrics.Items {
|
|
resultItem.RelationValues[metric.Name] = queryResult.MetricData[idx].Data[metric.Name][i].Value
|
|
}
|
|
resultItems = append(resultItems, resultItem)
|
|
break LoopCondition
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
conditionResult.QueryResult.MetricData = targetMetricData
|
|
conditionResult.ResultItems = resultItems
|
|
return conditionResult, nil
|
|
}
|
|
|
|
type BucketDiffState struct {
|
|
ContentChangeState int
|
|
DocCount int
|
|
}
|
|
|
|
func (engine *Engine) CheckBucketCondition(rule *alerting.Rule, targetMetricData []alerting.MetricData, queryResult *alerting.QueryResult) (*alerting.ConditionResult, error) {
|
|
var resultItems []alerting.ConditionResultItem
|
|
conditionResult := &alerting.ConditionResult{
|
|
QueryResult: queryResult,
|
|
}
|
|
//transform targetMetricData
|
|
var (
|
|
times = map[int64]struct{}{}
|
|
buckets = map[string]map[int64]int{}
|
|
maxTime int64
|
|
minTime = time.Now().UnixMilli()
|
|
)
|
|
for _, targetData := range targetMetricData {
|
|
for _, v := range targetData.Data {
|
|
for _, item := range v {
|
|
if tv, ok := item.Timestamp.(float64); ok {
|
|
timestamp := int64(tv)
|
|
if timestamp < minTime {
|
|
minTime = timestamp
|
|
}
|
|
if timestamp > maxTime {
|
|
maxTime = timestamp
|
|
}
|
|
if _, ok = times[timestamp]; !ok {
|
|
times[timestamp] = struct{}{}
|
|
}
|
|
bucketKey := strings.Join(targetData.GroupValues, "*")
|
|
if _, ok = buckets[bucketKey]; !ok {
|
|
buckets[bucketKey] = map[int64]int{}
|
|
}
|
|
buckets[bucketKey][timestamp] = item.DocCount
|
|
} else {
|
|
log.Warnf("invalid timestamp type: %T", item.Timestamp)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
var timesArr []int64
|
|
for t := range times {
|
|
timesArr = append(timesArr, t)
|
|
}
|
|
sort.Slice(timesArr, func(i, j int) bool {
|
|
return timesArr[i] < timesArr[j] // Ascending order
|
|
})
|
|
|
|
// Remove the first bucket if its timestamp equals minTime, and
|
|
// the last bucket if its timestamp equals maxTime
|
|
if len(timesArr) > 0 && timesArr[0] == minTime {
|
|
// Remove first bucket if timestamp matches minTime
|
|
timesArr = timesArr[1:]
|
|
}
|
|
if len(timesArr) > 0 && timesArr[len(timesArr)-1] == maxTime {
|
|
// Remove last bucket if timestamp matches maxTime
|
|
timesArr = timesArr[:len(timesArr)-1]
|
|
}
|
|
|
|
//check bucket diff
|
|
diffResult := map[string]map[int64]BucketDiffState{}
|
|
for grps, bk := range buckets {
|
|
hasPre := false
|
|
if _, ok := diffResult[grps]; !ok {
|
|
diffResult[grps] = map[int64]BucketDiffState{}
|
|
}
|
|
for i, t := range timesArr {
|
|
if v, ok := bk[t]; !ok {
|
|
if hasPre {
|
|
diffResult[grps][t] = BucketDiffState{
|
|
ContentChangeState: -1,
|
|
}
|
|
}
|
|
// reset hasPre to false
|
|
hasPre = false
|
|
} else {
|
|
if !hasPre {
|
|
if i > 0 {
|
|
diffResult[grps][t] = BucketDiffState{
|
|
ContentChangeState: 1,
|
|
}
|
|
}
|
|
} else {
|
|
diffResult[grps][t] = BucketDiffState{
|
|
ContentChangeState: 0,
|
|
DocCount: v - bk[timesArr[i-1]],
|
|
}
|
|
}
|
|
hasPre = true
|
|
}
|
|
}
|
|
}
|
|
|
|
sort.Slice(rule.BucketConditions.Items, func(i, j int) bool {
|
|
return alerting.PriorityWeights[rule.BucketConditions.Items[i].Priority] > alerting.PriorityWeights[rule.BucketConditions.Items[j].Priority]
|
|
})
|
|
|
|
for grps, states := range diffResult {
|
|
LoopCondition:
|
|
for _, cond := range rule.BucketConditions.Items {
|
|
conditionExpression, err := cond.GenerateConditionExpression()
|
|
if err != nil {
|
|
return conditionResult, err
|
|
}
|
|
expression, err := govaluate.NewEvaluableExpression(conditionExpression)
|
|
if err != nil {
|
|
return conditionResult, err
|
|
}
|
|
triggerCount := 0
|
|
for t, state := range states {
|
|
resultValue := state.DocCount
|
|
if cond.Type == alerting.BucketDiffTypeContent {
|
|
resultValue = state.ContentChangeState
|
|
}
|
|
evaluateResult, err := expression.Evaluate(map[string]interface{}{
|
|
"result": resultValue,
|
|
})
|
|
if err != nil {
|
|
return conditionResult, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err)
|
|
}
|
|
if evaluateResult == true {
|
|
triggerCount += 1
|
|
} else {
|
|
triggerCount = 0
|
|
}
|
|
if triggerCount >= cond.MinimumPeriodMatch {
|
|
groupValues := strings.Split(grps, "*")
|
|
log.Debugf("triggered condition %v, groups: %v\n", cond, groupValues)
|
|
resultItem := alerting.ConditionResultItem{
|
|
GroupValues: groupValues,
|
|
ConditionItem: &cond,
|
|
ResultValue: resultValue,
|
|
IssueTimestamp: t,
|
|
RelationValues: map[string]interface{}{},
|
|
}
|
|
resultItems = append(resultItems, resultItem)
|
|
break LoopCondition
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
conditionResult.QueryResult.MetricData = targetMetricData
|
|
conditionResult.ResultItems = resultItems
|
|
return conditionResult, nil
|
|
}
|
|
func (engine *Engine) Do(rule *alerting.Rule) error {
|
|
|
|
var (
|
|
alertItem *alerting.Alert
|
|
err error
|
|
)
|
|
defer func() {
|
|
if err != nil && alertItem == nil {
|
|
alertItem = &alerting.Alert{
|
|
ID: util.GetUUID(),
|
|
Created: time.Now(),
|
|
Updated: time.Now(),
|
|
RuleID: rule.ID,
|
|
RuleName: rule.Name,
|
|
ResourceID: rule.Resource.ID,
|
|
ResourceName: rule.Resource.Name,
|
|
Expression: rule.Metrics.Expression,
|
|
Objects: rule.Resource.Objects,
|
|
State: alerting.AlertStateError,
|
|
//Priority: "undefine",
|
|
Error: err.Error(),
|
|
}
|
|
}
|
|
if alertItem != nil {
|
|
if err != nil {
|
|
alertItem.State = alerting.AlertStateError
|
|
alertItem.Error = err.Error()
|
|
} else {
|
|
for _, actionResult := range alertItem.ActionExecutionResults {
|
|
if actionResult.Error != "" {
|
|
alertItem.Error = actionResult.Error
|
|
alertItem.State = alerting.AlertStateError
|
|
}
|
|
}
|
|
}
|
|
|
|
err = orm.Save(nil, alertItem)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
}()
|
|
log.Tracef("start check condition of rule %s", rule.ID)
|
|
|
|
//todo do only once when rule not change
|
|
metricExpression, _ := rule.Metrics.GenerateExpression()
|
|
for i, cond := range rule.Conditions.Items {
|
|
expression, _ := cond.GenerateConditionExpression()
|
|
rule.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression)
|
|
}
|
|
alertItem = &alerting.Alert{
|
|
ID: util.GetUUID(),
|
|
Created: time.Now(),
|
|
Updated: time.Now(),
|
|
RuleID: rule.ID,
|
|
RuleName: rule.Name,
|
|
ResourceID: rule.Resource.ID,
|
|
ResourceName: rule.Resource.Name,
|
|
Expression: rule.Metrics.Expression,
|
|
Objects: rule.Resource.Objects,
|
|
Conditions: rule.Conditions,
|
|
State: alerting.AlertStateOK,
|
|
}
|
|
checkResults, err := engine.CheckCondition(rule)
|
|
alertItem.ConditionResult = checkResults
|
|
if err != nil {
|
|
return err
|
|
}
|
|
alertMessage, err := getLastAlertMessage(rule.ID, 2*time.Minute)
|
|
if err != nil {
|
|
return fmt.Errorf("get alert message error: %w", err)
|
|
}
|
|
conditionResults := checkResults.ResultItems
|
|
var paramsCtx map[string]interface{}
|
|
if len(conditionResults) == 0 {
|
|
alertItem.Priority = ""
|
|
if checkResults.QueryResult.Nodata {
|
|
alertItem.State = alerting.AlertStateNodata
|
|
}
|
|
|
|
if alertMessage != nil && alertMessage.Status != alerting.MessageStateRecovered && !checkResults.QueryResult.Nodata {
|
|
alertMessage.Status = alerting.MessageStateRecovered
|
|
alertMessage.ResourceID = rule.Resource.ID
|
|
alertMessage.ResourceName = rule.Resource.Name
|
|
err = saveAlertMessage(alertMessage)
|
|
if err != nil {
|
|
return fmt.Errorf("save alert message error: %w", err)
|
|
}
|
|
// todo add recover notification to inner system message
|
|
// send recover message to channel
|
|
recoverCfg := rule.RecoveryNotificationConfig
|
|
if recoverCfg != nil && recoverCfg.EventEnabled && recoverCfg.Enabled {
|
|
paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
|
|
alerting2.ParamEventID: alertMessage.ID,
|
|
alerting2.ParamTimestamp: alertItem.Created.Unix(),
|
|
"duration": alertItem.Created.Sub(alertMessage.Created).String(),
|
|
"trigger_at": alertMessage.Created.Unix(),
|
|
})
|
|
err = attachTitleMessageToCtx(recoverCfg.Title, recoverCfg.Message, paramsCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx, false)
|
|
alertItem.RecoverActionResults = actionResults
|
|
//clear history notification time
|
|
rule.LastNotificationTime = time.Time{}
|
|
rule.LastEscalationTime = time.Time{}
|
|
_ = kv.DeleteKey(alerting2.KVLastNotificationTime, []byte(rule.ID))
|
|
_ = kv.DeleteKey(alerting2.KVLastEscalationTime, []byte(rule.ID))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
alertItem.State = alerting.AlertStateAlerting
|
|
|
|
var (
|
|
priority = conditionResults[0].ConditionItem.Priority
|
|
)
|
|
for _, conditionResult := range conditionResults {
|
|
if alerting.PriorityWeights[priority] < alerting.PriorityWeights[conditionResult.ConditionItem.Priority] {
|
|
priority = conditionResult.ConditionItem.Priority
|
|
}
|
|
}
|
|
triggerAt := alertItem.Created
|
|
if alertMessage != nil {
|
|
triggerAt = alertMessage.Created
|
|
}
|
|
paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
|
|
alerting2.ParamTimestamp: alertItem.Created.Unix(),
|
|
"duration": alertItem.Created.Sub(triggerAt).String(),
|
|
"trigger_at": triggerAt.Unix(),
|
|
})
|
|
|
|
alertItem.Priority = priority
|
|
var newAlertMessage *alerting.AlertMessage
|
|
if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered {
|
|
newAlertMessage = &alerting.AlertMessage{
|
|
RuleID: rule.ID,
|
|
Created: alertItem.Created,
|
|
Updated: time.Now(),
|
|
ID: util.GetUUID(),
|
|
ResourceID: rule.Resource.ID,
|
|
ResourceName: rule.Resource.Name,
|
|
Status: alerting.MessageStateAlerting,
|
|
Priority: priority,
|
|
Tags: rule.Tags,
|
|
Category: rule.Category,
|
|
}
|
|
paramsCtx[alerting2.ParamEventID] = newAlertMessage.ID
|
|
} else {
|
|
paramsCtx[alerting2.ParamEventID] = alertMessage.ID
|
|
}
|
|
title, message := rule.GetNotificationTitleAndMessage()
|
|
err = attachTitleMessageToCtx(title, message, paramsCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
alertItem.Message = paramsCtx[alerting2.ParamMessage].(string)
|
|
alertItem.Title = paramsCtx[alerting2.ParamTitle].(string)
|
|
if newAlertMessage != nil {
|
|
alertMessage = newAlertMessage
|
|
alertMessage.Title = alertItem.Title
|
|
alertMessage.Message = alertItem.Message
|
|
err = saveAlertMessage(newAlertMessage)
|
|
if err != nil {
|
|
return fmt.Errorf("save alert message error: %w", err)
|
|
}
|
|
userID := rule.Creator.Id
|
|
if userID == "" {
|
|
userID = "*"
|
|
}
|
|
notification := &model.Notification{
|
|
UserId: util.ToString(userID),
|
|
Type: model.NotificationTypeNotification,
|
|
MessageType: model.MessageTypeAlerting,
|
|
Status: model.NotificationStatusNew,
|
|
Title: alertItem.Title,
|
|
Body: alertItem.Message,
|
|
Link: "/alerting/message",
|
|
}
|
|
err = orm.Create(nil, notification)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create notification, err: %w", err)
|
|
}
|
|
} else {
|
|
alertMessage.Title = alertItem.Title
|
|
alertMessage.Message = alertItem.Message
|
|
alertMessage.ResourceID = rule.Resource.ID
|
|
alertMessage.ResourceName = rule.Resource.Name
|
|
alertMessage.Priority = priority
|
|
err = saveAlertMessage(alertMessage)
|
|
if err != nil {
|
|
return fmt.Errorf("save alert message error: %w", err)
|
|
}
|
|
}
|
|
log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID)
|
|
|
|
// if alert message status equals ignored , then skip sending message to channel
|
|
if alertMessage.Status == alerting.MessageStateIgnored {
|
|
return nil
|
|
}
|
|
if paramsCtx != nil {
|
|
paramsCtx[alerting2.ParamEventID] = alertMessage.ID
|
|
}
|
|
// if channel is not enabled return
|
|
notifyCfg := rule.GetNotificationConfig()
|
|
if notifyCfg == nil || !notifyCfg.Enabled {
|
|
return nil
|
|
}
|
|
|
|
if notifyCfg.AcceptTimeRange.Include(time.Now()) {
|
|
periodDuration, err := time.ParseDuration(notifyCfg.ThrottlePeriod)
|
|
if err != nil {
|
|
alertItem.Error = err.Error()
|
|
return err
|
|
}
|
|
if rule.LastNotificationTime.IsZero() {
|
|
tm, err := readTimeFromKV(alerting2.KVLastNotificationTime, []byte(rule.ID))
|
|
if err != nil {
|
|
return fmt.Errorf("get last notification time from kv error: %w", err)
|
|
}
|
|
if !tm.IsZero() {
|
|
rule.LastNotificationTime = tm
|
|
}
|
|
}
|
|
period := time.Now().Sub(rule.LastNotificationTime.Local())
|
|
|
|
//log.Error(lastAlertItem.ID, period, periodDuration)
|
|
if paramsCtx == nil {
|
|
paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
|
|
alerting2.ParamTimestamp: alertItem.Created.Unix(),
|
|
"priority": priority,
|
|
"duration": alertItem.Created.Sub(alertMessage.Created).String(),
|
|
"trigger_at": alertMessage.Created.Unix(),
|
|
})
|
|
if alertMessage != nil {
|
|
paramsCtx[alerting2.ParamEventID] = alertMessage.ID
|
|
}
|
|
}
|
|
|
|
if alertMessage == nil || period > periodDuration {
|
|
actionResults, _ := performChannels(notifyCfg.Normal, paramsCtx, false)
|
|
alertItem.ActionExecutionResults = actionResults
|
|
//change and save last notification time in local kv store when action error count equals zero
|
|
rule.LastNotificationTime = time.Now()
|
|
strTime := time.Now().UTC().Format(time.RFC3339)
|
|
kv.AddValue(alerting2.KVLastNotificationTime, []byte(rule.ID), []byte(strTime))
|
|
alertItem.IsNotified = true
|
|
}
|
|
|
|
if notifyCfg.EscalationEnabled {
|
|
throttlePeriod, err := time.ParseDuration(notifyCfg.EscalationThrottlePeriod)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rule.LastTermStartTime = time.Now()
|
|
if alertMessage != nil {
|
|
rule.LastTermStartTime = alertMessage.Created
|
|
}
|
|
if time.Now().Sub(rule.LastTermStartTime.Local()) > throttlePeriod {
|
|
if rule.LastEscalationTime.IsZero() {
|
|
tm, err := readTimeFromKV(alerting2.KVLastEscalationTime, []byte(rule.ID))
|
|
if err != nil {
|
|
return fmt.Errorf("get last escalation time from kv error: %w", err)
|
|
}
|
|
if !tm.IsZero() {
|
|
rule.LastEscalationTime = tm
|
|
}
|
|
}
|
|
if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration {
|
|
actionResults, _ := performChannels(notifyCfg.Escalation, paramsCtx, false)
|
|
alertItem.EscalationActionResults = actionResults
|
|
//todo init last escalation time when create task (by last alert item is escalated)
|
|
rule.LastEscalationTime = time.Now()
|
|
alertItem.IsEscalated = true
|
|
strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339)
|
|
kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime))
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func attachTitleMessageToCtx(title, message string, paramsCtx map[string]interface{}) error {
|
|
var (
|
|
tplBytes []byte
|
|
err error
|
|
)
|
|
tplBytes, err = common.ResolveMessage(message, paramsCtx)
|
|
if err != nil {
|
|
return fmt.Errorf("resolve message template error: %w", err)
|
|
}
|
|
paramsCtx[alerting2.ParamMessage] = string(tplBytes)
|
|
tplBytes, err = common.ResolveMessage(title, paramsCtx)
|
|
if err != nil {
|
|
return fmt.Errorf("resolve title template error: %w", err)
|
|
}
|
|
paramsCtx[alerting2.ParamTitle] = string(tplBytes)
|
|
return nil
|
|
}
|
|
|
|
func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, extraParams map[string]interface{}) map[string]interface{} {
|
|
var (
|
|
conditionParams []util.MapStr
|
|
firstGroupValue string
|
|
firstThreshold string
|
|
priority string
|
|
)
|
|
if len(checkResults.ResultItems) > 0 {
|
|
priority = checkResults.ResultItems[0].ConditionItem.Priority
|
|
sort.Slice(checkResults.ResultItems, func(i, j int) bool {
|
|
if alerting.PriorityWeights[checkResults.ResultItems[i].ConditionItem.Priority] > alerting.PriorityWeights[checkResults.ResultItems[j].ConditionItem.Priority] {
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
sort.Slice(checkResults.ResultItems, func(i, j int) bool {
|
|
if vi, ok := checkResults.ResultItems[i].ResultValue.(float64); ok {
|
|
if vj, ok := checkResults.ResultItems[j].ResultValue.(float64); ok {
|
|
return vi > vj
|
|
}
|
|
}
|
|
return false
|
|
})
|
|
}
|
|
|
|
for i, resultItem := range checkResults.ResultItems {
|
|
if i >= 10 {
|
|
break
|
|
}
|
|
if i == 0 {
|
|
firstGroupValue = strings.Join(resultItem.GroupValues, ",")
|
|
firstThreshold = strings.Join(resultItem.ConditionItem.Values, ",")
|
|
}
|
|
conditionParams = append(conditionParams, util.MapStr{
|
|
alerting2.ParamThreshold: resultItem.ConditionItem.Values,
|
|
alerting2.Priority: resultItem.ConditionItem.Priority,
|
|
alerting2.ParamGroupValues: resultItem.GroupValues,
|
|
alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp,
|
|
alerting2.ParamResultValue: resultItem.ResultValue,
|
|
alerting2.ParamRelationValues: resultItem.RelationValues,
|
|
})
|
|
}
|
|
envVariables, err := alerting2.GetEnvVariables()
|
|
if err != nil {
|
|
log.Errorf("get env variables error: %v", err)
|
|
}
|
|
var (
|
|
min interface{}
|
|
max interface{}
|
|
)
|
|
if checkResults.QueryResult != nil {
|
|
min = checkResults.QueryResult.Min
|
|
max = checkResults.QueryResult.Max
|
|
if v, ok := min.(int64); ok {
|
|
//expand 60s
|
|
min = time.UnixMilli(v).Add(-time.Second * 60).UTC().Format("2006-01-02T15:04:05.999Z")
|
|
}
|
|
if v, ok := max.(int64); ok {
|
|
max = time.UnixMilli(v).Add(time.Second * 60).UTC().Format("2006-01-02T15:04:05.999Z")
|
|
}
|
|
}
|
|
paramsCtx := util.MapStr{
|
|
alerting2.ParamRuleID: rule.ID,
|
|
alerting2.ParamResourceID: rule.Resource.ID,
|
|
alerting2.ParamResourceName: rule.Resource.Name,
|
|
alerting2.ParamResults: conditionParams,
|
|
"objects": rule.Resource.Objects,
|
|
"first_group_value": firstGroupValue,
|
|
"first_threshold": firstThreshold,
|
|
"rule_name": rule.Name,
|
|
"priority": priority,
|
|
"min": min,
|
|
"max": max,
|
|
"env": envVariables,
|
|
}
|
|
err = util.MergeFields(paramsCtx, extraParams, true)
|
|
if err != nil {
|
|
log.Errorf("merge template params error: %v", err)
|
|
}
|
|
return paramsCtx
|
|
}
|
|
|
|
func (engine *Engine) Test(rule *alerting.Rule, msgType string) ([]alerting.ActionExecutionResult, error) {
|
|
checkResults, err := engine.CheckCondition(rule)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("check condition error:%w", err)
|
|
}
|
|
alertMessage, err := getLastAlertMessage(rule.ID, 2*time.Minute)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get alert message error: %w", err)
|
|
}
|
|
var actionResults []alerting.ActionExecutionResult
|
|
|
|
now := time.Now()
|
|
triggerAt := now
|
|
if alertMessage != nil {
|
|
triggerAt = alertMessage.Created
|
|
}
|
|
paramsCtx := newParameterCtx(rule, checkResults, util.MapStr{
|
|
alerting2.ParamEventID: util.GetUUID(),
|
|
alerting2.ParamTimestamp: now.Unix(),
|
|
"duration": now.Sub(triggerAt).String(),
|
|
"trigger_at": triggerAt.Unix(),
|
|
})
|
|
if msgType == "escalation" || msgType == "notification" {
|
|
title, message := rule.GetNotificationTitleAndMessage()
|
|
err = attachTitleMessageToCtx(title, message, paramsCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else if msgType == "recover_notification" {
|
|
if rule.RecoveryNotificationConfig == nil {
|
|
return nil, fmt.Errorf("recovery notification must not be empty")
|
|
}
|
|
err = attachTitleMessageToCtx(rule.RecoveryNotificationConfig.Title, rule.RecoveryNotificationConfig.Message, paramsCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
return nil, fmt.Errorf("unkonwn parameter msg type")
|
|
}
|
|
|
|
var channels []alerting.Channel
|
|
switch msgType {
|
|
case "escalation":
|
|
notifyCfg := rule.GetNotificationConfig()
|
|
if notifyCfg == nil {
|
|
return nil, nil
|
|
}
|
|
channels = notifyCfg.Escalation
|
|
case "recover_notification":
|
|
if rule.RecoveryNotificationConfig != nil {
|
|
channels = rule.RecoveryNotificationConfig.Normal
|
|
}
|
|
case "notification":
|
|
notifyCfg := rule.GetNotificationConfig()
|
|
if notifyCfg == nil {
|
|
return nil, nil
|
|
}
|
|
channels = notifyCfg.Normal
|
|
}
|
|
if len(channels) > 0 {
|
|
actionResults, _ = performChannels(channels, paramsCtx, true)
|
|
} else {
|
|
return nil, fmt.Errorf("no useable channel")
|
|
}
|
|
return actionResults, nil
|
|
}
|
|
|
|
func performChannels(channels []alerting.Channel, ctx map[string]interface{}, raiseChannelEnabledErr bool) ([]alerting.ActionExecutionResult, int) {
|
|
var errCount int
|
|
var actionResults []alerting.ActionExecutionResult
|
|
for _, channel := range channels {
|
|
var (
|
|
errStr string
|
|
resBytes []byte
|
|
messageBytes []byte
|
|
)
|
|
_, err := common.RetrieveChannel(&channel, raiseChannelEnabledErr)
|
|
if err != nil {
|
|
log.Error(err)
|
|
errCount++
|
|
errStr = err.Error()
|
|
} else {
|
|
if !channel.Enabled {
|
|
continue
|
|
}
|
|
resBytes, err, messageBytes = common.PerformChannel(&channel, ctx)
|
|
if err != nil {
|
|
errCount++
|
|
errStr = err.Error()
|
|
}
|
|
}
|
|
actionResults = append(actionResults, alerting.ActionExecutionResult{
|
|
Result: string(resBytes),
|
|
Error: errStr,
|
|
Message: string(messageBytes),
|
|
ExecutionTime: int(time.Now().UnixNano() / 1e6),
|
|
ChannelType: channel.SubType,
|
|
ChannelName: channel.Name,
|
|
ChannelID: channel.ID,
|
|
})
|
|
}
|
|
return actionResults, errCount
|
|
}
|
|
|
|
func (engine *Engine) GenerateTask(rule alerting.Rule) func(ctx context.Context) {
|
|
return func(ctx context.Context) {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Error(err)
|
|
debug.PrintStack()
|
|
}
|
|
}()
|
|
err := engine.Do(&rule)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func CollectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData) {
|
|
if aggM, ok := agg.(map[string]interface{}); ok {
|
|
if targetAgg, ok := aggM["filter_agg"]; ok {
|
|
collectMetricData(targetAgg, groupValues, metricData)
|
|
} else {
|
|
collectMetricData(aggM, groupValues, metricData)
|
|
}
|
|
}
|
|
}
|
|
|
|
func collectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData) {
|
|
if aggM, ok := agg.(map[string]interface{}); ok {
|
|
if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok {
|
|
if bks, ok := timeBks["buckets"].([]interface{}); ok {
|
|
md := alerting.MetricData{
|
|
Data: map[string][]alerting.MetricDataItem{},
|
|
GroupValues: strings.Split(groupValues, "*"),
|
|
}
|
|
for _, bk := range bks {
|
|
if bkM, ok := bk.(map[string]interface{}); ok {
|
|
|
|
var docCount int
|
|
if v, ok := bkM["doc_count"]; ok {
|
|
docCount = int(v.(float64))
|
|
}
|
|
for k, v := range bkM {
|
|
if k == "key" || k == "key_as_string" || k == "doc_count" {
|
|
continue
|
|
}
|
|
if len(k) > 5 { //just store a,b,c
|
|
continue
|
|
}
|
|
if vm, ok := v.(map[string]interface{}); ok {
|
|
if metricVal, ok := vm["value"]; ok {
|
|
md.Data[k] = append(md.Data[k], alerting.MetricDataItem{Timestamp: bkM["key"], Value: metricVal, DocCount: docCount})
|
|
} else {
|
|
//percentiles agg type
|
|
switch vm["values"].(type) {
|
|
case []interface{}:
|
|
for _, val := range vm["values"].([]interface{}) {
|
|
if valM, ok := val.(map[string]interface{}); ok {
|
|
md.Data[k] = append(md.Data[k], alerting.MetricDataItem{Timestamp: bkM["key"], Value: valM["value"], DocCount: docCount})
|
|
}
|
|
break
|
|
}
|
|
case map[string]interface{}:
|
|
for _, val := range vm["values"].(map[string]interface{}) {
|
|
md.Data[k] = append(md.Data[k], alerting.MetricDataItem{Timestamp: bkM["key"], Value: val, DocCount: docCount})
|
|
break
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
}
|
|
*metricData = append(*metricData, md)
|
|
}
|
|
|
|
} else {
|
|
for k, v := range aggM {
|
|
if k == "key" || k == "doc_count" {
|
|
continue
|
|
}
|
|
if vm, ok := v.(map[string]interface{}); ok {
|
|
if bks, ok := vm["buckets"].([]interface{}); ok {
|
|
for _, bk := range bks {
|
|
if bkVal, ok := bk.(map[string]interface{}); ok {
|
|
currentGroup := bkVal["key"].(string)
|
|
newGroupValues := currentGroup
|
|
if groupValues != "" {
|
|
newGroupValues = fmt.Sprintf("%s*%s", groupValues, currentGroup)
|
|
}
|
|
|
|
collectMetricData(bk, newGroupValues, metricData)
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func getLastAlertMessageFromES(ruleID string) (*alerting.AlertMessage, error) {
|
|
queryDsl := util.MapStr{
|
|
"size": 1,
|
|
"sort": []util.MapStr{
|
|
{
|
|
"created": util.MapStr{
|
|
"order": "desc",
|
|
},
|
|
},
|
|
},
|
|
"query": util.MapStr{
|
|
"term": util.MapStr{
|
|
"rule_id": util.MapStr{
|
|
"value": ruleID,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
q := orm.Query{
|
|
RawQuery: util.MustToJSONBytes(queryDsl),
|
|
}
|
|
err, searchResult := orm.Search(alerting.AlertMessage{}, &q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(searchResult.Result) == 0 {
|
|
return nil, nil
|
|
}
|
|
messageBytes := util.MustToJSONBytes(searchResult.Result[0])
|
|
message := &alerting.AlertMessage{}
|
|
err = util.FromJSONBytes(messageBytes, message)
|
|
return message, err
|
|
}
|
|
|
|
func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.AlertMessage, error) {
|
|
messageBytes, err := kv.GetValue(alerting2.KVLastMessageState, []byte(ruleID))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
message := &alerting.AlertMessage{}
|
|
if messageBytes != nil {
|
|
|
|
err = util.FromJSONBytes(messageBytes, message)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if time.Now().Sub(message.Updated) <= duration {
|
|
return message, nil
|
|
}
|
|
}
|
|
message, err = getLastAlertMessageFromES(ruleID)
|
|
return message, err
|
|
}
|
|
|
|
func saveAlertMessageToES(message *alerting.AlertMessage) error {
|
|
message.Updated = time.Now()
|
|
return orm.Save(nil, message)
|
|
}
|
|
|
|
func saveAlertMessage(message *alerting.AlertMessage) error {
|
|
//todo diff message if not change , then skip save to es ?
|
|
err := saveAlertMessageToES(message)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
messageBytes, err := util.ToJSONBytes(message)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = kv.AddValue(alerting2.KVLastMessageState, []byte(message.RuleID), messageBytes)
|
|
return err
|
|
}
|
|
|
|
func readTimeFromKV(bucketKey string, key []byte) (time.Time, error) {
|
|
timeBytes, err := kv.GetValue(bucketKey, key)
|
|
zeroTime := time.Time{}
|
|
if err != nil {
|
|
return zeroTime, err
|
|
}
|
|
timeStr := string(timeBytes)
|
|
if timeStr != "" {
|
|
return time.ParseInLocation(time.RFC3339, string(timeBytes), time.UTC)
|
|
}
|
|
return zeroTime, nil
|
|
}
|