From d7578a2743f953c7f3d4d7fc8bdbd69a94b73f1b Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 16 Aug 2023 10:02:48 +0800 Subject: [PATCH 01/11] add tags, category field --- model/alerting/alert.go | 2 + model/alerting/rule.go | 2 + plugin/api/alerting/message.go | 99 ++++++++++++++++++++---- service/alerting/elasticsearch/engine.go | 2 + 4 files changed, 90 insertions(+), 15 deletions(-) diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 6e21f572..2b23e95f 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -70,6 +70,8 @@ type AlertMessage struct { IgnoredUser string `json:"ignored_user,omitempty" elastic_mapping:"ignored_user: { type: keyword,copy_to:search_text }"` Priority string `json:"priority" elastic_mapping:"priority: { type: keyword }"` SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` + Category string `json:"category,omitempty" elastic_mapping:"category: { type: keyword,copy_to:search_text }"` + Tags []string `json:"tags,omitempty" elastic_mapping:"tags: { type: keyword,copy_to:search_text }"` } /* diff --git a/model/alerting/rule.go b/model/alerting/rule.go index e930d64f..9ba01324 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -31,6 +31,8 @@ type Rule struct { Name string `json:"name" elastic_mapping:"name: { type: keyword }"` Id string `json:"id" elastic_mapping:"id: { type: keyword }"` } `json:"creator" elastic_mapping:"creator:{type:object}"` + Category string `json:"category,omitempty" elastic_mapping:"category: { type: keyword,copy_to:search_text }"` + Tags []string `json:"tags,omitempty" elastic_mapping:"tags: { type: keyword,copy_to:search_text }"` } func (rule *Rule) GetOrInitExpression() (string, error){ diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index 59b23644..69bf53f0 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -25,6 +25,7 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, body := struct { Messages []alerting.AlertMessage `json:"messages"` IgnoredReason string `json:"ignored_reason"` + IsReset bool `json:"is_reset"` }{} err := h.DecodeJSON(req, &body) if err != nil { @@ -41,29 +42,44 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, messageIDs = append(messageIDs, msg.ID) } currentUser := h.GetCurrentUser(req) + must := []util.MapStr{ + { + "terms": util.MapStr{ + "_id": messageIDs, + }, + }, + } + var source string + if body.IsReset { + must = append(must, util.MapStr{ + "term": util.MapStr{ + "status": util.MapStr{ + "value": alerting.MessageStateIgnored, + }, + }, + }) + source = fmt.Sprintf("ctx._source['status'] = '%s'", alerting.MessageStateAlerting) + }else { + must = append(must, util.MapStr{ + "term": util.MapStr{ + "status": util.MapStr{ + "value": alerting.MessageStateAlerting, + }, + }, + }) + source = fmt.Sprintf("ctx._source['status'] = '%s';ctx._source['ignored_time']='%s';ctx._source['ignored_reason']='%s';ctx._source['ignored_user']='%s'", alerting.MessageStateIgnored, time.Now().Format(time.RFC3339Nano), body.IgnoredReason, currentUser) + } queryDsl := util.MapStr{ "query": util.MapStr{ "bool": util.MapStr{ - "must": []util.MapStr{ - { - "terms": util.MapStr{ - "_id": messageIDs, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": alerting.MessageStateAlerting, - }, - }, - }, - }, + "must": must, }, }, "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s';ctx._source['ignored_time']='%s';ctx._source['ignored_reason']='%s';ctx._source['ignored_user']='%s'", alerting.MessageStateIgnored, time.Now().Format(time.RFC3339Nano), body.IgnoredReason, currentUser), + "source": source, }, } + log.Info(util.MustToJSON(queryDsl)) err = orm.UpdateBy(alerting.AlertMessage{}, util.MustToJSONBytes(queryDsl)) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -160,10 +176,53 @@ func (h *AlertAPI) getAlertMessageStats(w http.ResponseWriter, req *http.Request return } statusCounts[alerting.MessageStateIgnored] = countRes.Count + + queryDsl = util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "terms_by_category": util.MapStr{ + "terms": util.MapStr{ + "field": "category", + "size": 100, + }, + }, + "terms_by_tags": util.MapStr{ + "terms": util.MapStr{ + "field": "tags", + "size": 100, + }, + }, + }, + } + searchRes, err = esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(queryDsl) ) + if err != nil { + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + categories := []string{} + if termsAgg, ok := searchRes.Aggregations["terms_by_category"]; ok { + for _, bk := range termsAgg.Buckets { + if cate, ok := bk["key"].(string); ok { + categories = append(categories, cate) + } + } + } + tags := []string{} + if termsAgg, ok := searchRes.Aggregations["terms_by_tags"]; ok { + for _, bk := range termsAgg.Buckets { + if tag, ok := bk["key"].(string); ok { + tags = append(tags, tag) + } + } + } h.WriteJSON(w, util.MapStr{ "alert": util.MapStr{ "current": statusCounts, }, + "categories": categories, + "tags": tags, }, http.StatusOK) } @@ -182,6 +241,8 @@ func (h *AlertAPI) searchAlertMessage(w http.ResponseWriter, req *http.Request, max = h.GetParameterOrDefault(req, "max", "now") mustBuilder = &strings.Builder{} sortBuilder = strings.Builder{} + category = h.GetParameterOrDefault(req, "category", "") + tags = h.GetParameterOrDefault(req, "tags", "") ) mustBuilder.WriteString(fmt.Sprintf(`{"range":{"created":{"gte":"%s", "lte": "%s"}}}`, min, max)) if ruleID != "" { @@ -213,6 +274,14 @@ func (h *AlertAPI) searchAlertMessage(w http.ResponseWriter, req *http.Request, mustBuilder.WriteString(",") mustBuilder.WriteString(fmt.Sprintf(`{"term":{"priority":{"value":"%s"}}}`, priority)) } + if category != "" { + mustBuilder.WriteString(",") + mustBuilder.WriteString(fmt.Sprintf(`{"term":{"category":{"value":"%s"}}}`, category)) + } + if tags != "" { + mustBuilder.WriteString(",") + mustBuilder.WriteString(fmt.Sprintf(`{"term":{"tags":{"value":"%s"}}}`, tags)) + } size, _ := strconv.Atoi(strSize) if size <= 0 { size = 20 diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index a981c7e9..18348591 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -745,6 +745,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { Priority: priority, Title: alertItem.Title, Message: alertItem.Message, + Tags: rule.Tags, + Category: rule.Category, } err = saveAlertMessage(msg) if err != nil { From 47733e63c8613261e4189a38c51759d9d67080dc Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 17 Aug 2023 10:17:30 +0800 Subject: [PATCH 02/11] calc hit condition --- plugin/api/alerting/message.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index 69bf53f0..b30368bb 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -79,7 +79,6 @@ func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, "source": source, }, } - log.Info(util.MustToJSON(queryDsl)) err = orm.UpdateBy(alerting.AlertMessage{}, util.MustToJSONBytes(queryDsl)) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -355,8 +354,12 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps return } metricExpression, _ := rule.Metrics.GenerateExpression() + var hitCondition string for i, cond := range rule.Conditions.Items { expression, _ := cond.GenerateConditionExpression() + if cond.Priority == message.Priority { + hitCondition = strings.ReplaceAll(expression, "result", "") + } rule.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression) } var duration time.Duration @@ -384,6 +387,8 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps "ignored_reason": message.IgnoredReason, "ignored_user": message.IgnoredUser, "status": message.Status, + "expression": rule.Metrics.Expression, + "hit_condition": hitCondition, } h.WriteJSON(w, detailObj, http.StatusOK) } \ No newline at end of file From b349c66b142c51188274b580e7126f8545b39eec Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 17 Aug 2023 11:10:55 +0800 Subject: [PATCH 03/11] add get widget api --- main.go | 1 + model/insight/widget.go | 41 ++++++++++++++++++++++++++++++++++++ plugin/api/insight/api.go | 1 + plugin/api/insight/widget.go | 35 ++++++++++++++++++++++++++++++ 4 files changed, 78 insertions(+) create mode 100644 model/insight/widget.go create mode 100644 plugin/api/insight/widget.go diff --git a/main.go b/main.go index 9f47c0e7..e1c7b700 100644 --- a/main.go +++ b/main.go @@ -133,6 +133,7 @@ func main() { orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel") orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") + orm.RegisterSchemaWithIndexName(insight.Widget{}, "widget") orm.RegisterSchemaWithIndexName(task1.Task{}, "task") orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") diff --git a/model/insight/widget.go b/model/insight/widget.go new file mode 100644 index 00000000..0cb77858 --- /dev/null +++ b/model/insight/widget.go @@ -0,0 +1,41 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package insight + +import "infini.sh/framework/core/orm" + +type Widget struct { + orm.ORMObjectBase + Formatter string `json:"formatter" elastic_mapping:"formatter: { type: keyword }"` + Series [] WidgetSeriesItem `json:"series" elastic_mapping:"series: { type: object,enabled:false }"` + Title string `json:"title" elastic_mapping:"title: { type: text }"` +} + +type WidgetSeriesItem struct { + Metric WidgetMetric `json:"metric"` + Queries WidgetQuery `json:"queries"` + Type string `json:"type"` +} + +type WidgetQuery struct { + ClusterId string `json:"cluster_id"` + Indices []string `json:"indices"` + Query string `json:"query"` + TimeField string `json:"time_field"` +} +type WidgetMetric struct { + BucketSize string `json:"bucket_size"` + FormatType string `json:"format_type"` + Formula string `json:"formula"` + Groups []struct { + Field string `json:"field"` + Limit int `json:"limit"` + } `json:"groups"` + Items []struct { + Field string `json:"field"` + Name string `json:"name"` + Statistic string `json:"statistic"` + } `json:"items"` +} diff --git a/plugin/api/insight/api.go b/plugin/api/insight/api.go index 8ae2a96e..c20b6b1f 100644 --- a/plugin/api/insight/api.go +++ b/plugin/api/insight/api.go @@ -28,4 +28,5 @@ func InitAPI() { api.HandleAPIMethod(api.DELETE, "/insight/dashboard/:dashboard_id", insight.deleteDashboard) api.HandleAPIMethod(api.GET, "/insight/dashboard/_search", insight.searchDashboard) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/map_label/_render", insight.renderMapLabelTemplate) + api.HandleAPIMethod(api.GET, "/insight/widget/:widget_id", insight.getWidget) } diff --git a/plugin/api/insight/widget.go b/plugin/api/insight/widget.go new file mode 100644 index 00000000..8ef34858 --- /dev/null +++ b/plugin/api/insight/widget.go @@ -0,0 +1,35 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package insight + +import ( + "infini.sh/console/model/insight" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" +) + +func (h *InsightAPI) getWidget(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("widget_id") + + obj := insight.Widget{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} From 14f391b58b64604d6f72704cab0245e59818763c Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 17 Aug 2023 11:35:35 +0800 Subject: [PATCH 04/11] update widget struct --- model/insight/widget.go | 30 +----------------------------- plugin/api/insight/api.go | 1 + plugin/api/insight/widget.go | 26 +++++++++++++++++++++----- 3 files changed, 23 insertions(+), 34 deletions(-) diff --git a/model/insight/widget.go b/model/insight/widget.go index 0cb77858..e004cc3f 100644 --- a/model/insight/widget.go +++ b/model/insight/widget.go @@ -8,34 +8,6 @@ import "infini.sh/framework/core/orm" type Widget struct { orm.ORMObjectBase - Formatter string `json:"formatter" elastic_mapping:"formatter: { type: keyword }"` - Series [] WidgetSeriesItem `json:"series" elastic_mapping:"series: { type: object,enabled:false }"` Title string `json:"title" elastic_mapping:"title: { type: text }"` -} - -type WidgetSeriesItem struct { - Metric WidgetMetric `json:"metric"` - Queries WidgetQuery `json:"queries"` - Type string `json:"type"` -} - -type WidgetQuery struct { - ClusterId string `json:"cluster_id"` - Indices []string `json:"indices"` - Query string `json:"query"` - TimeField string `json:"time_field"` -} -type WidgetMetric struct { - BucketSize string `json:"bucket_size"` - FormatType string `json:"format_type"` - Formula string `json:"formula"` - Groups []struct { - Field string `json:"field"` - Limit int `json:"limit"` - } `json:"groups"` - Items []struct { - Field string `json:"field"` - Name string `json:"name"` - Statistic string `json:"statistic"` - } `json:"items"` + Config interface{}`json:"config" elastic_mapping:"config: { type: object,enabled:false }"` } diff --git a/plugin/api/insight/api.go b/plugin/api/insight/api.go index c20b6b1f..27b232be 100644 --- a/plugin/api/insight/api.go +++ b/plugin/api/insight/api.go @@ -29,4 +29,5 @@ func InitAPI() { api.HandleAPIMethod(api.GET, "/insight/dashboard/_search", insight.searchDashboard) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/map_label/_render", insight.renderMapLabelTemplate) api.HandleAPIMethod(api.GET, "/insight/widget/:widget_id", insight.getWidget) + api.HandleAPIMethod(api.POST, "/insight/widget", insight.RequireLogin(insight.createWidget)) } diff --git a/plugin/api/insight/widget.go b/plugin/api/insight/widget.go index 8ef34858..1bc60e70 100644 --- a/plugin/api/insight/widget.go +++ b/plugin/api/insight/widget.go @@ -5,6 +5,7 @@ package insight import ( + log "github.com/cihub/seelog" "infini.sh/console/model/insight" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/orm" @@ -12,6 +13,25 @@ import ( "net/http" ) +func (h *InsightAPI) createWidget(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var obj = &insight.Widget{} + err := h.DecodeJSON(req, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + err = orm.Create(nil, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteCreatedOKJSON(w, obj.ID) + +} + func (h *InsightAPI) getWidget(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("widget_id") @@ -27,9 +47,5 @@ func (h *InsightAPI) getWidget(w http.ResponseWriter, req *http.Request, ps http return } - h.WriteJSON(w, util.MapStr{ - "found": true, - "_id": id, - "_source": obj, - }, 200) + h.WriteGetOKJSON(w, id, obj) } From 9e6a1019fe0f81398327fda3f6b0c20570d110b7 Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 18 Aug 2023 16:01:28 +0800 Subject: [PATCH 05/11] add batch operation api --- model/alerting/alert.go | 4 +- plugin/api/alerting/api.go | 3 + plugin/api/alerting/message.go | 189 ++++++++++++++++++++ plugin/api/alerting/rule.go | 210 +++++++++++++++++++++-- plugin/api/data/export.go | 20 ++- plugin/api/data/model.go | 1 + service/alerting/elasticsearch/engine.go | 6 +- 7 files changed, 405 insertions(+), 28 deletions(-) diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 2b23e95f..8a989fed 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -22,7 +22,9 @@ type Alert struct { Title string `json:"title" elastic_mapping:"title: { type: keyword }"` Message string `json:"message" elastic_mapping:"context: { type: keyword, copy_to:search_text }"` AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` - ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` + ActionExecutionResults []ActionExecutionResult `json:"action_execution_results,omitempty"` + RecoverActionResults []ActionExecutionResult `json:"recover_action_results,omitempty"` + EscalationActionResults []ActionExecutionResult `json:"escalation_action_results,omitempty"` Users []string `json:"users,omitempty"` State string `json:"state"` Error string `json:"error,omitempty"` diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 995ccd00..eca3a292 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -27,6 +27,8 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.RequirePermission(alert.enableRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/metric", alert.RequirePermission(alert.getMetricData, enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/info", alert.RequirePermission(alert.getRuleDetail, enum.PermissionAlertRuleRead, enum.PermissionAlertMessageRead)) + api.HandleAPIMethod(api.POST, "/alerting/rule/_enable", alert.RequirePermission(alert.batchEnableRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.POST, "/alerting/rule/_disable", alert.RequirePermission(alert.batchDisableRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.RequirePermission(alert.getChannel, enum.PermissionAlertChannelRead)) api.HandleAPIMethod(api.POST, "/alerting/channel", alert.RequirePermission(alert.createChannel, enum.PermissionAlertChannelWrite)) @@ -43,6 +45,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.POST, "/alerting/message/_ignore", alert.RequirePermission(alert.ignoreAlertMessage, enum.PermissionAlertMessageWrite)) api.HandleAPIMethod(api.GET, "/alerting/message/_stats", alert.RequirePermission(alert.getAlertMessageStats, enum.PermissionAlertMessageRead)) api.HandleAPIMethod(api.GET, "/alerting/message/:message_id", alert.RequirePermission(alert.getAlertMessage, enum.PermissionAlertMessageRead)) + api.HandleAPIMethod(api.GET, "/alerting/message/:message_id/notification", alert.getMessageNotificationInfo) //just for test diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index b30368bb..c583b645 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -6,6 +6,7 @@ package alerting import ( "fmt" + "github.com/buger/jsonparser" log "github.com/cihub/seelog" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" @@ -391,4 +392,192 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps "hit_condition": hitCondition, } h.WriteJSON(w, detailObj, http.StatusOK) +} + +func (h *AlertAPI) getMessageNotificationInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + message := &alerting.AlertMessage{ + ID: ps.ByName("message_id"), + } + exists, err := orm.Get(message) + if !exists || err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "_id": message.ID, + "found": false, + }, http.StatusNotFound) + return + } + rule := &alerting.Rule{ + ID: message.RuleID, + } + exists, err = orm.Get(rule) + if !exists || err != nil { + log.Error(err) + h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError) + return + } + notificationInfo := util.MapStr{} + if rule.NotificationConfig == nil && rule.RecoveryNotificationConfig == nil { + notificationInfo["is_empty"] = true + h.WriteJSON(w, notificationInfo, http.StatusOK) + return + } + stats, err := getMessageNotificationStats(message) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if rule.NotificationConfig != nil { + notificationInfo["alerting"] = util.MapStr{ + "accept_time_range": rule.NotificationConfig.AcceptTimeRange, + "throttle_period": rule.NotificationConfig.ThrottlePeriod, + "escalation_enabled": rule.NotificationConfig.EscalationEnabled, + "escalation_throttle_period": rule.NotificationConfig.EscalationThrottlePeriod, + "normal_stats": stats["normal"], + "escalation_stats": stats["escalation"], + } + } + if rule.RecoveryNotificationConfig != nil { + notificationInfo["recovery"] = util.MapStr{ + "stats": stats["recovery"], + } + } + h.WriteJSON(w, notificationInfo, http.StatusOK) +} + +func getMessageNotificationStats(msg *alerting.AlertMessage )(util.MapStr, error){ + rangeQ := util.MapStr{ + "gte": msg.Created.UnixMilli(), + } + if msg.Status == alerting.MessageStateRecovered { + rangeQ["lte"] = msg.Updated.UnixMilli() + } + aggs := util.MapStr{ + "grp_normal_channel": util.MapStr{ + "terms": util.MapStr{ + "field": "action_execution_results.channel_type", + "size": 20, + }, + "aggs": util.MapStr{ + "top": util.MapStr{ + "top_hits": util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "_source": util.MapStr{ + "includes": []string{"created", "action_execution_results.channel_name"}, + }, + "size": 1, + }, + }, + }, + }, + "grp_escalation_channel": util.MapStr{ + "terms": util.MapStr{ + "field": "escalation_action_results.channel_type", + "size": 20, + }, + "aggs": util.MapStr{ + "top": util.MapStr{ + "top_hits": util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "_source": util.MapStr{ + "includes": []string{"created", "escalation_action_results.channel_name"}, + }, + "size": 1, + }, + }, + }, + }, + } + if msg.Status == alerting.MessageStateRecovered { + aggs["grp_recover_channel"] = util.MapStr{ + "terms": util.MapStr{ + "field": "recover_action_results.channel_type", + "size": 20, + }, + "aggs": util.MapStr{ + "top": util.MapStr{ + "top_hits": util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "_source": util.MapStr{ + "includes": []string{"created", "recover_action_results.channel_name"}, + }, + "size": 1, + }, + }, + }, + } + } + query := util.MapStr{ + "size": 0, + "aggs": aggs, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "range": util.MapStr{ + "created": rangeQ, + }, + }, + { + "term": util.MapStr{ + "rule_id": util.MapStr{ + "value": msg.RuleID, + }, + }, + }, + }, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(alerting.Alert{}, &q) + if err != nil { + return nil, err + } + + var normalStats = extractStatsFromRaw(result.Raw, "grp_normal_channel", "action_execution_results") + var escalationStats = extractStatsFromRaw(result.Raw, "grp_escalation_channel", "escalation_action_results") + stats := util.MapStr{ + "normal": normalStats, + "escalation": escalationStats, + } + if msg.Status == alerting.MessageStateRecovered { + recoverStats := extractStatsFromRaw(result.Raw, "grp_recover_channel", "recover_action_results") + stats["recovery"] = recoverStats + } + + return stats, nil +} +func extractStatsFromRaw(searchRawRes []byte, grpKey string, actionKey string) []util.MapStr { + var stats []util.MapStr + jsonparser.ArrayEach(searchRawRes, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { + statsItem := util.MapStr{} + statsItem["channel_type"], _ = jsonparser.GetString(value, "key") + statsItem["count"], _ = jsonparser.GetInt(value, "doc_count") + statsItem["channel_name"], _ = jsonparser.GetString(value, "top", "hits","hits", "[0]", "_source",actionKey, "[0]", "channel_name") + statsItem["last_time"], _ = jsonparser.GetString(value, "top", "hits","hits", "[0]", "_source","created") + stats = append(stats, statsItem) + }, "aggregations", grpKey, "buckets") + return stats } \ No newline at end of file diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index e9b18129..ea4745d0 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -592,9 +592,7 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) if err != nil { log.Error(err) - alertAPI.WriteJSON(w, util.MapStr{ - "error": err.Error(), - }, http.StatusInternalServerError) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) return } if len(searchRes.Hits.Hits) == 0 { @@ -609,6 +607,50 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque "status": hit.Source["state"], } } + } + queryDsl = util.MapStr{ + "_source": []string{"created", "rule_id"}, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "collapse": util.MapStr{ + "field": "rule_id", + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "rule_id": ruleIDs, + }, + }, + { + "term": util.MapStr{ + "state": util.MapStr{ + "value": alerting.AlertStateAlerting, + }, + }, + }, + }, + }, + }, + } + searchRes, err = esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for _, hit := range searchRes.Hits.Hits { + if ruleID, ok := hit.Source["rule_id"].(string); ok { + if _, ok = latestAlertInfos[ruleID]; ok { + latestAlertInfos[ruleID]["last_notification_time"] = hit.Source["created"] + } + } } alertAPI.WriteJSON(w, latestAlertInfos, http.StatusOK) @@ -636,20 +678,9 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p return } if reqObj.Enabled { - eng := alerting2.GetEngine(obj.Resource.Type) - ruleTask := task.ScheduleTask{ - ID: obj.ID, - Interval: obj.Schedule.Interval, - Description: obj.Metrics.Expression, - Task: eng.GenerateTask(obj), - } - task.DeleteTask(ruleTask.ID) - clearKV(ruleTask.ID) - task.RegisterScheduleTask(ruleTask) - task.StartTask(ruleTask.ID) - }else{ - task.DeleteTask(id) - clearKV(id) + enableRule(&obj) + } else { + disableRule(&obj) } obj.Enabled = reqObj.Enabled err = orm.Save(nil, obj) @@ -660,10 +691,28 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p } alertAPI.WriteJSON(w, util.MapStr{ "result": "updated", - "_id": id, + "_id": id, }, http.StatusOK) } +func enableRule(obj *alerting.Rule) { + eng := alerting2.GetEngine(obj.Resource.Type) + ruleTask := task.ScheduleTask{ + ID: obj.ID, + Interval: obj.Schedule.Interval, + Description: obj.Metrics.Expression, + Task: eng.GenerateTask(*obj), + } + task.DeleteTask(ruleTask.ID) + clearKV(ruleTask.ID) + task.RegisterScheduleTask(ruleTask) + task.StartTask(ruleTask.ID) +} +func disableRule(obj *alerting.Rule) { + task.DeleteTask(obj.ID) + clearKV(obj.ID) +} + func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { typ := alertAPI.GetParameterOrDefault(req, "type", "notification") rule := alerting.Rule{} @@ -917,4 +966,129 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) }) } return &metricItem,queryResult, nil +} + +func (alertAPI *AlertAPI) batchEnableRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + err := alertAPI.DecodeJSON(req, &ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + rules, err := getRulesByID(ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var newIDs []string + for _, rule := range rules { + if !rule.Enabled { + enableRule(&rule) + newIDs = append(newIDs, rule.ID) + } + } + if len(newIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": newIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['enabled'] = %v", true), + }, + } + err = orm.UpdateBy(alerting.Rule{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func (alertAPI *AlertAPI) batchDisableRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + err := alertAPI.DecodeJSON(req, &ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + rules, err := getRulesByID(ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + log.Info(rules) + var newIDs []string + for _, rule := range rules { + if rule.Enabled { + disableRule(&rule) + newIDs = append(newIDs, rule.ID) + } + } + if len(newIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": newIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['enabled'] = %v", false), + }, + } + log.Info(util.MustToJSON(q)) + err = orm.UpdateBy(alerting.Rule{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func getRulesByID(ruleIDs []string) ([]alerting.Rule, error){ + if len(ruleIDs) == 0 { + return nil, nil + } + query := util.MapStr{ + "size": len(ruleIDs), + "query": util.MapStr{ + "terms": util.MapStr{ + "id": ruleIDs, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(alerting.Rule{}, q) + if err != nil { + return nil, err + } + var rules []alerting.Rule + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + rule := alerting.Rule{} + util.MustFromJSONBytes(buf, &rule) + rules = append(rules, rule) + } + return rules, nil } \ No newline at end of file diff --git a/plugin/api/data/export.go b/plugin/api/data/export.go index dea5b5d0..36f415f8 100644 --- a/plugin/api/data/export.go +++ b/plugin/api/data/export.go @@ -25,7 +25,7 @@ func (h *DataAPI) exportData(w http.ResponseWriter, req *http.Request, ps httpro } var resBody []ExportData for _, meta := range reqBody.Metadatas { - result, err := getExportData(meta.Type) + result, err := getExportData(meta) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -86,9 +86,9 @@ func indexExportData(eds []ExportData) error { return nil } -func getExportData(typ string) (*orm.Result, error) { +func getExportData(meta ExportMetadata) (*orm.Result, error) { var obj interface{} - switch typ { + switch meta.Type { case DataTypeAlertChannel: obj = alerting.Channel{} case DataTypeAlertRule: @@ -96,11 +96,19 @@ func getExportData(typ string) (*orm.Result, error) { case DataTypeAlertEmailServer: obj = model.EmailServer{} default: - return nil, fmt.Errorf("unkonw data type: %s", typ) + return nil, fmt.Errorf("unkonw data type: %s", meta.Type) } - err, result := orm.Search(obj, &orm.Query{ + q := &orm.Query{ Size: 1000, - }) + } + if meta.Filter != nil { + query := util.MapStr{ + "size": 1000, + "query": meta.Filter, + } + q.RawQuery = util.MustToJSONBytes(query) + } + err, result := orm.Search(obj, q) if err != nil { return nil, err } diff --git a/plugin/api/data/model.go b/plugin/api/data/model.go index c87eae0f..1635285f 100644 --- a/plugin/api/data/model.go +++ b/plugin/api/data/model.go @@ -10,6 +10,7 @@ type ExportDataRequest struct { type ExportMetadata struct { Type string `json:"type"` + Filter interface{} `json:"filter,omitempty"` } type ExportData struct { diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 18348591..038baba0 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -699,7 +699,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return err } actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx, false) - alertItem.ActionExecutionResults = actionResults + alertItem.RecoverActionResults = actionResults } } return nil @@ -852,7 +852,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { actionResults, errCount := performChannels(notifyCfg.Escalation, paramsCtx, false) - alertItem.ActionExecutionResults = actionResults + alertItem.EscalationActionResults = actionResults //todo init last escalation time when create task (by last alert item is escalated) if errCount == 0 { rule.LastEscalationTime = time.Now() @@ -1063,7 +1063,7 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}, ra Error: errStr, Message: string(messageBytes), ExecutionTime: int(time.Now().UnixNano()/1e6), - ChannelType: channel.Type, + ChannelType: channel.SubType, ChannelName: channel.Name, }) } From 2a1df9743e358d3c8091803b6b2718ffec329cec Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 18 Aug 2023 17:51:18 +0800 Subject: [PATCH 06/11] add rule batch delete api --- plugin/api/alerting/api.go | 1 + plugin/api/alerting/rule.go | 67 ++++++++++++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index eca3a292..f09660d3 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -19,6 +19,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.POST, "/alerting/rule", alert.RequirePermission(alert.createRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.RequireLogin(alert.sendTestMessage)) api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.RequirePermission(alert.deleteRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.DELETE, "/alerting/rule", alert.RequirePermission(alert.batchDeleteRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.RequirePermission(alert.updateRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.RequirePermission(alert.searchRule, enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.GET, "/alerting/stats", alert.RequirePermission(alert.getAlertStats, enum.PermissionAlertHistoryRead)) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index ea4745d0..a9352c32 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -418,7 +418,9 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p delDsl := util.MapStr{ "query": util.MapStr{ "term": util.MapStr{ - "rule_id": id, + "rule_id": util.MapStr{ + "value": id, + }, }, }, } @@ -437,6 +439,69 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p }, 200) } +func (alertAPI *AlertAPI) batchDeleteRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ruleIDs = []string{} + err := alertAPI.DecodeJSON(req, &ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(ruleIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + rules, err := getRulesByID(ruleIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + var newIDs []string + for _, rule := range rules { + saveAlertActivity("alerting_rule_change", "delete", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },nil, &rule) + task.DeleteTask(rule.ID) + clearKV(rule.ID) + newIDs = append(newIDs, rule.ID) + } + if len(newIDs) > 0 { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": newIDs, + }, + }, + } + err = orm.DeleteBy(alerting.Rule{}, util.MustToJSONBytes(q)) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + delDsl := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "rule_id": newIDs, + }, + }, + } + err = orm.DeleteBy(alerting.AlertMessage{}, util.MustToJSONBytes(delDsl)) + if err != nil { + log.Error(err) + } + err = orm.DeleteBy(alerting.Alert{}, util.MustToJSONBytes(delDsl)) + if err != nil { + log.Error(err) + } + } + alertAPI.WriteAckOKJSON(w) +} + func (alertAPI *AlertAPI) searchRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( keyword = alertAPI.GetParameterOrDefault(req, "keyword", "") From 174126fa10be34f7a90d6e775714df266b4613ab Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 21 Aug 2023 18:03:23 +0800 Subject: [PATCH 07/11] fix wrong channel name --- plugin/api/alerting/message.go | 14 ++++++++++---- service/alerting/elasticsearch/engine.go | 15 ++++++++------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go index c583b645..3a8d790a 100644 --- a/plugin/api/alerting/message.go +++ b/plugin/api/alerting/message.go @@ -470,7 +470,7 @@ func getMessageNotificationStats(msg *alerting.AlertMessage )(util.MapStr, error }, }, "_source": util.MapStr{ - "includes": []string{"created", "action_execution_results.channel_name"}, + "includes": []string{"created", "action_execution_results.channel_name", "action_execution_results.channel_type"}, }, "size": 1, }, @@ -493,7 +493,7 @@ func getMessageNotificationStats(msg *alerting.AlertMessage )(util.MapStr, error }, }, "_source": util.MapStr{ - "includes": []string{"created", "escalation_action_results.channel_name"}, + "includes": []string{"created", "escalation_action_results.channel_name", "escalation_action_results.channel_type"}, }, "size": 1, }, @@ -518,7 +518,7 @@ func getMessageNotificationStats(msg *alerting.AlertMessage )(util.MapStr, error }, }, "_source": util.MapStr{ - "includes": []string{"created", "recover_action_results.channel_name"}, + "includes": []string{"created", "recover_action_results.channel_name", "recover_action_results.channel_type"}, }, "size": 1, }, @@ -575,7 +575,13 @@ func extractStatsFromRaw(searchRawRes []byte, grpKey string, actionKey string) [ statsItem := util.MapStr{} statsItem["channel_type"], _ = jsonparser.GetString(value, "key") statsItem["count"], _ = jsonparser.GetInt(value, "doc_count") - statsItem["channel_name"], _ = jsonparser.GetString(value, "top", "hits","hits", "[0]", "_source",actionKey, "[0]", "channel_name") + jsonparser.ArrayEach(value, func(v []byte, dataType jsonparser.ValueType, offset int, err error) { + ck, _ := jsonparser.GetString(v, "channel_type") + cn, _ := jsonparser.GetString(v, "channel_name") + if ck == statsItem["channel_type"] { + statsItem["channel_name"] = cn + } + }, "top", "hits","hits", "[0]", "_source",actionKey) statsItem["last_time"], _ = jsonparser.GetString(value, "top", "hits","hits", "[0]", "_source","created") stats = append(stats, statsItem) }, "aggregations", grpKey, "buckets") diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 038baba0..7a0c1239 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -822,6 +822,9 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if alertMessage == nil || period > periodDuration { actionResults, _ := performChannels(notifyCfg.Normal, paramsCtx, false) + if rule.ID == "builtin-calaqnh7h710dpnp2bm8" { + log.Info(actionResults) + } alertItem.ActionExecutionResults = actionResults //change and save last notification time in local kv store when action error count equals zero rule.LastNotificationTime = time.Now() @@ -851,15 +854,13 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { - actionResults, errCount := performChannels(notifyCfg.Escalation, paramsCtx, false) + actionResults, _ := performChannels(notifyCfg.Escalation, paramsCtx, false) alertItem.EscalationActionResults = actionResults //todo init last escalation time when create task (by last alert item is escalated) - if errCount == 0 { - rule.LastEscalationTime = time.Now() - alertItem.IsEscalated = true - strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339) - kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime)) - } + rule.LastEscalationTime = time.Now() + alertItem.IsEscalated = true + strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339) + kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime)) } } From 522a5361a283ad9fc50e88ca10a01e71fe21a2b8 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 22 Aug 2023 09:04:17 +0800 Subject: [PATCH 08/11] add reference channel id to alerting history --- model/alerting/alert.go | 1 + service/alerting/elasticsearch/engine.go | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 8a989fed..fc8bf8b1 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -42,6 +42,7 @@ type ActionExecutionResult struct { Message string `json:"message"` ChannelName string `json:"channel_name"` ChannelType string `json:"channel_type"` + ChannelID string `json:"channel_id"` } const ( diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 7a0c1239..3fdab54b 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -822,9 +822,6 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if alertMessage == nil || period > periodDuration { actionResults, _ := performChannels(notifyCfg.Normal, paramsCtx, false) - if rule.ID == "builtin-calaqnh7h710dpnp2bm8" { - log.Info(actionResults) - } alertItem.ActionExecutionResults = actionResults //change and save last notification time in local kv store when action error count equals zero rule.LastNotificationTime = time.Now() @@ -1066,6 +1063,7 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}, ra ExecutionTime: int(time.Now().UnixNano()/1e6), ChannelType: channel.SubType, ChannelName: channel.Name, + ChannelID: channel.ID, }) } return actionResults, errCount From e9ac60b217d10fc7899569f5a2d5b382bb280df2 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 22 Aug 2023 16:28:52 +0800 Subject: [PATCH 09/11] clear last notification time after alerting recovered --- service/alerting/elasticsearch/engine.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 3fdab54b..a6cfc647 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -700,6 +700,9 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx, false) alertItem.RecoverActionResults = actionResults + //clear history notification time + _ = kv.DeleteKey(alerting2.KVLastNotificationTime, []byte(rule.ID)) + _ = kv.DeleteKey(alerting2.KVLastEscalationTime, []byte(rule.ID)) } } return nil From f767ffa39a79b068b19345b38b0cc602451cef32 Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 25 Aug 2023 16:27:26 +0800 Subject: [PATCH 10/11] return notification info in rule detail api --- plugin/api/alerting/rule.go | 96 +++++++++++++++++++++++++++++++++---- 1 file changed, 87 insertions(+), 9 deletions(-) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index a9352c32..0382f2c0 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -190,7 +190,6 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request return } queryDSL := util.MapStr{ - "_source": "state", "size": 1, "sort": []util.MapStr{ { @@ -200,9 +199,22 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request }, }, "query": util.MapStr{ - "term": util.MapStr{ - "rule_id": util.MapStr{ - "value": obj.ID, + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "rule_id": util.MapStr{ + "value": obj.ID, + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": alerting.MessageStateAlerting, + }, + }, + }, }, }, }, @@ -211,18 +223,74 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request WildcardIndex: true, RawQuery: util.MustToJSONBytes(queryDSL), } - err, result := orm.Search(alerting.Alert{}, q) + err, result := orm.Search(alerting.AlertMessage{}, q) if err != nil { log.Error(err) - alertAPI.WriteJSON(w, util.MapStr{ - "error": err.Error(), - }, http.StatusInternalServerError) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) return } var state interface{} = "N/A" + var alertingMessageItem interface{} if len(result.Result) > 0 { + alertingMessageItem = result.Result[0] if resultM, ok := result.Result[0].(map[string]interface{}); ok { - state = resultM["state"] + state = resultM["status"] + } + } + var channelIDs []interface{} + if obj.NotificationConfig != nil { + for _, ch := range obj.NotificationConfig.Normal { + channelIDs = append(channelIDs, ch.ID) + } + for _, ch := range obj.NotificationConfig.Escalation { + channelIDs = append(channelIDs, ch.ID) + } + } + if obj.RecoveryNotificationConfig != nil { + for _, ch := range obj.RecoveryNotificationConfig.Normal { + channelIDs = append(channelIDs, ch.ID) + } + } + q = &orm.Query{ + Size: len(channelIDs), + } + q.Conds = append(q.Conds, orm.In("id", channelIDs)) + err, result = orm.Search(alerting.Channel{}, q) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + chm := map[string]alerting.Channel{} + for _, row := range result.Result { + buf := util.MustToJSONBytes(row) + ch := alerting.Channel{} + util.MustFromJSONBytes(buf, &ch) + chm[ch.ID] = ch + } + if obj.NotificationConfig != nil { + for i, ch := range obj.NotificationConfig.Normal { + if v, ok := chm[ch.ID]; ok { + obj.NotificationConfig.Normal[i].Enabled = v.Enabled && ch.Enabled + obj.NotificationConfig.Normal[i].Type = v.SubType + obj.NotificationConfig.Normal[i].Name = v.Name + } + } + for i, ch := range obj.NotificationConfig.Escalation { + if v, ok := chm[ch.ID]; ok { + obj.NotificationConfig.Escalation[i].Enabled = v.Enabled && ch.Enabled + obj.NotificationConfig.Escalation[i].Type = v.SubType + obj.NotificationConfig.Escalation[i].Name = v.Name + } + } + } + if obj.RecoveryNotificationConfig != nil { + for i, ch := range obj.RecoveryNotificationConfig.Normal { + if v, ok := chm[ch.ID]; ok { + obj.RecoveryNotificationConfig.Normal[i].Enabled = v.Enabled && ch.Enabled + obj.RecoveryNotificationConfig.Normal[i].Type = v.SubType + obj.RecoveryNotificationConfig.Normal[i].Name = v.Name + } } } @@ -231,12 +299,22 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request "resource_name": obj.Resource.Name, "resource_id": obj.Resource.ID, "resource_objects": obj.Resource.Objects, + "resource_time_field": obj.Resource.TimeField, + "resource_raw_filter": obj.Resource.RawFilter, + "metrics": obj.Metrics, "bucket_size": obj.Metrics.BucketSize, //统计周期 "updated": obj.Updated, "conditions": obj.Conditions, "message_count": alertNumbers[obj.ID], //所有关联告警消息数(包括已恢复的) "state": state, "enabled": obj.Enabled, + "created": obj.Created, + "creator": obj.Creator, + "tags": obj.Tags, + "alerting_message": alertingMessageItem, + "expression": obj.Metrics.Expression, + "notification_config": obj.NotificationConfig, + "recovery_notification_config": obj.RecoveryNotificationConfig, } alertAPI.WriteJSON(w, detailObj, 200) From 6bbce21258239709d3698d25a4f398a02e296af9 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 28 Aug 2023 16:20:42 +0800 Subject: [PATCH 11/11] add batch enable channel api --- plugin/api/alerting/api.go | 3 ++ plugin/api/alerting/channel.go | 61 ++++++++++++++++++++++++++++++++++ plugin/api/alerting/rule.go | 56 ++++++++++++++++++++++++++++++- 3 files changed, 119 insertions(+), 1 deletion(-) diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index f09660d3..87d80eda 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -30,6 +30,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/info", alert.RequirePermission(alert.getRuleDetail, enum.PermissionAlertRuleRead, enum.PermissionAlertMessageRead)) api.HandleAPIMethod(api.POST, "/alerting/rule/_enable", alert.RequirePermission(alert.batchEnableRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.POST, "/alerting/rule/_disable", alert.RequirePermission(alert.batchDisableRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.GET, "/alerting/rule/_search_values", alert.RequirePermission(alert.searchFieldValues, enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.RequirePermission(alert.getChannel, enum.PermissionAlertChannelRead)) api.HandleAPIMethod(api.POST, "/alerting/channel", alert.RequirePermission(alert.createChannel, enum.PermissionAlertChannelWrite)) @@ -37,6 +38,8 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.PUT, "/alerting/channel/:channel_id", alert.RequirePermission(alert.updateChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.GET, "/alerting/channel/_search", alert.RequirePermission(alert.searchChannel, enum.PermissionAlertChannelRead)) api.HandleAPIMethod(api.POST, "/alerting/channel/test", alert.RequirePermission(alert.testChannel, enum.PermissionAlertChannelWrite)) + api.HandleAPIMethod(api.POST, "/alerting/channel/_enable", alert.RequirePermission(alert.batchEnableChannel, enum.PermissionAlertChannelWrite)) + api.HandleAPIMethod(api.POST, "/alerting/channel/_disable", alert.RequirePermission(alert.batchDisableChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.RequirePermission(alert.searchAlert, enum.PermissionAlertHistoryRead)) api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.RequirePermission(alert.getAlert, enum.PermissionAlertHistoryRead)) diff --git a/plugin/api/alerting/channel.go b/plugin/api/alerting/channel.go index 6999098b..fb1de867 100644 --- a/plugin/api/alerting/channel.go +++ b/plugin/api/alerting/channel.go @@ -315,4 +315,65 @@ func (h *AlertAPI) testChannel(w http.ResponseWriter, req *http.Request, ps http return } h.WriteAckOKJSON(w) +} + +func (alertAPI *AlertAPI) batchEnableChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var channelIDs = []string{} + err := alertAPI.DecodeJSON(req, &channelIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(channelIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + if len(channelIDs) > 0 { + err = setChannelEnabled(true, channelIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func (alertAPI *AlertAPI) batchDisableChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var channelIDs = []string{} + err := alertAPI.DecodeJSON(req, &channelIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(channelIDs) == 0 { + alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + if len(channelIDs) > 0 { + err = setChannelEnabled(false, channelIDs) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + } + alertAPI.WriteAckOKJSON(w) +} + +func setChannelEnabled(enabled bool, channelIDs []string) error { + q := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": channelIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['enabled'] = %v", enabled), + }, + } + err := orm.UpdateBy(alerting.Channel{}, util.MustToJSONBytes(q)) + return err } \ No newline at end of file diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 0382f2c0..ba206702 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -1177,7 +1177,6 @@ func (alertAPI *AlertAPI) batchDisableRule(w http.ResponseWriter, req *http.Requ alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) return } - log.Info(rules) var newIDs []string for _, rule := range rules { if rule.Enabled { @@ -1207,6 +1206,61 @@ func (alertAPI *AlertAPI) batchDisableRule(w http.ResponseWriter, req *http.Requ alertAPI.WriteAckOKJSON(w) } +func (alertAPI *AlertAPI) searchFieldValues(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var keyword = alertAPI.GetParameterOrDefault(req, "keyword", "") + var field = alertAPI.GetParameterOrDefault(req, "field", "category") + items , err := searchListItems(field, keyword, 20) + if err != nil { + log.Error(err) + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + alertAPI.WriteJSON(w, items, http.StatusOK) +} + +func searchListItems(field, keyword string, size int) ([]string, error){ + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "items": util.MapStr{ + "terms": util.MapStr{ + "field": field, + "size": size, + }, + }, + }, + } + if v := strings.TrimSpace(keyword); v != ""{ + query["query"]= util.MapStr{ + "query_string": util.MapStr{ + "default_field": field, + "query": fmt.Sprintf("*%s*", v), + }, + } + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(alerting.Rule{}, &q) + if err != nil { + return nil, err + } + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + return nil, err + } + items := []string{} + for _, bk := range searchRes.Aggregations["items"].Buckets { + if v, ok := bk["key"].(string); ok { + if strings.Contains(v, keyword){ + items = append(items, v) + } + } + } + return items, nil +} + func getRulesByID(ruleIDs []string) ([]alerting.Rule, error){ if len(ruleIDs) == 0 { return nil, nil