update alerting api

This commit is contained in:
liugq 2022-05-13 12:27:06 +08:00
parent c5727f331f
commit 8926460150
6 changed files with 72 additions and 40 deletions

View File

@ -35,6 +35,7 @@ type ActionExecutionResult struct {
LastExecutionTime int `json:"last_execution_time"`
Error string `json:"error"`
Result string `json:"result"`
Message string `json:"message"`
}
const (

View File

@ -75,6 +75,10 @@ func (tr *TimeRange) Include( t time.Time) bool {
return tr.Start <= currentTimeStr && currentTimeStr <= tr.End
}
type FilterParam struct {
Start interface{} `json:"start"`
End interface{} `json:"end"`
}
//ctx
//rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values],
//check_status ,timestamp,

View File

@ -6,13 +6,13 @@ package alerting
import (
"fmt"
log "github.com/cihub/seelog"
"infini.sh/console/model/alerting"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"net/http"
log "src/github.com/cihub/seelog"
"strconv"
"strings"
)

View File

@ -431,6 +431,9 @@ func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Reque
}, http.StatusInternalServerError)
return
}
if rule.ID == "" {
rule.ID = util.GetUUID()
}
eng := alerting2.GetEngine(rule.Resource.Type)
actionResults, err := eng.Test(&rule)
if err != nil {
@ -483,7 +486,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
return
}
eng := alerting2.GetEngine(rule.Resource.Type)
metricData, err := eng.GetTargetMetricData(&rule, true)
metricData, err := eng.GetTargetMetricData(&rule, true, nil)
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{

View File

@ -35,8 +35,8 @@ type Engine struct {
//auto generate elasticsearch aggregations by metrics of rule
//group of metric item converted to terms aggregation and TimeField of rule converted to date_histogram aggregation
//convert statistic of metric item to elasticsearch aggregation
func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
filter, err := engine.GenerateRawFilter(rule)
func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.FilterParam) (interface{}, error) {
filter, err := engine.GenerateRawFilter(rule, filterParam)
if err != nil {
return nil, err
}
@ -258,7 +258,7 @@ func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[str
return resultQuery, nil
}
func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) {
func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) {
query := map[string]interface{}{}
var err error
if rule.Resource.RawFilter != nil {
@ -271,30 +271,47 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa
}
}
}
intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval)
if err != nil {
return nil, err
}
var (
units string
value int
timeStart interface{}
timeEnd interface{}
)
if intervalDuration / time.Hour >= 1 {
units = "h"
value = int(intervalDuration / time.Hour)
}else if intervalDuration / time.Minute >= 1{
units = "m"
value = int(intervalDuration / time.Minute)
}else if intervalDuration / time.Second >= 1 {
units = "s"
value = int(intervalDuration / time.Second)
if filterParam != nil {
timeStart = filterParam.Start
timeEnd = filterParam.End
}else{
return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval)
var (
units string
value int
)
intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval)
if err != nil {
return nil, err
}
if intervalDuration / time.Hour >= 1 {
units = "h"
value = int(intervalDuration / time.Hour)
}else if intervalDuration / time.Minute >= 1{
units = "m"
value = int(intervalDuration / time.Minute)
}else if intervalDuration / time.Second >= 1 {
units = "s"
value = int(intervalDuration / time.Second)
}else{
return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval)
}
duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * 15, units))
if err != nil {
return nil, err
}
timeStart = time.Now().Add(-duration).Format(time.RFC3339Nano)
timeEnd = time.Now().Format(time.RFC3339Nano)
}
timeQuery := util.MapStr{
"range": util.MapStr{
rule.Resource.TimeField: util.MapStr{
"gte": fmt.Sprintf("now-%d%s", value * 15, units),
"gte": timeStart,
"lte": timeEnd,
},
},
}
@ -331,12 +348,12 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa
return query, nil
}
func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error){
func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.FilterParam)(*alerting.QueryResult, error){
esClient := elastic.GetClient(rule.Resource.ID)
queryResult := &alerting.QueryResult{}
indexName := strings.Join(rule.Resource.Objects, ",")
//todo cache queryDsl
queryDsl, err := engine.GenerateQuery(rule)
queryDsl, err := engine.GenerateQuery(rule, filterParam)
if err != nil {
return nil, err
}
@ -363,8 +380,8 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, e
queryResult.MetricData = metricData
return queryResult, nil
}
func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool)([]alerting.MetricData, error){
queryResult, err := engine.ExecuteQuery(rule)
func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error){
queryResult, err := engine.ExecuteQuery(rule, filterParam)
if err != nil {
return nil, err
}
@ -429,7 +446,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool)
//if triggered returns an array of ConditionResult
//sort conditions by severity desc before check , and then if condition is true, then continue check another group
func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){
queryResult, err := engine.ExecuteQuery(rule)
queryResult, err := engine.ExecuteQuery(rule, nil)
conditionResult := &alerting.ConditionResult{
QueryResult: queryResult,
}
@ -438,7 +455,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes
}
var resultItems []alerting.ConditionResultItem
targetMetricData, err := engine.GetTargetMetricData(rule, false)
targetMetricData, err := engine.GetTargetMetricData(rule, false, nil)
if err != nil {
return nil, err
}
@ -542,7 +559,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
}
}()
log.Tracef("start check condition of rule %s", rule.ID)
checkResults, err := engine.CheckCondition(rule)
alertItem = &alerting.Alert{
ID: util.GetUUID(),
Created: time.Now(),
@ -552,10 +569,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
ResourceName: rule.Resource.Name,
Expression: rule.Metrics.Expression,
Objects: rule.Resource.Objects,
ConditionResult: checkResults,
Conditions: rule.Conditions,
State: alerting.AlertStateNormal,
}
checkResults, err := engine.CheckCondition(rule)
alertItem.ConditionResult = checkResults
if err != nil {
return err
}
@ -716,7 +734,7 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([
var errCount int
var actionResults []alerting.ActionExecutionResult
for _, channel := range channels {
resBytes, err := performChannel(&channel, ctx)
resBytes, err, messageBytes := performChannel(&channel, ctx)
var errStr string
if err != nil {
errCount++
@ -725,6 +743,7 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([
actionResults = append(actionResults, alerting.ActionExecutionResult{
Result: string(resBytes),
Error: errStr,
Message: string(messageBytes),
LastExecutionTime: int(time.Now().UnixNano()/1e6),
})
}
@ -744,23 +763,28 @@ func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte,
return msgBuffer.Bytes(), err
}
func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error) {
var act action.Action
func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error, []byte) {
var (
act action.Action
message []byte
err error
)
switch channel.Type {
case alerting.ChannelWebhook:
message, err := resolveMessage(channel.Webhook.Body, ctx)
message, err = resolveMessage(channel.Webhook.Body, ctx)
if err != nil {
return nil, err
return nil, err, message
}
act = &action.WebhookAction{
Data: channel.Webhook,
Message: string(message),
}
default:
return nil, fmt.Errorf("unsupported action type: %s", channel.Type)
return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message
}
return act.Execute()
executeResult, err := act.Execute()
return executeResult, err, message
}
func (engine *Engine) GenerateTask(rule *alerting.Rule) func(ctx context.Context) {
return func(ctx context.Context) {

View File

@ -12,12 +12,12 @@ import (
)
type Engine interface {
GenerateQuery(rule *alerting.Rule) (interface{}, error)
ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error)
GenerateQuery(rule *alerting.Rule, filterParam *alerting.FilterParam) (interface{}, error)
ExecuteQuery(rule *alerting.Rule, filterParam *alerting.FilterParam)(*alerting.QueryResult, error)
CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error)
GenerateTask(rule *alerting.Rule) func(ctx context.Context)
Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error)
GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool)([]alerting.MetricData, error)
GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error)
}
var (