From fbcd324113a6d8a19fd4b8114ad13954a22fb0e1 Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 14 Apr 2022 17:26:55 +0800 Subject: [PATCH 1/6] update alert api --- plugin/api/alerting/api.go | 9 + plugin/api/alerting/channel.go | 169 ++++++++++++++++++ plugin/api/alerting/rule.go | 63 ++++++- service/alerting/elasticsearch/engine.go | 72 +++++++- service/alerting/elasticsearch/engine_test.go | 79 +++++++- 5 files changed, 376 insertions(+), 16 deletions(-) create mode 100644 plugin/api/alerting/channel.go diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index ac6e367a..4cf3e591 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -18,5 +18,14 @@ func init() { api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule) + + api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.getChannel) + api.HandleAPIMethod(api.POST, "/alerting/channel", alert.createChannel) + api.HandleAPIMethod(api.DELETE, "/alerting/channel/:channel_id", alert.deleteChannel) + api.HandleAPIMethod(api.PUT, "/alerting/channel/:channel_id", alert.updateChannel) + api.HandleAPIMethod(api.GET, "/alerting/channel/_search", alert.searchChannel) + + //just for test + //api.HandleAPIMethod(api.GET, "/alerting/rule/test", alert.testRule) } diff --git a/plugin/api/alerting/channel.go b/plugin/api/alerting/channel.go new file mode 100644 index 00000000..c372954d --- /dev/null +++ b/plugin/api/alerting/channel.go @@ -0,0 +1,169 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "fmt" + "infini.sh/console/model/alerting" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + log "src/github.com/cihub/seelog" + "strconv" + "strings" +) + +func (h *AlertAPI) createChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var obj = &alerting.Channel{} + err := h.DecodeJSON(req, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + err = orm.Create(obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "created", + }, 200) + +} + +func (h *AlertAPI) getChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("channel_id") + + obj := alerting.Channel{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} + +func (h *AlertAPI) updateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("channel_id") + obj := alerting.Channel{} + + obj.ID = id + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + id = obj.ID + create := obj.Created + obj = alerting.Channel{} + err = h.DecodeJSON(req, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + //protect + obj.ID = id + obj.Created = create + err = orm.Update(&obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "updated", + }, 200) +} + +func (h *AlertAPI) deleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("channel_id") + + obj := alerting.Channel{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + err = orm.Delete(&obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "deleted", + }, 200) +} + +func (h *AlertAPI) searchChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + mustBuilder = &strings.Builder{} + ) + if keyword != "" { + mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) + } + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{} + queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from) + q.RawQuery = []byte(queryDSL) + + err, res := orm.Search(&alerting.Channel{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.Write(w, res.Raw) +} diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index ae3b1e4d..bb3fde1b 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -234,6 +234,7 @@ func (alertAPI *AlertAPI) searchRule(w http.ResponseWriter, req *http.Request, p log.Error(err) return } + w.Write(searchResult.Raw) } @@ -253,4 +254,64 @@ func checkResourceExists(rule *alerting.Rule) (bool, error) { default: return false, fmt.Errorf("unsupport resource type: %s", rule.Resource.Type) } -} \ No newline at end of file +} + +//func (alertAPI *AlertAPI) testRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { +// rule := alerting.Rule{ +// ID: util.GetUUID(), +// Created: time.Now(), +// Updated: time.Now(), +// Enabled: true, +// Resource: alerting.Resource{ +// ID: "c8i18llath2blrusdjng", +// Type: "elasticsearch", +// Objects: []string{".infini_metrics*"}, +// TimeField: "timestamp", +// RawFilter: map[string]interface{}{ +// "bool": util.MapStr{ +// "must": []util.MapStr{}, +// }, +// }, +// }, +// +// Metrics: alerting.Metric{ +// PeriodInterval: "1m", +// MaxPeriods: 15, +// Items: []alerting.MetricItem{ +// {Name: "a", Field: "payload.elasticsearch.node_stats.os.cpu.percent", Statistic: "p99", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, +// }, +// }, +// Conditions: alerting.Condition{ +// Operator: "any", +// Items: []alerting.ConditionItem{ +// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"}, +// }, +// }, +// +// Channels: alerting.RuleChannel{ +// Normal: []alerting.Channel{ +// {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ +// HeaderParams: map[string]string{ +// "Content-Type": "application/json", +// }, +// Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, +// Method: http.MethodPost, +// URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX", +// }}, +// }, +// ThrottlePeriod: "1h", +// AcceptTimeRange: alerting.TimeRange{ +// Start: "8:00", +// End: "21:00", +// }, +// EscalationEnabled: true, +// EscalationThrottlePeriod: "30m", +// }, +// } +// eng := alerting2.GetEngine(rule.Resource.Type) +// data, err := eng.ExecuteQuery(&rule) +// if err != nil { +// log.Error(err) +// } +// alertAPI.WriteJSON(w, data, http.StatusOK) +//} \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 99a37d6d..649f7e90 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -19,6 +19,7 @@ import ( "infini.sh/framework/core/util" "io" "sort" + "strconv" "strings" "time" ) @@ -38,11 +39,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { } basicAggs := util.MapStr{} for _, metricItem := range rule.Metrics.Items { - basicAggs[metricItem.Name] = util.MapStr{ - metricItem.Statistic: util.MapStr{ - "field": metricItem.Field, - }, - } + basicAggs[metricItem.Name] = engine.generateAgg(&metricItem) } timeAggs := util.MapStr{ "date_histogram": util.MapStr{ @@ -92,6 +89,37 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { "aggs": rootAggs, }, nil } +func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{ + var ( + aggType = "value_count" + field = metricItem.Field + ) + if field == "" || field == "*" { + field = "_id" + } + var percent = 0.0 + switch metricItem.Statistic { + case "max", "min", "sum", "avg": + aggType = metricItem.Statistic + case "count", "value_count": + aggType = "value_count" + case "medium": + aggType = "median_absolute_deviation" + case "p99", "p95","p90","p80","p50": + aggType = "percentiles" + percentStr := strings.TrimPrefix(metricItem.Statistic, "p") + percent, _ = strconv.ParseFloat(percentStr, 32) + } + aggValue := util.MapStr{ + "field": field, + } + if aggType == "percentiles" { + aggValue["percents"] = []interface{}{percent} + } + return util.MapStr{ + aggType: aggValue, + } +} func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) { query := map[string]interface{}{} @@ -141,9 +169,15 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa } } }else{ - query["bool"] = util.MapStr{ - "must": []interface{}{ - timeQuery, + must := []interface{}{ + timeQuery, + } + if _, ok := query["match_all"]; !ok { + must = append(must, query) + } + query = util.MapStr{ + "bool": util.MapStr{ + "must": must, }, } } @@ -465,7 +499,27 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti continue } if vm, ok := v.(map[string]interface{}); ok { - md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], vm["value"]}) + if metricVal, ok := vm["value"]; ok { + md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal}) + }else{ + //percentiles agg type + switch vm["values"].(type) { + case []interface{}: + for _, val := range vm["values"].([]interface{}) { + if valM, ok := val.(map[string]interface{}); ok { + md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], valM["value"]}) + } + break + } + case map[string]interface{}: + for _, val := range vm["values"].(map[string]interface{}) { + md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], val}) + break + } + } + + } + } } diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index d8409974..7db1ec2c 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -7,7 +7,6 @@ package elasticsearch import ( "fmt" "infini.sh/console/model/alerting" - "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" "sort" @@ -17,11 +16,9 @@ import ( func TestEngine( t *testing.T) { rule := alerting.Rule{ - ORMObjectBase: orm.ORMObjectBase{ - ID: util.GetUUID(), - Created: time.Now(), - Updated: time.Now(), - }, + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), Enabled: true, Resource: alerting.Resource{ ID: "c8i18llath2blrusdjng", @@ -117,3 +114,73 @@ func TestEngine( t *testing.T) { //fmt.Println(util.MustToJSON(filter)) } + +func TestGenerateAgg(t *testing.T) { + eng := &Engine{} + agg := eng.generateAgg(&alerting.MetricItem{ + Name: "a", + Field: "cpu.percent", + Statistic: "p99", + }) + fmt.Println(util.MustToJSON(agg)) +} + +func TestGeneratePercentilesAggQuery(t *testing.T) { + rule := alerting.Rule{ + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), + Enabled: true, + Resource: alerting.Resource{ + ID: "c8i18llath2blrusdjng", + Type: "elasticsearch", + Objects: []string{".infini_metrics*"}, + TimeField: "timestamp", + RawFilter: map[string]interface{}{ + "match_all": util.MapStr{ + + }, + }, + }, + + Metrics: alerting.Metric{ + PeriodInterval: "1m", + MaxPeriods: 15, + Items: []alerting.MetricItem{ + {Name: "a", Field: "payload.elasticsearch.node_stats.os.cpu.percent", Statistic: "p99", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + }, + }, + Conditions: alerting.Condition{ + Operator: "any", + Items: []alerting.ConditionItem{ + {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"}, + }, + }, + + Channels: alerting.RuleChannel{ + Normal: []alerting.Channel{ + {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ + HeaderParams: map[string]string{ + "Content-Type": "application/json", + }, + Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, + Method: http.MethodPost, + URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX", + }}, + }, + ThrottlePeriod: "1h", + AcceptTimeRange: alerting.TimeRange{ + Start: "8:00", + End: "21:00", + }, + EscalationEnabled: true, + EscalationThrottlePeriod: "30m", + }, + } + eng := &Engine{} + q, err := eng.GenerateQuery(&rule) + if err != nil { + t.Fatal(err) + } + fmt.Println(util.MustToJSON(q)) +} From 97f65f432cfe76c3d79dfe6031765b53ea2bb5ce Mon Sep 17 00:00:00 2001 From: medcl Date: Sat, 16 Apr 2022 13:07:16 +0800 Subject: [PATCH 2/6] enable availability check by default --- console.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/console.yml b/console.yml index b7062b00..c0fe998d 100644 --- a/console.yml +++ b/console.yml @@ -31,7 +31,7 @@ elastic: enabled: true interval: 30s availability_check: - enabled: false + enabled: true interval: 60s metadata_refresh: enabled: true @@ -100,4 +100,4 @@ pipeline: consumer: group: activity when: - cluster_available: [ "default" ] \ No newline at end of file + cluster_available: [ "default" ] From e1d85bc4f3139b1b52713bf944af4567c0f83237 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 18 Apr 2022 09:42:44 +0800 Subject: [PATCH 3/6] update alerting api --- model/alerting/alert.go | 16 +- model/alerting/filter.go | 4 + model/alerting/filter_query.go | 8 + model/alerting/resource.go | 15 +- model/alerting/rule.go | 3 + plugin/api/alerting/alert.go | 106 +++++++++ plugin/api/alerting/api.go | 10 +- plugin/api/alerting/rule.go | 107 +++++++++ plugin/api/init.go | 5 + service/alerting/elasticsearch/engine.go | 220 ++++++++++++++++-- service/alerting/elasticsearch/engine_test.go | 35 ++- 11 files changed, 499 insertions(+), 30 deletions(-) create mode 100644 plugin/api/alerting/alert.go diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 515550e1..efd93b03 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -12,17 +12,21 @@ type Alert struct { ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` - RuleID string `json:"rule_id"` - ClusterID string `json:"cluster_id"` - Expression string `json:"expression"` - Objects []string `json:"objects"` - Severity string `json:"severity"` - Content string `json:"content"` + RuleID string `json:"rule_id" elastic_mapping:"rule_id: { type: keyword }"` + ResourceID string `json:"resource_id" elastic_mapping:"resource_id: { type: keyword }"` + ResourceName string `json:"resource_name" elastic_mapping:"resource_name: { type: keyword }"` + Expression string `json:"expression" elastic_mapping:"expression: { type: keyword, copy_to:search_text }"` + Objects []string `json:"objects" elastic_mapping:"objects: { type:keyword,copy_to:search_text }"` + Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"` + Content string `json:"content" elastic_mapping:"context: { type: keyword, copy_to:search_text }"` AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` Users []string `json:"users,omitempty"` State string `json:"state"` Error string `json:"error,omitempty"` + IsNotified bool `json:"is_notified" elastic_mapping:"is_notified: { type: boolean }"` //标识本次检测是否发送了告警通知 + IsEscalated bool `json:"is_escalated" elastic_mapping:"is_escalated: { type: boolean }"` //标识本次检测是否发送了升级告警通知 + SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` } type ActionExecutionResult struct { diff --git a/model/alerting/filter.go b/model/alerting/filter.go index 6ae07519..f419c3f0 100644 --- a/model/alerting/filter.go +++ b/model/alerting/filter.go @@ -10,3 +10,7 @@ type Filter struct { Not []FilterQuery `json:"not,omitempty"` //MinimumShouldMatch int `json:"minimum_should_match"` } + +func (f Filter) IsEmpty() bool { + return len(f.And) == 0 && len(f.Or) == 0 && len(f.Not) == 0 +} \ No newline at end of file diff --git a/model/alerting/filter_query.go b/model/alerting/filter_query.go index ece64139..30a4ab67 100644 --- a/model/alerting/filter_query.go +++ b/model/alerting/filter_query.go @@ -12,3 +12,11 @@ type FilterQuery struct { Or []FilterQuery `json:"or,omitempty"` Not []FilterQuery `json:"not,omitempty"` } + +func (fq FilterQuery) IsComplex() bool { + return len(fq.And) > 0 || len(fq.Or) > 0 || len(fq.Not) > 0 +} + +func (f FilterQuery) IsEmpty() bool { + return !f.IsComplex() && f.Operator == "" +} \ No newline at end of file diff --git a/model/alerting/resource.go b/model/alerting/resource.go index 33764bde..6a126b67 100644 --- a/model/alerting/resource.go +++ b/model/alerting/resource.go @@ -4,12 +4,25 @@ package alerting +import ( + "fmt" +) + type Resource struct { ID string `json:"resource_id" elastic_mapping:"resource_id:{type:keyword}"` + Name string `json:"resource_name" elastic_mapping:"resource_name:{type:keyword}"` Type string `json:"type" elastic_mapping:"type:{type:keyword}"` Objects []string `json:"objects" elastic_mapping:"objects:{type:keyword,copy_to:search_text}"` - Filter Filter `json:"filter,omitempty" elastic_mapping:"-"` + Filter FilterQuery `json:"filter,omitempty" elastic_mapping:"-"` RawFilter map[string]interface{} `json:"raw_filter,omitempty"` TimeField string `json:"time_field,omitempty" elastic_mapping:"id:{type:keyword}"` Context Context `json:"context"` } + +func (r Resource) Validate() error{ + if r.TimeField == "" { + return fmt.Errorf("TimeField can not be empty") + } + return nil +} + diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 4da263dc..8e5339c6 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -19,6 +19,9 @@ type Rule struct { Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"` Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` + LastNotificationTime time.Time `json:"-" elastic_mapping:"last_notification_time:{type:date}"` + LastTermStartTime time.Time `json:"-"` //标识最近一轮告警的开始时间 + LastEscalationTime time.Time `json:"-"` //标识最近一轮告警的开始时间 SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` } diff --git a/plugin/api/alerting/alert.go b/plugin/api/alerting/alert.go new file mode 100644 index 00000000..63195d3a --- /dev/null +++ b/plugin/api/alerting/alert.go @@ -0,0 +1,106 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "fmt" + "infini.sh/console/model/alerting" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + log "src/github.com/cihub/seelog" + "strconv" + "strings" +) + +func (h *AlertAPI) getAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("alert_id") + + obj := alerting.Alert{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} + +func (h *AlertAPI) acknowledgeAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + body := struct { + AlertIDs []string `json:"alert_ids"` + }{} + queryDsl := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "_id": body.AlertIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['state'] = '%s'", alerting.AlertStateAcknowledge), + }, + } + err := orm.UpdateBy(alerting.Alert{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "ids": body.AlertIDs, + "result": "updated", + }, 200) +} + + +func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + mustBuilder = &strings.Builder{} + ) + if keyword != "" { + mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) + } + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{} + queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from) + q.RawQuery = []byte(queryDSL) + + err, res := orm.Search(&alerting.Alert{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.Write(w, res.Raw) +} diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 4cf3e591..77fa1d64 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -4,20 +4,24 @@ package alerting -import "infini.sh/framework/core/api" +import ( + "infini.sh/console/config" + "infini.sh/framework/core/api" +) type AlertAPI struct { api.Handler + Config *config.AppConfig } -func init() { - alert:=AlertAPI{} +func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule) api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule) api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule) + api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.fetchAlertInfos) api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.getChannel) api.HandleAPIMethod(api.POST, "/alerting/channel", alert.createChannel) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index bb3fde1b..a03f81ca 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -238,6 +238,110 @@ func (alertAPI *AlertAPI) searchRule(w http.ResponseWriter, req *http.Request, p w.Write(searchResult.Raw) } +func (alertAPI *AlertAPI) getRuleAlertNumbers(ruleIDs []string) ( map[string]interface{},error) { + esClient := elastic.GetClient(alertAPI.Config.Elasticsearch) + queryDsl := util.MapStr{ + "size": 0, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "rule_id": ruleIDs, + }, + }, + { + "terms": util.MapStr{ + "state": []string{alerting.AlertStateError, alerting.AlertStateActive}, + }, + }, + }, + }, + }, + "aggs": util.MapStr{ + "terms_rule_id": util.MapStr{ + "terms": util.MapStr{ + "field": "rule_id", + }, + }, + }, + } + + searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) + if err != nil { + return nil, err + } + + ruleAlertNumbers := map[string]interface{}{} + if termRules, ok := searchRes.Aggregations["terms_rule_id"]; ok { + for _, bk := range termRules.Buckets { + key := bk["key"].(string) + ruleAlertNumbers[key] = bk["doc_count"] + } + } + return ruleAlertNumbers, nil +} + +func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + alertAPI.DecodeJSON(req, &ruleIDs) + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + esClient := elastic.GetClient(alertAPI.Config.Elasticsearch) + queryDsl := util.MapStr{ + "_source": []string{"state", "rule_id"}, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "rule_id", + }, + "query": util.MapStr{ + "terms": util.MapStr{ + "rule_id": ruleIDs, + }, + }, + } + + searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + if len(searchRes.Hits.Hits) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + aletNumbers, err := alertAPI.getRuleAlertNumbers(ruleIDs) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + + latestAlertInfos := map[string]util.MapStr{} + for _, hit := range searchRes.Hits.Hits { + if ruleID, ok := hit.Source["rule_id"].(string); ok { + latestAlertInfos[ruleID] = util.MapStr{ + "status": hit.Source["state"], + "alert_count": aletNumbers[ruleID], + } + } + + } + alertAPI.WriteJSON(w, latestAlertInfos, http.StatusOK) +} + func checkResourceExists(rule *alerting.Rule) (bool, error) { if rule.Resource.ID == "" { return false, fmt.Errorf("resource id can not be empty") @@ -250,6 +354,9 @@ func checkResourceExists(rule *alerting.Rule) (bool, error) { if err != nil { return false, err } + if rule.Resource.Name == "" { + rule.Resource.Name = obj.Name + } return ok && obj.Name != "", nil default: return false, fmt.Errorf("unsupport resource type: %s", rule.Resource.Type) diff --git a/plugin/api/init.go b/plugin/api/init.go index 9e6d3a70..c7f323aa 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -2,6 +2,7 @@ package api import ( "infini.sh/console/config" + "infini.sh/console/plugin/api/alerting" "infini.sh/console/plugin/api/index_management" "infini.sh/framework/core/api" "path" @@ -53,5 +54,9 @@ func Init(cfg *config.AppConfig) { // } // }, //}) + alertAPI := alerting.AlertAPI{ + Config: cfg, + } + alertAPI.Init() } diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 649f7e90..f2096e72 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -27,7 +27,12 @@ import ( type Engine struct { } - +//GenerateQuery generate a final elasticsearch query dsl object +//when RawFilter of rule is not empty, priority use it, otherwise to covert from Filter of rule (todo) +//auto generate time filter query and then attach to final query +//auto generate elasticsearch aggregations by metrics of rule +//group of metric item converted to terms aggregation and TimeField of rule converted to date_histogram aggregation +//convert statistic of metric item to elasticsearch aggregation func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { filter, err := engine.GenerateRawFilter(rule) if err != nil { @@ -89,6 +94,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { "aggs": rootAggs, }, nil } +//generateAgg convert statistic of metric item to elasticsearch aggregation func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{ var ( aggType = "value_count" @@ -121,10 +127,117 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{ } } +func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[string]interface{}, error){ + if !fq.IsComplex(){ + q := map[string]interface{}{} + if len(fq.Values) == 0 { + return nil, fmt.Errorf("values should not be empty") + } + //equals/gte/gt/lt/lte/in/match/regexp/wildcard/range/prefix/suffix/contain/ + switch fq.Operator { + case "equals": + q["term"] = util.MapStr{ + fq.Field: util.MapStr{ + "value": fq.Values[0], + }, + } + case "in": + q["terms"] = util.MapStr{ + fq.Field: fq.Values, + } + case "match": + q[fq.Operator] = util.MapStr{ + fq.Field: fq.Values[0], + } + case "gte", "gt", "lt", "lte": + q["range"] = util.MapStr{ + fq.Field: util.MapStr{ + fq.Operator: fq.Values[0], + }, + } + case "range": + if len(fq.Values) != 2 { + return nil, fmt.Errorf("values length of range query must be 2, but got %d", len(fq.Values)) + } + q["range"] = util.MapStr{ + fq.Field: util.MapStr{ + "gte": fq.Values[0], + "lte": fq.Values[1], + }, + } + case "prefix": + q["prefix"] = util.MapStr{ + fq.Field: fq.Values[0], + } + case "regexp", "wildcard": + q[fq.Operator] = util.MapStr{ + fq.Field: util.MapStr{ + "value": fq.Values[0], + }, + } + default: + return nil, fmt.Errorf("unsupport query operator %s", fq.Operator) + } + return q, nil + } + if fq.Or != nil && fq.And != nil { + return nil, fmt.Errorf("filter format error: or, and bool operation in same level") + } + if fq.Or != nil && fq.Not != nil { + return nil, fmt.Errorf("filter format error: or, not bool operation in same level") + } + if fq.And != nil && fq.Not != nil { + return nil, fmt.Errorf("filter format error: and, not bool operation in same level") + } + var ( + boolOperator string + filterQueries []alerting.FilterQuery + ) + + if len(fq.Not) >0 { + boolOperator = "must_not" + filterQueries = fq.Not + + }else if len(fq.Or) > 0 { + boolOperator = "should" + filterQueries = fq.Or + }else { + boolOperator = "must" + filterQueries = fq.And + } + var subQueries []interface{} + for _, subQ := range filterQueries { + subQuery, err := engine.ConvertFilterQueryToDsl(&subQ) + if err != nil { + return nil, err + } + subQueries = append(subQueries, subQuery) + } + boolQuery := util.MapStr{ + boolOperator: subQueries, + } + if boolOperator == "should" { + boolQuery["minimum_should_match"] = 1 + } + resultQuery := map[string]interface{}{ + "bool": boolQuery, + } + + return resultQuery, nil +} + func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) { query := map[string]interface{}{} + var err error if rule.Resource.RawFilter != nil { query = rule.Resource.RawFilter + }else{ + if !rule.Resource.Filter.IsEmpty(){ + query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter) + if err != nil { + return nil, err + } + } } intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval) if err != nil { @@ -208,6 +321,9 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e collectMetricData(searchResult["aggregations"], "", &metricData) return metricData, nil } +//CheckCondition check whether rule conditions triggered or not +//if triggered returns an array of ConditionResult +//sort conditions by severity desc before check , and then if condition is true, then continue check another group func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error){ metricData, err := engine.ExecuteQuery(rule) if err != nil { @@ -325,7 +441,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return err } lastAlertItem := alerting.Alert{} - err = getLastAlert(rule.ID, rule.Resource.ID, &lastAlertItem) + err = getLastAlert(rule.ID, &lastAlertItem) if err != nil { return err } @@ -349,11 +465,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if len(conditionResults) == 0 { if lastAlertItem.State != alerting.AlertStateNormal && lastAlertItem.ID != "" { alertItem = &alerting.Alert{ - ID: util.GetUUID(), - Created: time.Now(), - Updated: time.Now(), + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), RuleID: rule.ID, - ClusterID: rule.Resource.ID, + ResourceID: rule.Resource.ID, Expression: rule.Metrics.Expression, Objects: rule.Resource.Objects, Severity: "info", @@ -363,6 +479,9 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } return nil }else{ + if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal { + rule.LastTermStartTime = time.Now() + } log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) var ( severity = conditionResults[0].ConditionItem.Severity @@ -379,7 +498,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { Created: time.Now(), Updated: time.Now(), RuleID: rule.ID, - ClusterID: rule.Resource.ID, + ResourceID: rule.Resource.ID, + ResourceName: rule.Resource.Name, Expression: rule.Metrics.Expression, Objects: rule.Resource.Objects, Severity: severity, @@ -394,20 +514,36 @@ func (engine *Engine) Do(rule *alerting.Rule) error { alertItem.Error = err.Error() return err } + period := time.Now().Sub(rule.LastNotificationTime) + //log.Error(lastAlertItem.ID, period, periodDuration) - if time.Now().Sub(lastAlertItem.Created) > periodDuration { + if lastAlertItem.ID == "" || period > periodDuration { actionResults := performChannels(rule.Channels.Normal, conditionResults) alertItem.ActionExecutionResults = actionResults - + //todo init last notification time when create task (by last alert item is notified) + rule.LastNotificationTime = time.Now() + alertItem.IsNotified = true } - if rule.Channels.EscalationEnabled && lastAlertItem.State != alerting.AlertStateNormal { - periodDuration, err = time.ParseDuration(rule.Channels.EscalationThrottlePeriod) + isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime) + if err != nil { + alertItem.Error = err.Error() + return err + } + if rule.Channels.EscalationEnabled && lastAlertItem.ID !="" && !isAck { + throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod) if err != nil { return err } - if time.Now().Sub(lastAlertItem.Created) > periodDuration { - actionResults := performChannels(rule.Channels.Escalation, conditionResults) - alertItem.ActionExecutionResults = actionResults + //todo init last term start time when create task (by last alert item of state normal) + if time.Now().Sub(rule.LastTermStartTime) > throttlePeriod { + if time.Now().Sub(rule.LastEscalationTime) > periodDuration { + actionResults := performChannels(rule.Channels.Escalation, conditionResults) + alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...) + //todo init last escalation time when create task (by last alert item is escalated) + rule.LastEscalationTime = time.Now() + alertItem.IsEscalated = true + } + } } } @@ -555,8 +691,7 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti } } -func getLastAlert(ruleID, clusterID string, alertItem *alerting.Alert) error { - esClient := elastic.GetClient(clusterID) +func getLastAlert(ruleID string, alertItem *alerting.Alert) error { queryDsl := util.MapStr{ "size": 1, "sort": []util.MapStr{ @@ -574,13 +709,60 @@ func getLastAlert(ruleID, clusterID string, alertItem *alerting.Alert) error { }, }, } - searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alertItem), util.MustToJSONBytes(queryDsl) ) + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDsl), + } + err, searchResult := orm.Search(alertItem, &q ) if err != nil { return err } - if len(searchRes.Hits.Hits) == 0 { + if len(searchResult.Result) == 0 { return nil } - alertBytes := util.MustToJSONBytes(searchRes.Hits.Hits[0].Source) + alertBytes := util.MustToJSONBytes(searchResult.Result[0]) return util.FromJSONBytes(alertBytes, alertItem) +} + +func hasAcknowledgedRule(ruleID string, startTime time.Time) (bool, error){ + queryDsl := util.MapStr{ + "size": 1, + "query": util.MapStr{ + "bool": util.MapStr{ + "must":[]util.MapStr{ + { + "term": util.MapStr{ + "rule_id": util.MapStr{ + "value": ruleID, + }, + }, + }, + { + "term": util.MapStr{ + "state": alerting.AlertStateAcknowledge, + }, + }, + { + "range": util.MapStr{ + "created": util.MapStr{ + "gte": startTime, + }, + }, + }, + }, + + }, + }, + } + q := orm.Query{ + WildcardIndex: true, + RawQuery: util.MustToJSONBytes(queryDsl), + } + err, searchResult := orm.Search(alerting.Alert{}, &q ) + if err != nil { + return false, err + } + if len(searchResult.Result) == 0 { + return false, nil + } + return true, nil } \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 7db1ec2c..d9e3e609 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -25,7 +25,7 @@ func TestEngine( t *testing.T) { Type: "elasticsearch", Objects: []string{".infini_metrics*"}, TimeField: "timestamp", - Filter: alerting.Filter{ + Filter: alerting.FilterQuery{ And: []alerting.FilterQuery{ {Field: "timestamp", Operator: "gte", Values: []string{"now-15m"}}, //{Field: "payload.elasticsearch.cluster_health.status", Operator: "equals", Values: []string{"red"}}, @@ -184,3 +184,36 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { } fmt.Println(util.MustToJSON(q)) } + +func TestConvertFilterQuery(t *testing.T) { + fq := alerting.FilterQuery{ + And: []alerting.FilterQuery{ + { + Field: "metadata.category", + Values: []string{"elasticsearch"}, + Operator: "equals", + }, + { + Field: "metadata.name", + Values: []string{"index_stats", "node_stats"}, + Operator: "in", + }, + { + Not: []alerting.FilterQuery{ + { + Field: "timestamp", + Operator: "gt", + Values: []string{"2022-04-16T16:16:39.168605+08:00"}, + }, + }, + }, + }, + } + + eng := &Engine{} + q, err := eng.ConvertFilterQueryToDsl(&fq) + if err != nil { + t.Fatal(err) + } + fmt.Println(util.MustToJSON(q)) +} \ No newline at end of file From 6c75b5aebafeeeebd07073f5a03d192785f4330d Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 18 Apr 2022 10:08:42 +0800 Subject: [PATCH 4/6] update error handle --- service/alerting/elasticsearch/engine.go | 43 ++++++++++++++++++------ 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index f2096e72..7b8945ad 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -312,6 +312,9 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e if err != nil { return nil, err } + if searchRes.StatusCode != 200 { + return nil, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body)) + } searchResult := map[string]interface{}{} err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult) if err != nil { @@ -435,18 +438,26 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe return conditionResults, nil } func (engine *Engine) Do(rule *alerting.Rule) error { - log.Tracef("start check condition of rule %s", rule.ID) - conditionResults, err := engine.CheckCondition(rule) - if err != nil { - return err - } - lastAlertItem := alerting.Alert{} - err = getLastAlert(rule.ID, &lastAlertItem) - if err != nil { - return err - } - var alertItem *alerting.Alert + + var ( + alertItem *alerting.Alert + err error + ) defer func() { + if err != nil && alertItem == nil { + alertItem = &alerting.Alert{ + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), + RuleID: rule.ID, + ResourceID: rule.Resource.ID, + ResourceName: rule.Resource.Name, + Expression: rule.Metrics.Expression, + Objects: rule.Resource.Objects, + State: alerting.AlertStateError, + Error: err.Error(), + } + } if alertItem != nil { for _, actionResult := range alertItem.ActionExecutionResults { if actionResult.Error != "" { @@ -462,6 +473,16 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } }() + log.Tracef("start check condition of rule %s", rule.ID) + conditionResults, err := engine.CheckCondition(rule) + if err != nil { + return err + } + lastAlertItem := alerting.Alert{} + err = getLastAlert(rule.ID, &lastAlertItem) + if err != nil { + return err + } if len(conditionResults) == 0 { if lastAlertItem.State != alerting.AlertStateNormal && lastAlertItem.ID != "" { alertItem = &alerting.Alert{ From 162cf120c92e0a948249e650fd571d8077c076cc Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 19 Apr 2022 18:38:13 +0800 Subject: [PATCH 5/6] update alert api --- main.go | 9 +- model/alerting/alert.go | 3 +- model/alerting/condition.go | 4 + model/alerting/metric.go | 6 + model/alerting/rule.go | 4 +- plugin/api/alerting/api.go | 1 + plugin/api/alerting/rule.go | 25 ++- service/alerting/elasticsearch/engine.go | 155 +++++++++++------- service/alerting/elasticsearch/engine_test.go | 77 ++++++++- service/alerting/engine.go | 4 +- 10 files changed, 214 insertions(+), 74 deletions(-) diff --git a/main.go b/main.go index 92cca1dd..dbd69cd1 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "infini.sh/console/model/alerting" "infini.sh/console/model/gateway" _ "infini.sh/console/plugin" + alerting2 "infini.sh/console/service/alerting" "infini.sh/framework" "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" @@ -27,6 +28,7 @@ import ( _ "infini.sh/framework/plugins" api2 "infini.sh/gateway/api" _ "infini.sh/gateway/proxy" + log "src/github.com/cihub/seelog" ) var appConfig *config.AppConfig @@ -130,7 +132,12 @@ func main() { api.RegisterSchema() - + go func() { + err := alerting2.InitTasks() + if err != nil { + log.Errorf("init alerting task error: %v", err) + } + }() }, nil) { app.Run() } diff --git a/model/alerting/alert.go b/model/alerting/alert.go index efd93b03..2358d1ab 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -26,11 +26,12 @@ type Alert struct { Error string `json:"error,omitempty"` IsNotified bool `json:"is_notified" elastic_mapping:"is_notified: { type: boolean }"` //标识本次检测是否发送了告警通知 IsEscalated bool `json:"is_escalated" elastic_mapping:"is_escalated: { type: boolean }"` //标识本次检测是否发送了升级告警通知 + Conditions Condition `json:"condition"` + ConditionResult *ConditionResult `json:"condition_result,omitempty" elastic_mapping:"condition_result: { type: object,enabled:false }"` SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` } type ActionExecutionResult struct { - //ActionId string `json:"action_id"` LastExecutionTime int `json:"last_execution_time"` Error string `json:"error"` Result string `json:"result"` diff --git a/model/alerting/condition.go b/model/alerting/condition.go index 6e0d3cea..8614f037 100644 --- a/model/alerting/condition.go +++ b/model/alerting/condition.go @@ -19,6 +19,10 @@ type ConditionItem struct { } type ConditionResult struct { + ResultItems []ConditionResultItem `json:"result_items"` + QueryResult *QueryResult `json:"query_result"` +} +type ConditionResultItem struct { GroupValues []string `json:"group_values"` ConditionItem *ConditionItem `json:"condition_item"` } diff --git a/model/alerting/metric.go b/model/alerting/metric.go index dd6ce249..411d43c3 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -48,6 +48,12 @@ type MetricItem struct { Group []string `json:"group"` //bucket group } +type QueryResult struct { + Query string `json:"query"` + Raw string `json:"raw"` + MetricData []MetricData `json:"metric_data"` +} + type MetricData struct { GroupValues []string `json:"group_values"` Data map[string][]TimeMetricData `json:"data"` diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 8e5339c6..d8267bb3 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -27,10 +27,10 @@ type Rule struct { type RuleChannel struct { Normal []Channel `json:"normal"` - Escalation []Channel `json:"escalation"` + Escalation []Channel `json:"escalation,omitempty"` ThrottlePeriod string `json:"throttle_period"` //沉默周期 AcceptTimeRange TimeRange `json:"accept_time_range"` - EscalationThrottlePeriod string `json:"escalation_throttle_period"` + EscalationThrottlePeriod string `json:"escalation_throttle_period,omitempty"` EscalationEnabled bool `json:"escalation_enabled"` } diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 77fa1d64..d09ddcb4 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -31,5 +31,6 @@ func (alert *AlertAPI) Init() { //just for test //api.HandleAPIMethod(api.GET, "/alerting/rule/test", alert.testRule) + } diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index a03f81ca..8ea1678c 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -109,10 +109,10 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("rule_id") - obj := alerting.Rule{} + obj := &alerting.Rule{} obj.ID = id - exists, err := orm.Get(&obj) + exists, err := orm.Get(obj) if !exists || err != nil { alertAPI.WriteJSON(w, util.MapStr{ "_id": id, @@ -123,8 +123,8 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p id = obj.ID create := obj.Created - obj = alerting.Rule{} - err = alertAPI.DecodeJSON(req, &obj) + obj = &alerting.Rule{} + err = alertAPI.DecodeJSON(req, obj) if err != nil { alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -135,7 +135,13 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p obj.ID = id obj.Created = create obj.Updated = time.Now() - err = orm.Update(&obj) + err = obj.Metrics.RefreshExpression() + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + err = orm.Update(obj) if err != nil { alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -143,6 +149,13 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p } if obj.Enabled { + exists, err = checkResourceExists(obj) + if err != nil || !exists { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } //update task task.StopTask(id) eng := alerting2.GetEngine(obj.Resource.Type) @@ -150,7 +163,7 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p ID: obj.ID, Interval: obj.Schedule.Interval, Description: obj.Metrics.Expression, - Task: eng.GenerateTask(&obj), + Task: eng.GenerateTask(obj), } task.RegisterScheduleTask(ruleTask) task.StartTask(ruleTask.ID) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 7b8945ad..b7ccde98 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -18,6 +18,8 @@ import ( "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "io" + "math" + "runtime/debug" "sort" "strconv" "strings" @@ -44,7 +46,10 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { } basicAggs := util.MapStr{} for _, metricItem := range rule.Metrics.Items { - basicAggs[metricItem.Name] = engine.generateAgg(&metricItem) + metricAggs := engine.generateAgg(&metricItem) + if err = util.MergeFields(basicAggs, metricAggs, true); err != nil { + return nil, err + } } timeAggs := util.MapStr{ "date_histogram": util.MapStr{ @@ -95,7 +100,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { }, nil } //generateAgg convert statistic of metric item to elasticsearch aggregation -func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{ +func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) map[string]interface{}{ var ( aggType = "value_count" field = metricItem.Field @@ -104,11 +109,15 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{ field = "_id" } var percent = 0.0 + var isPipeline = false switch metricItem.Statistic { case "max", "min", "sum", "avg": aggType = metricItem.Statistic case "count", "value_count": aggType = "value_count" + case "rate": + aggType = "max" + isPipeline = true case "medium": aggType = "median_absolute_deviation" case "p99", "p95","p90","p80","p50": @@ -122,9 +131,22 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) interface{}{ if aggType == "percentiles" { aggValue["percents"] = []interface{}{percent} } - return util.MapStr{ - aggType: aggValue, + aggs := util.MapStr{ + metricItem.Name: util.MapStr{ + aggType: aggValue, + }, } + if !isPipeline{ + return aggs + } + pipelineAggID := util.GetUUID() + aggs[pipelineAggID] = aggs[metricItem.Name] + aggs[metricItem.Name] = util.MapStr{ + "derivative": util.MapStr{ + "buckets_path": pipelineAggID, + }, + } + return aggs } func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[string]interface{}, error){ @@ -285,8 +307,10 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa must := []interface{}{ timeQuery, } - if _, ok := query["match_all"]; !ok { - must = append(must, query) + if len(query) > 0 { + if _, ok = query["match_all"]; !ok { + must = append(must, query) + } } query = util.MapStr{ "bool": util.MapStr{ @@ -297,8 +321,9 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa return query, nil } -func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, error){ +func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error){ esClient := elastic.GetClient(rule.Resource.ID) + queryResult := &alerting.QueryResult{} indexName := strings.Join(rule.Resource.Objects, ",") queryDsl, err := engine.GenerateQuery(rule) if err != nil { @@ -308,6 +333,7 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e if err != nil { return nil, err } + queryResult.Query = string(queryDslBytes) searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes) if err != nil { return nil, err @@ -315,6 +341,7 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e if searchRes.StatusCode != 200 { return nil, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body)) } + queryResult.Raw = string(searchRes.RawResult.Body) searchResult := map[string]interface{}{} err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult) if err != nil { @@ -322,18 +349,24 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e } metricData := []alerting.MetricData{} collectMetricData(searchResult["aggregations"], "", &metricData) - return metricData, nil + queryResult.MetricData = metricData + return queryResult, nil } //CheckCondition check whether rule conditions triggered or not //if triggered returns an array of ConditionResult //sort conditions by severity desc before check , and then if condition is true, then continue check another group -func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error){ - metricData, err := engine.ExecuteQuery(rule) - if err != nil { - return nil, err +func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ + queryResult, err := engine.ExecuteQuery(rule) + conditionResult := &alerting.ConditionResult{ + QueryResult: queryResult, } - var conditionResults []alerting.ConditionResult - for _, md := range metricData { + if err != nil { + return conditionResult, err + } + + var resultItems []alerting.ConditionResultItem + var targetMetricData []alerting.MetricData + for _, md := range queryResult.MetricData { var targetData alerting.MetricData if len(rule.Metrics.Items) == 1 { targetData = md @@ -344,7 +377,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe } expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) if err != nil { - return nil, err + return conditionResult, err } dataLength := 0 for _, v := range md.Data { @@ -357,8 +390,14 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe } var timestamp interface{} for k, v := range md.Data { + if len(k) == 20 { + continue + } //drop nil value bucket - if v[i][1] == nil { + if len(v[i]) < 2 { + continue DataLoop + } + if _, ok := v[i][1].(float64); !ok { continue DataLoop } parameters[k] = v[i][1] @@ -366,11 +405,18 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe } result, err := expression.Evaluate(parameters) if err != nil { - return nil, err + return conditionResult, err } + if r, ok := result.(float64); ok { + if math.IsNaN(r){ + continue + } + } + targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, result}) } } + targetMetricData = append(targetMetricData, targetData) sort.Slice(rule.Conditions.Items, func(i, j int) bool { return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity] }) @@ -379,7 +425,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe conditionExpression := "" valueLength := len(cond.Values) if valueLength == 0 { - return nil, fmt.Errorf("condition values: %v should not be empty", cond.Values) + return conditionResult, fmt.Errorf("condition values: %v should not be empty", cond.Values) } switch cond.Operator { case "equals": @@ -394,15 +440,15 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe conditionExpression = fmt.Sprintf("result < %v", cond.Values[0]) case "range": if valueLength != 2 { - return nil, fmt.Errorf("length of %s condition values should be 2", cond.Operator) + return conditionResult, fmt.Errorf("length of %s condition values should be 2", cond.Operator) } conditionExpression = fmt.Sprintf("result >= %v && result <= %v", cond.Values[0], cond.Values[1]) default: - return nil, fmt.Errorf("unsupport condition operator: %s", cond.Operator) + return conditionResult, fmt.Errorf("unsupport condition operator: %s", cond.Operator) } expression, err := govaluate.NewEvaluableExpression(conditionExpression) if err != nil { - return nil, err + return conditionResult, err } dataLength := 0 dataKey := "" @@ -412,20 +458,20 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe } triggerCount := 0 for i := 0; i < dataLength; i++ { - conditionResult, err := expression.Evaluate(map[string]interface{}{ + evaluateResult, err := expression.Evaluate(map[string]interface{}{ "result": targetData.Data[dataKey][i][1], }) if err != nil { return nil, err } - if conditionResult == true { + if evaluateResult == true { triggerCount += 1 }else { triggerCount = 0 } if triggerCount >= cond.MinimumPeriodMatch { log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues) - conditionResults = append(conditionResults, alerting.ConditionResult{ + resultItems = append(resultItems, alerting.ConditionResultItem{ GroupValues: targetData.GroupValues, ConditionItem: &cond, }) @@ -435,7 +481,9 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe } } - return conditionResults, nil + conditionResult.QueryResult.MetricData = targetMetricData + conditionResult.ResultItems = resultItems + return conditionResult, nil } func (engine *Engine) Do(rule *alerting.Rule) error { @@ -474,7 +522,18 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } }() log.Tracef("start check condition of rule %s", rule.ID) - conditionResults, err := engine.CheckCondition(rule) + checkResults, err := engine.CheckCondition(rule) + alertItem = &alerting.Alert{ + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), + RuleID: rule.ID, + ResourceID: rule.Resource.ID, + Expression: rule.Metrics.Expression, + Objects: rule.Resource.Objects, + ConditionResult: checkResults, + Conditions: rule.Conditions, + } if err != nil { return err } @@ -483,21 +542,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if err != nil { return err } + conditionResults := checkResults.ResultItems if len(conditionResults) == 0 { - if lastAlertItem.State != alerting.AlertStateNormal && lastAlertItem.ID != "" { - alertItem = &alerting.Alert{ - ID: util.GetUUID(), - Created: time.Now(), - Updated: time.Now(), - RuleID: rule.ID, - ResourceID: rule.Resource.ID, - Expression: rule.Metrics.Expression, - Objects: rule.Resource.Objects, - Severity: "info", - Content: "", - State: alerting.AlertStateNormal, - } - } + alertItem.Severity = "info" + alertItem.Content = "" + alertItem.State = alerting.AlertStateNormal return nil }else{ if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal { @@ -511,22 +560,12 @@ func (engine *Engine) Do(rule *alerting.Rule) error { for _, conditionResult := range conditionResults { if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] { severity = conditionResult.ConditionItem.Severity + content = conditionResult.ConditionItem.Message } - content += conditionResult.ConditionItem.Message + ";" - } - alertItem = &alerting.Alert{ - ID: util.GetUUID(), - Created: time.Now(), - Updated: time.Now(), - RuleID: rule.ID, - ResourceID: rule.Resource.ID, - ResourceName: rule.Resource.Name, - Expression: rule.Metrics.Expression, - Objects: rule.Resource.Objects, - Severity: severity, - Content: content, - State: alerting.AlertStateActive, } + alertItem.Severity = severity + alertItem.Content = content + alertItem.State = alerting.AlertStateActive } if rule.Channels.AcceptTimeRange.Include(time.Now()) { @@ -571,7 +610,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } -func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResult) []alerting.ActionExecutionResult { +func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResultItem) []alerting.ActionExecutionResult { var message string for _, conditionResult := range conditionResults { message += fmt.Sprintf("severity: %s\t message:%s\t groups:%v\t timestamp: %v;", conditionResult.ConditionItem.Severity, conditionResult.ConditionItem.Message, conditionResult.GroupValues, time.Now()) @@ -632,6 +671,12 @@ func performChannel(channel *alerting.Channel, ctx []byte) ([]byte, error) { } func (engine *Engine) GenerateTask(rule *alerting.Rule) func(ctx context.Context) { return func(ctx context.Context) { + defer func() { + if err := recover(); err != nil { + log.Error(err) + debug.PrintStack() + } + }() err := engine.Do(rule) if err != nil { log.Error(err) diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index d9e3e609..492a4324 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -126,6 +126,57 @@ func TestGenerateAgg(t *testing.T) { } func TestGeneratePercentilesAggQuery(t *testing.T) { + //rule := alerting.Rule{ + // ID: util.GetUUID(), + // Created: time.Now(), + // Updated: time.Now(), + // Enabled: true, + // Resource: alerting.Resource{ + // ID: "c8i18llath2blrusdjng", + // Type: "elasticsearch", + // Objects: []string{".infini_metrics*"}, + // TimeField: "timestamp", + // RawFilter: map[string]interface{}{ + // "match_all": util.MapStr{ + // + // }, + // }, + // }, + // + // Metrics: alerting.Metric{ + // PeriodInterval: "1m", + // MaxPeriods: 15, + // Items: []alerting.MetricItem{ + // {Name: "a", Field: "payload.elasticsearch.node_stats.os.cpu.percent", Statistic: "p99", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + // }, + // }, + // Conditions: alerting.Condition{ + // Operator: "any", + // Items: []alerting.ConditionItem{ + // {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"}, + // }, + // }, + // + // Channels: alerting.RuleChannel{ + // Normal: []alerting.Channel{ + // {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ + // HeaderParams: map[string]string{ + // "Content-Type": "application/json", + // }, + // Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, + // Method: http.MethodPost, + // URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX", + // }}, + // }, + // ThrottlePeriod: "1h", + // AcceptTimeRange: alerting.TimeRange{ + // Start: "8:00", + // End: "21:00", + // }, + // EscalationEnabled: true, + // EscalationThrottlePeriod: "30m", + // }, + //} rule := alerting.Rule{ ID: util.GetUUID(), Created: time.Now(), @@ -137,8 +188,16 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { Objects: []string{".infini_metrics*"}, TimeField: "timestamp", RawFilter: map[string]interface{}{ - "match_all": util.MapStr{ - + "bool": map[string]interface{}{ + "must": []interface{}{ + util.MapStr{ + "term": util.MapStr{ + "metadata.name": util.MapStr{ + "value": "index_stats", + }, + }, + }, + }, }, }, }, @@ -147,13 +206,15 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { PeriodInterval: "1m", MaxPeriods: 15, Items: []alerting.MetricItem{ - {Name: "a", Field: "payload.elasticsearch.node_stats.os.cpu.percent", Statistic: "p99", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + {Name: "a", Field: "payload.elasticsearch.index_stats.total.search.query_total", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}}, + {Name: "b", Field: "payload.elasticsearch.index_stats.total.search.query_time_in_millis", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}}, }, + Formula: "b/a", }, Conditions: alerting.Condition{ Operator: "any", Items: []alerting.ConditionItem{ - {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"}, + {MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"10"}, Severity: "warning", Message: "搜索延迟大于10ms"}, }, }, @@ -170,7 +231,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { }, ThrottlePeriod: "1h", AcceptTimeRange: alerting.TimeRange{ - Start: "8:00", + Start: "08:00", End: "21:00", }, EscalationEnabled: true, @@ -209,11 +270,13 @@ func TestConvertFilterQuery(t *testing.T) { }, }, } - + var targetDsl = `{"bool":{"must":[{"term":{"metadata.category":{"value":"elasticsearch"}}},{"terms":{"metadata.name":["index_stats","node_stats"]}},{"bool":{"must_not":[{"range":{"timestamp":{"gt":"2022-04-16T16:16:39.168605+08:00"}}}]}}]}}` eng := &Engine{} q, err := eng.ConvertFilterQueryToDsl(&fq) if err != nil { t.Fatal(err) } - fmt.Println(util.MustToJSON(q)) + if dsl := util.MustToJSON(q); dsl != targetDsl { + t.Errorf("expect dsl %s but got %s", targetDsl, dsl) + } } \ No newline at end of file diff --git a/service/alerting/engine.go b/service/alerting/engine.go index 2b7c6a5b..5151b7d5 100644 --- a/service/alerting/engine.go +++ b/service/alerting/engine.go @@ -13,8 +13,8 @@ import ( type Engine interface { GenerateQuery(rule *alerting.Rule) (interface{}, error) - ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, error) - CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error) + ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error) + CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error) GenerateTask(rule *alerting.Rule) func(ctx context.Context) } From 147d3946999558bc21eb42c75b1bfebaed8c5bd6 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 19 Apr 2022 18:38:27 +0800 Subject: [PATCH 6/6] update alert api --- service/alerting/init.go | 97 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 service/alerting/init.go diff --git a/service/alerting/init.go b/service/alerting/init.go new file mode 100644 index 00000000..eed05844 --- /dev/null +++ b/service/alerting/init.go @@ -0,0 +1,97 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "infini.sh/console/model/alerting" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" + log "src/github.com/cihub/seelog" + "time" +) + +func InitTasks() error { + //fetch alerting rules from es + q := orm.Query{ + Size: 10000, + WildcardIndex: true, + } + q.Conds = orm.And(orm.Eq("enabled", true)) + err, result := orm.Search(alerting.Rule{}, &q) + if err != nil { + return err + } + for _, ruleM := range result.Result { + rb := util.MustToJSONBytes(ruleM) + rule := &alerting.Rule{} + err = util.FromJSONBytes(rb, rule) + if err != nil { + return err + } + if !rule.Enabled { + continue + } + eng := GetEngine(rule.Resource.Type) + task.RegisterScheduleTask(task.ScheduleTask{ + ID: rule.ID, + Interval: rule.Schedule.Interval, + Description: rule.Metrics.Expression, + Task: eng.GenerateTask(rule), + }) + task.StartTask(rule.ID) + } + return nil +} + +func getRuleLastTermTime() (map[string]time.Time, error) { + query := util.MapStr{ + "_source": "created", + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "rule_id", + }, + "query": util.MapStr{ + "term": util.MapStr{ + "state": util.MapStr{ + "value": "normal", + }, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(query), + Size: 1000, + WildcardIndex: true, + } + err, result := orm.Search(alerting.Alert{}, q) + if err != nil { + return nil, err + } + + times := map[string]time.Time{} + obj := &ruleTime{} + for _, item := range result.Result { + itemBytes := util.MustToJSONBytes(item) + err = util.FromJSONBytes(itemBytes, obj) + if err != nil { + log.Error(err) + continue + } + times[obj.RuleID] = obj.Created + } + return times, nil +} + +type ruleTime struct { + Created time.Time `json:"created"` + RuleID string `json:"rule_id"` +}