From 9f9ed6894f627701603ba82dd5f3b6b26bfd3acf Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 23 May 2022 11:43:50 +0800 Subject: [PATCH] update alerting api --- model/alerting/metric.go | 1 - model/alerting/rule_test.go | 1 - plugin/api/alerting/message.go | 2 +- plugin/api/alerting/rule.go | 132 +++++++++++++----- service/alerting/elasticsearch/engine_test.go | 2 - 5 files changed, 99 insertions(+), 39 deletions(-) diff --git a/model/alerting/metric.go b/model/alerting/metric.go index 5f8fd0cb..86f7979f 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -11,7 +11,6 @@ import ( type Metric struct { PeriodInterval string `json:"period_interval"` - MaxPeriods int `json:"max_periods"` Items []MetricItem `json:"items"` Formula string `json:"formula,omitempty"` Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go index 93e1e257..1f1b4e43 100644 --- a/model/alerting/rule_test.go +++ b/model/alerting/rule_test.go @@ -61,7 +61,6 @@ func TestCreateRule( t *testing.T) { Metrics: Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []MetricItem{ {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index e5bbe3b3..5ec3a730 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -215,7 +215,7 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps exists, err = orm.Get(rule) if !exists || err != nil { log.Error(err) - h.WriteError(w, fmt.Sprintf("rule[%s] not found", rule.ID), http.StatusInternalServerError) + h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError) return } metricExpression, _ := rule.Metrics.GenerateExpression() diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 13dd2fa8..f0ecb1b8 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -7,13 +7,16 @@ package alerting import ( "fmt" log "github.com/cihub/seelog" + "github.com/r3labs/diff/v2" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" _ "infini.sh/console/service/alerting/elasticsearch" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" + "infini.sh/framework/core/queue" "infini.sh/framework/core/task" "infini.sh/framework/core/util" "infini.sh/framework/modules/elastic/api" @@ -55,7 +58,6 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p ids = append(ids, rule.ID) rule.Created = time.Now() rule.Updated = time.Now() - rule.Metrics.MaxPeriods = 15 if rule.Schedule.Interval == ""{ rule.Schedule.Interval = "1m" } @@ -68,6 +70,11 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p }, http.StatusInternalServerError) return } + saveAlertActivity("alerting_rule_change", "create", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },nil, &rule) eng := alerting2.GetEngine(rule.Resource.Type) if rule.Enabled { ruleTask := task.ScheduleTask{ @@ -189,12 +196,55 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request } +func saveActivity(activityInfo *event.Activity){ + queueConfig := queue.GetOrInitConfig("platform##activities") + if queueConfig.Labels == nil { + queueConfig.Labels = map[string]interface{}{ + "type": "platform", + "name": "activity", + "category": "elasticsearch", + "activity": true, + } + } + err := queue.Push(queueConfig, util.MustToJSONBytes(event.Event{ + Timestamp: time.Now(), + Metadata: event.EventMetadata{ + Category: "elasticsearch", + Name: "activity", + }, + Fields: util.MapStr{ + "activity": activityInfo, + }})) + if err != nil { + log.Error(err) + } +} + +func saveAlertActivity(name, typ string, labels map[string]interface{}, changelog diff.Changelog, oldState interface{}){ + activityInfo := &event.Activity{ + ID: util.GetUUID(), + Timestamp: time.Now(), + Metadata: event.ActivityMetadata{ + Category: "elasticsearch", + Group: "platform", + Name: name, + Type: typ, + Labels: labels, + }, + Changelog: changelog, + Fields: util.MapStr{ + "rule": oldState, + }, + } + saveActivity(activityInfo) +} + func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("rule_id") - obj := &alerting.Rule{} + oldRule := &alerting.Rule{} - obj.ID = id - exists, err := orm.Get(obj) + oldRule.ID = id + exists, err := orm.Get(oldRule) if !exists || err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -204,35 +254,46 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p return } - id = obj.ID - create := obj.Created - obj = &alerting.Rule{} - err = alertAPI.DecodeJSON(req, obj) + id = oldRule.ID + create := oldRule.Created + rule := &alerting.Rule{ + } + err = alertAPI.DecodeJSON(req, rule) if err != nil { alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } + rule.Metrics.Expression, err = rule.Metrics.GenerateExpression() + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + changeLog, err := util.DiffTwoObject(oldRule, rule) + if err != nil { + log.Error(err) + } //protect - obj.ID = id - obj.Created = create - obj.Updated = time.Now() - obj.Metrics.Expression, err = obj.Metrics.GenerateExpression() - if err != nil { - alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - err = orm.Update(obj) - if err != nil { - alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } + rule.ID = id + rule.Created = create + rule.Updated = time.Now() - if obj.Enabled { - exists, err = checkResourceExists(obj) + err = orm.Update(rule) + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + saveAlertActivity("alerting_rule_change", "update", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },changeLog, oldRule) + + if rule.Enabled { + exists, err = checkResourceExists(rule) if err != nil || !exists { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -242,22 +303,22 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p } //update task task.StopTask(id) - eng := alerting2.GetEngine(obj.Resource.Type) + eng := alerting2.GetEngine(rule.Resource.Type) ruleTask := task.ScheduleTask{ - ID: obj.ID, - Interval: obj.Schedule.Interval, - Description: obj.Metrics.Expression, - Task: eng.GenerateTask(obj), + ID: rule.ID, + Interval: rule.Schedule.Interval, + Description: rule.Metrics.Expression, + Task: eng.GenerateTask(rule), } task.RegisterScheduleTask(ruleTask) task.StartTask(ruleTask.ID) }else{ task.DeleteTask(id) } - clearKV(obj.ID) + clearKV(rule.ID) alertAPI.WriteJSON(w, util.MapStr{ - "_id": obj.ID, + "_id": rule.ID, "result": "updated", }, 200) } @@ -289,6 +350,11 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p log.Error(err) return } + saveAlertActivity("alerting_rule_change", "delete", util.MapStr{ + "cluster_id": obj.Resource.ID, + "rule_id": obj.ID, + "cluster_name": obj.Resource.Name, + },nil, &obj) task.DeleteTask(obj.ID) clearKV(obj.ID) @@ -654,8 +720,6 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) if len(md.Data) == 0 { continue } - //filteredMetricData = append(filteredMetricData, md) - targetData := md.Data["result"] if len(rule.Metrics.Items) == 1 { for k, _ := range md.Data { diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 82d68215..6a0eccb2 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -55,7 +55,6 @@ func TestEngine( t *testing.T) { Metrics: alerting.Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []alerting.MetricItem{ {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, @@ -204,7 +203,6 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { Metrics: alerting.Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []alerting.MetricItem{ {Name: "a", Field: "payload.elasticsearch.index_stats.total.search.query_total", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}}, {Name: "b", Field: "payload.elasticsearch.index_stats.total.search.query_time_in_millis", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}},