1296 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			1296 lines
		
	
	
		
			37 KiB
		
	
	
	
		
			Go
		
	
	
	
// Copyright (C) INFINI Labs & INFINI LIMITED.
 | 
						|
//
 | 
						|
// The INFINI Console is offered under the GNU Affero General Public License v3.0
 | 
						|
// and as commercial software.
 | 
						|
//
 | 
						|
// For commercial licensing, contact us at:
 | 
						|
//   - Website: infinilabs.com
 | 
						|
//   - Email: hello@infini.ltd
 | 
						|
//
 | 
						|
// Open Source licensed under AGPL V3:
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
/* Copyright © INFINI Ltd. All rights reserved.
 | 
						|
 * web: https://infinilabs.com
 | 
						|
 * mail: hello#infini.ltd */
 | 
						|
 | 
						|
package elasticsearch
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"github.com/Knetic/govaluate"
 | 
						|
	log "github.com/cihub/seelog"
 | 
						|
	"infini.sh/console/model"
 | 
						|
	"infini.sh/console/model/alerting"
 | 
						|
	alerting2 "infini.sh/console/service/alerting"
 | 
						|
	"infini.sh/console/service/alerting/common"
 | 
						|
	"infini.sh/framework/core/elastic"
 | 
						|
	"infini.sh/console/model/insight"
 | 
						|
	"infini.sh/framework/core/kv"
 | 
						|
	"infini.sh/framework/core/orm"
 | 
						|
	"infini.sh/framework/core/util"
 | 
						|
	"math"
 | 
						|
	"runtime/debug"
 | 
						|
	"sort"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
type Engine struct {
 | 
						|
 | 
						|
}
 | 
						|
//GenerateQuery generate a final elasticsearch query dsl object
 | 
						|
//when RawFilter of rule is not empty, priority use it, otherwise to covert from Filter of rule (todo)
 | 
						|
//auto generate time filter query and then attach to final query
 | 
						|
//auto generate elasticsearch aggregations by metrics of rule
 | 
						|
//group of metric item converted to terms aggregation and TimeField of rule converted to date_histogram aggregation
 | 
						|
//convert statistic of metric item to elasticsearch aggregation
 | 
						|
func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.FilterParam) (interface{}, error) {
 | 
						|
	filter, err := engine.GenerateRawFilter(rule, filterParam)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	timeFilter, err := engine.generateTimeFilter(rule, filterParam)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if len(rule.Metrics.Items) == 0 {
 | 
						|
		return nil, fmt.Errorf("metric items should not be empty")
 | 
						|
	}
 | 
						|
	basicAggs := util.MapStr{}
 | 
						|
	//todo bucket sort (es 6.1) bucket script (es 2.0)
 | 
						|
	for _, metricItem  := range rule.Metrics.Items {
 | 
						|
		metricAggs := engine.generateAgg(&metricItem)
 | 
						|
		if err = util.MergeFields(basicAggs, metricAggs, true); err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	verInfo := elastic.GetClient(rule.Resource.ID).GetVersion()
 | 
						|
	var periodInterval = rule.Metrics.BucketSize
 | 
						|
	if filterParam != nil && filterParam.BucketSize != "" {
 | 
						|
		periodInterval =  filterParam.BucketSize
 | 
						|
	}
 | 
						|
 | 
						|
	if verInfo.Number==""{
 | 
						|
		panic("invalid version")
 | 
						|
	}
 | 
						|
 | 
						|
	intervalField, err := elastic.GetDateHistogramIntervalField(verInfo.Distribution,verInfo.Number, periodInterval )
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("get interval field error: %w", err)
 | 
						|
	}
 | 
						|
	timeAggs := util.MapStr{
 | 
						|
		"time_buckets": util.MapStr{
 | 
						|
			"date_histogram": util.MapStr{
 | 
						|
				"field":    rule.Resource.TimeField,
 | 
						|
				intervalField: periodInterval,
 | 
						|
			},
 | 
						|
			"aggs": basicAggs,
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	var rootAggs util.MapStr
 | 
						|
	groups := rule.Metrics.Groups
 | 
						|
	if grpLength := len(groups); grpLength > 0 {
 | 
						|
		var lastGroupAgg util.MapStr
 | 
						|
 | 
						|
		for i := grpLength-1; i>=0; i-- {
 | 
						|
			limit := groups[i].Limit
 | 
						|
			//top group 10
 | 
						|
			if limit <= 0 {
 | 
						|
				limit = 10
 | 
						|
			}
 | 
						|
			groupAgg := util.MapStr{
 | 
						|
				"terms": util.MapStr{
 | 
						|
					"field": groups[i].Field,
 | 
						|
					"size": limit,
 | 
						|
				},
 | 
						|
			}
 | 
						|
			groupID := util.GetUUID()
 | 
						|
			if lastGroupAgg != nil {
 | 
						|
				groupAgg["aggs"] = util.MapStr{
 | 
						|
					groupID: lastGroupAgg,
 | 
						|
				}
 | 
						|
			}else{
 | 
						|
				groupAgg["aggs"] = timeAggs
 | 
						|
			}
 | 
						|
			lastGroupAgg = groupAgg
 | 
						|
		}
 | 
						|
		rootAggs = util.MapStr{
 | 
						|
			util.GetUUID(): lastGroupAgg,
 | 
						|
		}
 | 
						|
	}else{
 | 
						|
		rootAggs = timeAggs
 | 
						|
	}
 | 
						|
	if len(filter) > 0 {
 | 
						|
		rootAggs = util.MapStr{
 | 
						|
			"filter_agg": util.MapStr{
 | 
						|
				"filter": filter,
 | 
						|
				"aggs": rootAggs,
 | 
						|
			},
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return util.MapStr{
 | 
						|
		"size": 0,
 | 
						|
		"query": timeFilter,
 | 
						|
		"aggs": rootAggs,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
//generateAgg convert statistic of metric item to elasticsearch aggregation
 | 
						|
func (engine *Engine) generateAgg(metricItem *insight.MetricItem) map[string]interface{}{
 | 
						|
	var (
 | 
						|
		aggType = "value_count"
 | 
						|
		field = metricItem.Field
 | 
						|
	)
 | 
						|
	if field == "" || field == "*" {
 | 
						|
		field = "_id"
 | 
						|
	}
 | 
						|
	var percent = 0.0
 | 
						|
	var isPipeline = false
 | 
						|
	switch metricItem.Statistic {
 | 
						|
	case "max", "min", "sum", "avg":
 | 
						|
		aggType = metricItem.Statistic
 | 
						|
	case "count", "value_count":
 | 
						|
		aggType = "value_count"
 | 
						|
	case "derivative":
 | 
						|
		aggType = "max"
 | 
						|
		isPipeline = true
 | 
						|
	case "medium": // from es version 6.6
 | 
						|
		aggType = "median_absolute_deviation"
 | 
						|
	case "p99", "p95","p90","p80","p50":
 | 
						|
		aggType = "percentiles"
 | 
						|
		percentStr := strings.TrimPrefix(metricItem.Statistic, "p")
 | 
						|
		percent, _ = strconv.ParseFloat(percentStr, 32)
 | 
						|
	}
 | 
						|
	aggValue := util.MapStr{
 | 
						|
		"field": field,
 | 
						|
	}
 | 
						|
	if aggType == "percentiles" {
 | 
						|
		aggValue["percents"] = []interface{}{percent}
 | 
						|
	}
 | 
						|
	aggs := util.MapStr{
 | 
						|
		metricItem.Name: util.MapStr{
 | 
						|
			aggType: aggValue,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	if !isPipeline{
 | 
						|
		return aggs
 | 
						|
	}
 | 
						|
	pipelineAggID := util.GetUUID()
 | 
						|
	aggs[pipelineAggID] = aggs[metricItem.Name]
 | 
						|
	aggs[metricItem.Name] = util.MapStr{
 | 
						|
		"derivative": util.MapStr{
 | 
						|
			"buckets_path": pipelineAggID,
 | 
						|
		},
 | 
						|
	}
 | 
						|
	return aggs
 | 
						|
}
 | 
						|
 | 
						|
func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[string]interface{}, error){
 | 
						|
	if !fq.IsComplex(){
 | 
						|
		q := map[string]interface{}{}
 | 
						|
		if len(fq.Values) == 0 {
 | 
						|
			return nil, fmt.Errorf("values should not be empty")
 | 
						|
		}
 | 
						|
		//equals/gte/gt/lt/lte/in/match/regexp/wildcard/range/prefix/suffix/contain/
 | 
						|
		switch fq.Operator {
 | 
						|
		case "equals":
 | 
						|
			q["term"] = util.MapStr{
 | 
						|
				fq.Field: util.MapStr{
 | 
						|
					"value": fq.Values[0],
 | 
						|
				},
 | 
						|
			}
 | 
						|
		case "in":
 | 
						|
			q["terms"] = util.MapStr{
 | 
						|
				fq.Field: fq.Values,
 | 
						|
			}
 | 
						|
		case "match":
 | 
						|
			q[fq.Operator] = util.MapStr{
 | 
						|
				fq.Field: fq.Values[0],
 | 
						|
			}
 | 
						|
		case "gte", "gt", "lt", "lte":
 | 
						|
			q["range"] = util.MapStr{
 | 
						|
				fq.Field: util.MapStr{
 | 
						|
					fq.Operator: fq.Values[0],
 | 
						|
				},
 | 
						|
			}
 | 
						|
		case "range":
 | 
						|
			if len(fq.Values) != 2 {
 | 
						|
				return nil, fmt.Errorf("values length of range query must be 2, but got %d", len(fq.Values))
 | 
						|
			}
 | 
						|
			q["range"] = util.MapStr{
 | 
						|
				fq.Field: util.MapStr{
 | 
						|
					"gte": fq.Values[0],
 | 
						|
					"lte": fq.Values[1],
 | 
						|
				},
 | 
						|
			}
 | 
						|
		case "prefix":
 | 
						|
			q["prefix"] = util.MapStr{
 | 
						|
				fq.Field: fq.Values[0],
 | 
						|
			}
 | 
						|
		case "regexp", "wildcard":
 | 
						|
			q[fq.Operator] = util.MapStr{
 | 
						|
				fq.Field: util.MapStr{
 | 
						|
					"value": fq.Values[0],
 | 
						|
				},
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			return nil, fmt.Errorf("unsupport query operator %s", fq.Operator)
 | 
						|
		}
 | 
						|
		return q, nil
 | 
						|
	}
 | 
						|
	if fq.Or != nil && fq.And != nil {
 | 
						|
		return nil, fmt.Errorf("filter format error: or, and bool operation in same level")
 | 
						|
	}
 | 
						|
	if fq.Or != nil && fq.Not != nil {
 | 
						|
		return nil, fmt.Errorf("filter format error: or, not bool operation in same level")
 | 
						|
	}
 | 
						|
	if fq.And != nil && fq.Not != nil {
 | 
						|
		return nil, fmt.Errorf("filter format error: and, not bool operation in same level")
 | 
						|
	}
 | 
						|
	var (
 | 
						|
		boolOperator  string
 | 
						|
		filterQueries []alerting.FilterQuery
 | 
						|
	)
 | 
						|
 | 
						|
	if len(fq.Not) >0 {
 | 
						|
		boolOperator = "must_not"
 | 
						|
		filterQueries = fq.Not
 | 
						|
 | 
						|
	}else if len(fq.Or) > 0 {
 | 
						|
		boolOperator = "should"
 | 
						|
		filterQueries = fq.Or
 | 
						|
	}else {
 | 
						|
		boolOperator = "must"
 | 
						|
		filterQueries = fq.And
 | 
						|
	}
 | 
						|
	var subQueries []interface{}
 | 
						|
	for _, subQ := range filterQueries {
 | 
						|
		subQuery, err := engine.ConvertFilterQueryToDsl(&subQ)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		subQueries = append(subQueries, subQuery)
 | 
						|
	}
 | 
						|
	boolQuery := util.MapStr{
 | 
						|
		boolOperator: subQueries,
 | 
						|
	}
 | 
						|
	if boolOperator == "should" {
 | 
						|
		boolQuery["minimum_should_match"] = 1
 | 
						|
	}
 | 
						|
	resultQuery := map[string]interface{}{
 | 
						|
		"bool": boolQuery,
 | 
						|
	}
 | 
						|
 | 
						|
	return resultQuery, nil
 | 
						|
}
 | 
						|
 | 
						|
func getQueryTimeRange(rule *alerting.Rule, filterParam *alerting.FilterParam) (start, end interface{}){
 | 
						|
	var (
 | 
						|
		timeStart interface{}
 | 
						|
		timeEnd interface{}
 | 
						|
	)
 | 
						|
	if filterParam != nil {
 | 
						|
		timeStart = filterParam.Start
 | 
						|
		timeEnd = filterParam.End
 | 
						|
	}else{
 | 
						|
		var (
 | 
						|
			units string
 | 
						|
			value int
 | 
						|
		)
 | 
						|
		intervalDuration, err := time.ParseDuration(rule.Metrics.BucketSize)
 | 
						|
		if err != nil {
 | 
						|
			return nil, fmt.Errorf("parse bucket size of rule [%s] error: %v", rule.Name, err)
 | 
						|
		}
 | 
						|
		if intervalDuration / time.Hour >= 1 {
 | 
						|
			units = "h"
 | 
						|
			value = int(intervalDuration / time.Hour)
 | 
						|
		}else if intervalDuration / time.Minute >= 1{
 | 
						|
			units = "m"
 | 
						|
			value = int(intervalDuration / time.Minute)
 | 
						|
		}else if intervalDuration / time.Second >= 1 {
 | 
						|
			units = "s"
 | 
						|
			value = int(intervalDuration / time.Second)
 | 
						|
		}else{
 | 
						|
			return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.BucketSize)
 | 
						|
		}
 | 
						|
		bucketCount := rule.Conditions.GetMinimumPeriodMatch() + 1
 | 
						|
		if bucketCount <= 0 {
 | 
						|
			bucketCount = 1
 | 
						|
		}
 | 
						|
		duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * bucketCount, units))
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		timeStart = time.Now().Add(-duration).UnixMilli() //.Format(time.RFC3339Nano)
 | 
						|
		timeEnd = time.Now().UnixMilli()
 | 
						|
	}
 | 
						|
	return timeStart, timeEnd
 | 
						|
}
 | 
						|
 | 
						|
func (engine *Engine) generateTimeFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error){
 | 
						|
	timeStart, timeEnd := getQueryTimeRange(rule, filterParam)
 | 
						|
	timeQuery := util.MapStr{
 | 
						|
		"range": util.MapStr{
 | 
						|
			rule.Resource.TimeField: util.MapStr{
 | 
						|
				"gte": timeStart,
 | 
						|
				"lte": timeEnd,
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	return timeQuery, nil
 | 
						|
}
 | 
						|
 | 
						|
func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) {
 | 
						|
	query := map[string]interface{}{}
 | 
						|
	var err error
 | 
						|
	if rule.Resource.RawFilter != nil {
 | 
						|
		query = util.DeepCopy(rule.Resource.RawFilter).(map[string]interface{})
 | 
						|
	}else{
 | 
						|
		if !rule.Resource.Filter.IsEmpty(){
 | 
						|
			query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	//timeQuery, err := engine.generateTimeFilter(rule, filterParam)
 | 
						|
	//if err != nil {
 | 
						|
	//	return nil, err
 | 
						|
	//}
 | 
						|
	//
 | 
						|
	//if boolQ, ok := query["bool"].(map[string]interface{}); ok {
 | 
						|
	//	if mustQ, ok := boolQ["must"]; ok {
 | 
						|
	//
 | 
						|
	//		if mustArr, ok := mustQ.([]interface{}); ok {
 | 
						|
	//			boolQ["must"] = append(mustArr, timeQuery)
 | 
						|
	//
 | 
						|
	//		}else{
 | 
						|
	//			return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ)
 | 
						|
	//		}
 | 
						|
	//	}else{
 | 
						|
	//		boolQ["must"] = []interface{}{
 | 
						|
	//			timeQuery,
 | 
						|
	//		}
 | 
						|
	//	}
 | 
						|
	//}else{
 | 
						|
	//	must := []interface{}{
 | 
						|
	//		timeQuery,
 | 
						|
	//	}
 | 
						|
	//	if len(query) > 0 {
 | 
						|
	//		if _, ok = query["match_all"]; !ok {
 | 
						|
	//			must = append(must, query)
 | 
						|
	//		}
 | 
						|
	//	}
 | 
						|
	//	query = util.MapStr{
 | 
						|
	//		"bool":  util.MapStr{
 | 
						|
	//			"must": must,
 | 
						|
	//		},
 | 
						|
	//	}
 | 
						|
	//}
 | 
						|
	return query, nil
 | 
						|
}
 | 
						|
 | 
						|
func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.FilterParam)(*alerting.QueryResult, error){
 | 
						|
	esClient := elastic.GetClient(rule.Resource.ID)
 | 
						|
	queryResult := &alerting.QueryResult{}
 | 
						|
	indexName := strings.Join(rule.Resource.Objects, ",")
 | 
						|
	//todo cache queryDsl
 | 
						|
	queryDsl, err := engine.GenerateQuery(rule, filterParam)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	if vm, ok := queryDsl.(util.MapStr); ok {
 | 
						|
		queryResult.Min, err = vm.GetValue(fmt.Sprintf("query.range.%s.gte", rule.Resource.TimeField))
 | 
						|
		queryResult.Max, _ = vm.GetValue(fmt.Sprintf("query.range.%s.lte", rule.Resource.TimeField))
 | 
						|
	}
 | 
						|
	queryDslBytes, err := util.ToJSONBytes(queryDsl)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	queryResult.Query = string(queryDslBytes)
 | 
						|
	searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes)
 | 
						|
	if err != nil {
 | 
						|
		return queryResult, err
 | 
						|
	}
 | 
						|
	if searchRes.GetTotal() == 0 {
 | 
						|
		queryResult.Nodata = true
 | 
						|
	}
 | 
						|
	if searchRes.StatusCode != 200 {
 | 
						|
		return queryResult, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body))
 | 
						|
	}
 | 
						|
	queryResult.Raw = string(searchRes.RawResult.Body)
 | 
						|
	searchResult := map[string]interface{}{}
 | 
						|
	err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult)
 | 
						|
	if err != nil {
 | 
						|
		return queryResult, err
 | 
						|
	}
 | 
						|
	metricData := []alerting.MetricData{}
 | 
						|
	CollectMetricData(searchResult["aggregations"], "", &metricData)
 | 
						|
	//将 derivative 求导数据 除以 bucket size (单位 /s)
 | 
						|
	//statisticM := map[string] string{}
 | 
						|
	//for _, mi := range rule.Metrics.Items {
 | 
						|
	//	statisticM[mi.Name] = mi.Statistic
 | 
						|
	//}
 | 
						|
	//var periodInterval = rule.Metrics.PeriodInterval
 | 
						|
	//if filterParam != nil && filterParam.BucketSize != "" {
 | 
						|
	//	periodInterval =  filterParam.BucketSize
 | 
						|
	//}
 | 
						|
	//interval, err := time.ParseDuration(periodInterval)
 | 
						|
	//if err != nil {
 | 
						|
	//	log.Error(err)
 | 
						|
	//}
 | 
						|
	//for i, _ := range metricData {
 | 
						|
	//	for k, d := range metricData[i].Data {
 | 
						|
	//		if statisticM[k] == "derivative" {
 | 
						|
	//			for _, td := range d {
 | 
						|
	//				if len(td) > 1 {
 | 
						|
	//					if v, ok := td[1].(float64); ok {
 | 
						|
	//						td[1] = v / interval.Seconds()
 | 
						|
	//					}
 | 
						|
	//				}
 | 
						|
	//			}
 | 
						|
	//		}
 | 
						|
	//	}
 | 
						|
	//}
 | 
						|
	queryResult.MetricData = metricData
 | 
						|
	return queryResult, nil
 | 
						|
}
 | 
						|
func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error){
 | 
						|
	queryResult, err := engine.ExecuteQuery(rule, filterParam)
 | 
						|
	if err != nil {
 | 
						|
		return nil, queryResult, err
 | 
						|
	}
 | 
						|
	var targetMetricData []alerting.MetricData
 | 
						|
	for _, md := range queryResult.MetricData {
 | 
						|
		var targetData alerting.MetricData
 | 
						|
		if len(rule.Metrics.Items) == 1 {
 | 
						|
			targetData = md
 | 
						|
		} else {
 | 
						|
			targetData = alerting.MetricData{
 | 
						|
				GroupValues: md.GroupValues,
 | 
						|
				Data:        map[string][]alerting.TimeMetricData{},
 | 
						|
			}
 | 
						|
			expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula)
 | 
						|
			if err != nil {
 | 
						|
				return nil, queryResult, err
 | 
						|
			}
 | 
						|
			dataLength := 0
 | 
						|
			for _, v := range md.Data {
 | 
						|
				dataLength = len(v)
 | 
						|
				break
 | 
						|
			}
 | 
						|
		DataLoop:
 | 
						|
			for i := 0; i < dataLength; i++ {
 | 
						|
				parameters := map[string]interface{}{}
 | 
						|
				var timestamp interface{}
 | 
						|
				for k, v := range md.Data {
 | 
						|
					if len(k) == 20 {
 | 
						|
						continue
 | 
						|
					}
 | 
						|
					if len(v) < dataLength {
 | 
						|
						continue
 | 
						|
					}
 | 
						|
 | 
						|
					//drop nil value bucket
 | 
						|
					if v == nil || len(v[i]) < 2 {
 | 
						|
						continue DataLoop
 | 
						|
					}
 | 
						|
					if _, ok := v[i][1].(float64); !ok {
 | 
						|
						continue DataLoop
 | 
						|
					}
 | 
						|
					parameters[k] = v[i][1]
 | 
						|
					timestamp = v[i][0]
 | 
						|
				}
 | 
						|
				if len(parameters) == 0 {
 | 
						|
					continue
 | 
						|
				}
 | 
						|
 | 
						|
				result, err := expression.Evaluate(parameters)
 | 
						|
				if err != nil {
 | 
						|
					return nil, queryResult, err
 | 
						|
				}
 | 
						|
				if r, ok := result.(float64); ok {
 | 
						|
					if math.IsNaN(r) || math.IsInf(r, 0 ){
 | 
						|
						if !isFilterNaN {
 | 
						|
							targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()})
 | 
						|
						}
 | 
						|
						continue
 | 
						|
					}
 | 
						|
				}
 | 
						|
 | 
						|
				targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, result})
 | 
						|
			}
 | 
						|
		}
 | 
						|
		targetMetricData = append(targetMetricData, targetData)
 | 
						|
	}
 | 
						|
	return targetMetricData, queryResult, nil
 | 
						|
}
 | 
						|
//CheckCondition check whether rule conditions triggered or not
 | 
						|
//if triggered returns an ConditionResult
 | 
						|
//sort conditions by priority desc  before check , and then if condition is true, then continue check another group
 | 
						|
func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){
 | 
						|
	var resultItems []alerting.ConditionResultItem
 | 
						|
	targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, true, nil)
 | 
						|
	conditionResult := &alerting.ConditionResult{
 | 
						|
		QueryResult: queryResult,
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return conditionResult, err
 | 
						|
	}
 | 
						|
	for idx, targetData := range targetMetricData {
 | 
						|
		if idx == 0 {
 | 
						|
			sort.Slice(rule.Conditions.Items, func(i, j int) bool {
 | 
						|
				return alerting.PriorityWeights[rule.Conditions.Items[i].Priority] > alerting.PriorityWeights[rule.Conditions.Items[j].Priority]
 | 
						|
			})
 | 
						|
		}
 | 
						|
		LoopCondition:
 | 
						|
		for _, cond := range rule.Conditions.Items {
 | 
						|
			conditionExpression, err := cond.GenerateConditionExpression()
 | 
						|
			if err != nil {
 | 
						|
				return conditionResult, err
 | 
						|
			}
 | 
						|
			expression, err := govaluate.NewEvaluableExpression(conditionExpression)
 | 
						|
			if err != nil {
 | 
						|
				return conditionResult, err
 | 
						|
			}
 | 
						|
			dataLength := 0
 | 
						|
			dataKey := ""
 | 
						|
			for k, v := range targetData.Data {
 | 
						|
				dataLength = len(v)
 | 
						|
				dataKey = k
 | 
						|
			}
 | 
						|
			triggerCount := 0
 | 
						|
			for i := 0; i < dataLength; i++ {
 | 
						|
				//clear nil value
 | 
						|
				if targetData.Data[dataKey][i][1] == nil {
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				if r, ok :=  targetData.Data[dataKey][i][1].(float64); ok {
 | 
						|
					if math.IsNaN(r){
 | 
						|
						continue
 | 
						|
					}
 | 
						|
				}
 | 
						|
				evaluateResult, err := expression.Evaluate(map[string]interface{}{
 | 
						|
					"result": targetData.Data[dataKey][i][1],
 | 
						|
				})
 | 
						|
				if err != nil {
 | 
						|
					return conditionResult, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err)
 | 
						|
				}
 | 
						|
				if evaluateResult == true {
 | 
						|
					triggerCount += 1
 | 
						|
				}else {
 | 
						|
					triggerCount = 0
 | 
						|
				}
 | 
						|
				if triggerCount >= cond.MinimumPeriodMatch {
 | 
						|
					log.Debugf("triggered condition  %v, groups: %v\n", cond, targetData.GroupValues)
 | 
						|
					resultItem := alerting.ConditionResultItem{
 | 
						|
						GroupValues: targetData.GroupValues,
 | 
						|
						ConditionItem: &cond,
 | 
						|
						ResultValue: targetData.Data[dataKey][i][1],
 | 
						|
						IssueTimestamp: targetData.Data[dataKey][i][0],
 | 
						|
						RelationValues: map[string]interface{}{},
 | 
						|
					}
 | 
						|
					for _, metric := range rule.Metrics.Items{
 | 
						|
						resultItem.RelationValues[metric.Name] = queryResult.MetricData[idx].Data[metric.Name][i][1]
 | 
						|
					}
 | 
						|
					resultItems = append(resultItems, resultItem)
 | 
						|
					break LoopCondition
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
		}
 | 
						|
	}
 | 
						|
	conditionResult.QueryResult.MetricData = targetMetricData
 | 
						|
	conditionResult.ResultItems = resultItems
 | 
						|
	return conditionResult, nil
 | 
						|
}
 | 
						|
func (engine *Engine) Do(rule *alerting.Rule) error {
 | 
						|
 | 
						|
	var (
 | 
						|
		alertItem *alerting.Alert
 | 
						|
		err error
 | 
						|
	)
 | 
						|
	defer func() {
 | 
						|
		if err != nil && alertItem == nil {
 | 
						|
			alertItem = &alerting.Alert{
 | 
						|
				ID: util.GetUUID(),
 | 
						|
				Created: time.Now(),
 | 
						|
				Updated: time.Now(),
 | 
						|
				RuleID: rule.ID,
 | 
						|
				RuleName: rule.Name,
 | 
						|
				ResourceID: rule.Resource.ID,
 | 
						|
				ResourceName: rule.Resource.Name,
 | 
						|
				Expression: rule.Metrics.Expression,
 | 
						|
				Objects: rule.Resource.Objects,
 | 
						|
				State: alerting.AlertStateError,
 | 
						|
				//Priority: "undefine",
 | 
						|
				Error: err.Error(),
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if alertItem != nil {
 | 
						|
			if err != nil{
 | 
						|
				alertItem.State = alerting.AlertStateError
 | 
						|
				alertItem.Error = err.Error()
 | 
						|
			}else {
 | 
						|
				for _, actionResult := range alertItem.ActionExecutionResults {
 | 
						|
					if actionResult.Error != "" {
 | 
						|
						alertItem.Error = actionResult.Error
 | 
						|
						alertItem.State = alerting.AlertStateError
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			err = orm.Save(nil, alertItem)
 | 
						|
			if err != nil {
 | 
						|
				log.Error(err)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	log.Tracef("start check condition of rule %s", rule.ID)
 | 
						|
 | 
						|
	//todo do only once when rule not change
 | 
						|
	metricExpression, _ := rule.Metrics.GenerateExpression()
 | 
						|
	for i, cond := range rule.Conditions.Items {
 | 
						|
		expression, _ := cond.GenerateConditionExpression()
 | 
						|
		rule.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression)
 | 
						|
	}
 | 
						|
	alertItem = &alerting.Alert{
 | 
						|
		ID: util.GetUUID(),
 | 
						|
		Created: time.Now(),
 | 
						|
		Updated: time.Now(),
 | 
						|
		RuleID: rule.ID,
 | 
						|
		RuleName: rule.Name,
 | 
						|
		ResourceID: rule.Resource.ID,
 | 
						|
		ResourceName: rule.Resource.Name,
 | 
						|
		Expression: rule.Metrics.Expression,
 | 
						|
		Objects: rule.Resource.Objects,
 | 
						|
		Conditions: rule.Conditions,
 | 
						|
		State: alerting.AlertStateOK,
 | 
						|
	}
 | 
						|
	checkResults, err := engine.CheckCondition(rule)
 | 
						|
	alertItem.ConditionResult = checkResults
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	alertMessage, err := getLastAlertMessage(rule.ID, 2 * time.Minute)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("get alert message error: %w", err)
 | 
						|
	}
 | 
						|
	conditionResults := checkResults.ResultItems
 | 
						|
	var paramsCtx map[string]interface{}
 | 
						|
	if len(conditionResults) == 0 {
 | 
						|
		alertItem.Priority = ""
 | 
						|
		if checkResults.QueryResult.Nodata {
 | 
						|
			alertItem.State =  alerting.AlertStateNodata
 | 
						|
		}
 | 
						|
 | 
						|
		if alertMessage != nil  &&  alertMessage.Status != alerting.MessageStateRecovered && !checkResults.QueryResult.Nodata {
 | 
						|
			alertMessage.Status = alerting.MessageStateRecovered
 | 
						|
			alertMessage.ResourceID =  rule.Resource.ID
 | 
						|
			alertMessage.ResourceName = rule.Resource.Name
 | 
						|
			err = saveAlertMessage(alertMessage)
 | 
						|
			if err != nil {
 | 
						|
				return fmt.Errorf("save alert message error: %w", err)
 | 
						|
			}
 | 
						|
			// todo add recover notification to inner system message
 | 
						|
			// send recover message to channel
 | 
						|
			recoverCfg := rule.RecoveryNotificationConfig
 | 
						|
			if  recoverCfg != nil && recoverCfg.EventEnabled && recoverCfg.Enabled {
 | 
						|
				paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
 | 
						|
					alerting2.ParamEventID: alertMessage.ID,
 | 
						|
					alerting2.ParamTimestamp:  alertItem.Created.Unix(),
 | 
						|
					"duration": alertItem.Created.Sub(alertMessage.Created).String(),
 | 
						|
					"trigger_at": alertMessage.Created.Unix(),
 | 
						|
				})
 | 
						|
				err = attachTitleMessageToCtx(recoverCfg.Title, recoverCfg.Message, paramsCtx)
 | 
						|
				if err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
				actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx, false)
 | 
						|
				alertItem.RecoverActionResults = actionResults
 | 
						|
				//clear history notification time
 | 
						|
				rule.LastNotificationTime = time.Time{}
 | 
						|
				rule.LastEscalationTime = time.Time{}
 | 
						|
				_ = kv.DeleteKey(alerting2.KVLastNotificationTime, []byte(rule.ID))
 | 
						|
				_ = kv.DeleteKey(alerting2.KVLastEscalationTime, []byte(rule.ID))
 | 
						|
			}
 | 
						|
		}
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	alertItem.State = alerting.AlertStateAlerting
 | 
						|
 | 
						|
	var (
 | 
						|
		priority = conditionResults[0].ConditionItem.Priority
 | 
						|
	)
 | 
						|
	for _, conditionResult := range conditionResults {
 | 
						|
		if alerting.PriorityWeights[priority] < alerting.PriorityWeights[conditionResult.ConditionItem.Priority] {
 | 
						|
			priority = conditionResult.ConditionItem.Priority
 | 
						|
		}
 | 
						|
	}
 | 
						|
	triggerAt := alertItem.Created
 | 
						|
	if alertMessage != nil {
 | 
						|
		triggerAt = alertMessage.Created
 | 
						|
	}
 | 
						|
	paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
 | 
						|
		alerting2.ParamTimestamp:  alertItem.Created.Unix(),
 | 
						|
		"duration": alertItem.Created.Sub(triggerAt).String(),
 | 
						|
		"trigger_at": triggerAt.Unix(),
 | 
						|
	})
 | 
						|
 | 
						|
	alertItem.Priority = priority
 | 
						|
	title, message := rule.GetNotificationTitleAndMessage()
 | 
						|
	err = attachTitleMessageToCtx(title, message, paramsCtx)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	alertItem.Message = paramsCtx[alerting2.ParamMessage].(string)
 | 
						|
	alertItem.Title = paramsCtx[alerting2.ParamTitle].(string)
 | 
						|
	if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered {
 | 
						|
		msg := &alerting.AlertMessage{
 | 
						|
			RuleID:       rule.ID,
 | 
						|
			Created:      alertItem.Created,
 | 
						|
			Updated:      time.Now(),
 | 
						|
			ID:           util.GetUUID(),
 | 
						|
			ResourceID:   rule.Resource.ID,
 | 
						|
			ResourceName: rule.Resource.Name,
 | 
						|
			Status:       alerting.MessageStateAlerting,
 | 
						|
			Priority:     priority,
 | 
						|
			Title:        alertItem.Title,
 | 
						|
			Message:      alertItem.Message,
 | 
						|
			Tags: rule.Tags,
 | 
						|
			Category: rule.Category,
 | 
						|
		}
 | 
						|
		alertMessage = msg
 | 
						|
		err = saveAlertMessage(msg)
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("save alert message error: %w", err)
 | 
						|
		}
 | 
						|
		userID := rule.Creator.Id
 | 
						|
		if userID == "" {
 | 
						|
			userID = "*"
 | 
						|
		}
 | 
						|
		notification := &model.Notification{
 | 
						|
			UserId:      util.ToString(userID),
 | 
						|
			Type:        model.NotificationTypeNotification,
 | 
						|
			MessageType: model.MessageTypeAlerting,
 | 
						|
			Status:      model.NotificationStatusNew,
 | 
						|
			Title:       alertItem.Title,
 | 
						|
			Body:        alertItem.Message,
 | 
						|
			Link:        "/alerting/message",
 | 
						|
		}
 | 
						|
		err = orm.Create(nil, notification)
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("failed to create notification, err: %w", err)
 | 
						|
		}
 | 
						|
	}else{
 | 
						|
		alertMessage.Title = alertItem.Title
 | 
						|
		alertMessage.Message = alertItem.Message
 | 
						|
		alertMessage.ResourceID =  rule.Resource.ID
 | 
						|
		alertMessage.ResourceName= rule.Resource.Name
 | 
						|
		alertMessage.Priority = priority
 | 
						|
		err = saveAlertMessage(alertMessage)
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("save alert message error: %w", err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID )
 | 
						|
 | 
						|
	// if alert message status equals ignored , then skip sending message to channel
 | 
						|
	if alertMessage != nil && alertMessage.Status == alerting.MessageStateIgnored {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if alertMessage != nil && paramsCtx != nil {
 | 
						|
		paramsCtx[alerting2.ParamEventID] = alertMessage.ID
 | 
						|
	}
 | 
						|
	// if channel is not enabled return
 | 
						|
	notifyCfg := rule.GetNotificationConfig()
 | 
						|
	if notifyCfg == nil || !notifyCfg.Enabled {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	if notifyCfg.AcceptTimeRange.Include(time.Now()) {
 | 
						|
		periodDuration, err := time.ParseDuration(notifyCfg.ThrottlePeriod)
 | 
						|
		if err != nil {
 | 
						|
			alertItem.Error = err.Error()
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if rule.LastNotificationTime.IsZero() {
 | 
						|
			tm, err := readTimeFromKV(alerting2.KVLastNotificationTime, []byte(rule.ID))
 | 
						|
			if err != nil {
 | 
						|
				return fmt.Errorf("get last notification time from kv error: %w", err)
 | 
						|
			}
 | 
						|
			if !tm.IsZero(){
 | 
						|
				rule.LastNotificationTime = tm
 | 
						|
			}
 | 
						|
		}
 | 
						|
		period := time.Now().Sub(rule.LastNotificationTime.Local())
 | 
						|
 | 
						|
		//log.Error(lastAlertItem.ID, period, periodDuration)
 | 
						|
		if paramsCtx == nil {
 | 
						|
			paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
 | 
						|
				alerting2.ParamTimestamp: alertItem.Created.Unix(),
 | 
						|
				"priority":               priority,
 | 
						|
				"duration": alertItem.Created.Sub(alertMessage.Created).String(),
 | 
						|
				"trigger_at": alertMessage.Created.Unix(),
 | 
						|
			})
 | 
						|
			if alertMessage != nil {
 | 
						|
				paramsCtx[alerting2.ParamEventID] = alertMessage.ID
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if alertMessage == nil || period > periodDuration {
 | 
						|
			actionResults, _ := performChannels(notifyCfg.Normal, paramsCtx, false)
 | 
						|
			alertItem.ActionExecutionResults = actionResults
 | 
						|
			//change and save last notification time in local kv store when action error count equals zero
 | 
						|
			rule.LastNotificationTime = time.Now()
 | 
						|
			strTime := time.Now().UTC().Format(time.RFC3339)
 | 
						|
			kv.AddValue(alerting2.KVLastNotificationTime, []byte(rule.ID), []byte(strTime))
 | 
						|
			alertItem.IsNotified = true
 | 
						|
		}
 | 
						|
 | 
						|
		if notifyCfg.EscalationEnabled {
 | 
						|
			throttlePeriod, err := time.ParseDuration(notifyCfg.EscalationThrottlePeriod)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
 | 
						|
			rule.LastTermStartTime = time.Now()
 | 
						|
			if alertMessage != nil {
 | 
						|
				rule.LastTermStartTime = alertMessage.Created
 | 
						|
			}
 | 
						|
			if time.Now().Sub(rule.LastTermStartTime.Local()) > throttlePeriod {
 | 
						|
				if rule.LastEscalationTime.IsZero(){
 | 
						|
					tm, err := readTimeFromKV(alerting2.KVLastEscalationTime, []byte(rule.ID))
 | 
						|
					if err != nil {
 | 
						|
						return fmt.Errorf("get last escalation time from kv error: %w", err)
 | 
						|
					}
 | 
						|
					if !tm.IsZero(){
 | 
						|
						rule.LastEscalationTime = tm
 | 
						|
					}
 | 
						|
				}
 | 
						|
				if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration {
 | 
						|
					actionResults, _ := performChannels(notifyCfg.Escalation, paramsCtx, false)
 | 
						|
					alertItem.EscalationActionResults = actionResults
 | 
						|
					//todo init last escalation time when create task (by last alert item is escalated)
 | 
						|
					rule.LastEscalationTime = time.Now()
 | 
						|
					alertItem.IsEscalated = true
 | 
						|
					strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339)
 | 
						|
					kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime))
 | 
						|
				}
 | 
						|
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func attachTitleMessageToCtx(title, message string, paramsCtx map[string]interface{}) error{
 | 
						|
	var (
 | 
						|
		tplBytes []byte
 | 
						|
		err error
 | 
						|
	)
 | 
						|
	tplBytes, err = common.ResolveMessage(message, paramsCtx)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("resolve message template error: %w", err)
 | 
						|
	}
 | 
						|
	paramsCtx[alerting2.ParamMessage] = string(tplBytes)
 | 
						|
	tplBytes, err = common.ResolveMessage(title, paramsCtx)
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("resolve title template error: %w", err)
 | 
						|
	}
 | 
						|
	paramsCtx[alerting2.ParamTitle] = string(tplBytes)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, extraParams map[string]interface{} ) map[string]interface{}{
 | 
						|
	var (
 | 
						|
		conditionParams []util.MapStr
 | 
						|
		firstGroupValue string
 | 
						|
		firstThreshold string
 | 
						|
		priority       string
 | 
						|
	)
 | 
						|
	if len(checkResults.ResultItems) > 0 {
 | 
						|
		priority = checkResults.ResultItems[0].ConditionItem.Priority
 | 
						|
		sort.Slice(checkResults.ResultItems, func(i, j int) bool {
 | 
						|
			if  alerting.PriorityWeights[checkResults.ResultItems[i].ConditionItem.Priority] > alerting.PriorityWeights[checkResults.ResultItems[j].ConditionItem.Priority] {
 | 
						|
				return true
 | 
						|
			}
 | 
						|
			return false
 | 
						|
		})
 | 
						|
		sort.Slice(checkResults.ResultItems, func(i, j int) bool {
 | 
						|
			if  vi, ok := checkResults.ResultItems[i].ResultValue.(float64); ok {
 | 
						|
				if vj, ok := checkResults.ResultItems[j].ResultValue.(float64); ok {
 | 
						|
					return vi > vj
 | 
						|
				}
 | 
						|
			}
 | 
						|
			return false
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	for i, resultItem := range checkResults.ResultItems {
 | 
						|
		if i >= 10 {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if i == 0 {
 | 
						|
			firstGroupValue = strings.Join(resultItem.GroupValues, ",")
 | 
						|
			firstThreshold = strings.Join(resultItem.ConditionItem.Values, ",")
 | 
						|
		}
 | 
						|
		conditionParams = append(conditionParams, util.MapStr{
 | 
						|
			alerting2.ParamThreshold:      resultItem.ConditionItem.Values,
 | 
						|
			alerting2.Priority:            resultItem.ConditionItem.Priority,
 | 
						|
			alerting2.ParamGroupValues:    resultItem.GroupValues,
 | 
						|
			alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp,
 | 
						|
			alerting2.ParamResultValue:    resultItem.ResultValue,
 | 
						|
			alerting2.ParamRelationValues: resultItem.RelationValues,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	envVariables, err := alerting2.GetEnvVariables()
 | 
						|
	if err != nil {
 | 
						|
		log.Errorf("get env variables error: %v", err)
 | 
						|
	}
 | 
						|
	var (
 | 
						|
		min interface{}
 | 
						|
		max interface{}
 | 
						|
	)
 | 
						|
	if checkResults.QueryResult != nil {
 | 
						|
		min = checkResults.QueryResult.Min
 | 
						|
		max = checkResults.QueryResult.Max
 | 
						|
		if v, ok := min.(int64); ok {
 | 
						|
			//expand 60s
 | 
						|
			min = time.UnixMilli(v).Add(-time.Second*60).UTC().Format("2006-01-02T15:04:05.999Z")
 | 
						|
		}
 | 
						|
		if v, ok := max.(int64); ok {
 | 
						|
			max = time.UnixMilli(v).Add(time.Second*60).UTC().Format("2006-01-02T15:04:05.999Z")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	paramsCtx := util.MapStr{
 | 
						|
		alerting2.ParamRuleID:       rule.ID,
 | 
						|
		alerting2.ParamResourceID:   rule.Resource.ID,
 | 
						|
		alerting2.ParamResourceName: rule.Resource.Name,
 | 
						|
		alerting2.ParamResults:      conditionParams,
 | 
						|
		"objects": rule.Resource.Objects,
 | 
						|
		"first_group_value":         firstGroupValue,
 | 
						|
		"first_threshold":           firstThreshold,
 | 
						|
		"rule_name":                 rule.Name,
 | 
						|
		"priority":                  priority,
 | 
						|
		"min":  min,
 | 
						|
		"max": max,
 | 
						|
		"env": envVariables,
 | 
						|
	}
 | 
						|
	err = util.MergeFields(paramsCtx, extraParams, true)
 | 
						|
	if err != nil {
 | 
						|
		log.Errorf("merge template params error: %v", err)
 | 
						|
	}
 | 
						|
	return paramsCtx
 | 
						|
}
 | 
						|
 | 
						|
func (engine *Engine) Test(rule *alerting.Rule, msgType string) ([]alerting.ActionExecutionResult, error) {
 | 
						|
	checkResults, err := engine.CheckCondition(rule)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("check condition error:%w", err)
 | 
						|
	}
 | 
						|
	alertMessage, err := getLastAlertMessage(rule.ID, 2 * time.Minute)
 | 
						|
	if err != nil {
 | 
						|
		return nil, fmt.Errorf("get alert message error: %w", err)
 | 
						|
	}
 | 
						|
	var actionResults []alerting.ActionExecutionResult
 | 
						|
 | 
						|
	now :=  time.Now()
 | 
						|
	triggerAt := now
 | 
						|
	if alertMessage != nil {
 | 
						|
		triggerAt = alertMessage.Created
 | 
						|
	}
 | 
						|
	paramsCtx := newParameterCtx(rule, checkResults,util.MapStr{
 | 
						|
		alerting2.ParamEventID: util.GetUUID(),
 | 
						|
		alerting2.ParamTimestamp:  now.Unix(),
 | 
						|
		"duration": now.Sub(triggerAt).String(),
 | 
						|
		"trigger_at": triggerAt.Unix(),
 | 
						|
	} )
 | 
						|
	if msgType == "escalation" || msgType == "notification" {
 | 
						|
		title, message := rule.GetNotificationTitleAndMessage()
 | 
						|
		err = attachTitleMessageToCtx(title, message, paramsCtx)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}else if msgType == "recover_notification" {
 | 
						|
		if rule.RecoveryNotificationConfig == nil {
 | 
						|
			return nil, fmt.Errorf("recovery notification must not be empty")
 | 
						|
		}
 | 
						|
		err = attachTitleMessageToCtx(rule.RecoveryNotificationConfig.Title, rule.RecoveryNotificationConfig.Message, paramsCtx)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
	}else{
 | 
						|
		return nil, fmt.Errorf("unkonwn parameter msg type")
 | 
						|
	}
 | 
						|
 | 
						|
	var channels []alerting.Channel
 | 
						|
	switch msgType {
 | 
						|
	case "escalation":
 | 
						|
		notifyCfg := rule.GetNotificationConfig()
 | 
						|
		if notifyCfg == nil {
 | 
						|
			return nil, nil
 | 
						|
		}
 | 
						|
		channels = notifyCfg.Escalation
 | 
						|
	case "recover_notification":
 | 
						|
		if rule.RecoveryNotificationConfig != nil {
 | 
						|
			channels = rule.RecoveryNotificationConfig.Normal
 | 
						|
		}
 | 
						|
	case "notification":
 | 
						|
		notifyCfg := rule.GetNotificationConfig()
 | 
						|
		if notifyCfg == nil {
 | 
						|
			return nil, nil
 | 
						|
		}
 | 
						|
		channels = notifyCfg.Normal
 | 
						|
	}
 | 
						|
	if len(channels) > 0 {
 | 
						|
		actionResults, _ = performChannels(channels, paramsCtx, true)
 | 
						|
	}else{
 | 
						|
		return nil, fmt.Errorf("no useable channel")
 | 
						|
	}
 | 
						|
	return actionResults, nil
 | 
						|
}
 | 
						|
 | 
						|
func performChannels(channels []alerting.Channel, ctx map[string]interface{}, raiseChannelEnabledErr bool) ([]alerting.ActionExecutionResult, int) {
 | 
						|
	var errCount int
 | 
						|
	var actionResults []alerting.ActionExecutionResult
 | 
						|
	for _, channel := range channels {
 | 
						|
		var (
 | 
						|
			errStr string
 | 
						|
			resBytes []byte
 | 
						|
			messageBytes []byte
 | 
						|
		)
 | 
						|
		_, err := common.RetrieveChannel(&channel, raiseChannelEnabledErr)
 | 
						|
		if err != nil {
 | 
						|
			log.Error(err)
 | 
						|
			errCount++
 | 
						|
			errStr = err.Error()
 | 
						|
		}else{
 | 
						|
			if !channel.Enabled {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
			resBytes, err, messageBytes = common.PerformChannel(&channel, ctx)
 | 
						|
			if err != nil {
 | 
						|
				errCount++
 | 
						|
				errStr = err.Error()
 | 
						|
			}
 | 
						|
		}
 | 
						|
		actionResults = append(actionResults, alerting.ActionExecutionResult{
 | 
						|
			Result:        string(resBytes),
 | 
						|
			Error:         errStr,
 | 
						|
			Message:       string(messageBytes),
 | 
						|
			ExecutionTime: int(time.Now().UnixNano()/1e6),
 | 
						|
			ChannelType:   channel.SubType,
 | 
						|
			ChannelName:   channel.Name,
 | 
						|
			ChannelID: channel.ID,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	return actionResults, errCount
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
 | 
						|
func (engine *Engine) GenerateTask(rule alerting.Rule) func(ctx context.Context) {
 | 
						|
	return func(ctx context.Context) {
 | 
						|
		defer func() {
 | 
						|
			if err := recover(); err != nil {
 | 
						|
				log.Error(err)
 | 
						|
				debug.PrintStack()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		err := engine.Do(&rule)
 | 
						|
		if err != nil {
 | 
						|
			log.Error(err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func CollectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData){
 | 
						|
	if aggM, ok := agg.(map[string]interface{}); ok {
 | 
						|
		if targetAgg, ok := aggM["filter_agg"]; ok {
 | 
						|
			collectMetricData(targetAgg, groupValues, metricData)
 | 
						|
		}else{
 | 
						|
			collectMetricData(aggM, groupValues, metricData)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func collectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData){
 | 
						|
	if aggM, ok := agg.(map[string]interface{}); ok {
 | 
						|
		if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok {
 | 
						|
			if bks, ok := timeBks["buckets"].([]interface{}); ok {
 | 
						|
				md := alerting.MetricData{
 | 
						|
					Data: map[string][]alerting.TimeMetricData{},
 | 
						|
					GroupValues: strings.Split(groupValues, "*"),
 | 
						|
				}
 | 
						|
				for _, bk := range bks {
 | 
						|
					if bkM, ok := bk.(map[string]interface{}); ok{
 | 
						|
 | 
						|
						for k, v := range bkM {
 | 
						|
							if k == "key" || k == "key_as_string" || k== "doc_count"{
 | 
						|
								continue
 | 
						|
							}
 | 
						|
							if len(k) > 5 { //just store a,b,c
 | 
						|
								continue
 | 
						|
							}
 | 
						|
							if vm, ok := v.(map[string]interface{}); ok {
 | 
						|
								if metricVal, ok := vm["value"]; ok {
 | 
						|
									md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal})
 | 
						|
								}else{
 | 
						|
									//percentiles agg type
 | 
						|
									switch  vm["values"].(type) {
 | 
						|
									case []interface{}:
 | 
						|
										for _, val := range vm["values"].([]interface{}) {
 | 
						|
											if valM, ok := val.(map[string]interface{}); ok {
 | 
						|
												md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], valM["value"]})
 | 
						|
											}
 | 
						|
											break
 | 
						|
										}
 | 
						|
									case map[string]interface{}:
 | 
						|
										for _, val := range vm["values"].(map[string]interface{}) {
 | 
						|
												md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], val})
 | 
						|
											break
 | 
						|
										}
 | 
						|
									}
 | 
						|
 | 
						|
								}
 | 
						|
 | 
						|
							}
 | 
						|
 | 
						|
						}
 | 
						|
					}
 | 
						|
 | 
						|
				}
 | 
						|
				*metricData = append(*metricData,md)
 | 
						|
			}
 | 
						|
 | 
						|
		}else{
 | 
						|
			for k, v := range aggM {
 | 
						|
				if k == "key" || k== "doc_count"{
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				if vm, ok := v.(map[string]interface{}); ok {
 | 
						|
					if bks, ok := vm["buckets"].([]interface{}); ok {
 | 
						|
						for _, bk := range bks {
 | 
						|
							if bkVal, ok :=  bk.(map[string]interface{}); ok {
 | 
						|
								currentGroup := bkVal["key"].(string)
 | 
						|
								newGroupValues := currentGroup
 | 
						|
								if groupValues != "" {
 | 
						|
									newGroupValues = fmt.Sprintf("%s*%s", groupValues, currentGroup)
 | 
						|
								}
 | 
						|
 | 
						|
								collectMetricData(bk, newGroupValues, metricData)
 | 
						|
							}
 | 
						|
 | 
						|
						}
 | 
						|
					}
 | 
						|
				}
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func getLastAlertMessageFromES(ruleID string) (*alerting.AlertMessage, error) {
 | 
						|
	queryDsl := util.MapStr{
 | 
						|
		"size": 1,
 | 
						|
		"sort": []util.MapStr{
 | 
						|
			{
 | 
						|
				"created": util.MapStr{
 | 
						|
					"order": "desc",
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
		"query": util.MapStr{
 | 
						|
			"term": util.MapStr{
 | 
						|
				"rule_id": util.MapStr{
 | 
						|
					"value": ruleID,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	q := orm.Query{
 | 
						|
		RawQuery: util.MustToJSONBytes(queryDsl),
 | 
						|
	}
 | 
						|
	err, searchResult := orm.Search(alerting.AlertMessage{}, &q )
 | 
						|
	if err != nil {
 | 
						|
		return  nil, err
 | 
						|
	}
 | 
						|
	if len(searchResult.Result) == 0 {
 | 
						|
		return  nil, nil
 | 
						|
	}
 | 
						|
	messageBytes := util.MustToJSONBytes(searchResult.Result[0])
 | 
						|
	message := &alerting.AlertMessage{}
 | 
						|
	err =  util.FromJSONBytes(messageBytes, message)
 | 
						|
	return message, err
 | 
						|
}
 | 
						|
 | 
						|
func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.AlertMessage, error ){
 | 
						|
	messageBytes, err := kv.GetValue(alerting2.KVLastMessageState, []byte(ruleID))
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	message := &alerting.AlertMessage{}
 | 
						|
	if messageBytes != nil {
 | 
						|
 | 
						|
		err = util.FromJSONBytes(messageBytes, message)
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		if time.Now().Sub(message.Updated) <= duration {
 | 
						|
			return message, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	message, err = getLastAlertMessageFromES(ruleID)
 | 
						|
	return message, err
 | 
						|
}
 | 
						|
 | 
						|
func saveAlertMessageToES(message *alerting.AlertMessage) error {
 | 
						|
	message.Updated = time.Now()
 | 
						|
	return orm.Save(nil, message)
 | 
						|
}
 | 
						|
 | 
						|
func saveAlertMessage(message *alerting.AlertMessage) error {
 | 
						|
	//todo diff message if not change , then skip save to es ?
 | 
						|
	err := saveAlertMessageToES(message)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	messageBytes, err := util.ToJSONBytes(message)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	err = kv.AddValue(alerting2.KVLastMessageState, []byte(message.RuleID), messageBytes)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
func readTimeFromKV(bucketKey string, key []byte)(time.Time, error){
 | 
						|
	timeBytes, err := kv.GetValue(bucketKey, key)
 | 
						|
	zeroTime := time.Time{}
 | 
						|
	if err != nil {
 | 
						|
		return zeroTime, err
 | 
						|
	}
 | 
						|
	timeStr :=  string(timeBytes)
 | 
						|
	if timeStr != ""{
 | 
						|
		return time.ParseInLocation(time.RFC3339, string(timeBytes), time.UTC)
 | 
						|
	}
 | 
						|
	return zeroTime, nil
 | 
						|
}
 |