diff --git a/model/alerting/condition.go b/model/alerting/condition.go index 2d53dbd6..494f790a 100644 --- a/model/alerting/condition.go +++ b/model/alerting/condition.go @@ -53,6 +53,9 @@ type ConditionResult struct { type ConditionResultItem struct { GroupValues []string `json:"group_values"` ConditionItem *ConditionItem `json:"condition_item"` + IssueTimestamp interface{} `json:"issue_timestamp"` + ResultValue interface{} `json:"result_value"` //满足条件最后一个值 + RelationValues map[string]interface{} `json:"relation_values"` } type Severity string diff --git a/model/alerting/metric.go b/model/alerting/metric.go index c698e000..ca40e945 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -44,6 +44,7 @@ type MetricItem struct { Field string `json:"field"` Statistic string `json:"statistic"` Group []string `json:"group"` //bucket group + Limit int `json:"limit"` } type QueryResult struct { diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 293f0ec2..46829847 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -74,3 +74,7 @@ func (tr *TimeRange) Include( t time.Time) bool { currentTimeStr := t.Format("15:04") return tr.Start <= currentTimeStr && currentTimeStr <= tr.End } + +//ctx +//rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values], +//check_status ,timestamp, \ No newline at end of file diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index ca27be12..daf3c0a9 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -18,6 +18,7 @@ type AlertAPI struct { func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule) api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule) + api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.sendTestMessage) api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule) @@ -34,6 +35,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.searchAlert) api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.getAlert) api.HandleAPIMethod(api.POST, "/alerting/alert/_acknowledge", alert.acknowledgeAlert) + api.HandleAPIMethod(api.GET, "/alerting/template/parameters", alert.getTemplateParams) //just for test diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index bd556ab5..edf46a52 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -398,6 +398,29 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p "_id": id, }, http.StatusOK) } + +func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + rule := alerting.Rule{} + err := alertAPI.DecodeJSON(req, &rule) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + eng := alerting2.GetEngine(rule.Resource.Type) + actionResults, err := eng.Test(&rule) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + alertAPI.WriteJSON(w, util.MapStr{ + "action_results": actionResults, + }, http.StatusOK) + +} func checkResourceExists(rule *alerting.Rule) (bool, error) { if rule.Resource.ID == "" { return false, fmt.Errorf("resource id can not be empty") @@ -419,6 +442,12 @@ func checkResourceExists(rule *alerting.Rule) (bool, error) { } } +func (alertAPI *AlertAPI) getTemplateParams(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + alertAPI.WriteJSON(w, util.MapStr{ + "template_params": alerting2.GetTemplateParameters(), + }, http.StatusOK) +} + //func (alertAPI *AlertAPI) testRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { // rule := alerting.Rule{ // ID: util.GetUUID(), diff --git a/service/alerting/constants.go b/service/alerting/constants.go index 013f80b0..ad36c95b 100644 --- a/service/alerting/constants.go +++ b/service/alerting/constants.go @@ -9,3 +9,22 @@ const ( KVLastTermStartTime = "alert_last_term_start_time" KVLastEscalationTime = "alert_last_escalation_time" ) + + +const ( + ParamRuleID = "rule_id" //规则 UUID + ParamResourceID = "resource_id" // 资源 UUID + ParamResourceName = "resource_name" // 资源名称 如集群名称 es-v714 + ParamEventID = "event_id" // 检查事件 ID + ParamResults = "results" // + ParamMessage = "message" //检查消息 自定义 + ParamPresetValue = "preset_value" //检查预设值 float64 + ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} + ParamStatus = "status" //状态 + ParamTimestamp = "timestamp" //事件产生时间戳 + ParamGroupValues = "group_values" + ParamIssueTimestamp = "issue_timestamp" + ParamRelationValues = "relation_values" +//rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values], +//check_status ,timestamp, +) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 669df8b3..886a1a3f 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -9,9 +9,7 @@ import ( "context" "fmt" "github.com/Knetic/govaluate" - "github.com/buger/jsonparser" log "github.com/cihub/seelog" - "github.com/valyala/fasttemplate" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" "infini.sh/console/service/alerting/action" @@ -19,12 +17,12 @@ import ( "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" - "io" "math" "runtime/debug" "sort" "strconv" "strings" + "text/template" "time" ) @@ -67,6 +65,11 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { } var rootAggs util.MapStr groups := rule.Metrics.Items[0].Group + limit := rule.Metrics.Items[0].Limit + //top group 10 + if limit <= 0 { + limit = 10 + } if grpLength := len(groups); grpLength > 0 { var lastGroupAgg util.MapStr @@ -74,7 +77,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { groupAgg := util.MapStr{ "terms": util.MapStr{ "field": groups[i], - "size": 500, + "size": limit, }, } groupID := util.GetUUID() @@ -360,42 +363,33 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, e queryResult.MetricData = metricData return queryResult, nil } -//CheckCondition check whether rule conditions triggered or not -//if triggered returns an array of ConditionResult -//sort conditions by severity desc before check , and then if condition is true, then continue check another group -func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ +func (engine *Engine) GetTargetMetricData(rule *alerting.Rule)([]alerting.MetricData, error){ queryResult, err := engine.ExecuteQuery(rule) - conditionResult := &alerting.ConditionResult{ - QueryResult: queryResult, - } if err != nil { - return conditionResult, err + return nil, err } - - var resultItems []alerting.ConditionResultItem var targetMetricData []alerting.MetricData for _, md := range queryResult.MetricData { var targetData alerting.MetricData if len(rule.Metrics.Items) == 1 { targetData = md - }else{ + } else { targetData = alerting.MetricData{ GroupValues: md.GroupValues, - Data: map[string][]alerting.TimeMetricData{}, + Data: map[string][]alerting.TimeMetricData{}, } expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) if err != nil { - return conditionResult, err + return nil, err } dataLength := 0 for _, v := range md.Data { dataLength = len(v) break } - DataLoop: + DataLoop: for i := 0; i < dataLength; i++ { - parameters := map[string]interface{}{ - } + parameters := map[string]interface{}{} var timestamp interface{} for k, v := range md.Data { if len(k) == 20 { @@ -413,10 +407,11 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } result, err := expression.Evaluate(parameters) if err != nil { - return conditionResult, err + return nil, err } if r, ok := result.(float64); ok { - if math.IsNaN(r) || math.IsInf(r, 0){ + if math.IsNaN(r) || math.IsInf(r, 0 ){ + targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()}) continue } } @@ -425,9 +420,32 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } } targetMetricData = append(targetMetricData, targetData) - sort.Slice(rule.Conditions.Items, func(i, j int) bool { - return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity] - }) + } + return targetMetricData, nil +} +//CheckCondition check whether rule conditions triggered or not +//if triggered returns an array of ConditionResult +//sort conditions by severity desc before check , and then if condition is true, then continue check another group +func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ + queryResult, err := engine.ExecuteQuery(rule) + conditionResult := &alerting.ConditionResult{ + QueryResult: queryResult, + } + if err != nil { + return conditionResult, err + } + + var resultItems []alerting.ConditionResultItem + targetMetricData, err := engine.GetTargetMetricData(rule) + if err != nil { + return nil, err + } + for idx, targetData := range targetMetricData { + if idx == 0 { + sort.Slice(rule.Conditions.Items, func(i, j int) bool { + return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity] + }) + } LoopCondition: for _, cond := range rule.Conditions.Items { conditionExpression, err := cond.GenerateConditionExpression() @@ -446,6 +464,11 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } triggerCount := 0 for i := 0; i < dataLength; i++ { + if r, ok := targetData.Data[dataKey][i][1].(float64); ok { + if math.IsNaN(r){ + continue + } + } evaluateResult, err := expression.Evaluate(map[string]interface{}{ "result": targetData.Data[dataKey][i][1], }) @@ -459,10 +482,17 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } if triggerCount >= cond.MinimumPeriodMatch { log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues) - resultItems = append(resultItems, alerting.ConditionResultItem{ + resultItem := alerting.ConditionResultItem{ GroupValues: targetData.GroupValues, ConditionItem: &cond, - }) + ResultValue: targetData.Data[dataKey][i][1], + IssueTimestamp: targetData.Data[dataKey][i][0], + RelationValues: map[string]interface{}{}, + } + for _, metric := range rule.Metrics.Items{ + resultItem.RelationValues[metric.Name] = queryResult.MetricData[idx].Data[metric.Name][i][1] + } + resultItems = append(resultItems, resultItem) break LoopCondition } } @@ -578,9 +608,10 @@ func (engine *Engine) Do(rule *alerting.Rule) error { period := time.Now().Sub(rule.LastNotificationTime.Local()) //log.Error(lastAlertItem.ID, period, periodDuration) + paramsCtx := newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Format(time.RFC3339)) if lastAlertItem.ID == "" || period > periodDuration { - actionResults := performChannels(rule.Channels.Normal, conditionResults) + actionResults := performChannels(rule.Channels.Normal, paramsCtx) alertItem.ActionExecutionResults = actionResults //todo init last notification time when create task (by last alert item is notified) rule.LastNotificationTime = time.Now() @@ -619,7 +650,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { - actionResults := performChannels(rule.Channels.Escalation, conditionResults) + actionResults := performChannels(rule.Channels.Escalation, paramsCtx) alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...) //todo init last escalation time when create task (by last alert item is escalated) rule.LastEscalationTime = time.Now() @@ -634,19 +665,52 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } -func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResultItem) []alerting.ActionExecutionResult { - var message string - for _, conditionResult := range conditionResults { - message += fmt.Sprintf("severity: %s\t message:%s\t groups:%v\t timestamp: %v;", conditionResult.ConditionItem.Severity, conditionResult.ConditionItem.Message, conditionResult.GroupValues, time.Now()) +func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, eventID string, eventTimestamp interface{} ) map[string]interface{}{ + var conditionParams []util.MapStr + for _, resultItem := range checkResults.ResultItems { + conditionParams = append(conditionParams, util.MapStr{ + alerting2.ParamMessage: resultItem.ConditionItem.Message, + alerting2.ParamPresetValue: resultItem.ConditionItem.Values, + alerting2.ParamStatus: resultItem.ConditionItem.Severity, + alerting2.ParamGroupValues: resultItem.GroupValues, + alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp, + alerting2.ParamResultValue: resultItem.ResultValue, + alerting2.ParamRelationValues: resultItem.RelationValues, + }) } - ctx := util.MapStr{ - "ctx": util.MapStr{ - "message": message, - }, + paramsCtx := util.MapStr{ + alerting2.ParamRuleID: rule.ID, + alerting2.ParamResourceID: rule.Resource.ID, + alerting2.ParamResourceName: rule.Resource.Name, + alerting2.ParamEventID: eventID, + alerting2.ParamTimestamp: eventTimestamp, + alerting2.ParamResults: conditionParams, + } + return paramsCtx +} + +func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) { + checkResults, err := engine.CheckCondition(rule) + if err != nil { + return nil, fmt.Errorf("check condition error:%w", err) } + var actionResults []alerting.ActionExecutionResult + paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Format(time.RFC3339)) + if len(rule.Channels.Normal) > 0 { + actionResults = performChannels(rule.Channels.Normal, paramsCtx) + }else if len(rule.Channels.Escalation) > 0{ + actionResults = performChannels(rule.Channels.Escalation, paramsCtx) + }else{ + return nil, fmt.Errorf("no useable channel") + } + return actionResults, nil +} + +func performChannels(channels []alerting.Channel, ctx map[string]interface{}) []alerting.ActionExecutionResult { + var actionResults []alerting.ActionExecutionResult for _, channel := range channels { - resBytes, err := performChannel(&channel, util.MustToJSONBytes(ctx)) + resBytes, err := performChannel(&channel, ctx) var errStr string if err != nil { errStr = err.Error() @@ -660,22 +724,29 @@ func performChannels(channels []alerting.Channel, conditionResults []alerting.Co return actionResults } -func resolveMessage(messageTemplate string, ctx []byte) ([]byte, error){ +func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){ msg := messageTemplate - tpl := fasttemplate.New(msg, "{{", "}}") - msgBuffer := bytes.NewBuffer(nil) - _, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ - keyParts := strings.Split(tag,".") - value, _, _, err := jsonparser.Get(ctx, keyParts...) - if err != nil { - return 0, err - } - return writer.Write(value) - }) + //tpl := fasttemplate.New(msg, "{{", "}}") + //msgBuffer := bytes.NewBuffer(nil) + //_, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ + // keyParts := strings.Split(tag,".") + // value, _, _, err := jsonparser.Get(ctx, keyParts...) + // if err != nil { + // return 0, err + // } + // return writer.Write(value) + //}) + //return msgBuffer.Bytes(), err + tmpl, err := template.New("alert-message").Parse(msg) + if err !=nil { + return nil, fmt.Errorf("parse message temlate error: %w", err) + } + msgBuffer := &bytes.Buffer{} + err = tmpl.Execute(msgBuffer, ctx) return msgBuffer.Bytes(), err } -func performChannel(channel *alerting.Channel, ctx []byte) ([]byte, error) { +func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error) { var act action.Action switch channel.Type { diff --git a/service/alerting/engine.go b/service/alerting/engine.go index 5151b7d5..d9dece81 100644 --- a/service/alerting/engine.go +++ b/service/alerting/engine.go @@ -16,6 +16,7 @@ type Engine interface { ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error) GenerateTask(rule *alerting.Rule) func(ctx context.Context) + Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) } var ( diff --git a/service/alerting/parameter.go b/service/alerting/parameter.go new file mode 100644 index 00000000..f29c258c --- /dev/null +++ b/service/alerting/parameter.go @@ -0,0 +1,32 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +type ParameterMeta struct { + Name string `json:"name"` + Type string `json:"type"` //int, float, string, date, array, object + Description string `json:"description"` + Eg string `json:"eg,omitempty"` + Properties []ParameterMeta `json:"properties,omitempty"` +} + +func GetTemplateParameters() []ParameterMeta { + return []ParameterMeta{ + {ParamRuleID, "string", "rule uuid", "c9f663tath2e5a0vksjg", nil}, + {ParamResourceID, "string", "resource uuid", "c9f663tath2e5a0vksjg", nil}, + {ParamResourceName, "string", "resource name", "es-v716", nil}, + {ParamEventID, "string", "identifier for check details", "c9f663tath2e5a0vksjx", nil}, + {ParamResults, "array", "", "", []ParameterMeta{ + {ParamMessage, "string", "", "disk used 90%", nil}, + {ParamPresetValue, "float", "", "", nil}, + {ParamStatus, "string", "", "error", nil}, + {ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil}, + {ParamIssueTimestamp, "date", "", "1652184211252", nil}, + {ParamResultValue, "float", "", "91.2", nil}, + {ParamRelationValues, "map", "", "{a:100, b:91.2}", nil}, + }}, + {ParamTimestamp, "date", "", "", nil}, + } +}