update alert api
This commit is contained in:
parent
6c75b5aeba
commit
162cf120c9
9
main.go
9
main.go
|
@ -8,6 +8,7 @@ import (
|
|||
"infini.sh/console/model/alerting"
|
||||
"infini.sh/console/model/gateway"
|
||||
_ "infini.sh/console/plugin"
|
||||
alerting2 "infini.sh/console/service/alerting"
|
||||
"infini.sh/framework"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/env"
|
||||
|
@ -27,6 +28,7 @@ import (
|
|||
_ "infini.sh/framework/plugins"
|
||||
api2 "infini.sh/gateway/api"
|
||||
_ "infini.sh/gateway/proxy"
|
||||
log "src/github.com/cihub/seelog"
|
||||
)
|
||||
|
||||
var appConfig *config.AppConfig
|
||||
|
@ -130,7 +132,12 @@ func main() {
|
|||
|
||||
api.RegisterSchema()
|
||||
|
||||
|
||||
go func() {
|
||||
err := alerting2.InitTasks()
|
||||
if err != nil {
|
||||
log.Errorf("init alerting task error: %v", err)
|
||||
}
|
||||
}()
|
||||
}, nil) {
|
||||
app.Run()
|
||||
}
|
||||
|
|
|
@ -26,11 +26,12 @@ type Alert struct {
|
|||
Error string `json:"error,omitempty"`
|
||||
IsNotified bool `json:"is_notified" elastic_mapping:"is_notified: { type: boolean }"` //标识本次检测是否发送了告警通知
|
||||
IsEscalated bool `json:"is_escalated" elastic_mapping:"is_escalated: { type: boolean }"` //标识本次检测是否发送了升级告警通知
|
||||
Conditions Condition `json:"condition"`
|
||||
ConditionResult *ConditionResult `json:"condition_result,omitempty" elastic_mapping:"condition_result: { type: object,enabled:false }"`
|
||||
SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"`
|
||||
}
|
||||
|
||||
type ActionExecutionResult struct {
|
||||
//ActionId string `json:"action_id"`
|
||||
LastExecutionTime int `json:"last_execution_time"`
|
||||
Error string `json:"error"`
|
||||
Result string `json:"result"`
|
||||
|
|
|
@ -19,6 +19,10 @@ type ConditionItem struct {
|
|||
}
|
||||
|
||||
type ConditionResult struct {
|
||||
ResultItems []ConditionResultItem `json:"result_items"`
|
||||
QueryResult *QueryResult `json:"query_result"`
|
||||
}
|
||||
type ConditionResultItem struct {
|
||||
GroupValues []string `json:"group_values"`
|
||||
ConditionItem *ConditionItem `json:"condition_item"`
|
||||
}
|
||||
|
|
|
@ -48,6 +48,12 @@ type MetricItem struct {
|
|||
Group []string `json:"group"` //bucket group
|
||||
}
|
||||
|
||||
type QueryResult struct {
|
||||
Query string `json:"query"`
|
||||
Raw string `json:"raw"`
|
||||
MetricData []MetricData `json:"metric_data"`
|
||||
}
|
||||
|
||||
type MetricData struct {
|
||||
GroupValues []string `json:"group_values"`
|
||||
Data map[string][]TimeMetricData `json:"data"`
|
||||
|
|
|
@ -27,10 +27,10 @@ type Rule struct {
|
|||
|
||||
type RuleChannel struct {
|
||||
Normal []Channel `json:"normal"`
|
||||
Escalation []Channel `json:"escalation"`
|
||||
Escalation []Channel `json:"escalation,omitempty"`
|
||||
ThrottlePeriod string `json:"throttle_period"` //沉默周期
|
||||
AcceptTimeRange TimeRange `json:"accept_time_range"`
|
||||
EscalationThrottlePeriod string `json:"escalation_throttle_period"`
|
||||
EscalationThrottlePeriod string `json:"escalation_throttle_period,omitempty"`
|
||||
EscalationEnabled bool `json:"escalation_enabled"`
|
||||
}
|
||||
|
||||
|
|
|
@ -31,5 +31,6 @@ func (alert *AlertAPI) Init() {
|
|||
|
||||
//just for test
|
||||
//api.HandleAPIMethod(api.GET, "/alerting/rule/test", alert.testRule)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -109,10 +109,10 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h
|
|||
|
||||
func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
id := ps.MustGetParameter("rule_id")
|
||||
obj := alerting.Rule{}
|
||||
obj := &alerting.Rule{}
|
||||
|
||||
obj.ID = id
|
||||
exists, err := orm.Get(&obj)
|
||||
exists, err := orm.Get(obj)
|
||||
if !exists || err != nil {
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"_id": id,
|
||||
|
@ -123,8 +123,8 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
|
|||
|
||||
id = obj.ID
|
||||
create := obj.Created
|
||||
obj = alerting.Rule{}
|
||||
err = alertAPI.DecodeJSON(req, &obj)
|
||||
obj = &alerting.Rule{}
|
||||
err = alertAPI.DecodeJSON(req, obj)
|
||||
if err != nil {
|
||||
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
|
@ -135,7 +135,13 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
|
|||
obj.ID = id
|
||||
obj.Created = create
|
||||
obj.Updated = time.Now()
|
||||
err = orm.Update(&obj)
|
||||
err = obj.Metrics.RefreshExpression()
|
||||
if err != nil {
|
||||
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
err = orm.Update(obj)
|
||||
if err != nil {
|
||||
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
|
@ -143,6 +149,13 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
|
|||
}
|
||||
|
||||
if obj.Enabled {
|
||||
exists, err = checkResourceExists(obj)
|
||||
if err != nil || !exists {
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"error": err.Error(),
|
||||
}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
//update task
|
||||
task.StopTask(id)
|
||||
eng := alerting2.GetEngine(obj.Resource.Type)
|
||||
|
@ -150,7 +163,7 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
|
|||
ID: obj.ID,
|
||||
Interval: obj.Schedule.Interval,
|
||||
Description: obj.Metrics.Expression,
|
||||
Task: eng.GenerateTask(&obj),
|
||||
Task: eng.GenerateTask(obj),
|
||||
}
|
||||
task.RegisterScheduleTask(ruleTask)
|
||||
task.StartTask(ruleTask.ID)
|
||||
|
|
|
@ -18,6 +18,8 @@ import (
|
|||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/util"
|
||||
"io"
|
||||
"math"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -44,7 +46,10 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
|
|||
}
|
||||
basicAggs := util.MapStr{}
|
||||
for _, metricItem := range rule.Metrics.Items {
|
||||
basicAggs[metricItem.Name] = engine.generateAgg(&metricItem)
|
||||
metricAggs := engine.generateAgg(&metricItem)
|
||||
if err = util.MergeFields(basicAggs, metricAggs, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
timeAggs := util.MapStr{
|
||||
"date_histogram": util.MapStr{
|
||||
|
@ -95,7 +100,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
|
|||
}, nil
|
||||
}
|
||||
//generateAgg convert statistic of metric item to elasticsearch aggregation
|
||||
func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{
|
||||
func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) map[string]interface{}{
|
||||
var (
|
||||
aggType = "value_count"
|
||||
field = metricItem.Field
|
||||
|
@ -104,11 +109,15 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{
|
|||
field = "_id"
|
||||
}
|
||||
var percent = 0.0
|
||||
var isPipeline = false
|
||||
switch metricItem.Statistic {
|
||||
case "max", "min", "sum", "avg":
|
||||
aggType = metricItem.Statistic
|
||||
case "count", "value_count":
|
||||
aggType = "value_count"
|
||||
case "rate":
|
||||
aggType = "max"
|
||||
isPipeline = true
|
||||
case "medium":
|
||||
aggType = "median_absolute_deviation"
|
||||
case "p99", "p95","p90","p80","p50":
|
||||
|
@ -122,9 +131,22 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{
|
|||
if aggType == "percentiles" {
|
||||
aggValue["percents"] = []interface{}{percent}
|
||||
}
|
||||
return util.MapStr{
|
||||
aggType: aggValue,
|
||||
aggs := util.MapStr{
|
||||
metricItem.Name: util.MapStr{
|
||||
aggType: aggValue,
|
||||
},
|
||||
}
|
||||
if !isPipeline{
|
||||
return aggs
|
||||
}
|
||||
pipelineAggID := util.GetUUID()
|
||||
aggs[pipelineAggID] = aggs[metricItem.Name]
|
||||
aggs[metricItem.Name] = util.MapStr{
|
||||
"derivative": util.MapStr{
|
||||
"buckets_path": pipelineAggID,
|
||||
},
|
||||
}
|
||||
return aggs
|
||||
}
|
||||
|
||||
func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[string]interface{}, error){
|
||||
|
@ -285,8 +307,10 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa
|
|||
must := []interface{}{
|
||||
timeQuery,
|
||||
}
|
||||
if _, ok := query["match_all"]; !ok {
|
||||
must = append(must, query)
|
||||
if len(query) > 0 {
|
||||
if _, ok = query["match_all"]; !ok {
|
||||
must = append(must, query)
|
||||
}
|
||||
}
|
||||
query = util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
|
@ -297,8 +321,9 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa
|
|||
return query, nil
|
||||
}
|
||||
|
||||
func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, error){
|
||||
func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error){
|
||||
esClient := elastic.GetClient(rule.Resource.ID)
|
||||
queryResult := &alerting.QueryResult{}
|
||||
indexName := strings.Join(rule.Resource.Objects, ",")
|
||||
queryDsl, err := engine.GenerateQuery(rule)
|
||||
if err != nil {
|
||||
|
@ -308,6 +333,7 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
queryResult.Query = string(queryDslBytes)
|
||||
searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -315,6 +341,7 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e
|
|||
if searchRes.StatusCode != 200 {
|
||||
return nil, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body))
|
||||
}
|
||||
queryResult.Raw = string(searchRes.RawResult.Body)
|
||||
searchResult := map[string]interface{}{}
|
||||
err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult)
|
||||
if err != nil {
|
||||
|
@ -322,18 +349,24 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e
|
|||
}
|
||||
metricData := []alerting.MetricData{}
|
||||
collectMetricData(searchResult["aggregations"], "", &metricData)
|
||||
return metricData, nil
|
||||
queryResult.MetricData = metricData
|
||||
return queryResult, nil
|
||||
}
|
||||
//CheckCondition check whether rule conditions triggered or not
|
||||
//if triggered returns an array of ConditionResult
|
||||
//sort conditions by severity desc before check , and then if condition is true, then continue check another group
|
||||
func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error){
|
||||
metricData, err := engine.ExecuteQuery(rule)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){
|
||||
queryResult, err := engine.ExecuteQuery(rule)
|
||||
conditionResult := &alerting.ConditionResult{
|
||||
QueryResult: queryResult,
|
||||
}
|
||||
var conditionResults []alerting.ConditionResult
|
||||
for _, md := range metricData {
|
||||
if err != nil {
|
||||
return conditionResult, err
|
||||
}
|
||||
|
||||
var resultItems []alerting.ConditionResultItem
|
||||
var targetMetricData []alerting.MetricData
|
||||
for _, md := range queryResult.MetricData {
|
||||
var targetData alerting.MetricData
|
||||
if len(rule.Metrics.Items) == 1 {
|
||||
targetData = md
|
||||
|
@ -344,7 +377,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
|
|||
}
|
||||
expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return conditionResult, err
|
||||
}
|
||||
dataLength := 0
|
||||
for _, v := range md.Data {
|
||||
|
@ -357,8 +390,14 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
|
|||
}
|
||||
var timestamp interface{}
|
||||
for k, v := range md.Data {
|
||||
if len(k) == 20 {
|
||||
continue
|
||||
}
|
||||
//drop nil value bucket
|
||||
if v[i][1] == nil {
|
||||
if len(v[i]) < 2 {
|
||||
continue DataLoop
|
||||
}
|
||||
if _, ok := v[i][1].(float64); !ok {
|
||||
continue DataLoop
|
||||
}
|
||||
parameters[k] = v[i][1]
|
||||
|
@ -366,11 +405,18 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
|
|||
}
|
||||
result, err := expression.Evaluate(parameters)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return conditionResult, err
|
||||
}
|
||||
if r, ok := result.(float64); ok {
|
||||
if math.IsNaN(r){
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, result})
|
||||
}
|
||||
}
|
||||
targetMetricData = append(targetMetricData, targetData)
|
||||
sort.Slice(rule.Conditions.Items, func(i, j int) bool {
|
||||
return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity]
|
||||
})
|
||||
|
@ -379,7 +425,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
|
|||
conditionExpression := ""
|
||||
valueLength := len(cond.Values)
|
||||
if valueLength == 0 {
|
||||
return nil, fmt.Errorf("condition values: %v should not be empty", cond.Values)
|
||||
return conditionResult, fmt.Errorf("condition values: %v should not be empty", cond.Values)
|
||||
}
|
||||
switch cond.Operator {
|
||||
case "equals":
|
||||
|
@ -394,15 +440,15 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
|
|||
conditionExpression = fmt.Sprintf("result < %v", cond.Values[0])
|
||||
case "range":
|
||||
if valueLength != 2 {
|
||||
return nil, fmt.Errorf("length of %s condition values should be 2", cond.Operator)
|
||||
return conditionResult, fmt.Errorf("length of %s condition values should be 2", cond.Operator)
|
||||
}
|
||||
conditionExpression = fmt.Sprintf("result >= %v && result <= %v", cond.Values[0], cond.Values[1])
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupport condition operator: %s", cond.Operator)
|
||||
return conditionResult, fmt.Errorf("unsupport condition operator: %s", cond.Operator)
|
||||
}
|
||||
expression, err := govaluate.NewEvaluableExpression(conditionExpression)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return conditionResult, err
|
||||
}
|
||||
dataLength := 0
|
||||
dataKey := ""
|
||||
|
@ -412,20 +458,20 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
|
|||
}
|
||||
triggerCount := 0
|
||||
for i := 0; i < dataLength; i++ {
|
||||
conditionResult, err := expression.Evaluate(map[string]interface{}{
|
||||
evaluateResult, err := expression.Evaluate(map[string]interface{}{
|
||||
"result": targetData.Data[dataKey][i][1],
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if conditionResult == true {
|
||||
if evaluateResult == true {
|
||||
triggerCount += 1
|
||||
}else {
|
||||
triggerCount = 0
|
||||
}
|
||||
if triggerCount >= cond.MinimumPeriodMatch {
|
||||
log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues)
|
||||
conditionResults = append(conditionResults, alerting.ConditionResult{
|
||||
resultItems = append(resultItems, alerting.ConditionResultItem{
|
||||
GroupValues: targetData.GroupValues,
|
||||
ConditionItem: &cond,
|
||||
})
|
||||
|
@ -435,7 +481,9 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
|
|||
|
||||
}
|
||||
}
|
||||
return conditionResults, nil
|
||||
conditionResult.QueryResult.MetricData = targetMetricData
|
||||
conditionResult.ResultItems = resultItems
|
||||
return conditionResult, nil
|
||||
}
|
||||
func (engine *Engine) Do(rule *alerting.Rule) error {
|
||||
|
||||
|
@ -474,7 +522,18 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
}
|
||||
}()
|
||||
log.Tracef("start check condition of rule %s", rule.ID)
|
||||
conditionResults, err := engine.CheckCondition(rule)
|
||||
checkResults, err := engine.CheckCondition(rule)
|
||||
alertItem = &alerting.Alert{
|
||||
ID: util.GetUUID(),
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
RuleID: rule.ID,
|
||||
ResourceID: rule.Resource.ID,
|
||||
Expression: rule.Metrics.Expression,
|
||||
Objects: rule.Resource.Objects,
|
||||
ConditionResult: checkResults,
|
||||
Conditions: rule.Conditions,
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -483,21 +542,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conditionResults := checkResults.ResultItems
|
||||
if len(conditionResults) == 0 {
|
||||
if lastAlertItem.State != alerting.AlertStateNormal && lastAlertItem.ID != "" {
|
||||
alertItem = &alerting.Alert{
|
||||
ID: util.GetUUID(),
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
RuleID: rule.ID,
|
||||
ResourceID: rule.Resource.ID,
|
||||
Expression: rule.Metrics.Expression,
|
||||
Objects: rule.Resource.Objects,
|
||||
Severity: "info",
|
||||
Content: "",
|
||||
State: alerting.AlertStateNormal,
|
||||
}
|
||||
}
|
||||
alertItem.Severity = "info"
|
||||
alertItem.Content = ""
|
||||
alertItem.State = alerting.AlertStateNormal
|
||||
return nil
|
||||
}else{
|
||||
if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal {
|
||||
|
@ -511,22 +560,12 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
for _, conditionResult := range conditionResults {
|
||||
if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] {
|
||||
severity = conditionResult.ConditionItem.Severity
|
||||
content = conditionResult.ConditionItem.Message
|
||||
}
|
||||
content += conditionResult.ConditionItem.Message + ";"
|
||||
}
|
||||
alertItem = &alerting.Alert{
|
||||
ID: util.GetUUID(),
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
RuleID: rule.ID,
|
||||
ResourceID: rule.Resource.ID,
|
||||
ResourceName: rule.Resource.Name,
|
||||
Expression: rule.Metrics.Expression,
|
||||
Objects: rule.Resource.Objects,
|
||||
Severity: severity,
|
||||
Content: content,
|
||||
State: alerting.AlertStateActive,
|
||||
}
|
||||
alertItem.Severity = severity
|
||||
alertItem.Content = content
|
||||
alertItem.State = alerting.AlertStateActive
|
||||
}
|
||||
|
||||
if rule.Channels.AcceptTimeRange.Include(time.Now()) {
|
||||
|
@ -571,7 +610,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResult) []alerting.ActionExecutionResult {
|
||||
func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResultItem) []alerting.ActionExecutionResult {
|
||||
var message string
|
||||
for _, conditionResult := range conditionResults {
|
||||
message += fmt.Sprintf("severity: %s\t message:%s\t groups:%v\t timestamp: %v;", conditionResult.ConditionItem.Severity, conditionResult.ConditionItem.Message, conditionResult.GroupValues, time.Now())
|
||||
|
@ -632,6 +671,12 @@ func performChannel(channel *alerting.Channel, ctx []byte) ([]byte, error) {
|
|||
}
|
||||
func (engine *Engine) GenerateTask(rule *alerting.Rule) func(ctx context.Context) {
|
||||
return func(ctx context.Context) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
log.Error(err)
|
||||
debug.PrintStack()
|
||||
}
|
||||
}()
|
||||
err := engine.Do(rule)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
|
|
|
@ -126,6 +126,57 @@ func TestGenerateAgg(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGeneratePercentilesAggQuery(t *testing.T) {
|
||||
//rule := alerting.Rule{
|
||||
// ID: util.GetUUID(),
|
||||
// Created: time.Now(),
|
||||
// Updated: time.Now(),
|
||||
// Enabled: true,
|
||||
// Resource: alerting.Resource{
|
||||
// ID: "c8i18llath2blrusdjng",
|
||||
// Type: "elasticsearch",
|
||||
// Objects: []string{".infini_metrics*"},
|
||||
// TimeField: "timestamp",
|
||||
// RawFilter: map[string]interface{}{
|
||||
// "match_all": util.MapStr{
|
||||
//
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
//
|
||||
// Metrics: alerting.Metric{
|
||||
// PeriodInterval: "1m",
|
||||
// MaxPeriods: 15,
|
||||
// Items: []alerting.MetricItem{
|
||||
// {Name: "a", Field: "payload.elasticsearch.node_stats.os.cpu.percent", Statistic: "p99", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},
|
||||
// },
|
||||
// },
|
||||
// Conditions: alerting.Condition{
|
||||
// Operator: "any",
|
||||
// Items: []alerting.ConditionItem{
|
||||
// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"},
|
||||
// },
|
||||
// },
|
||||
//
|
||||
// Channels: alerting.RuleChannel{
|
||||
// Normal: []alerting.Channel{
|
||||
// {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
|
||||
// HeaderParams: map[string]string{
|
||||
// "Content-Type": "application/json",
|
||||
// },
|
||||
// Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
|
||||
// Method: http.MethodPost,
|
||||
// URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX",
|
||||
// }},
|
||||
// },
|
||||
// ThrottlePeriod: "1h",
|
||||
// AcceptTimeRange: alerting.TimeRange{
|
||||
// Start: "8:00",
|
||||
// End: "21:00",
|
||||
// },
|
||||
// EscalationEnabled: true,
|
||||
// EscalationThrottlePeriod: "30m",
|
||||
// },
|
||||
//}
|
||||
rule := alerting.Rule{
|
||||
ID: util.GetUUID(),
|
||||
Created: time.Now(),
|
||||
|
@ -137,8 +188,16 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
|
|||
Objects: []string{".infini_metrics*"},
|
||||
TimeField: "timestamp",
|
||||
RawFilter: map[string]interface{}{
|
||||
"match_all": util.MapStr{
|
||||
|
||||
"bool": map[string]interface{}{
|
||||
"must": []interface{}{
|
||||
util.MapStr{
|
||||
"term": util.MapStr{
|
||||
"metadata.name": util.MapStr{
|
||||
"value": "index_stats",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -147,13 +206,15 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
|
|||
PeriodInterval: "1m",
|
||||
MaxPeriods: 15,
|
||||
Items: []alerting.MetricItem{
|
||||
{Name: "a", Field: "payload.elasticsearch.node_stats.os.cpu.percent", Statistic: "p99", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},
|
||||
{Name: "a", Field: "payload.elasticsearch.index_stats.total.search.query_total", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}},
|
||||
{Name: "b", Field: "payload.elasticsearch.index_stats.total.search.query_time_in_millis", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}},
|
||||
},
|
||||
Formula: "b/a",
|
||||
},
|
||||
Conditions: alerting.Condition{
|
||||
Operator: "any",
|
||||
Items: []alerting.ConditionItem{
|
||||
{MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"},
|
||||
{MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"10"}, Severity: "warning", Message: "搜索延迟大于10ms"},
|
||||
},
|
||||
},
|
||||
|
||||
|
@ -170,7 +231,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
|
|||
},
|
||||
ThrottlePeriod: "1h",
|
||||
AcceptTimeRange: alerting.TimeRange{
|
||||
Start: "8:00",
|
||||
Start: "08:00",
|
||||
End: "21:00",
|
||||
},
|
||||
EscalationEnabled: true,
|
||||
|
@ -209,11 +270,13 @@ func TestConvertFilterQuery(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
|
||||
var targetDsl = `{"bool":{"must":[{"term":{"metadata.category":{"value":"elasticsearch"}}},{"terms":{"metadata.name":["index_stats","node_stats"]}},{"bool":{"must_not":[{"range":{"timestamp":{"gt":"2022-04-16T16:16:39.168605+08:00"}}}]}}]}}`
|
||||
eng := &Engine{}
|
||||
q, err := eng.ConvertFilterQueryToDsl(&fq)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fmt.Println(util.MustToJSON(q))
|
||||
if dsl := util.MustToJSON(q); dsl != targetDsl {
|
||||
t.Errorf("expect dsl %s but got %s", targetDsl, dsl)
|
||||
}
|
||||
}
|
|
@ -13,8 +13,8 @@ import (
|
|||
|
||||
type Engine interface {
|
||||
GenerateQuery(rule *alerting.Rule) (interface{}, error)
|
||||
ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, error)
|
||||
CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error)
|
||||
ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error)
|
||||
CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error)
|
||||
GenerateTask(rule *alerting.Rule) func(ctx context.Context)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue