console/service/alerting/elasticsearch/engine.go

1262 lines
36 KiB
Go

/* Copyright © INFINI Ltd. All rights reserved.
* web: https://infinilabs.com
* mail: hello#infini.ltd */
package elasticsearch
import (
"context"
"fmt"
"github.com/Knetic/govaluate"
log "github.com/cihub/seelog"
"infini.sh/console/model"
"infini.sh/console/model/alerting"
alerting2 "infini.sh/console/service/alerting"
"infini.sh/console/service/alerting/common"
"infini.sh/framework/core/elastic"
"infini.sh/console/model/insight"
"infini.sh/framework/core/kv"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"math"
"runtime/debug"
"sort"
"strconv"
"strings"
"time"
)
type Engine struct {
}
//GenerateQuery generate a final elasticsearch query dsl object
//when RawFilter of rule is not empty, priority use it, otherwise to covert from Filter of rule (todo)
//auto generate time filter query and then attach to final query
//auto generate elasticsearch aggregations by metrics of rule
//group of metric item converted to terms aggregation and TimeField of rule converted to date_histogram aggregation
//convert statistic of metric item to elasticsearch aggregation
func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.FilterParam) (interface{}, error) {
filter, err := engine.GenerateRawFilter(rule, filterParam)
if err != nil {
return nil, err
}
timeFilter, err := engine.generateTimeFilter(rule, filterParam)
if err != nil {
return nil, err
}
if len(rule.Metrics.Items) == 0 {
return nil, fmt.Errorf("metric items should not be empty")
}
basicAggs := util.MapStr{}
//todo bucket sort (es 6.1) bucket script (es 2.0)
for _, metricItem := range rule.Metrics.Items {
metricAggs := engine.generateAgg(&metricItem)
if err = util.MergeFields(basicAggs, metricAggs, true); err != nil {
return nil, err
}
}
verInfo := elastic.GetClient(rule.Resource.ID).GetVersion()
var periodInterval = rule.Metrics.BucketSize
if filterParam != nil && filterParam.BucketSize != "" {
periodInterval = filterParam.BucketSize
}
if verInfo.Number==""{
panic("invalid version")
}
intervalField, err := elastic.GetDateHistogramIntervalField(verInfo.Distribution,verInfo.Number, periodInterval )
if err != nil {
return nil, fmt.Errorf("get interval field error: %w", err)
}
timeAggs := util.MapStr{
"time_buckets": util.MapStr{
"date_histogram": util.MapStr{
"field": rule.Resource.TimeField,
intervalField: periodInterval,
},
"aggs": basicAggs,
},
}
var rootAggs util.MapStr
groups := rule.Metrics.Groups
if grpLength := len(groups); grpLength > 0 {
var lastGroupAgg util.MapStr
for i := grpLength-1; i>=0; i-- {
limit := groups[i].Limit
//top group 10
if limit <= 0 {
limit = 10
}
groupAgg := util.MapStr{
"terms": util.MapStr{
"field": groups[i].Field,
"size": limit,
},
}
groupID := util.GetUUID()
if lastGroupAgg != nil {
groupAgg["aggs"] = util.MapStr{
groupID: lastGroupAgg,
}
}else{
groupAgg["aggs"] = timeAggs
}
lastGroupAgg = groupAgg
}
rootAggs = util.MapStr{
util.GetUUID(): lastGroupAgg,
}
}else{
rootAggs = timeAggs
}
if len(filter) > 0 {
rootAggs = util.MapStr{
"filter_agg": util.MapStr{
"filter": filter,
"aggs": rootAggs,
},
}
}
return util.MapStr{
"size": 0,
"query": timeFilter,
"aggs": rootAggs,
}, nil
}
//generateAgg convert statistic of metric item to elasticsearch aggregation
func (engine *Engine) generateAgg(metricItem *insight.MetricItem) map[string]interface{}{
var (
aggType = "value_count"
field = metricItem.Field
)
if field == "" || field == "*" {
field = "_id"
}
var percent = 0.0
var isPipeline = false
switch metricItem.Statistic {
case "max", "min", "sum", "avg":
aggType = metricItem.Statistic
case "count", "value_count":
aggType = "value_count"
case "derivative":
aggType = "max"
isPipeline = true
case "medium": // from es version 6.6
aggType = "median_absolute_deviation"
case "p99", "p95","p90","p80","p50":
aggType = "percentiles"
percentStr := strings.TrimPrefix(metricItem.Statistic, "p")
percent, _ = strconv.ParseFloat(percentStr, 32)
}
aggValue := util.MapStr{
"field": field,
}
if aggType == "percentiles" {
aggValue["percents"] = []interface{}{percent}
}
aggs := util.MapStr{
metricItem.Name: util.MapStr{
aggType: aggValue,
},
}
if !isPipeline{
return aggs
}
pipelineAggID := util.GetUUID()
aggs[pipelineAggID] = aggs[metricItem.Name]
aggs[metricItem.Name] = util.MapStr{
"derivative": util.MapStr{
"buckets_path": pipelineAggID,
},
}
return aggs
}
func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[string]interface{}, error){
if !fq.IsComplex(){
q := map[string]interface{}{}
if len(fq.Values) == 0 {
return nil, fmt.Errorf("values should not be empty")
}
//equals/gte/gt/lt/lte/in/match/regexp/wildcard/range/prefix/suffix/contain/
switch fq.Operator {
case "equals":
q["term"] = util.MapStr{
fq.Field: util.MapStr{
"value": fq.Values[0],
},
}
case "in":
q["terms"] = util.MapStr{
fq.Field: fq.Values,
}
case "match":
q[fq.Operator] = util.MapStr{
fq.Field: fq.Values[0],
}
case "gte", "gt", "lt", "lte":
q["range"] = util.MapStr{
fq.Field: util.MapStr{
fq.Operator: fq.Values[0],
},
}
case "range":
if len(fq.Values) != 2 {
return nil, fmt.Errorf("values length of range query must be 2, but got %d", len(fq.Values))
}
q["range"] = util.MapStr{
fq.Field: util.MapStr{
"gte": fq.Values[0],
"lte": fq.Values[1],
},
}
case "prefix":
q["prefix"] = util.MapStr{
fq.Field: fq.Values[0],
}
case "regexp", "wildcard":
q[fq.Operator] = util.MapStr{
fq.Field: util.MapStr{
"value": fq.Values[0],
},
}
default:
return nil, fmt.Errorf("unsupport query operator %s", fq.Operator)
}
return q, nil
}
if fq.Or != nil && fq.And != nil {
return nil, fmt.Errorf("filter format error: or, and bool operation in same level")
}
if fq.Or != nil && fq.Not != nil {
return nil, fmt.Errorf("filter format error: or, not bool operation in same level")
}
if fq.And != nil && fq.Not != nil {
return nil, fmt.Errorf("filter format error: and, not bool operation in same level")
}
var (
boolOperator string
filterQueries []alerting.FilterQuery
)
if len(fq.Not) >0 {
boolOperator = "must_not"
filterQueries = fq.Not
}else if len(fq.Or) > 0 {
boolOperator = "should"
filterQueries = fq.Or
}else {
boolOperator = "must"
filterQueries = fq.And
}
var subQueries []interface{}
for _, subQ := range filterQueries {
subQuery, err := engine.ConvertFilterQueryToDsl(&subQ)
if err != nil {
return nil, err
}
subQueries = append(subQueries, subQuery)
}
boolQuery := util.MapStr{
boolOperator: subQueries,
}
if boolOperator == "should" {
boolQuery["minimum_should_match"] = 1
}
resultQuery := map[string]interface{}{
"bool": boolQuery,
}
return resultQuery, nil
}
func getQueryTimeRange(rule *alerting.Rule, filterParam *alerting.FilterParam) (start, end interface{}){
var (
timeStart interface{}
timeEnd interface{}
)
if filterParam != nil {
timeStart = filterParam.Start
timeEnd = filterParam.End
}else{
var (
units string
value int
)
intervalDuration, err := time.ParseDuration(rule.Metrics.BucketSize)
if err != nil {
return nil, fmt.Errorf("parse bucket size of rule [%s] error: %v", rule.Name, err)
}
if intervalDuration / time.Hour >= 1 {
units = "h"
value = int(intervalDuration / time.Hour)
}else if intervalDuration / time.Minute >= 1{
units = "m"
value = int(intervalDuration / time.Minute)
}else if intervalDuration / time.Second >= 1 {
units = "s"
value = int(intervalDuration / time.Second)
}else{
return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.BucketSize)
}
bucketCount := rule.Conditions.GetMinimumPeriodMatch() + 1
if bucketCount <= 0 {
bucketCount = 1
}
duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * bucketCount, units))
if err != nil {
return nil, err
}
timeStart = time.Now().Add(-duration).UnixMilli() //.Format(time.RFC3339Nano)
timeEnd = time.Now().UnixMilli()
}
return timeStart, timeEnd
}
func (engine *Engine) generateTimeFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error){
timeStart, timeEnd := getQueryTimeRange(rule, filterParam)
timeQuery := util.MapStr{
"range": util.MapStr{
rule.Resource.TimeField: util.MapStr{
"gte": timeStart,
"lte": timeEnd,
},
},
}
return timeQuery, nil
}
func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) {
query := map[string]interface{}{}
var err error
if rule.Resource.RawFilter != nil {
query = util.DeepCopy(rule.Resource.RawFilter).(map[string]interface{})
}else{
if !rule.Resource.Filter.IsEmpty(){
query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter)
if err != nil {
return nil, err
}
}
}
//timeQuery, err := engine.generateTimeFilter(rule, filterParam)
//if err != nil {
// return nil, err
//}
//
//if boolQ, ok := query["bool"].(map[string]interface{}); ok {
// if mustQ, ok := boolQ["must"]; ok {
//
// if mustArr, ok := mustQ.([]interface{}); ok {
// boolQ["must"] = append(mustArr, timeQuery)
//
// }else{
// return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ)
// }
// }else{
// boolQ["must"] = []interface{}{
// timeQuery,
// }
// }
//}else{
// must := []interface{}{
// timeQuery,
// }
// if len(query) > 0 {
// if _, ok = query["match_all"]; !ok {
// must = append(must, query)
// }
// }
// query = util.MapStr{
// "bool": util.MapStr{
// "must": must,
// },
// }
//}
return query, nil
}
func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.FilterParam)(*alerting.QueryResult, error){
esClient := elastic.GetClient(rule.Resource.ID)
queryResult := &alerting.QueryResult{}
indexName := strings.Join(rule.Resource.Objects, ",")
//todo cache queryDsl
queryDsl, err := engine.GenerateQuery(rule, filterParam)
if err != nil {
return nil, err
}
if vm, ok := queryDsl.(util.MapStr); ok {
queryResult.Min, err = vm.GetValue(fmt.Sprintf("query.range.%s.gte", rule.Resource.TimeField))
queryResult.Max, _ = vm.GetValue(fmt.Sprintf("query.range.%s.lte", rule.Resource.TimeField))
}
queryDslBytes, err := util.ToJSONBytes(queryDsl)
if err != nil {
return nil, err
}
queryResult.Query = string(queryDslBytes)
searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes)
if err != nil {
return queryResult, err
}
if searchRes.GetTotal() == 0 {
queryResult.Nodata = true
}
if searchRes.StatusCode != 200 {
return queryResult, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body))
}
queryResult.Raw = string(searchRes.RawResult.Body)
searchResult := map[string]interface{}{}
err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult)
if err != nil {
return queryResult, err
}
metricData := []alerting.MetricData{}
CollectMetricData(searchResult["aggregations"], "", &metricData)
//将 derivative 求导数据 除以 bucket size (单位 /s)
//statisticM := map[string] string{}
//for _, mi := range rule.Metrics.Items {
// statisticM[mi.Name] = mi.Statistic
//}
//var periodInterval = rule.Metrics.PeriodInterval
//if filterParam != nil && filterParam.BucketSize != "" {
// periodInterval = filterParam.BucketSize
//}
//interval, err := time.ParseDuration(periodInterval)
//if err != nil {
// log.Error(err)
//}
//for i, _ := range metricData {
// for k, d := range metricData[i].Data {
// if statisticM[k] == "derivative" {
// for _, td := range d {
// if len(td) > 1 {
// if v, ok := td[1].(float64); ok {
// td[1] = v / interval.Seconds()
// }
// }
// }
// }
// }
//}
queryResult.MetricData = metricData
return queryResult, nil
}
func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error){
queryResult, err := engine.ExecuteQuery(rule, filterParam)
if err != nil {
return nil, queryResult, err
}
var targetMetricData []alerting.MetricData
for _, md := range queryResult.MetricData {
var targetData alerting.MetricData
if len(rule.Metrics.Items) == 1 {
targetData = md
} else {
targetData = alerting.MetricData{
GroupValues: md.GroupValues,
Data: map[string][]alerting.TimeMetricData{},
}
expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula)
if err != nil {
return nil, queryResult, err
}
dataLength := 0
for _, v := range md.Data {
dataLength = len(v)
break
}
DataLoop:
for i := 0; i < dataLength; i++ {
parameters := map[string]interface{}{}
var timestamp interface{}
for k, v := range md.Data {
if len(k) == 20 {
continue
}
if len(v) < dataLength {
continue
}
//drop nil value bucket
if v == nil || len(v[i]) < 2 {
continue DataLoop
}
if _, ok := v[i][1].(float64); !ok {
continue DataLoop
}
parameters[k] = v[i][1]
timestamp = v[i][0]
}
if len(parameters) == 0 {
continue
}
result, err := expression.Evaluate(parameters)
if err != nil {
return nil, queryResult, err
}
if r, ok := result.(float64); ok {
if math.IsNaN(r) || math.IsInf(r, 0 ){
if !isFilterNaN {
targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()})
}
continue
}
}
targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, result})
}
}
targetMetricData = append(targetMetricData, targetData)
}
return targetMetricData, queryResult, nil
}
//CheckCondition check whether rule conditions triggered or not
//if triggered returns an ConditionResult
//sort conditions by priority desc before check , and then if condition is true, then continue check another group
func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){
var resultItems []alerting.ConditionResultItem
targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, true, nil)
conditionResult := &alerting.ConditionResult{
QueryResult: queryResult,
}
if err != nil {
return conditionResult, err
}
for idx, targetData := range targetMetricData {
if idx == 0 {
sort.Slice(rule.Conditions.Items, func(i, j int) bool {
return alerting.PriorityWeights[rule.Conditions.Items[i].Priority] > alerting.PriorityWeights[rule.Conditions.Items[j].Priority]
})
}
LoopCondition:
for _, cond := range rule.Conditions.Items {
conditionExpression, err := cond.GenerateConditionExpression()
if err != nil {
return conditionResult, err
}
expression, err := govaluate.NewEvaluableExpression(conditionExpression)
if err != nil {
return conditionResult, err
}
dataLength := 0
dataKey := ""
for k, v := range targetData.Data {
dataLength = len(v)
dataKey = k
}
triggerCount := 0
for i := 0; i < dataLength; i++ {
//clear nil value
if targetData.Data[dataKey][i][1] == nil {
continue
}
if r, ok := targetData.Data[dataKey][i][1].(float64); ok {
if math.IsNaN(r){
continue
}
}
evaluateResult, err := expression.Evaluate(map[string]interface{}{
"result": targetData.Data[dataKey][i][1],
})
if err != nil {
return conditionResult, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err)
}
if evaluateResult == true {
triggerCount += 1
}else {
triggerCount = 0
}
if triggerCount >= cond.MinimumPeriodMatch {
log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues)
resultItem := alerting.ConditionResultItem{
GroupValues: targetData.GroupValues,
ConditionItem: &cond,
ResultValue: targetData.Data[dataKey][i][1],
IssueTimestamp: targetData.Data[dataKey][i][0],
RelationValues: map[string]interface{}{},
}
for _, metric := range rule.Metrics.Items{
resultItem.RelationValues[metric.Name] = queryResult.MetricData[idx].Data[metric.Name][i][1]
}
resultItems = append(resultItems, resultItem)
break LoopCondition
}
}
}
}
conditionResult.QueryResult.MetricData = targetMetricData
conditionResult.ResultItems = resultItems
return conditionResult, nil
}
func (engine *Engine) Do(rule *alerting.Rule) error {
var (
alertItem *alerting.Alert
err error
)
defer func() {
if err != nil && alertItem == nil {
alertItem = &alerting.Alert{
ID: util.GetUUID(),
Created: time.Now(),
Updated: time.Now(),
RuleID: rule.ID,
RuleName: rule.Name,
ResourceID: rule.Resource.ID,
ResourceName: rule.Resource.Name,
Expression: rule.Metrics.Expression,
Objects: rule.Resource.Objects,
State: alerting.AlertStateError,
//Priority: "undefine",
Error: err.Error(),
}
}
if alertItem != nil {
if err != nil{
alertItem.State = alerting.AlertStateError
alertItem.Error = err.Error()
}else {
for _, actionResult := range alertItem.ActionExecutionResults {
if actionResult.Error != "" {
alertItem.Error = actionResult.Error
alertItem.State = alerting.AlertStateError
}
}
}
err = orm.Save(nil, alertItem)
if err != nil {
log.Error(err)
}
}
}()
log.Tracef("start check condition of rule %s", rule.ID)
//todo do only once when rule not change
metricExpression, _ := rule.Metrics.GenerateExpression()
for i, cond := range rule.Conditions.Items {
expression, _ := cond.GenerateConditionExpression()
rule.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression)
}
alertItem = &alerting.Alert{
ID: util.GetUUID(),
Created: time.Now(),
Updated: time.Now(),
RuleID: rule.ID,
RuleName: rule.Name,
ResourceID: rule.Resource.ID,
ResourceName: rule.Resource.Name,
Expression: rule.Metrics.Expression,
Objects: rule.Resource.Objects,
Conditions: rule.Conditions,
State: alerting.AlertStateOK,
}
checkResults, err := engine.CheckCondition(rule)
alertItem.ConditionResult = checkResults
if err != nil {
return err
}
alertMessage, err := getLastAlertMessage(rule.ID, 2 * time.Minute)
if err != nil {
return fmt.Errorf("get alert message error: %w", err)
}
conditionResults := checkResults.ResultItems
var paramsCtx map[string]interface{}
if len(conditionResults) == 0 {
alertItem.Priority = "info"
if checkResults.QueryResult.Nodata {
alertItem.State = alerting.AlertStateNodata
}
if alertMessage != nil && alertMessage.Status != alerting.MessageStateRecovered && !checkResults.QueryResult.Nodata {
alertMessage.Status = alerting.MessageStateRecovered
alertMessage.ResourceID = rule.Resource.ID
alertMessage.ResourceName = rule.Resource.Name
err = saveAlertMessage(alertMessage)
if err != nil {
return fmt.Errorf("save alert message error: %w", err)
}
// todo add recover notification to inner system message
// send recover message to channel
recoverCfg := rule.RecoveryNotificationConfig
if recoverCfg != nil && recoverCfg.EventEnabled && recoverCfg.Enabled {
paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
alerting2.ParamEventID: alertItem.ID,
alerting2.ParamTimestamp: alertItem.Created.Unix(),
"duration": alertItem.Created.Sub(alertMessage.Created).String(),
"trigger_at": alertMessage.Created.Unix(),
})
err = attachTitleMessageToCtx(recoverCfg.Title, recoverCfg.Message, paramsCtx)
if err != nil {
return err
}
actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx, false)
alertItem.RecoverActionResults = actionResults
}
}
return nil
}
alertItem.State = alerting.AlertStateAlerting
var (
priority = conditionResults[0].ConditionItem.Priority
)
for _, conditionResult := range conditionResults {
if alerting.PriorityWeights[priority] < alerting.PriorityWeights[conditionResult.ConditionItem.Priority] {
priority = conditionResult.ConditionItem.Priority
}
}
triggerAt := alertItem.Created
if alertMessage != nil {
triggerAt = alertMessage.Created
}
paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
alerting2.ParamEventID: alertItem.ID,
alerting2.ParamTimestamp: alertItem.Created.Unix(),
"duration": alertItem.Created.Sub(triggerAt).String(),
"trigger_at": triggerAt.Unix(),
})
alertItem.Priority = priority
title, message := rule.GetNotificationTitleAndMessage()
err = attachTitleMessageToCtx(title, message, paramsCtx)
if err != nil {
return err
}
alertItem.Message = paramsCtx[alerting2.ParamMessage].(string)
alertItem.Title = paramsCtx[alerting2.ParamTitle].(string)
if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered {
msg := &alerting.AlertMessage{
RuleID: rule.ID,
Created: alertItem.Created,
Updated: time.Now(),
ID: util.GetUUID(),
ResourceID: rule.Resource.ID,
ResourceName: rule.Resource.Name,
Status: alerting.MessageStateAlerting,
Priority: priority,
Title: alertItem.Title,
Message: alertItem.Message,
Tags: rule.Tags,
Category: rule.Category,
}
err = saveAlertMessage(msg)
if err != nil {
return fmt.Errorf("save alert message error: %w", err)
}
userID := rule.Creator.Id
if userID == "" {
userID = "*"
}
notification := &model.Notification{
UserId: util.ToString(userID),
Type: model.NotificationTypeNotification,
MessageType: model.MessageTypeAlerting,
Status: model.NotificationStatusNew,
Title: alertItem.Title,
Body: alertItem.Message,
Link: "/alerting/message",
}
err = orm.Create(nil, notification)
if err != nil {
return fmt.Errorf("failed to create notification, err: %w", err)
}
}else{
alertMessage.Title = alertItem.Title
alertMessage.Message = alertItem.Message
alertMessage.ResourceID = rule.Resource.ID
alertMessage.ResourceName= rule.Resource.Name
alertMessage.Priority = priority
err = saveAlertMessage(alertMessage)
if err != nil {
return fmt.Errorf("save alert message error: %w", err)
}
}
log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID )
// if alert message status equals ignored , then skip sending message to channel
if alertMessage != nil && alertMessage.Status == alerting.MessageStateIgnored {
return nil
}
// if channel is not enabled return
notifyCfg := rule.GetNotificationConfig()
if notifyCfg == nil || !notifyCfg.Enabled {
return nil
}
if notifyCfg.AcceptTimeRange.Include(time.Now()) {
periodDuration, err := time.ParseDuration(notifyCfg.ThrottlePeriod)
if err != nil {
alertItem.Error = err.Error()
return err
}
if rule.LastNotificationTime.IsZero() {
tm, err := readTimeFromKV(alerting2.KVLastNotificationTime, []byte(rule.ID))
if err != nil {
return fmt.Errorf("get last notification time from kv error: %w", err)
}
if !tm.IsZero(){
rule.LastNotificationTime = tm
}
}
period := time.Now().Sub(rule.LastNotificationTime.Local())
//log.Error(lastAlertItem.ID, period, periodDuration)
if paramsCtx == nil {
paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
alerting2.ParamEventID: alertItem.ID,
alerting2.ParamTimestamp: alertItem.Created.Unix(),
"priority": priority,
"duration": alertItem.Created.Sub(alertMessage.Created).String(),
"trigger_at": alertMessage.Created.Unix(),
})
}
if alertMessage == nil || period > periodDuration {
actionResults, _ := performChannels(notifyCfg.Normal, paramsCtx, false)
alertItem.ActionExecutionResults = actionResults
//change and save last notification time in local kv store when action error count equals zero
rule.LastNotificationTime = time.Now()
strTime := time.Now().UTC().Format(time.RFC3339)
kv.AddValue(alerting2.KVLastNotificationTime, []byte(rule.ID), []byte(strTime))
alertItem.IsNotified = true
}
if notifyCfg.EscalationEnabled {
throttlePeriod, err := time.ParseDuration(notifyCfg.EscalationThrottlePeriod)
if err != nil {
return err
}
rule.LastTermStartTime = time.Now()
if alertMessage != nil {
rule.LastTermStartTime = alertMessage.Created
}
if time.Now().Sub(rule.LastTermStartTime.Local()) > throttlePeriod {
if rule.LastEscalationTime.IsZero(){
tm, err := readTimeFromKV(alerting2.KVLastEscalationTime, []byte(rule.ID))
if err != nil {
return fmt.Errorf("get last escalation time from kv error: %w", err)
}
if !tm.IsZero(){
rule.LastEscalationTime = tm
}
}
if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration {
actionResults, _ := performChannels(notifyCfg.Escalation, paramsCtx, false)
alertItem.EscalationActionResults = actionResults
//todo init last escalation time when create task (by last alert item is escalated)
rule.LastEscalationTime = time.Now()
alertItem.IsEscalated = true
strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339)
kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime))
}
}
}
}
return nil
}
func attachTitleMessageToCtx(title, message string, paramsCtx map[string]interface{}) error{
var (
tplBytes []byte
err error
)
tplBytes, err = common.ResolveMessage(message, paramsCtx)
if err != nil {
return fmt.Errorf("resolve message template error: %w", err)
}
paramsCtx[alerting2.ParamMessage] = string(tplBytes)
tplBytes, err = common.ResolveMessage(title, paramsCtx)
if err != nil {
return fmt.Errorf("resolve title template error: %w", err)
}
paramsCtx[alerting2.ParamTitle] = string(tplBytes)
return nil
}
func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, extraParams map[string]interface{} ) map[string]interface{}{
var (
conditionParams []util.MapStr
firstGroupValue string
firstThreshold string
priority string
)
if len(checkResults.ResultItems) > 0 {
priority = checkResults.ResultItems[0].ConditionItem.Priority
sort.Slice(checkResults.ResultItems, func(i, j int) bool {
if alerting.PriorityWeights[checkResults.ResultItems[i].ConditionItem.Priority] > alerting.PriorityWeights[checkResults.ResultItems[j].ConditionItem.Priority] {
return true
}
return false
})
sort.Slice(checkResults.ResultItems, func(i, j int) bool {
if vi, ok := checkResults.ResultItems[i].ResultValue.(float64); ok {
if vj, ok := checkResults.ResultItems[j].ResultValue.(float64); ok {
return vi > vj
}
}
return false
})
}
for i, resultItem := range checkResults.ResultItems {
if i >= 10 {
break
}
if i == 0 {
firstGroupValue = strings.Join(resultItem.GroupValues, ",")
firstThreshold = strings.Join(resultItem.ConditionItem.Values, ",")
}
conditionParams = append(conditionParams, util.MapStr{
alerting2.ParamThreshold: resultItem.ConditionItem.Values,
alerting2.Priority: resultItem.ConditionItem.Priority,
alerting2.ParamGroupValues: resultItem.GroupValues,
alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp,
alerting2.ParamResultValue: resultItem.ResultValue,
alerting2.ParamRelationValues: resultItem.RelationValues,
})
}
envVariables, err := alerting2.GetEnvVariables()
if err != nil {
log.Errorf("get env variables error: %v", err)
}
var (
min interface{}
max interface{}
)
if checkResults.QueryResult != nil {
min = checkResults.QueryResult.Min
max = checkResults.QueryResult.Max
if v, ok := min.(int64); ok {
min = time.Unix(v/1000, v%1000 * 1e5).UTC().Format("2006-01-02T15:04:05.999Z")
}
if v, ok := max.(int64); ok {
max = time.Unix(v/1000, v%1000 * 1e5).UTC().Format("2006-01-02T15:04:05.999Z")
}
}
paramsCtx := util.MapStr{
alerting2.ParamRuleID: rule.ID,
alerting2.ParamResourceID: rule.Resource.ID,
alerting2.ParamResourceName: rule.Resource.Name,
alerting2.ParamResults: conditionParams,
"objects": rule.Resource.Objects,
"first_group_value": firstGroupValue,
"first_threshold": firstThreshold,
"rule_name": rule.Name,
"priority": priority,
"min": min,
"max": max,
"env": envVariables,
}
err = util.MergeFields(paramsCtx, extraParams, true)
if err != nil {
log.Errorf("merge template params error: %v", err)
}
return paramsCtx
}
func (engine *Engine) Test(rule *alerting.Rule, msgType string) ([]alerting.ActionExecutionResult, error) {
checkResults, err := engine.CheckCondition(rule)
if err != nil {
return nil, fmt.Errorf("check condition error:%w", err)
}
alertMessage, err := getLastAlertMessage(rule.ID, 2 * time.Minute)
if err != nil {
return nil, fmt.Errorf("get alert message error: %w", err)
}
var actionResults []alerting.ActionExecutionResult
now := time.Now()
triggerAt := now
if alertMessage != nil {
triggerAt = alertMessage.Created
}
paramsCtx := newParameterCtx(rule, checkResults,util.MapStr{
alerting2.ParamEventID: util.GetUUID(),
alerting2.ParamTimestamp: now.Unix(),
"duration": now.Sub(triggerAt).String(),
"trigger_at": triggerAt.Unix(),
} )
if msgType == "escalation" || msgType == "notification" {
title, message := rule.GetNotificationTitleAndMessage()
err = attachTitleMessageToCtx(title, message, paramsCtx)
if err != nil {
return nil, err
}
}else if msgType == "recover_notification" {
if rule.RecoveryNotificationConfig == nil {
return nil, fmt.Errorf("recovery notification must not be empty")
}
err = attachTitleMessageToCtx(rule.RecoveryNotificationConfig.Title, rule.RecoveryNotificationConfig.Message, paramsCtx)
if err != nil {
return nil, err
}
}else{
return nil, fmt.Errorf("unkonwn parameter msg type")
}
var channels []alerting.Channel
switch msgType {
case "escalation":
notifyCfg := rule.GetNotificationConfig()
if notifyCfg == nil {
return nil, nil
}
channels = notifyCfg.Escalation
case "recover_notification":
if rule.RecoveryNotificationConfig != nil {
channels = rule.RecoveryNotificationConfig.Normal
}
case "notification":
notifyCfg := rule.GetNotificationConfig()
if notifyCfg == nil {
return nil, nil
}
channels = notifyCfg.Normal
}
if len(channels) > 0 {
actionResults, _ = performChannels(channels, paramsCtx, true)
}else{
return nil, fmt.Errorf("no useable channel")
}
return actionResults, nil
}
func performChannels(channels []alerting.Channel, ctx map[string]interface{}, raiseChannelEnabledErr bool) ([]alerting.ActionExecutionResult, int) {
var errCount int
var actionResults []alerting.ActionExecutionResult
for _, channel := range channels {
var (
errStr string
resBytes []byte
messageBytes []byte
)
_, err := common.RetrieveChannel(&channel, raiseChannelEnabledErr)
if err != nil {
log.Error(err)
errCount++
errStr = err.Error()
}else{
if !channel.Enabled {
continue
}
resBytes, err, messageBytes = common.PerformChannel(&channel, ctx)
if err != nil {
errCount++
errStr = err.Error()
}
}
actionResults = append(actionResults, alerting.ActionExecutionResult{
Result: string(resBytes),
Error: errStr,
Message: string(messageBytes),
ExecutionTime: int(time.Now().UnixNano()/1e6),
ChannelType: channel.SubType,
ChannelName: channel.Name,
ChannelID: channel.ID,
})
}
return actionResults, errCount
}
func (engine *Engine) GenerateTask(rule alerting.Rule) func(ctx context.Context) {
return func(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
log.Error(err)
debug.PrintStack()
}
}()
err := engine.Do(&rule)
if err != nil {
log.Error(err)
}
}
}
func CollectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData){
if aggM, ok := agg.(map[string]interface{}); ok {
if targetAgg, ok := aggM["filter_agg"]; ok {
collectMetricData(targetAgg, groupValues, metricData)
}else{
collectMetricData(aggM, groupValues, metricData)
}
}
}
func collectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData){
if aggM, ok := agg.(map[string]interface{}); ok {
if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok {
if bks, ok := timeBks["buckets"].([]interface{}); ok {
md := alerting.MetricData{
Data: map[string][]alerting.TimeMetricData{},
GroupValues: strings.Split(groupValues, "*"),
}
for _, bk := range bks {
if bkM, ok := bk.(map[string]interface{}); ok{
for k, v := range bkM {
if k == "key" || k == "key_as_string" || k== "doc_count"{
continue
}
if len(k) > 5 { //just store a,b,c
continue
}
if vm, ok := v.(map[string]interface{}); ok {
if metricVal, ok := vm["value"]; ok {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal})
}else{
//percentiles agg type
switch vm["values"].(type) {
case []interface{}:
for _, val := range vm["values"].([]interface{}) {
if valM, ok := val.(map[string]interface{}); ok {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], valM["value"]})
}
break
}
case map[string]interface{}:
for _, val := range vm["values"].(map[string]interface{}) {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], val})
break
}
}
}
}
}
}
}
*metricData = append(*metricData,md)
}
}else{
for k, v := range aggM {
if k == "key" || k== "doc_count"{
continue
}
if vm, ok := v.(map[string]interface{}); ok {
if bks, ok := vm["buckets"].([]interface{}); ok {
for _, bk := range bks {
if bkVal, ok := bk.(map[string]interface{}); ok {
currentGroup := bkVal["key"].(string)
newGroupValues := currentGroup
if groupValues != "" {
newGroupValues = fmt.Sprintf("%s*%s", groupValues, currentGroup)
}
collectMetricData(bk, newGroupValues, metricData)
}
}
}
}
break
}
}
}
}
func getLastAlertMessageFromES(ruleID string) (*alerting.AlertMessage, error) {
queryDsl := util.MapStr{
"size": 1,
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"query": util.MapStr{
"term": util.MapStr{
"rule_id": util.MapStr{
"value": ruleID,
},
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),
}
err, searchResult := orm.Search(alerting.AlertMessage{}, &q )
if err != nil {
return nil, err
}
if len(searchResult.Result) == 0 {
return nil, nil
}
messageBytes := util.MustToJSONBytes(searchResult.Result[0])
message := &alerting.AlertMessage{}
err = util.FromJSONBytes(messageBytes, message)
return message, err
}
func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.AlertMessage, error ){
messageBytes, err := kv.GetValue(alerting2.KVLastMessageState, []byte(ruleID))
if err != nil {
return nil, err
}
message := &alerting.AlertMessage{}
if messageBytes != nil {
err = util.FromJSONBytes(messageBytes, message)
if err != nil {
return nil, err
}
if time.Now().Sub(message.Updated) <= duration {
return message, nil
}
}
message, err = getLastAlertMessageFromES(ruleID)
return message, err
}
func saveAlertMessageToES(message *alerting.AlertMessage) error {
message.Updated = time.Now()
return orm.Save(nil, message)
}
func saveAlertMessage(message *alerting.AlertMessage) error {
//todo diff message if not change , then skip save to es ?
err := saveAlertMessageToES(message)
if err != nil {
return err
}
messageBytes, err := util.ToJSONBytes(message)
if err != nil {
return err
}
err = kv.AddValue(alerting2.KVLastMessageState, []byte(message.RuleID), messageBytes)
return err
}
func readTimeFromKV(bucketKey string, key []byte)(time.Time, error){
timeBytes, err := kv.GetValue(bucketKey, key)
zeroTime := time.Time{}
if err != nil {
return zeroTime, err
}
timeStr := string(timeBytes)
if timeStr != ""{
return time.ParseInLocation(time.RFC3339, string(timeBytes), time.UTC)
}
return zeroTime, nil
}