From 5ef6bc19659bb0ef445d7197fe902a07d9962185 Mon Sep 17 00:00:00 2001 From: liugq Date: Sun, 22 May 2022 14:50:30 +0800 Subject: [PATCH 01/15] update alert api --- model/alerting/condition.go | 9 +++ plugin/api/alerting/rule.go | 20 +++--- service/alerting/elasticsearch/engine.go | 79 +++++++++++++----------- service/alerting/engine.go | 2 +- 4 files changed, 66 insertions(+), 44 deletions(-) diff --git a/model/alerting/condition.go b/model/alerting/condition.go index 6c2784c4..cec3d075 100644 --- a/model/alerting/condition.go +++ b/model/alerting/condition.go @@ -10,6 +10,15 @@ type Condition struct { Operator string `json:"operator"` Items []ConditionItem `json:"items"` } +func (cond *Condition) GetMinimumPeriodMatch() int{ + var minPeriodMatch = 0 + for _, citem := range cond.Items { + if citem.MinimumPeriodMatch > minPeriodMatch { + minPeriodMatch = citem.MinimumPeriodMatch + } + } + return minPeriodMatch +} type ConditionItem struct { //MetricName string `json:"metric"` diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index ee9cc4e0..479639a9 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -577,7 +577,7 @@ func (alertAPI *AlertAPI) getPreviewMetricData(w http.ResponseWriter, req *http. End: max, BucketSize: fmt.Sprintf("%ds", bucketSize), } - metricItem, err := getRuleMetricData(rule, filterParam) + metricItem, _, err := getRuleMetricData(rule, filterParam) if err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -612,7 +612,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request End: max, BucketSize: fmt.Sprintf("%ds", bucketSize), } - metricItem, err := getRuleMetricData(rule, filterParam) + metricItem, queryResult, err := getRuleMetricData(rule, filterParam) if err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -620,16 +620,20 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request }, http.StatusInternalServerError) return } - alertAPI.WriteJSON(w, util.MapStr{ + resBody := util.MapStr{ "metric": metricItem, - }, http.StatusOK) + } + if alertAPI.GetParameter(req, "debug") == "1" { + resBody["query"] = queryResult.Query + } + alertAPI.WriteJSON(w,resBody, http.StatusOK) } -func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) (*common.MetricItem, error) { +func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) (*common.MetricItem, *alerting.QueryResult, error) { eng := alerting2.GetEngine(rule.Resource.Type) - metricData, err := eng.GetTargetMetricData(rule, true, filterParam) + metricData, queryResult, err := eng.GetTargetMetricData(rule, true, filterParam) if err != nil { - return nil, err + return nil,queryResult, err } //var filteredMetricData []alerting.MetricData //title := rule.Metrics.Formula @@ -696,7 +700,7 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) // } // } //} - return &metricItem, nil + return &metricItem,queryResult, nil } diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 4fac2a4d..6351c9a3 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -41,7 +41,6 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F if err != nil { return nil, err } - //todo generate agg if len(rule.Metrics.Items) == 0 { return nil, fmt.Errorf("metric items should not be empty") } @@ -133,7 +132,7 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) map[string]in case "rate": aggType = "max" isPipeline = true - case "medium": + case "medium": // from es version 6.6 aggType = "median_absolute_deviation" case "p99", "p95","p90","p80","p50": aggType = "percentiles" @@ -304,7 +303,11 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerti }else{ return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval) } - duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * 15, units)) + bucketCount := rule.Conditions.GetMinimumPeriodMatch() + 1 + if bucketCount <= 0 { + bucketCount = 1 + } + duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * bucketCount, units)) if err != nil { return nil, err } @@ -385,10 +388,10 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi queryResult.MetricData = metricData return queryResult, nil } -func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error){ +func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error){ queryResult, err := engine.ExecuteQuery(rule, filterParam) if err != nil { - return nil, err + return nil, queryResult, err } var targetMetricData []alerting.MetricData for _, md := range queryResult.MetricData { @@ -402,7 +405,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) if err != nil { - return nil, err + return nil, queryResult, err } dataLength := 0 for _, v := range md.Data { @@ -429,7 +432,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } result, err := expression.Evaluate(parameters) if err != nil { - return nil, err + return nil, queryResult, err } if r, ok := result.(float64); ok { if math.IsNaN(r) || math.IsInf(r, 0 ){ @@ -445,25 +448,20 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } targetMetricData = append(targetMetricData, targetData) } - return targetMetricData, nil + return targetMetricData, queryResult, nil } //CheckCondition check whether rule conditions triggered or not -//if triggered returns an array of ConditionResult +//if triggered returns an ConditionResult //sort conditions by severity desc before check , and then if condition is true, then continue check another group func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ - queryResult, err := engine.ExecuteQuery(rule, nil) + var resultItems []alerting.ConditionResultItem + targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, false, nil) conditionResult := &alerting.ConditionResult{ QueryResult: queryResult, } if err != nil { return conditionResult, err } - - var resultItems []alerting.ConditionResultItem - targetMetricData, err := engine.GetTargetMetricData(rule, false, nil) - if err != nil { - return nil, err - } for idx, targetData := range targetMetricData { if idx == 0 { sort.Slice(rule.Conditions.Items, func(i, j int) bool { @@ -604,22 +602,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error { paramsCtx = newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Unix()) var ( severity = conditionResults[0].ConditionItem.Severity - tplBytes []byte - message string - title string ) - tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) + err = attachTitleMessageToCtx(rule, paramsCtx) if err != nil { - return fmt.Errorf("resolve content template error: %w", err) + return err } - message = string(tplBytes) - paramsCtx[alerting2.ParamMessage] = message - tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx) - if err != nil { - return fmt.Errorf("resolve title template error: %w", err) - } - title = string(tplBytes) - paramsCtx[alerting2.ParamTitle] = title for _, conditionResult := range conditionResults { if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] { severity = conditionResult.ConditionItem.Severity @@ -627,8 +614,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } alertItem.Severity = severity - alertItem.Message = message - alertItem.Title = title + alertItem.Message = paramsCtx[alerting2.ParamMessage].(string) + alertItem.Title = paramsCtx[alerting2.ParamTitle].(string) alertItem.State = alerting.AlertStateAlerting if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { msg := &alerting.AlertMessage{ @@ -638,16 +625,16 @@ func (engine *Engine) Do(rule *alerting.Rule) error { ID: util.GetUUID(), Status: alerting.MessageStateAlerting, Severity: severity, - Title: title, - Message: message, + Title: alertItem.Title, + Message: alertItem.Message, } err = saveAlertMessage(msg) if err != nil { return fmt.Errorf("save alert message error: %w", err) } }else{ - alertMessage.Title = title - alertMessage.Message = message + alertMessage.Title = alertItem.Title + alertMessage.Message = alertItem.Message err = saveAlertMessage(alertMessage) if err != nil { return fmt.Errorf("save alert message error: %w", err) @@ -736,6 +723,24 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } +func attachTitleMessageToCtx(rule *alerting.Rule, paramsCtx map[string]interface{}) error{ + var ( + tplBytes []byte + err error + ) + tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) + if err != nil { + return fmt.Errorf("resolve message template error: %w", err) + } + paramsCtx[alerting2.ParamMessage] = string(tplBytes) + tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx) + if err != nil { + return fmt.Errorf("resolve title template error: %w", err) + } + paramsCtx[alerting2.ParamTitle] = string(tplBytes) + return nil +} + func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, eventID string, eventTimestamp interface{} ) map[string]interface{}{ var conditionParams []util.MapStr for _, resultItem := range checkResults.ResultItems { @@ -766,6 +771,10 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul } var actionResults []alerting.ActionExecutionResult paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Unix()) + err = attachTitleMessageToCtx(rule, paramsCtx) + if err != nil { + return nil, err + } if len(rule.Channels.Normal) > 0 { actionResults, _ = performChannels(rule.Channels.Normal, paramsCtx) }else if len(rule.Channels.Escalation) > 0{ diff --git a/service/alerting/engine.go b/service/alerting/engine.go index 5f3c548b..a6a4debd 100644 --- a/service/alerting/engine.go +++ b/service/alerting/engine.go @@ -17,7 +17,7 @@ type Engine interface { CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error) GenerateTask(rule *alerting.Rule) func(ctx context.Context) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) - GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error) + GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error) } var ( From bfd79d751b51d5355c28bca10832423f576f350a Mon Sep 17 00:00:00 2001 From: liugq Date: Sun, 22 May 2022 15:28:26 +0800 Subject: [PATCH 02/15] fixed no metric data when length of metric item equals 1 --- plugin/api/alerting/rule.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 479639a9..13dd2fa8 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -655,11 +655,19 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) continue } //filteredMetricData = append(filteredMetricData, md) + + targetData := md.Data["result"] + if len(rule.Metrics.Items) == 1 { + for k, _ := range md.Data { + targetData = md.Data[k] + break + } + } if sampleData == nil { - sampleData = md.Data["result"] + sampleData = targetData } metricItem.Lines = append(metricItem.Lines, &common.MetricLine{ - Data: md.Data["result"], + Data: targetData, BucketSize: filterParam.BucketSize, Metric: common.MetricSummary{ Label: strings.Join(md.GroupValues, "-"), From 9f9ed6894f627701603ba82dd5f3b6b26bfd3acf Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 23 May 2022 11:43:50 +0800 Subject: [PATCH 03/15] update alerting api --- model/alerting/metric.go | 1 - model/alerting/rule_test.go | 1 - plugin/api/alerting/message.go | 2 +- plugin/api/alerting/rule.go | 132 +++++++++++++----- service/alerting/elasticsearch/engine_test.go | 2 - 5 files changed, 99 insertions(+), 39 deletions(-) diff --git a/model/alerting/metric.go b/model/alerting/metric.go index 5f8fd0cb..86f7979f 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -11,7 +11,6 @@ import ( type Metric struct { PeriodInterval string `json:"period_interval"` - MaxPeriods int `json:"max_periods"` Items []MetricItem `json:"items"` Formula string `json:"formula,omitempty"` Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go index 93e1e257..1f1b4e43 100644 --- a/model/alerting/rule_test.go +++ b/model/alerting/rule_test.go @@ -61,7 +61,6 @@ func TestCreateRule( t *testing.T) { Metrics: Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []MetricItem{ {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index e5bbe3b3..5ec3a730 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -215,7 +215,7 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps exists, err = orm.Get(rule) if !exists || err != nil { log.Error(err) - h.WriteError(w, fmt.Sprintf("rule[%s] not found", rule.ID), http.StatusInternalServerError) + h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError) return } metricExpression, _ := rule.Metrics.GenerateExpression() diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 13dd2fa8..f0ecb1b8 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -7,13 +7,16 @@ package alerting import ( "fmt" log "github.com/cihub/seelog" + "github.com/r3labs/diff/v2" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" _ "infini.sh/console/service/alerting/elasticsearch" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" + "infini.sh/framework/core/queue" "infini.sh/framework/core/task" "infini.sh/framework/core/util" "infini.sh/framework/modules/elastic/api" @@ -55,7 +58,6 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p ids = append(ids, rule.ID) rule.Created = time.Now() rule.Updated = time.Now() - rule.Metrics.MaxPeriods = 15 if rule.Schedule.Interval == ""{ rule.Schedule.Interval = "1m" } @@ -68,6 +70,11 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p }, http.StatusInternalServerError) return } + saveAlertActivity("alerting_rule_change", "create", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },nil, &rule) eng := alerting2.GetEngine(rule.Resource.Type) if rule.Enabled { ruleTask := task.ScheduleTask{ @@ -189,12 +196,55 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request } +func saveActivity(activityInfo *event.Activity){ + queueConfig := queue.GetOrInitConfig("platform##activities") + if queueConfig.Labels == nil { + queueConfig.Labels = map[string]interface{}{ + "type": "platform", + "name": "activity", + "category": "elasticsearch", + "activity": true, + } + } + err := queue.Push(queueConfig, util.MustToJSONBytes(event.Event{ + Timestamp: time.Now(), + Metadata: event.EventMetadata{ + Category: "elasticsearch", + Name: "activity", + }, + Fields: util.MapStr{ + "activity": activityInfo, + }})) + if err != nil { + log.Error(err) + } +} + +func saveAlertActivity(name, typ string, labels map[string]interface{}, changelog diff.Changelog, oldState interface{}){ + activityInfo := &event.Activity{ + ID: util.GetUUID(), + Timestamp: time.Now(), + Metadata: event.ActivityMetadata{ + Category: "elasticsearch", + Group: "platform", + Name: name, + Type: typ, + Labels: labels, + }, + Changelog: changelog, + Fields: util.MapStr{ + "rule": oldState, + }, + } + saveActivity(activityInfo) +} + func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("rule_id") - obj := &alerting.Rule{} + oldRule := &alerting.Rule{} - obj.ID = id - exists, err := orm.Get(obj) + oldRule.ID = id + exists, err := orm.Get(oldRule) if !exists || err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -204,35 +254,46 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p return } - id = obj.ID - create := obj.Created - obj = &alerting.Rule{} - err = alertAPI.DecodeJSON(req, obj) + id = oldRule.ID + create := oldRule.Created + rule := &alerting.Rule{ + } + err = alertAPI.DecodeJSON(req, rule) if err != nil { alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } + rule.Metrics.Expression, err = rule.Metrics.GenerateExpression() + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + changeLog, err := util.DiffTwoObject(oldRule, rule) + if err != nil { + log.Error(err) + } //protect - obj.ID = id - obj.Created = create - obj.Updated = time.Now() - obj.Metrics.Expression, err = obj.Metrics.GenerateExpression() - if err != nil { - alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - err = orm.Update(obj) - if err != nil { - alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } + rule.ID = id + rule.Created = create + rule.Updated = time.Now() - if obj.Enabled { - exists, err = checkResourceExists(obj) + err = orm.Update(rule) + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + saveAlertActivity("alerting_rule_change", "update", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },changeLog, oldRule) + + if rule.Enabled { + exists, err = checkResourceExists(rule) if err != nil || !exists { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -242,22 +303,22 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p } //update task task.StopTask(id) - eng := alerting2.GetEngine(obj.Resource.Type) + eng := alerting2.GetEngine(rule.Resource.Type) ruleTask := task.ScheduleTask{ - ID: obj.ID, - Interval: obj.Schedule.Interval, - Description: obj.Metrics.Expression, - Task: eng.GenerateTask(obj), + ID: rule.ID, + Interval: rule.Schedule.Interval, + Description: rule.Metrics.Expression, + Task: eng.GenerateTask(rule), } task.RegisterScheduleTask(ruleTask) task.StartTask(ruleTask.ID) }else{ task.DeleteTask(id) } - clearKV(obj.ID) + clearKV(rule.ID) alertAPI.WriteJSON(w, util.MapStr{ - "_id": obj.ID, + "_id": rule.ID, "result": "updated", }, 200) } @@ -289,6 +350,11 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p log.Error(err) return } + saveAlertActivity("alerting_rule_change", "delete", util.MapStr{ + "cluster_id": obj.Resource.ID, + "rule_id": obj.ID, + "cluster_name": obj.Resource.Name, + },nil, &obj) task.DeleteTask(obj.ID) clearKV(obj.ID) @@ -654,8 +720,6 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) if len(md.Data) == 0 { continue } - //filteredMetricData = append(filteredMetricData, md) - targetData := md.Data["result"] if len(rule.Metrics.Items) == 1 { for k, _ := range md.Data { diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 82d68215..6a0eccb2 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -55,7 +55,6 @@ func TestEngine( t *testing.T) { Metrics: alerting.Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []alerting.MetricItem{ {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, @@ -204,7 +203,6 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { Metrics: alerting.Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []alerting.MetricItem{ {Name: "a", Field: "payload.elasticsearch.index_stats.total.search.query_total", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}}, {Name: "b", Field: "payload.elasticsearch.index_stats.total.search.query_time_in_millis", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}}, From 8d614422d9343e4cf743dfde11df41c60d58b058 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 23 May 2022 16:11:24 +0800 Subject: [PATCH 04/15] update alert message api --- plugin/api/alerting/message.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index 5ec3a730..1360f260 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -238,7 +238,7 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps "created": message.Created, "updated": message.Updated, "resource_name": rule.Resource.Name, - "resource_object": rule.Resource.Objects, + "resource_objects": rule.Resource.Objects, "conditions": rule.Conditions, "duration": duration.Milliseconds(), "ignored_time": message.IgnoredTime, From faf3d1be25ced80e5476416c9ba9b20ff86453e2 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 24 May 2022 17:00:46 +0800 Subject: [PATCH 05/15] add first_group_value --- service/alerting/elasticsearch/engine.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 6351c9a3..543cde8c 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -742,8 +742,16 @@ func attachTitleMessageToCtx(rule *alerting.Rule, paramsCtx map[string]interface } func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, eventID string, eventTimestamp interface{} ) map[string]interface{}{ - var conditionParams []util.MapStr - for _, resultItem := range checkResults.ResultItems { + var ( + conditionParams []util.MapStr + firstGroupValue string + firstPresetValue string + ) + for i, resultItem := range checkResults.ResultItems { + if i == 0 { + firstGroupValue = strings.Join(resultItem.GroupValues, ",") + firstPresetValue = strings.Join(resultItem.ConditionItem.Values, ",") + } conditionParams = append(conditionParams, util.MapStr{ alerting2.ParamPresetValue: resultItem.ConditionItem.Values, alerting2.Severity: resultItem.ConditionItem.Severity, @@ -760,6 +768,8 @@ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult alerting2.ParamEventID: eventID, alerting2.ParamTimestamp: eventTimestamp, alerting2.ParamResults: conditionParams, + "first_group_value": firstGroupValue, + "first_preset_value": firstPresetValue, } return paramsCtx } From fecb25c71ecacfd0ffd6543151bc45d83c225962 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 24 May 2022 17:15:52 +0800 Subject: [PATCH 06/15] add first_group_value --- service/alerting/elasticsearch/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 543cde8c..ac1c3b9f 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -599,6 +599,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } return nil }else{ + alertItem.State = alerting.AlertStateAlerting paramsCtx = newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Unix()) var ( severity = conditionResults[0].ConditionItem.Severity @@ -616,7 +617,6 @@ func (engine *Engine) Do(rule *alerting.Rule) error { alertItem.Severity = severity alertItem.Message = paramsCtx[alerting2.ParamMessage].(string) alertItem.Title = paramsCtx[alerting2.ParamTitle].(string) - alertItem.State = alerting.AlertStateAlerting if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { msg := &alerting.AlertMessage{ RuleID: rule.ID, From 3ce0621ef720f05a4300a2b6161c8cb11e29cacb Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 24 May 2022 20:28:48 +0800 Subject: [PATCH 07/15] rename preset_value to threshold --- service/alerting/constants.go | 10 +++++----- service/alerting/elasticsearch/engine.go | 10 +++++----- service/alerting/parameter.go | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/service/alerting/constants.go b/service/alerting/constants.go index 80860944..c546ca8c 100644 --- a/service/alerting/constants.go +++ b/service/alerting/constants.go @@ -19,11 +19,11 @@ const ( ParamEventID = "event_id" // 检查事件 ID ParamResults = "results" // ParamMessage = "message" //检查消息 自定义(模版渲染) - ParamTitle = "title" - ParamPresetValue = "preset_value" //检查预设值 float64 - ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} - Severity = "severity" //告警等级 - ParamTimestamp = "timestamp" //事件产生时间戳 + ParamTitle = "title" + ParamThreshold = "threshold" //检查预设值 []string + ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} + Severity = "severity" //告警等级 + ParamTimestamp = "timestamp" //事件产生时间戳 ParamGroupValues = "group_values" ParamIssueTimestamp = "issue_timestamp" ParamRelationValues = "relation_values" diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index ac1c3b9f..6f73628e 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -745,15 +745,15 @@ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult var ( conditionParams []util.MapStr firstGroupValue string - firstPresetValue string + firstThreshold string ) for i, resultItem := range checkResults.ResultItems { if i == 0 { firstGroupValue = strings.Join(resultItem.GroupValues, ",") - firstPresetValue = strings.Join(resultItem.ConditionItem.Values, ",") + firstThreshold = strings.Join(resultItem.ConditionItem.Values, ",") } conditionParams = append(conditionParams, util.MapStr{ - alerting2.ParamPresetValue: resultItem.ConditionItem.Values, + alerting2.ParamThreshold: resultItem.ConditionItem.Values, alerting2.Severity: resultItem.ConditionItem.Severity, alerting2.ParamGroupValues: resultItem.GroupValues, alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp, @@ -768,8 +768,8 @@ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult alerting2.ParamEventID: eventID, alerting2.ParamTimestamp: eventTimestamp, alerting2.ParamResults: conditionParams, - "first_group_value": firstGroupValue, - "first_preset_value": firstPresetValue, + "first_group_value": firstGroupValue, + "first_threshold": firstThreshold, } return paramsCtx } diff --git a/service/alerting/parameter.go b/service/alerting/parameter.go index 48c5d226..25ba5fcc 100644 --- a/service/alerting/parameter.go +++ b/service/alerting/parameter.go @@ -21,7 +21,7 @@ func GetTemplateParameters() []ParameterMeta { {ParamTitle, "string", "", "xxx cpu used 95%", nil}, {ParamMessage, "string", "", "disk used 90%", nil}, {ParamResults, "array", "", "", []ParameterMeta{ - {ParamPresetValue, "array", "", "[\"90\"]", nil}, + {ParamThreshold, "array", "", "[\"90\"]", nil}, {Severity, "string", "", "error", nil}, {ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil}, {ParamIssueTimestamp, "date", "", "2022-05-11T11:50:55+08:00", nil}, From 61f025be9f1febbfc0442744614b037a948b6d8b Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 25 May 2022 11:19:56 +0800 Subject: [PATCH 08/15] update alerting ignore api --- plugin/api/alerting/message.go | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index 1360f260..69b0f0d4 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -8,8 +8,10 @@ import ( "fmt" log "github.com/cihub/seelog" "infini.sh/console/model/alerting" + alerting2 "infini.sh/console/service/alerting" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" @@ -20,7 +22,7 @@ import ( func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { body := struct { - MessageIDs []string `json:"ids"` + Messages []alerting.AlertMessage `json:"messages"` }{} err := h.DecodeJSON(req, &body) if err != nil { @@ -28,14 +30,31 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, return } - if len(body.MessageIDs) == 0 { + if len(body.Messages) == 0 { h.WriteError(w, "alert ids should not be empty", http.StatusInternalServerError) return } + messageIDs := make([]string, 0, len(body.Messages)) + for _, msg := range body.Messages { + messageIDs = append(messageIDs, msg.ID) + } queryDsl := util.MapStr{ "query": util.MapStr{ - "terms": util.MapStr{ - "_id": body.MessageIDs, + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "_id": messageIDs, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": alerting.MessageStateAlerting, + }, + }, + }, + }, }, }, "script": util.MapStr{ @@ -48,9 +67,14 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, log.Error(err) return } + //delete kv cache + for _, msg := range body.Messages { + _ = kv.DeleteKey(alerting2.KVLastMessageState, []byte(msg.RuleID)) + } + h.WriteJSON(w, util.MapStr{ - "ids": body.MessageIDs, + "ids": messageIDs, "result": "updated", }, 200) } From 0438651474d5d4a7d1876c9da28afd56f488bc7a Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 25 May 2022 12:58:04 +0800 Subject: [PATCH 09/15] update alerting ignore api --- plugin/api/alerting/message.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index 69b0f0d4..fb57fe0b 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -67,9 +67,17 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, log.Error(err) return } - //delete kv cache + //update kv cache for _, msg := range body.Messages { - _ = kv.DeleteKey(alerting2.KVLastMessageState, []byte(msg.RuleID)) + stateBytes, err := kv.GetValue(alerting2.KVLastMessageState, []byte(msg.RuleID)) + if err != nil && stateBytes != nil { + message := &alerting.AlertMessage{} + util.MustFromJSONBytes(stateBytes, message) + if message.Status == alerting.MessageStateAlerting { + message.Status = alerting.MessageStateIgnored + } + kv.AddValue(alerting2.KVLastMessageState, []byte(msg.RuleID), util.MustToJSONBytes(message)) + } } From 2c07a3f535f3587783377cd0fe67b02223950bc3 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 25 May 2022 13:21:56 +0800 Subject: [PATCH 10/15] update alerting ignore api --- plugin/api/alerting/message.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index fb57fe0b..00d13d7e 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -75,8 +75,8 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, util.MustFromJSONBytes(stateBytes, message) if message.Status == alerting.MessageStateAlerting { message.Status = alerting.MessageStateIgnored + _ = kv.AddValue(alerting2.KVLastMessageState, []byte(msg.RuleID), util.MustToJSONBytes(message)) } - kv.AddValue(alerting2.KVLastMessageState, []byte(msg.RuleID), util.MustToJSONBytes(message)) } } From 8cfea1fc36d565d28858715ee2a1d12b7686bcde Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 25 May 2022 15:42:33 +0800 Subject: [PATCH 11/15] fixed alerting bug --- plugin/api/alerting/message.go | 14 ++------- service/alerting/elasticsearch/engine.go | 38 +++++++++++++----------- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index 00d13d7e..f9296ed1 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -31,7 +31,7 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, } if len(body.Messages) == 0 { - h.WriteError(w, "alert ids should not be empty", http.StatusInternalServerError) + h.WriteError(w, "messages should not be empty", http.StatusInternalServerError) return } messageIDs := make([]string, 0, len(body.Messages)) @@ -67,17 +67,9 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, log.Error(err) return } - //update kv cache + //delete kv cache for _, msg := range body.Messages { - stateBytes, err := kv.GetValue(alerting2.KVLastMessageState, []byte(msg.RuleID)) - if err != nil && stateBytes != nil { - message := &alerting.AlertMessage{} - util.MustFromJSONBytes(stateBytes, message) - if message.Status == alerting.MessageStateAlerting { - message.Status = alerting.MessageStateIgnored - _ = kv.AddValue(alerting2.KVLastMessageState, []byte(msg.RuleID), util.MustToJSONBytes(message)) - } - } + _ = kv.DeleteKey(alerting2.KVLastMessageState, []byte(msg.RuleID)) } diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 6f73628e..b2f84df7 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -547,14 +547,18 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if alertItem != nil { - for _, actionResult := range alertItem.ActionExecutionResults { - if actionResult.Error != "" { - alertItem.Error = actionResult.Error + if err != nil{ + alertItem.State = alerting.AlertStateError + alertItem.Error = err.Error() + }else { + for _, actionResult := range alertItem.ActionExecutionResults { + if actionResult.Error != "" { + alertItem.Error = actionResult.Error + alertItem.State = alerting.AlertStateError + } } } - if alertItem.Error != ""{ - alertItem.State = alerting.AlertStateError - } + err = orm.Save(alertItem) if err != nil { log.Error(err) @@ -976,19 +980,19 @@ func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.Alert if err != nil { return nil, err } - if messageBytes == nil { - return nil, nil - } message := &alerting.AlertMessage{} - err = util.FromJSONBytes(messageBytes, message) - if err != nil { - return nil, err + if messageBytes != nil { + + err = util.FromJSONBytes(messageBytes, message) + if err != nil { + return nil, err + } + if time.Now().Sub(message.Updated) <= duration { + return message, nil + } } - if time.Now().Sub(message.Updated) > duration { - err = getLastAlertMessageFromES(ruleID, message) - return message, err - } - return message, nil + err = getLastAlertMessageFromES(ruleID, message) + return message, err } func saveAlertMessageToES(message *alerting.AlertMessage) error { From e1471378ce44a869d3adb56cbdcb49ce3cb72830 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 25 May 2022 15:48:03 +0800 Subject: [PATCH 12/15] fixed alerting bug --- service/alerting/elasticsearch/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index b2f84df7..df6efca0 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -495,7 +495,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes "result": targetData.Data[dataKey][i][1], }) if err != nil { - return nil, err + return nil, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err) } if evaluateResult == true { triggerCount += 1 From e72f0e940a7bf4817f93212fc4433ca10dfbc544 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 25 May 2022 16:40:07 +0800 Subject: [PATCH 13/15] fixed alerting bug --- service/alerting/elasticsearch/engine.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index df6efca0..62d1c708 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -943,7 +943,7 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti } } -func getLastAlertMessageFromES(ruleID string, message *alerting.AlertMessage) error { +func getLastAlertMessageFromES(ruleID string) (*alerting.AlertMessage, error) { queryDsl := util.MapStr{ "size": 1, "sort": []util.MapStr{ @@ -966,13 +966,15 @@ func getLastAlertMessageFromES(ruleID string, message *alerting.AlertMessage) e } err, searchResult := orm.Search(alerting.AlertMessage{}, &q ) if err != nil { - return err + return nil, err } if len(searchResult.Result) == 0 { - return nil + return nil, nil } messageBytes := util.MustToJSONBytes(searchResult.Result[0]) - return util.FromJSONBytes(messageBytes, message) + message := &alerting.AlertMessage{} + err = util.FromJSONBytes(messageBytes, message) + return message, err } func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.AlertMessage, error ){ @@ -991,7 +993,7 @@ func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.Alert return message, nil } } - err = getLastAlertMessageFromES(ruleID, message) + message, err = getLastAlertMessageFromES(ruleID) return message, err } From 9bb29fba12d5fbfd931d011ebf6c98dde6497d7f Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 25 May 2022 16:48:30 +0800 Subject: [PATCH 14/15] fixed alerting bug --- service/alerting/elasticsearch/engine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 62d1c708..722cd1ee 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -495,7 +495,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes "result": targetData.Data[dataKey][i][1], }) if err != nil { - return nil, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err) + return conditionResult, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err) } if evaluateResult == true { triggerCount += 1 From f39ddacb0f7de417de33ceccfcf2b2f2aa8d4e35 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 25 May 2022 17:39:35 +0800 Subject: [PATCH 15/15] add rule name --- model/alerting/rule.go | 2 +- service/alerting/elasticsearch/engine.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/model/alerting/rule.go b/model/alerting/rule.go index c6e4c223..e676c61d 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -13,7 +13,7 @@ type Rule struct { ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` - //Name string `json:"name" elastic_mapping:"name:{type:keyword,copy_to:search_text}"` + Name string `json:"name" elastic_mapping:"name:{type:keyword,copy_to:search_text}"` Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:keyword}"` Resource Resource `json:"resource" elastic_mapping:"resource:{type:object}"` Metrics Metric `json:"metrics" elastic_mapping:"metrics:{type:object}"` diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 722cd1ee..b19b4cfb 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -486,6 +486,10 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } triggerCount := 0 for i := 0; i < dataLength; i++ { + //clear nil value + if targetData.Data[dataKey][i][1] == nil { + continue + } if r, ok := targetData.Data[dataKey][i][1].(float64); ok { if math.IsNaN(r){ continue