update alert api

This commit is contained in:
liugq 2022-05-22 14:50:30 +08:00
parent f4b87b0122
commit 5ef6bc1965
4 changed files with 66 additions and 44 deletions

View File

@ -10,6 +10,15 @@ type Condition struct {
Operator string `json:"operator"` Operator string `json:"operator"`
Items []ConditionItem `json:"items"` Items []ConditionItem `json:"items"`
} }
func (cond *Condition) GetMinimumPeriodMatch() int{
var minPeriodMatch = 0
for _, citem := range cond.Items {
if citem.MinimumPeriodMatch > minPeriodMatch {
minPeriodMatch = citem.MinimumPeriodMatch
}
}
return minPeriodMatch
}
type ConditionItem struct { type ConditionItem struct {
//MetricName string `json:"metric"` //MetricName string `json:"metric"`

View File

@ -577,7 +577,7 @@ func (alertAPI *AlertAPI) getPreviewMetricData(w http.ResponseWriter, req *http.
End: max, End: max,
BucketSize: fmt.Sprintf("%ds", bucketSize), BucketSize: fmt.Sprintf("%ds", bucketSize),
} }
metricItem, err := getRuleMetricData(rule, filterParam) metricItem, _, err := getRuleMetricData(rule, filterParam)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{ alertAPI.WriteJSON(w, util.MapStr{
@ -612,7 +612,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
End: max, End: max,
BucketSize: fmt.Sprintf("%ds", bucketSize), BucketSize: fmt.Sprintf("%ds", bucketSize),
} }
metricItem, err := getRuleMetricData(rule, filterParam) metricItem, queryResult, err := getRuleMetricData(rule, filterParam)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{ alertAPI.WriteJSON(w, util.MapStr{
@ -620,16 +620,20 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
}, http.StatusInternalServerError) }, http.StatusInternalServerError)
return return
} }
alertAPI.WriteJSON(w, util.MapStr{ resBody := util.MapStr{
"metric": metricItem, "metric": metricItem,
}, http.StatusOK) }
if alertAPI.GetParameter(req, "debug") == "1" {
resBody["query"] = queryResult.Query
}
alertAPI.WriteJSON(w,resBody, http.StatusOK)
} }
func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) (*common.MetricItem, error) { func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) (*common.MetricItem, *alerting.QueryResult, error) {
eng := alerting2.GetEngine(rule.Resource.Type) eng := alerting2.GetEngine(rule.Resource.Type)
metricData, err := eng.GetTargetMetricData(rule, true, filterParam) metricData, queryResult, err := eng.GetTargetMetricData(rule, true, filterParam)
if err != nil { if err != nil {
return nil, err return nil,queryResult, err
} }
//var filteredMetricData []alerting.MetricData //var filteredMetricData []alerting.MetricData
//title := rule.Metrics.Formula //title := rule.Metrics.Formula
@ -696,7 +700,7 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam)
// } // }
// } // }
//} //}
return &metricItem, nil return &metricItem,queryResult, nil
} }

View File

@ -41,7 +41,6 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
if err != nil { if err != nil {
return nil, err return nil, err
} }
//todo generate agg
if len(rule.Metrics.Items) == 0 { if len(rule.Metrics.Items) == 0 {
return nil, fmt.Errorf("metric items should not be empty") return nil, fmt.Errorf("metric items should not be empty")
} }
@ -133,7 +132,7 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) map[string]in
case "rate": case "rate":
aggType = "max" aggType = "max"
isPipeline = true isPipeline = true
case "medium": case "medium": // from es version 6.6
aggType = "median_absolute_deviation" aggType = "median_absolute_deviation"
case "p99", "p95","p90","p80","p50": case "p99", "p95","p90","p80","p50":
aggType = "percentiles" aggType = "percentiles"
@ -304,7 +303,11 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerti
}else{ }else{
return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval) return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval)
} }
duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * 15, units)) bucketCount := rule.Conditions.GetMinimumPeriodMatch() + 1
if bucketCount <= 0 {
bucketCount = 1
}
duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * bucketCount, units))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -385,10 +388,10 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi
queryResult.MetricData = metricData queryResult.MetricData = metricData
return queryResult, nil return queryResult, nil
} }
func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error){ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error){
queryResult, err := engine.ExecuteQuery(rule, filterParam) queryResult, err := engine.ExecuteQuery(rule, filterParam)
if err != nil { if err != nil {
return nil, err return nil, queryResult, err
} }
var targetMetricData []alerting.MetricData var targetMetricData []alerting.MetricData
for _, md := range queryResult.MetricData { for _, md := range queryResult.MetricData {
@ -402,7 +405,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool,
} }
expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula)
if err != nil { if err != nil {
return nil, err return nil, queryResult, err
} }
dataLength := 0 dataLength := 0
for _, v := range md.Data { for _, v := range md.Data {
@ -429,7 +432,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool,
} }
result, err := expression.Evaluate(parameters) result, err := expression.Evaluate(parameters)
if err != nil { if err != nil {
return nil, err return nil, queryResult, err
} }
if r, ok := result.(float64); ok { if r, ok := result.(float64); ok {
if math.IsNaN(r) || math.IsInf(r, 0 ){ if math.IsNaN(r) || math.IsInf(r, 0 ){
@ -445,25 +448,20 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool,
} }
targetMetricData = append(targetMetricData, targetData) targetMetricData = append(targetMetricData, targetData)
} }
return targetMetricData, nil return targetMetricData, queryResult, nil
} }
//CheckCondition check whether rule conditions triggered or not //CheckCondition check whether rule conditions triggered or not
//if triggered returns an array of ConditionResult //if triggered returns an ConditionResult
//sort conditions by severity desc before check , and then if condition is true, then continue check another group //sort conditions by severity desc before check , and then if condition is true, then continue check another group
func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){
queryResult, err := engine.ExecuteQuery(rule, nil) var resultItems []alerting.ConditionResultItem
targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, false, nil)
conditionResult := &alerting.ConditionResult{ conditionResult := &alerting.ConditionResult{
QueryResult: queryResult, QueryResult: queryResult,
} }
if err != nil { if err != nil {
return conditionResult, err return conditionResult, err
} }
var resultItems []alerting.ConditionResultItem
targetMetricData, err := engine.GetTargetMetricData(rule, false, nil)
if err != nil {
return nil, err
}
for idx, targetData := range targetMetricData { for idx, targetData := range targetMetricData {
if idx == 0 { if idx == 0 {
sort.Slice(rule.Conditions.Items, func(i, j int) bool { sort.Slice(rule.Conditions.Items, func(i, j int) bool {
@ -604,22 +602,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
paramsCtx = newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Unix()) paramsCtx = newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Unix())
var ( var (
severity = conditionResults[0].ConditionItem.Severity severity = conditionResults[0].ConditionItem.Severity
tplBytes []byte
message string
title string
) )
tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) err = attachTitleMessageToCtx(rule, paramsCtx)
if err != nil { if err != nil {
return fmt.Errorf("resolve content template error: %w", err) return err
} }
message = string(tplBytes)
paramsCtx[alerting2.ParamMessage] = message
tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx)
if err != nil {
return fmt.Errorf("resolve title template error: %w", err)
}
title = string(tplBytes)
paramsCtx[alerting2.ParamTitle] = title
for _, conditionResult := range conditionResults { for _, conditionResult := range conditionResults {
if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] { if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] {
severity = conditionResult.ConditionItem.Severity severity = conditionResult.ConditionItem.Severity
@ -627,8 +614,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
} }
alertItem.Severity = severity alertItem.Severity = severity
alertItem.Message = message alertItem.Message = paramsCtx[alerting2.ParamMessage].(string)
alertItem.Title = title alertItem.Title = paramsCtx[alerting2.ParamTitle].(string)
alertItem.State = alerting.AlertStateAlerting alertItem.State = alerting.AlertStateAlerting
if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered {
msg := &alerting.AlertMessage{ msg := &alerting.AlertMessage{
@ -638,16 +625,16 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
ID: util.GetUUID(), ID: util.GetUUID(),
Status: alerting.MessageStateAlerting, Status: alerting.MessageStateAlerting,
Severity: severity, Severity: severity,
Title: title, Title: alertItem.Title,
Message: message, Message: alertItem.Message,
} }
err = saveAlertMessage(msg) err = saveAlertMessage(msg)
if err != nil { if err != nil {
return fmt.Errorf("save alert message error: %w", err) return fmt.Errorf("save alert message error: %w", err)
} }
}else{ }else{
alertMessage.Title = title alertMessage.Title = alertItem.Title
alertMessage.Message = message alertMessage.Message = alertItem.Message
err = saveAlertMessage(alertMessage) err = saveAlertMessage(alertMessage)
if err != nil { if err != nil {
return fmt.Errorf("save alert message error: %w", err) return fmt.Errorf("save alert message error: %w", err)
@ -736,6 +723,24 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
return nil return nil
} }
func attachTitleMessageToCtx(rule *alerting.Rule, paramsCtx map[string]interface{}) error{
var (
tplBytes []byte
err error
)
tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx)
if err != nil {
return fmt.Errorf("resolve message template error: %w", err)
}
paramsCtx[alerting2.ParamMessage] = string(tplBytes)
tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx)
if err != nil {
return fmt.Errorf("resolve title template error: %w", err)
}
paramsCtx[alerting2.ParamTitle] = string(tplBytes)
return nil
}
func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, eventID string, eventTimestamp interface{} ) map[string]interface{}{ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, eventID string, eventTimestamp interface{} ) map[string]interface{}{
var conditionParams []util.MapStr var conditionParams []util.MapStr
for _, resultItem := range checkResults.ResultItems { for _, resultItem := range checkResults.ResultItems {
@ -766,6 +771,10 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul
} }
var actionResults []alerting.ActionExecutionResult var actionResults []alerting.ActionExecutionResult
paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Unix()) paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Unix())
err = attachTitleMessageToCtx(rule, paramsCtx)
if err != nil {
return nil, err
}
if len(rule.Channels.Normal) > 0 { if len(rule.Channels.Normal) > 0 {
actionResults, _ = performChannels(rule.Channels.Normal, paramsCtx) actionResults, _ = performChannels(rule.Channels.Normal, paramsCtx)
}else if len(rule.Channels.Escalation) > 0{ }else if len(rule.Channels.Escalation) > 0{

View File

@ -17,7 +17,7 @@ type Engine interface {
CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error)
GenerateTask(rule *alerting.Rule) func(ctx context.Context) GenerateTask(rule *alerting.Rule) func(ctx context.Context)
Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error)
GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error)
} }
var ( var (