diff --git a/model/alerting/condition.go b/model/alerting/condition.go index 6c2784c4..cec3d075 100644 --- a/model/alerting/condition.go +++ b/model/alerting/condition.go @@ -10,6 +10,15 @@ type Condition struct { Operator string `json:"operator"` Items []ConditionItem `json:"items"` } +func (cond *Condition) GetMinimumPeriodMatch() int{ + var minPeriodMatch = 0 + for _, citem := range cond.Items { + if citem.MinimumPeriodMatch > minPeriodMatch { + minPeriodMatch = citem.MinimumPeriodMatch + } + } + return minPeriodMatch +} type ConditionItem struct { //MetricName string `json:"metric"` diff --git a/model/alerting/metric.go b/model/alerting/metric.go index 5f8fd0cb..86f7979f 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -11,7 +11,6 @@ import ( type Metric struct { PeriodInterval string `json:"period_interval"` - MaxPeriods int `json:"max_periods"` Items []MetricItem `json:"items"` Formula string `json:"formula,omitempty"` Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 diff --git a/model/alerting/rule.go b/model/alerting/rule.go index c6e4c223..e676c61d 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -13,7 +13,7 @@ type Rule struct { ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` - //Name string `json:"name" elastic_mapping:"name:{type:keyword,copy_to:search_text}"` + Name string `json:"name" elastic_mapping:"name:{type:keyword,copy_to:search_text}"` Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:keyword}"` Resource Resource `json:"resource" elastic_mapping:"resource:{type:object}"` Metrics Metric `json:"metrics" elastic_mapping:"metrics:{type:object}"` diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go index 93e1e257..1f1b4e43 100644 --- a/model/alerting/rule_test.go +++ b/model/alerting/rule_test.go @@ -61,7 +61,6 @@ func TestCreateRule( t *testing.T) { Metrics: Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []MetricItem{ {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index e5bbe3b3..f9296ed1 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -8,8 +8,10 @@ import ( "fmt" log "github.com/cihub/seelog" "infini.sh/console/model/alerting" + alerting2 "infini.sh/console/service/alerting" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" @@ -20,7 +22,7 @@ import ( func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { body := struct { - MessageIDs []string `json:"ids"` + Messages []alerting.AlertMessage `json:"messages"` }{} err := h.DecodeJSON(req, &body) if err != nil { @@ -28,14 +30,31 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, return } - if len(body.MessageIDs) == 0 { - h.WriteError(w, "alert ids should not be empty", http.StatusInternalServerError) + if len(body.Messages) == 0 { + h.WriteError(w, "messages should not be empty", http.StatusInternalServerError) return } + messageIDs := make([]string, 0, len(body.Messages)) + for _, msg := range body.Messages { + messageIDs = append(messageIDs, msg.ID) + } queryDsl := util.MapStr{ "query": util.MapStr{ - "terms": util.MapStr{ - "_id": body.MessageIDs, + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "_id": messageIDs, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": alerting.MessageStateAlerting, + }, + }, + }, + }, }, }, "script": util.MapStr{ @@ -48,9 +67,14 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, log.Error(err) return } + //delete kv cache + for _, msg := range body.Messages { + _ = kv.DeleteKey(alerting2.KVLastMessageState, []byte(msg.RuleID)) + } + h.WriteJSON(w, util.MapStr{ - "ids": body.MessageIDs, + "ids": messageIDs, "result": "updated", }, 200) } @@ -215,7 +239,7 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps exists, err = orm.Get(rule) if !exists || err != nil { log.Error(err) - h.WriteError(w, fmt.Sprintf("rule[%s] not found", rule.ID), http.StatusInternalServerError) + h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError) return } metricExpression, _ := rule.Metrics.GenerateExpression() @@ -238,7 +262,7 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps "created": message.Created, "updated": message.Updated, "resource_name": rule.Resource.Name, - "resource_object": rule.Resource.Objects, + "resource_objects": rule.Resource.Objects, "conditions": rule.Conditions, "duration": duration.Milliseconds(), "ignored_time": message.IgnoredTime, diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index ee9cc4e0..f0ecb1b8 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -7,13 +7,16 @@ package alerting import ( "fmt" log "github.com/cihub/seelog" + "github.com/r3labs/diff/v2" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" _ "infini.sh/console/service/alerting/elasticsearch" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" + "infini.sh/framework/core/queue" "infini.sh/framework/core/task" "infini.sh/framework/core/util" "infini.sh/framework/modules/elastic/api" @@ -55,7 +58,6 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p ids = append(ids, rule.ID) rule.Created = time.Now() rule.Updated = time.Now() - rule.Metrics.MaxPeriods = 15 if rule.Schedule.Interval == ""{ rule.Schedule.Interval = "1m" } @@ -68,6 +70,11 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p }, http.StatusInternalServerError) return } + saveAlertActivity("alerting_rule_change", "create", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },nil, &rule) eng := alerting2.GetEngine(rule.Resource.Type) if rule.Enabled { ruleTask := task.ScheduleTask{ @@ -189,12 +196,55 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request } +func saveActivity(activityInfo *event.Activity){ + queueConfig := queue.GetOrInitConfig("platform##activities") + if queueConfig.Labels == nil { + queueConfig.Labels = map[string]interface{}{ + "type": "platform", + "name": "activity", + "category": "elasticsearch", + "activity": true, + } + } + err := queue.Push(queueConfig, util.MustToJSONBytes(event.Event{ + Timestamp: time.Now(), + Metadata: event.EventMetadata{ + Category: "elasticsearch", + Name: "activity", + }, + Fields: util.MapStr{ + "activity": activityInfo, + }})) + if err != nil { + log.Error(err) + } +} + +func saveAlertActivity(name, typ string, labels map[string]interface{}, changelog diff.Changelog, oldState interface{}){ + activityInfo := &event.Activity{ + ID: util.GetUUID(), + Timestamp: time.Now(), + Metadata: event.ActivityMetadata{ + Category: "elasticsearch", + Group: "platform", + Name: name, + Type: typ, + Labels: labels, + }, + Changelog: changelog, + Fields: util.MapStr{ + "rule": oldState, + }, + } + saveActivity(activityInfo) +} + func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("rule_id") - obj := &alerting.Rule{} + oldRule := &alerting.Rule{} - obj.ID = id - exists, err := orm.Get(obj) + oldRule.ID = id + exists, err := orm.Get(oldRule) if !exists || err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -204,35 +254,46 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p return } - id = obj.ID - create := obj.Created - obj = &alerting.Rule{} - err = alertAPI.DecodeJSON(req, obj) + id = oldRule.ID + create := oldRule.Created + rule := &alerting.Rule{ + } + err = alertAPI.DecodeJSON(req, rule) if err != nil { alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } + rule.Metrics.Expression, err = rule.Metrics.GenerateExpression() + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + changeLog, err := util.DiffTwoObject(oldRule, rule) + if err != nil { + log.Error(err) + } //protect - obj.ID = id - obj.Created = create - obj.Updated = time.Now() - obj.Metrics.Expression, err = obj.Metrics.GenerateExpression() - if err != nil { - alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - err = orm.Update(obj) - if err != nil { - alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } + rule.ID = id + rule.Created = create + rule.Updated = time.Now() - if obj.Enabled { - exists, err = checkResourceExists(obj) + err = orm.Update(rule) + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + saveAlertActivity("alerting_rule_change", "update", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },changeLog, oldRule) + + if rule.Enabled { + exists, err = checkResourceExists(rule) if err != nil || !exists { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -242,22 +303,22 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p } //update task task.StopTask(id) - eng := alerting2.GetEngine(obj.Resource.Type) + eng := alerting2.GetEngine(rule.Resource.Type) ruleTask := task.ScheduleTask{ - ID: obj.ID, - Interval: obj.Schedule.Interval, - Description: obj.Metrics.Expression, - Task: eng.GenerateTask(obj), + ID: rule.ID, + Interval: rule.Schedule.Interval, + Description: rule.Metrics.Expression, + Task: eng.GenerateTask(rule), } task.RegisterScheduleTask(ruleTask) task.StartTask(ruleTask.ID) }else{ task.DeleteTask(id) } - clearKV(obj.ID) + clearKV(rule.ID) alertAPI.WriteJSON(w, util.MapStr{ - "_id": obj.ID, + "_id": rule.ID, "result": "updated", }, 200) } @@ -289,6 +350,11 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p log.Error(err) return } + saveAlertActivity("alerting_rule_change", "delete", util.MapStr{ + "cluster_id": obj.Resource.ID, + "rule_id": obj.ID, + "cluster_name": obj.Resource.Name, + },nil, &obj) task.DeleteTask(obj.ID) clearKV(obj.ID) @@ -577,7 +643,7 @@ func (alertAPI *AlertAPI) getPreviewMetricData(w http.ResponseWriter, req *http. End: max, BucketSize: fmt.Sprintf("%ds", bucketSize), } - metricItem, err := getRuleMetricData(rule, filterParam) + metricItem, _, err := getRuleMetricData(rule, filterParam) if err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -612,7 +678,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request End: max, BucketSize: fmt.Sprintf("%ds", bucketSize), } - metricItem, err := getRuleMetricData(rule, filterParam) + metricItem, queryResult, err := getRuleMetricData(rule, filterParam) if err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -620,16 +686,20 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request }, http.StatusInternalServerError) return } - alertAPI.WriteJSON(w, util.MapStr{ + resBody := util.MapStr{ "metric": metricItem, - }, http.StatusOK) + } + if alertAPI.GetParameter(req, "debug") == "1" { + resBody["query"] = queryResult.Query + } + alertAPI.WriteJSON(w,resBody, http.StatusOK) } -func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) (*common.MetricItem, error) { +func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) (*common.MetricItem, *alerting.QueryResult, error) { eng := alerting2.GetEngine(rule.Resource.Type) - metricData, err := eng.GetTargetMetricData(rule, true, filterParam) + metricData, queryResult, err := eng.GetTargetMetricData(rule, true, filterParam) if err != nil { - return nil, err + return nil,queryResult, err } //var filteredMetricData []alerting.MetricData //title := rule.Metrics.Formula @@ -650,12 +720,18 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) if len(md.Data) == 0 { continue } - //filteredMetricData = append(filteredMetricData, md) + targetData := md.Data["result"] + if len(rule.Metrics.Items) == 1 { + for k, _ := range md.Data { + targetData = md.Data[k] + break + } + } if sampleData == nil { - sampleData = md.Data["result"] + sampleData = targetData } metricItem.Lines = append(metricItem.Lines, &common.MetricLine{ - Data: md.Data["result"], + Data: targetData, BucketSize: filterParam.BucketSize, Metric: common.MetricSummary{ Label: strings.Join(md.GroupValues, "-"), @@ -696,7 +772,7 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) // } // } //} - return &metricItem, nil + return &metricItem,queryResult, nil } diff --git a/service/alerting/constants.go b/service/alerting/constants.go index 80860944..c546ca8c 100644 --- a/service/alerting/constants.go +++ b/service/alerting/constants.go @@ -19,11 +19,11 @@ const ( ParamEventID = "event_id" // 检查事件 ID ParamResults = "results" // ParamMessage = "message" //检查消息 自定义(模版渲染) - ParamTitle = "title" - ParamPresetValue = "preset_value" //检查预设值 float64 - ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} - Severity = "severity" //告警等级 - ParamTimestamp = "timestamp" //事件产生时间戳 + ParamTitle = "title" + ParamThreshold = "threshold" //检查预设值 []string + ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} + Severity = "severity" //告警等级 + ParamTimestamp = "timestamp" //事件产生时间戳 ParamGroupValues = "group_values" ParamIssueTimestamp = "issue_timestamp" ParamRelationValues = "relation_values" diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 4fac2a4d..b19b4cfb 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -41,7 +41,6 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F if err != nil { return nil, err } - //todo generate agg if len(rule.Metrics.Items) == 0 { return nil, fmt.Errorf("metric items should not be empty") } @@ -133,7 +132,7 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) map[string]in case "rate": aggType = "max" isPipeline = true - case "medium": + case "medium": // from es version 6.6 aggType = "median_absolute_deviation" case "p99", "p95","p90","p80","p50": aggType = "percentiles" @@ -304,7 +303,11 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerti }else{ return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval) } - duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * 15, units)) + bucketCount := rule.Conditions.GetMinimumPeriodMatch() + 1 + if bucketCount <= 0 { + bucketCount = 1 + } + duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * bucketCount, units)) if err != nil { return nil, err } @@ -385,10 +388,10 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi queryResult.MetricData = metricData return queryResult, nil } -func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error){ +func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error){ queryResult, err := engine.ExecuteQuery(rule, filterParam) if err != nil { - return nil, err + return nil, queryResult, err } var targetMetricData []alerting.MetricData for _, md := range queryResult.MetricData { @@ -402,7 +405,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) if err != nil { - return nil, err + return nil, queryResult, err } dataLength := 0 for _, v := range md.Data { @@ -429,7 +432,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } result, err := expression.Evaluate(parameters) if err != nil { - return nil, err + return nil, queryResult, err } if r, ok := result.(float64); ok { if math.IsNaN(r) || math.IsInf(r, 0 ){ @@ -445,25 +448,20 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } targetMetricData = append(targetMetricData, targetData) } - return targetMetricData, nil + return targetMetricData, queryResult, nil } //CheckCondition check whether rule conditions triggered or not -//if triggered returns an array of ConditionResult +//if triggered returns an ConditionResult //sort conditions by severity desc before check , and then if condition is true, then continue check another group func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ - queryResult, err := engine.ExecuteQuery(rule, nil) + var resultItems []alerting.ConditionResultItem + targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, false, nil) conditionResult := &alerting.ConditionResult{ QueryResult: queryResult, } if err != nil { return conditionResult, err } - - var resultItems []alerting.ConditionResultItem - targetMetricData, err := engine.GetTargetMetricData(rule, false, nil) - if err != nil { - return nil, err - } for idx, targetData := range targetMetricData { if idx == 0 { sort.Slice(rule.Conditions.Items, func(i, j int) bool { @@ -488,6 +486,10 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } triggerCount := 0 for i := 0; i < dataLength; i++ { + //clear nil value + if targetData.Data[dataKey][i][1] == nil { + continue + } if r, ok := targetData.Data[dataKey][i][1].(float64); ok { if math.IsNaN(r){ continue @@ -497,7 +499,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes "result": targetData.Data[dataKey][i][1], }) if err != nil { - return nil, err + return conditionResult, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err) } if evaluateResult == true { triggerCount += 1 @@ -549,14 +551,18 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if alertItem != nil { - for _, actionResult := range alertItem.ActionExecutionResults { - if actionResult.Error != "" { - alertItem.Error = actionResult.Error + if err != nil{ + alertItem.State = alerting.AlertStateError + alertItem.Error = err.Error() + }else { + for _, actionResult := range alertItem.ActionExecutionResults { + if actionResult.Error != "" { + alertItem.Error = actionResult.Error + alertItem.State = alerting.AlertStateError + } } } - if alertItem.Error != ""{ - alertItem.State = alerting.AlertStateError - } + err = orm.Save(alertItem) if err != nil { log.Error(err) @@ -601,25 +607,15 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } return nil }else{ + alertItem.State = alerting.AlertStateAlerting paramsCtx = newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Unix()) var ( severity = conditionResults[0].ConditionItem.Severity - tplBytes []byte - message string - title string ) - tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) + err = attachTitleMessageToCtx(rule, paramsCtx) if err != nil { - return fmt.Errorf("resolve content template error: %w", err) + return err } - message = string(tplBytes) - paramsCtx[alerting2.ParamMessage] = message - tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx) - if err != nil { - return fmt.Errorf("resolve title template error: %w", err) - } - title = string(tplBytes) - paramsCtx[alerting2.ParamTitle] = title for _, conditionResult := range conditionResults { if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] { severity = conditionResult.ConditionItem.Severity @@ -627,9 +623,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } alertItem.Severity = severity - alertItem.Message = message - alertItem.Title = title - alertItem.State = alerting.AlertStateAlerting + alertItem.Message = paramsCtx[alerting2.ParamMessage].(string) + alertItem.Title = paramsCtx[alerting2.ParamTitle].(string) if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { msg := &alerting.AlertMessage{ RuleID: rule.ID, @@ -638,16 +633,16 @@ func (engine *Engine) Do(rule *alerting.Rule) error { ID: util.GetUUID(), Status: alerting.MessageStateAlerting, Severity: severity, - Title: title, - Message: message, + Title: alertItem.Title, + Message: alertItem.Message, } err = saveAlertMessage(msg) if err != nil { return fmt.Errorf("save alert message error: %w", err) } }else{ - alertMessage.Title = title - alertMessage.Message = message + alertMessage.Title = alertItem.Title + alertMessage.Message = alertItem.Message err = saveAlertMessage(alertMessage) if err != nil { return fmt.Errorf("save alert message error: %w", err) @@ -736,11 +731,37 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } +func attachTitleMessageToCtx(rule *alerting.Rule, paramsCtx map[string]interface{}) error{ + var ( + tplBytes []byte + err error + ) + tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) + if err != nil { + return fmt.Errorf("resolve message template error: %w", err) + } + paramsCtx[alerting2.ParamMessage] = string(tplBytes) + tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx) + if err != nil { + return fmt.Errorf("resolve title template error: %w", err) + } + paramsCtx[alerting2.ParamTitle] = string(tplBytes) + return nil +} + func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, eventID string, eventTimestamp interface{} ) map[string]interface{}{ - var conditionParams []util.MapStr - for _, resultItem := range checkResults.ResultItems { + var ( + conditionParams []util.MapStr + firstGroupValue string + firstThreshold string + ) + for i, resultItem := range checkResults.ResultItems { + if i == 0 { + firstGroupValue = strings.Join(resultItem.GroupValues, ",") + firstThreshold = strings.Join(resultItem.ConditionItem.Values, ",") + } conditionParams = append(conditionParams, util.MapStr{ - alerting2.ParamPresetValue: resultItem.ConditionItem.Values, + alerting2.ParamThreshold: resultItem.ConditionItem.Values, alerting2.Severity: resultItem.ConditionItem.Severity, alerting2.ParamGroupValues: resultItem.GroupValues, alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp, @@ -755,6 +776,8 @@ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult alerting2.ParamEventID: eventID, alerting2.ParamTimestamp: eventTimestamp, alerting2.ParamResults: conditionParams, + "first_group_value": firstGroupValue, + "first_threshold": firstThreshold, } return paramsCtx } @@ -766,6 +789,10 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul } var actionResults []alerting.ActionExecutionResult paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Unix()) + err = attachTitleMessageToCtx(rule, paramsCtx) + if err != nil { + return nil, err + } if len(rule.Channels.Normal) > 0 { actionResults, _ = performChannels(rule.Channels.Normal, paramsCtx) }else if len(rule.Channels.Escalation) > 0{ @@ -920,7 +947,7 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti } } -func getLastAlertMessageFromES(ruleID string, message *alerting.AlertMessage) error { +func getLastAlertMessageFromES(ruleID string) (*alerting.AlertMessage, error) { queryDsl := util.MapStr{ "size": 1, "sort": []util.MapStr{ @@ -943,13 +970,15 @@ func getLastAlertMessageFromES(ruleID string, message *alerting.AlertMessage) e } err, searchResult := orm.Search(alerting.AlertMessage{}, &q ) if err != nil { - return err + return nil, err } if len(searchResult.Result) == 0 { - return nil + return nil, nil } messageBytes := util.MustToJSONBytes(searchResult.Result[0]) - return util.FromJSONBytes(messageBytes, message) + message := &alerting.AlertMessage{} + err = util.FromJSONBytes(messageBytes, message) + return message, err } func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.AlertMessage, error ){ @@ -957,19 +986,19 @@ func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.Alert if err != nil { return nil, err } - if messageBytes == nil { - return nil, nil - } message := &alerting.AlertMessage{} - err = util.FromJSONBytes(messageBytes, message) - if err != nil { - return nil, err + if messageBytes != nil { + + err = util.FromJSONBytes(messageBytes, message) + if err != nil { + return nil, err + } + if time.Now().Sub(message.Updated) <= duration { + return message, nil + } } - if time.Now().Sub(message.Updated) > duration { - err = getLastAlertMessageFromES(ruleID, message) - return message, err - } - return message, nil + message, err = getLastAlertMessageFromES(ruleID) + return message, err } func saveAlertMessageToES(message *alerting.AlertMessage) error { diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 82d68215..6a0eccb2 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -55,7 +55,6 @@ func TestEngine( t *testing.T) { Metrics: alerting.Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []alerting.MetricItem{ {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, @@ -204,7 +203,6 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { Metrics: alerting.Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []alerting.MetricItem{ {Name: "a", Field: "payload.elasticsearch.index_stats.total.search.query_total", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}}, {Name: "b", Field: "payload.elasticsearch.index_stats.total.search.query_time_in_millis", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}}, diff --git a/service/alerting/engine.go b/service/alerting/engine.go index 5f3c548b..a6a4debd 100644 --- a/service/alerting/engine.go +++ b/service/alerting/engine.go @@ -17,7 +17,7 @@ type Engine interface { CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error) GenerateTask(rule *alerting.Rule) func(ctx context.Context) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) - GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error) + GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error) } var ( diff --git a/service/alerting/parameter.go b/service/alerting/parameter.go index 48c5d226..25ba5fcc 100644 --- a/service/alerting/parameter.go +++ b/service/alerting/parameter.go @@ -21,7 +21,7 @@ func GetTemplateParameters() []ParameterMeta { {ParamTitle, "string", "", "xxx cpu used 95%", nil}, {ParamMessage, "string", "", "disk used 90%", nil}, {ParamResults, "array", "", "", []ParameterMeta{ - {ParamPresetValue, "array", "", "[\"90\"]", nil}, + {ParamThreshold, "array", "", "[\"90\"]", nil}, {Severity, "string", "", "error", nil}, {ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil}, {ParamIssueTimestamp, "date", "", "2022-05-11T11:50:55+08:00", nil},