merge branch
This commit is contained in:
commit
40f50b710a
|
@ -53,6 +53,9 @@ type ConditionResult struct {
|
|||
type ConditionResultItem struct {
|
||||
GroupValues []string `json:"group_values"`
|
||||
ConditionItem *ConditionItem `json:"condition_item"`
|
||||
IssueTimestamp interface{} `json:"issue_timestamp"`
|
||||
ResultValue interface{} `json:"result_value"` //满足条件最后一个值
|
||||
RelationValues map[string]interface{} `json:"relation_values"`
|
||||
}
|
||||
|
||||
type Severity string
|
||||
|
|
|
@ -44,6 +44,7 @@ type MetricItem struct {
|
|||
Field string `json:"field"`
|
||||
Statistic string `json:"statistic"`
|
||||
Group []string `json:"group"` //bucket group
|
||||
Limit int `json:"limit"`
|
||||
}
|
||||
|
||||
type QueryResult struct {
|
||||
|
|
|
@ -74,3 +74,7 @@ func (tr *TimeRange) Include( t time.Time) bool {
|
|||
currentTimeStr := t.Format("15:04")
|
||||
return tr.Start <= currentTimeStr && currentTimeStr <= tr.End
|
||||
}
|
||||
|
||||
//ctx
|
||||
//rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values],
|
||||
//check_status ,timestamp,
|
|
@ -18,6 +18,7 @@ type AlertAPI struct {
|
|||
func (alert *AlertAPI) Init() {
|
||||
api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule)
|
||||
api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule)
|
||||
api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.sendTestMessage)
|
||||
api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule)
|
||||
api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule)
|
||||
api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule)
|
||||
|
@ -34,6 +35,7 @@ func (alert *AlertAPI) Init() {
|
|||
api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.searchAlert)
|
||||
api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.getAlert)
|
||||
api.HandleAPIMethod(api.POST, "/alerting/alert/_acknowledge", alert.acknowledgeAlert)
|
||||
api.HandleAPIMethod(api.GET, "/alerting/template/parameters", alert.getTemplateParams)
|
||||
|
||||
|
||||
//just for test
|
||||
|
|
|
@ -398,6 +398,29 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p
|
|||
"_id": id,
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
rule := alerting.Rule{}
|
||||
err := alertAPI.DecodeJSON(req, &rule)
|
||||
if err != nil {
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"error": err.Error(),
|
||||
}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
eng := alerting2.GetEngine(rule.Resource.Type)
|
||||
actionResults, err := eng.Test(&rule)
|
||||
if err != nil {
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"error": err.Error(),
|
||||
}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"action_results": actionResults,
|
||||
}, http.StatusOK)
|
||||
|
||||
}
|
||||
func checkResourceExists(rule *alerting.Rule) (bool, error) {
|
||||
if rule.Resource.ID == "" {
|
||||
return false, fmt.Errorf("resource id can not be empty")
|
||||
|
@ -419,6 +442,12 @@ func checkResourceExists(rule *alerting.Rule) (bool, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (alertAPI *AlertAPI) getTemplateParams(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"template_params": alerting2.GetTemplateParameters(),
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
//func (alertAPI *AlertAPI) testRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
// rule := alerting.Rule{
|
||||
// ID: util.GetUUID(),
|
||||
|
|
|
@ -9,3 +9,22 @@ const (
|
|||
KVLastTermStartTime = "alert_last_term_start_time"
|
||||
KVLastEscalationTime = "alert_last_escalation_time"
|
||||
)
|
||||
|
||||
|
||||
const (
|
||||
ParamRuleID = "rule_id" //规则 UUID
|
||||
ParamResourceID = "resource_id" // 资源 UUID
|
||||
ParamResourceName = "resource_name" // 资源名称 如集群名称 es-v714
|
||||
ParamEventID = "event_id" // 检查事件 ID
|
||||
ParamResults = "results" //
|
||||
ParamMessage = "message" //检查消息 自定义
|
||||
ParamPresetValue = "preset_value" //检查预设值 float64
|
||||
ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]}
|
||||
ParamStatus = "status" //状态
|
||||
ParamTimestamp = "timestamp" //事件产生时间戳
|
||||
ParamGroupValues = "group_values"
|
||||
ParamIssueTimestamp = "issue_timestamp"
|
||||
ParamRelationValues = "relation_values"
|
||||
//rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values],
|
||||
//check_status ,timestamp,
|
||||
)
|
||||
|
|
|
@ -9,9 +9,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"github.com/Knetic/govaluate"
|
||||
"github.com/buger/jsonparser"
|
||||
log "github.com/cihub/seelog"
|
||||
"github.com/valyala/fasttemplate"
|
||||
"infini.sh/console/model/alerting"
|
||||
alerting2 "infini.sh/console/service/alerting"
|
||||
"infini.sh/console/service/alerting/action"
|
||||
|
@ -19,12 +17,12 @@ import (
|
|||
"infini.sh/framework/core/kv"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/util"
|
||||
"io"
|
||||
"math"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -67,6 +65,11 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
|
|||
}
|
||||
var rootAggs util.MapStr
|
||||
groups := rule.Metrics.Items[0].Group
|
||||
limit := rule.Metrics.Items[0].Limit
|
||||
//top group 10
|
||||
if limit <= 0 {
|
||||
limit = 10
|
||||
}
|
||||
if grpLength := len(groups); grpLength > 0 {
|
||||
var lastGroupAgg util.MapStr
|
||||
|
||||
|
@ -74,7 +77,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
|
|||
groupAgg := util.MapStr{
|
||||
"terms": util.MapStr{
|
||||
"field": groups[i],
|
||||
"size": 500,
|
||||
"size": limit,
|
||||
},
|
||||
}
|
||||
groupID := util.GetUUID()
|
||||
|
@ -360,42 +363,33 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, e
|
|||
queryResult.MetricData = metricData
|
||||
return queryResult, nil
|
||||
}
|
||||
//CheckCondition check whether rule conditions triggered or not
|
||||
//if triggered returns an array of ConditionResult
|
||||
//sort conditions by severity desc before check , and then if condition is true, then continue check another group
|
||||
func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){
|
||||
func (engine *Engine) GetTargetMetricData(rule *alerting.Rule)([]alerting.MetricData, error){
|
||||
queryResult, err := engine.ExecuteQuery(rule)
|
||||
conditionResult := &alerting.ConditionResult{
|
||||
QueryResult: queryResult,
|
||||
}
|
||||
if err != nil {
|
||||
return conditionResult, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var resultItems []alerting.ConditionResultItem
|
||||
var targetMetricData []alerting.MetricData
|
||||
for _, md := range queryResult.MetricData {
|
||||
var targetData alerting.MetricData
|
||||
if len(rule.Metrics.Items) == 1 {
|
||||
targetData = md
|
||||
}else{
|
||||
} else {
|
||||
targetData = alerting.MetricData{
|
||||
GroupValues: md.GroupValues,
|
||||
Data: map[string][]alerting.TimeMetricData{},
|
||||
Data: map[string][]alerting.TimeMetricData{},
|
||||
}
|
||||
expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula)
|
||||
if err != nil {
|
||||
return conditionResult, err
|
||||
return nil, err
|
||||
}
|
||||
dataLength := 0
|
||||
for _, v := range md.Data {
|
||||
dataLength = len(v)
|
||||
break
|
||||
}
|
||||
DataLoop:
|
||||
DataLoop:
|
||||
for i := 0; i < dataLength; i++ {
|
||||
parameters := map[string]interface{}{
|
||||
}
|
||||
parameters := map[string]interface{}{}
|
||||
var timestamp interface{}
|
||||
for k, v := range md.Data {
|
||||
if len(k) == 20 {
|
||||
|
@ -413,10 +407,11 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes
|
|||
}
|
||||
result, err := expression.Evaluate(parameters)
|
||||
if err != nil {
|
||||
return conditionResult, err
|
||||
return nil, err
|
||||
}
|
||||
if r, ok := result.(float64); ok {
|
||||
if math.IsNaN(r) || math.IsInf(r, 0){
|
||||
if math.IsNaN(r) || math.IsInf(r, 0 ){
|
||||
targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()})
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -425,9 +420,32 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes
|
|||
}
|
||||
}
|
||||
targetMetricData = append(targetMetricData, targetData)
|
||||
sort.Slice(rule.Conditions.Items, func(i, j int) bool {
|
||||
return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity]
|
||||
})
|
||||
}
|
||||
return targetMetricData, nil
|
||||
}
|
||||
//CheckCondition check whether rule conditions triggered or not
|
||||
//if triggered returns an array of ConditionResult
|
||||
//sort conditions by severity desc before check , and then if condition is true, then continue check another group
|
||||
func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){
|
||||
queryResult, err := engine.ExecuteQuery(rule)
|
||||
conditionResult := &alerting.ConditionResult{
|
||||
QueryResult: queryResult,
|
||||
}
|
||||
if err != nil {
|
||||
return conditionResult, err
|
||||
}
|
||||
|
||||
var resultItems []alerting.ConditionResultItem
|
||||
targetMetricData, err := engine.GetTargetMetricData(rule)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for idx, targetData := range targetMetricData {
|
||||
if idx == 0 {
|
||||
sort.Slice(rule.Conditions.Items, func(i, j int) bool {
|
||||
return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity]
|
||||
})
|
||||
}
|
||||
LoopCondition:
|
||||
for _, cond := range rule.Conditions.Items {
|
||||
conditionExpression, err := cond.GenerateConditionExpression()
|
||||
|
@ -446,6 +464,11 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes
|
|||
}
|
||||
triggerCount := 0
|
||||
for i := 0; i < dataLength; i++ {
|
||||
if r, ok := targetData.Data[dataKey][i][1].(float64); ok {
|
||||
if math.IsNaN(r){
|
||||
continue
|
||||
}
|
||||
}
|
||||
evaluateResult, err := expression.Evaluate(map[string]interface{}{
|
||||
"result": targetData.Data[dataKey][i][1],
|
||||
})
|
||||
|
@ -459,10 +482,17 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes
|
|||
}
|
||||
if triggerCount >= cond.MinimumPeriodMatch {
|
||||
log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues)
|
||||
resultItems = append(resultItems, alerting.ConditionResultItem{
|
||||
resultItem := alerting.ConditionResultItem{
|
||||
GroupValues: targetData.GroupValues,
|
||||
ConditionItem: &cond,
|
||||
})
|
||||
ResultValue: targetData.Data[dataKey][i][1],
|
||||
IssueTimestamp: targetData.Data[dataKey][i][0],
|
||||
RelationValues: map[string]interface{}{},
|
||||
}
|
||||
for _, metric := range rule.Metrics.Items{
|
||||
resultItem.RelationValues[metric.Name] = queryResult.MetricData[idx].Data[metric.Name][i][1]
|
||||
}
|
||||
resultItems = append(resultItems, resultItem)
|
||||
break LoopCondition
|
||||
}
|
||||
}
|
||||
|
@ -578,9 +608,10 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
period := time.Now().Sub(rule.LastNotificationTime.Local())
|
||||
|
||||
//log.Error(lastAlertItem.ID, period, periodDuration)
|
||||
paramsCtx := newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Format(time.RFC3339))
|
||||
|
||||
if lastAlertItem.ID == "" || period > periodDuration {
|
||||
actionResults := performChannels(rule.Channels.Normal, conditionResults)
|
||||
actionResults := performChannels(rule.Channels.Normal, paramsCtx)
|
||||
alertItem.ActionExecutionResults = actionResults
|
||||
//todo init last notification time when create task (by last alert item is notified)
|
||||
rule.LastNotificationTime = time.Now()
|
||||
|
@ -619,7 +650,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
}
|
||||
}
|
||||
if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration {
|
||||
actionResults := performChannels(rule.Channels.Escalation, conditionResults)
|
||||
actionResults := performChannels(rule.Channels.Escalation, paramsCtx)
|
||||
alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...)
|
||||
//todo init last escalation time when create task (by last alert item is escalated)
|
||||
rule.LastEscalationTime = time.Now()
|
||||
|
@ -634,19 +665,52 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResultItem) []alerting.ActionExecutionResult {
|
||||
var message string
|
||||
for _, conditionResult := range conditionResults {
|
||||
message += fmt.Sprintf("severity: %s\t message:%s\t groups:%v\t timestamp: %v;", conditionResult.ConditionItem.Severity, conditionResult.ConditionItem.Message, conditionResult.GroupValues, time.Now())
|
||||
func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, eventID string, eventTimestamp interface{} ) map[string]interface{}{
|
||||
var conditionParams []util.MapStr
|
||||
for _, resultItem := range checkResults.ResultItems {
|
||||
conditionParams = append(conditionParams, util.MapStr{
|
||||
alerting2.ParamMessage: resultItem.ConditionItem.Message,
|
||||
alerting2.ParamPresetValue: resultItem.ConditionItem.Values,
|
||||
alerting2.ParamStatus: resultItem.ConditionItem.Severity,
|
||||
alerting2.ParamGroupValues: resultItem.GroupValues,
|
||||
alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp,
|
||||
alerting2.ParamResultValue: resultItem.ResultValue,
|
||||
alerting2.ParamRelationValues: resultItem.RelationValues,
|
||||
})
|
||||
}
|
||||
ctx := util.MapStr{
|
||||
"ctx": util.MapStr{
|
||||
"message": message,
|
||||
},
|
||||
paramsCtx := util.MapStr{
|
||||
alerting2.ParamRuleID: rule.ID,
|
||||
alerting2.ParamResourceID: rule.Resource.ID,
|
||||
alerting2.ParamResourceName: rule.Resource.Name,
|
||||
alerting2.ParamEventID: eventID,
|
||||
alerting2.ParamTimestamp: eventTimestamp,
|
||||
alerting2.ParamResults: conditionParams,
|
||||
}
|
||||
return paramsCtx
|
||||
}
|
||||
|
||||
func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) {
|
||||
checkResults, err := engine.CheckCondition(rule)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("check condition error:%w", err)
|
||||
}
|
||||
var actionResults []alerting.ActionExecutionResult
|
||||
paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Format(time.RFC3339))
|
||||
if len(rule.Channels.Normal) > 0 {
|
||||
actionResults = performChannels(rule.Channels.Normal, paramsCtx)
|
||||
}else if len(rule.Channels.Escalation) > 0{
|
||||
actionResults = performChannels(rule.Channels.Escalation, paramsCtx)
|
||||
}else{
|
||||
return nil, fmt.Errorf("no useable channel")
|
||||
}
|
||||
return actionResults, nil
|
||||
}
|
||||
|
||||
func performChannels(channels []alerting.Channel, ctx map[string]interface{}) []alerting.ActionExecutionResult {
|
||||
|
||||
var actionResults []alerting.ActionExecutionResult
|
||||
for _, channel := range channels {
|
||||
resBytes, err := performChannel(&channel, util.MustToJSONBytes(ctx))
|
||||
resBytes, err := performChannel(&channel, ctx)
|
||||
var errStr string
|
||||
if err != nil {
|
||||
errStr = err.Error()
|
||||
|
@ -660,22 +724,29 @@ func performChannels(channels []alerting.Channel, conditionResults []alerting.Co
|
|||
return actionResults
|
||||
}
|
||||
|
||||
func resolveMessage(messageTemplate string, ctx []byte) ([]byte, error){
|
||||
func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){
|
||||
msg := messageTemplate
|
||||
tpl := fasttemplate.New(msg, "{{", "}}")
|
||||
msgBuffer := bytes.NewBuffer(nil)
|
||||
_, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){
|
||||
keyParts := strings.Split(tag,".")
|
||||
value, _, _, err := jsonparser.Get(ctx, keyParts...)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return writer.Write(value)
|
||||
})
|
||||
//tpl := fasttemplate.New(msg, "{{", "}}")
|
||||
//msgBuffer := bytes.NewBuffer(nil)
|
||||
//_, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){
|
||||
// keyParts := strings.Split(tag,".")
|
||||
// value, _, _, err := jsonparser.Get(ctx, keyParts...)
|
||||
// if err != nil {
|
||||
// return 0, err
|
||||
// }
|
||||
// return writer.Write(value)
|
||||
//})
|
||||
//return msgBuffer.Bytes(), err
|
||||
tmpl, err := template.New("alert-message").Parse(msg)
|
||||
if err !=nil {
|
||||
return nil, fmt.Errorf("parse message temlate error: %w", err)
|
||||
}
|
||||
msgBuffer := &bytes.Buffer{}
|
||||
err = tmpl.Execute(msgBuffer, ctx)
|
||||
return msgBuffer.Bytes(), err
|
||||
}
|
||||
|
||||
func performChannel(channel *alerting.Channel, ctx []byte) ([]byte, error) {
|
||||
func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error) {
|
||||
var act action.Action
|
||||
switch channel.Type {
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ type Engine interface {
|
|||
ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error)
|
||||
CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error)
|
||||
GenerateTask(rule *alerting.Rule) func(ctx context.Context)
|
||||
Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error)
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* web: https://infinilabs.com
|
||||
* mail: hello#infini.ltd */
|
||||
|
||||
package alerting
|
||||
|
||||
type ParameterMeta struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"` //int, float, string, date, array, object
|
||||
Description string `json:"description"`
|
||||
Eg string `json:"eg,omitempty"`
|
||||
Properties []ParameterMeta `json:"properties,omitempty"`
|
||||
}
|
||||
|
||||
func GetTemplateParameters() []ParameterMeta {
|
||||
return []ParameterMeta{
|
||||
{ParamRuleID, "string", "rule uuid", "c9f663tath2e5a0vksjg", nil},
|
||||
{ParamResourceID, "string", "resource uuid", "c9f663tath2e5a0vksjg", nil},
|
||||
{ParamResourceName, "string", "resource name", "es-v716", nil},
|
||||
{ParamEventID, "string", "identifier for check details", "c9f663tath2e5a0vksjx", nil},
|
||||
{ParamResults, "array", "", "", []ParameterMeta{
|
||||
{ParamMessage, "string", "", "disk used 90%", nil},
|
||||
{ParamPresetValue, "float", "", "", nil},
|
||||
{ParamStatus, "string", "", "error", nil},
|
||||
{ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil},
|
||||
{ParamIssueTimestamp, "date", "", "1652184211252", nil},
|
||||
{ParamResultValue, "float", "", "91.2", nil},
|
||||
{ParamRelationValues, "map", "", "{a:100, b:91.2}", nil},
|
||||
}},
|
||||
{ParamTimestamp, "date", "", "", nil},
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue