From 9a4456be597865d9038837c6803cbd6b42657a0c Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 11 May 2022 11:55:41 +0800 Subject: [PATCH] add template parameters --- model/alerting/condition.go | 3 + model/alerting/rule.go | 4 + plugin/api/alerting/api.go | 1 + plugin/api/alerting/rule.go | 6 + service/alerting/constants.go | 19 +++ service/alerting/elasticsearch/engine.go | 160 +++++++++++++++-------- service/alerting/parameter.go | 32 +++++ 7 files changed, 168 insertions(+), 57 deletions(-) create mode 100644 service/alerting/parameter.go diff --git a/model/alerting/condition.go b/model/alerting/condition.go index 2d53dbd6..494f790a 100644 --- a/model/alerting/condition.go +++ b/model/alerting/condition.go @@ -53,6 +53,9 @@ type ConditionResult struct { type ConditionResultItem struct { GroupValues []string `json:"group_values"` ConditionItem *ConditionItem `json:"condition_item"` + IssueTimestamp interface{} `json:"issue_timestamp"` + ResultValue interface{} `json:"result_value"` //满足条件最后一个值 + RelationValues map[string]interface{} `json:"relation_values"` } type Severity string diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 293f0ec2..46829847 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -74,3 +74,7 @@ func (tr *TimeRange) Include( t time.Time) bool { currentTimeStr := t.Format("15:04") return tr.Start <= currentTimeStr && currentTimeStr <= tr.End } + +//ctx +//rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values], +//check_status ,timestamp, \ No newline at end of file diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 00b8769e..daf3c0a9 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -35,6 +35,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.searchAlert) api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.getAlert) api.HandleAPIMethod(api.POST, "/alerting/alert/_acknowledge", alert.acknowledgeAlert) + api.HandleAPIMethod(api.GET, "/alerting/template/parameters", alert.getTemplateParams) //just for test diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 4890bed6..edf46a52 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -442,6 +442,12 @@ func checkResourceExists(rule *alerting.Rule) (bool, error) { } } +func (alertAPI *AlertAPI) getTemplateParams(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + alertAPI.WriteJSON(w, util.MapStr{ + "template_params": alerting2.GetTemplateParameters(), + }, http.StatusOK) +} + //func (alertAPI *AlertAPI) testRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { // rule := alerting.Rule{ // ID: util.GetUUID(), diff --git a/service/alerting/constants.go b/service/alerting/constants.go index 013f80b0..ad36c95b 100644 --- a/service/alerting/constants.go +++ b/service/alerting/constants.go @@ -9,3 +9,22 @@ const ( KVLastTermStartTime = "alert_last_term_start_time" KVLastEscalationTime = "alert_last_escalation_time" ) + + +const ( + ParamRuleID = "rule_id" //规则 UUID + ParamResourceID = "resource_id" // 资源 UUID + ParamResourceName = "resource_name" // 资源名称 如集群名称 es-v714 + ParamEventID = "event_id" // 检查事件 ID + ParamResults = "results" // + ParamMessage = "message" //检查消息 自定义 + ParamPresetValue = "preset_value" //检查预设值 float64 + ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} + ParamStatus = "status" //状态 + ParamTimestamp = "timestamp" //事件产生时间戳 + ParamGroupValues = "group_values" + ParamIssueTimestamp = "issue_timestamp" + ParamRelationValues = "relation_values" +//rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values], +//check_status ,timestamp, +) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 46e81319..886a1a3f 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -9,9 +9,7 @@ import ( "context" "fmt" "github.com/Knetic/govaluate" - "github.com/buger/jsonparser" log "github.com/cihub/seelog" - "github.com/valyala/fasttemplate" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" "infini.sh/console/service/alerting/action" @@ -19,12 +17,12 @@ import ( "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" - "io" "math" "runtime/debug" "sort" "strconv" "strings" + "text/template" "time" ) @@ -365,42 +363,33 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, e queryResult.MetricData = metricData return queryResult, nil } -//CheckCondition check whether rule conditions triggered or not -//if triggered returns an array of ConditionResult -//sort conditions by severity desc before check , and then if condition is true, then continue check another group -func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ +func (engine *Engine) GetTargetMetricData(rule *alerting.Rule)([]alerting.MetricData, error){ queryResult, err := engine.ExecuteQuery(rule) - conditionResult := &alerting.ConditionResult{ - QueryResult: queryResult, - } if err != nil { - return conditionResult, err + return nil, err } - - var resultItems []alerting.ConditionResultItem var targetMetricData []alerting.MetricData for _, md := range queryResult.MetricData { var targetData alerting.MetricData if len(rule.Metrics.Items) == 1 { targetData = md - }else{ + } else { targetData = alerting.MetricData{ GroupValues: md.GroupValues, - Data: map[string][]alerting.TimeMetricData{}, + Data: map[string][]alerting.TimeMetricData{}, } expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) if err != nil { - return conditionResult, err + return nil, err } dataLength := 0 for _, v := range md.Data { dataLength = len(v) break } - DataLoop: + DataLoop: for i := 0; i < dataLength; i++ { - parameters := map[string]interface{}{ - } + parameters := map[string]interface{}{} var timestamp interface{} for k, v := range md.Data { if len(k) == 20 { @@ -418,10 +407,11 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } result, err := expression.Evaluate(parameters) if err != nil { - return conditionResult, err + return nil, err } if r, ok := result.(float64); ok { - if math.IsNaN(r) || math.IsInf(r, 0){ + if math.IsNaN(r) || math.IsInf(r, 0 ){ + targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()}) continue } } @@ -430,9 +420,32 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } } targetMetricData = append(targetMetricData, targetData) - sort.Slice(rule.Conditions.Items, func(i, j int) bool { - return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity] - }) + } + return targetMetricData, nil +} +//CheckCondition check whether rule conditions triggered or not +//if triggered returns an array of ConditionResult +//sort conditions by severity desc before check , and then if condition is true, then continue check another group +func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ + queryResult, err := engine.ExecuteQuery(rule) + conditionResult := &alerting.ConditionResult{ + QueryResult: queryResult, + } + if err != nil { + return conditionResult, err + } + + var resultItems []alerting.ConditionResultItem + targetMetricData, err := engine.GetTargetMetricData(rule) + if err != nil { + return nil, err + } + for idx, targetData := range targetMetricData { + if idx == 0 { + sort.Slice(rule.Conditions.Items, func(i, j int) bool { + return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity] + }) + } LoopCondition: for _, cond := range rule.Conditions.Items { conditionExpression, err := cond.GenerateConditionExpression() @@ -451,6 +464,11 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } triggerCount := 0 for i := 0; i < dataLength; i++ { + if r, ok := targetData.Data[dataKey][i][1].(float64); ok { + if math.IsNaN(r){ + continue + } + } evaluateResult, err := expression.Evaluate(map[string]interface{}{ "result": targetData.Data[dataKey][i][1], }) @@ -464,10 +482,17 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } if triggerCount >= cond.MinimumPeriodMatch { log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues) - resultItems = append(resultItems, alerting.ConditionResultItem{ + resultItem := alerting.ConditionResultItem{ GroupValues: targetData.GroupValues, ConditionItem: &cond, - }) + ResultValue: targetData.Data[dataKey][i][1], + IssueTimestamp: targetData.Data[dataKey][i][0], + RelationValues: map[string]interface{}{}, + } + for _, metric := range rule.Metrics.Items{ + resultItem.RelationValues[metric.Name] = queryResult.MetricData[idx].Data[metric.Name][i][1] + } + resultItems = append(resultItems, resultItem) break LoopCondition } } @@ -583,9 +608,10 @@ func (engine *Engine) Do(rule *alerting.Rule) error { period := time.Now().Sub(rule.LastNotificationTime.Local()) //log.Error(lastAlertItem.ID, period, periodDuration) + paramsCtx := newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Format(time.RFC3339)) if lastAlertItem.ID == "" || period > periodDuration { - actionResults := performChannels(rule.Channels.Normal, conditionResults) + actionResults := performChannels(rule.Channels.Normal, paramsCtx) alertItem.ActionExecutionResults = actionResults //todo init last notification time when create task (by last alert item is notified) rule.LastNotificationTime = time.Now() @@ -624,7 +650,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { - actionResults := performChannels(rule.Channels.Escalation, conditionResults) + actionResults := performChannels(rule.Channels.Escalation, paramsCtx) alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...) //todo init last escalation time when create task (by last alert item is escalated) rule.LastEscalationTime = time.Now() @@ -639,39 +665,52 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } +func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, eventID string, eventTimestamp interface{} ) map[string]interface{}{ + var conditionParams []util.MapStr + for _, resultItem := range checkResults.ResultItems { + conditionParams = append(conditionParams, util.MapStr{ + alerting2.ParamMessage: resultItem.ConditionItem.Message, + alerting2.ParamPresetValue: resultItem.ConditionItem.Values, + alerting2.ParamStatus: resultItem.ConditionItem.Severity, + alerting2.ParamGroupValues: resultItem.GroupValues, + alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp, + alerting2.ParamResultValue: resultItem.ResultValue, + alerting2.ParamRelationValues: resultItem.RelationValues, + }) + } + paramsCtx := util.MapStr{ + alerting2.ParamRuleID: rule.ID, + alerting2.ParamResourceID: rule.Resource.ID, + alerting2.ParamResourceName: rule.Resource.Name, + alerting2.ParamEventID: eventID, + alerting2.ParamTimestamp: eventTimestamp, + alerting2.ParamResults: conditionParams, + } + return paramsCtx +} + func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) { checkResults, err := engine.CheckCondition(rule) if err != nil { return nil, fmt.Errorf("check condition error:%w", err) } - conditionResults := checkResults.ResultItems var actionResults []alerting.ActionExecutionResult + paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Format(time.RFC3339)) if len(rule.Channels.Normal) > 0 { - actionResults = performChannels(rule.Channels.Normal, conditionResults) + actionResults = performChannels(rule.Channels.Normal, paramsCtx) }else if len(rule.Channels.Escalation) > 0{ - actionResults = performChannels(rule.Channels.Escalation, conditionResults) + actionResults = performChannels(rule.Channels.Escalation, paramsCtx) }else{ return nil, fmt.Errorf("no useable channel") } return actionResults, nil } -func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResultItem) []alerting.ActionExecutionResult { - var message string - for _, conditionResult := range conditionResults { - message += fmt.Sprintf("severity: %s\t message:%s\t groups:%v\t timestamp: %v;", conditionResult.ConditionItem.Severity, conditionResult.ConditionItem.Message, conditionResult.GroupValues, time.Now()) - } - if message == ""{ - message = "normal" - } - ctx := util.MapStr{ - "ctx": util.MapStr{ - "message": message, - }, - } +func performChannels(channels []alerting.Channel, ctx map[string]interface{}) []alerting.ActionExecutionResult { + var actionResults []alerting.ActionExecutionResult for _, channel := range channels { - resBytes, err := performChannel(&channel, util.MustToJSONBytes(ctx)) + resBytes, err := performChannel(&channel, ctx) var errStr string if err != nil { errStr = err.Error() @@ -685,22 +724,29 @@ func performChannels(channels []alerting.Channel, conditionResults []alerting.Co return actionResults } -func resolveMessage(messageTemplate string, ctx []byte) ([]byte, error){ +func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){ msg := messageTemplate - tpl := fasttemplate.New(msg, "{{", "}}") - msgBuffer := bytes.NewBuffer(nil) - _, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ - keyParts := strings.Split(tag,".") - value, _, _, err := jsonparser.Get(ctx, keyParts...) - if err != nil { - return 0, err - } - return writer.Write(value) - }) + //tpl := fasttemplate.New(msg, "{{", "}}") + //msgBuffer := bytes.NewBuffer(nil) + //_, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ + // keyParts := strings.Split(tag,".") + // value, _, _, err := jsonparser.Get(ctx, keyParts...) + // if err != nil { + // return 0, err + // } + // return writer.Write(value) + //}) + //return msgBuffer.Bytes(), err + tmpl, err := template.New("alert-message").Parse(msg) + if err !=nil { + return nil, fmt.Errorf("parse message temlate error: %w", err) + } + msgBuffer := &bytes.Buffer{} + err = tmpl.Execute(msgBuffer, ctx) return msgBuffer.Bytes(), err } -func performChannel(channel *alerting.Channel, ctx []byte) ([]byte, error) { +func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error) { var act action.Action switch channel.Type { diff --git a/service/alerting/parameter.go b/service/alerting/parameter.go new file mode 100644 index 00000000..f29c258c --- /dev/null +++ b/service/alerting/parameter.go @@ -0,0 +1,32 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +type ParameterMeta struct { + Name string `json:"name"` + Type string `json:"type"` //int, float, string, date, array, object + Description string `json:"description"` + Eg string `json:"eg,omitempty"` + Properties []ParameterMeta `json:"properties,omitempty"` +} + +func GetTemplateParameters() []ParameterMeta { + return []ParameterMeta{ + {ParamRuleID, "string", "rule uuid", "c9f663tath2e5a0vksjg", nil}, + {ParamResourceID, "string", "resource uuid", "c9f663tath2e5a0vksjg", nil}, + {ParamResourceName, "string", "resource name", "es-v716", nil}, + {ParamEventID, "string", "identifier for check details", "c9f663tath2e5a0vksjx", nil}, + {ParamResults, "array", "", "", []ParameterMeta{ + {ParamMessage, "string", "", "disk used 90%", nil}, + {ParamPresetValue, "float", "", "", nil}, + {ParamStatus, "string", "", "error", nil}, + {ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil}, + {ParamIssueTimestamp, "date", "", "1652184211252", nil}, + {ParamResultValue, "float", "", "91.2", nil}, + {ParamRelationValues, "map", "", "{a:100, b:91.2}", nil}, + }}, + {ParamTimestamp, "date", "", "", nil}, + } +}