From 0f4bb211d525a57b18eb53a408cb462dc398bd14 Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 6 Jul 2023 14:20:08 +0800 Subject: [PATCH] add alert recovery notification --- model/alerting/destination.go | 2 + model/alerting/metric.go | 6 +- model/alerting/rule.go | 33 +++++++++-- model/alerting/rule_test.go | 2 +- model/alerting/webhook.go | 11 ++++ service/alerting/action/email.go | 34 +++++++++++ service/alerting/elasticsearch/engine.go | 59 +++++++++++++++---- service/alerting/elasticsearch/engine_test.go | 6 +- 8 files changed, 129 insertions(+), 24 deletions(-) create mode 100644 service/alerting/action/email.go diff --git a/model/alerting/destination.go b/model/alerting/destination.go index 1f3e321c..25d6b3ea 100644 --- a/model/alerting/destination.go +++ b/model/alerting/destination.go @@ -13,6 +13,8 @@ type Channel struct { Priority int `json:"priority,omitempty"` Webhook *CustomWebhook `json:"webhook,omitempty" elastic_mapping:"webhook:{type:object}"` SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` + SubType string `json:"sub_type" elastic_mapping:"sub_type:{type:keyword,copy_to:search_text}"` + Email *Email `json:"email,omitempty" elastic_mapping:"email:{type:object}"` } diff --git a/model/alerting/metric.go b/model/alerting/metric.go index d9737677..018922d5 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -13,9 +13,9 @@ import ( type Metric struct { insight.Metric - Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 - Title string `json:"title"` //text template - Message string `json:"message"` // text template + Title string `json:"title,omitempty"` //text template + Message string `json:"message,omitempty"` // text template + Expression string `json:"expression,omitempty" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 } diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 119f2b79..43cdc3bb 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -17,9 +17,11 @@ type Rule struct { Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:keyword}"` Resource Resource `json:"resource" elastic_mapping:"resource:{type:object}"` Metrics Metric `json:"metrics" elastic_mapping:"metrics:{type:object}"` - Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` - Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"` - Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` + Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` + Channels NotificationConfig `json:"channels,omitempty" elastic_mapping:"channels:{type:object}"` + NotificationConfig *NotificationConfig `json:"notification_config,omitempty" elastic_mapping:"notification_config:{type:object}"` + RecoveryNotificationConfig *RecoveryNotificationConfig `json:"recovery_notification_config,omitempty" elastic_mapping:"recovery_notification_config:{type:object}"` + Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` LastNotificationTime time.Time `json:"-" elastic_mapping:"last_notification_time:{type:date}"` LastTermStartTime time.Time `json:"-"` //标识最近一轮告警的开始时间 LastEscalationTime time.Time `json:"-"` //标识最近一次告警升级发送通知的时间 @@ -54,9 +56,24 @@ func (rule *Rule) GetOrInitExpression() (string, error){ rule.Expression = strings.ReplaceAll(sb.String(), "result", metricExp) return rule.Expression, nil } +//GetNotificationConfig for adapter old version config +func (rule *Rule) GetNotificationConfig() *NotificationConfig { + if rule.NotificationConfig != nil { + return rule.NotificationConfig + } + return &rule.Channels +} +func (rule *Rule) GetNotificationTitleAndMessage() (string, string) { + if rule.NotificationConfig != nil { + return rule.NotificationConfig.Title, rule.NotificationConfig.Message + } + return rule.Metrics.Title, rule.Metrics.Message +} -type RuleChannel struct { +type NotificationConfig struct { Enabled bool `json:"enabled"` + Title string `json:"title,omitempty"` //text template + Message string `json:"message,omitempty"` // text template Normal []Channel `json:"normal,omitempty"` Escalation []Channel `json:"escalation,omitempty"` ThrottlePeriod string `json:"throttle_period,omitempty"` //沉默周期 @@ -65,6 +82,14 @@ type RuleChannel struct { EscalationEnabled bool `json:"escalation_enabled,omitempty"` } +type RecoveryNotificationConfig struct { + Enabled bool `json:"enabled"` + Title string `json:"title"` //text template + Message string `json:"message"` // text template + AcceptTimeRange TimeRange `json:"accept_time_range,omitempty"` + Channels []Channel `json:"channels,omitempty"` +} + type MessageTemplate struct{ Type string `json:"type"` Source string `json:"source"` diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go index 968b22c5..a59c008f 100644 --- a/model/alerting/rule_test.go +++ b/model/alerting/rule_test.go @@ -75,7 +75,7 @@ func TestCreateRule( t *testing.T) { }, }, - Channels: RuleChannel{ + Channels: NotificationConfig{ Normal: []Channel{ {Name: "钉钉", Type: ChannelWebhook, Webhook: &CustomWebhook{ HeaderParams: map[string]string{ diff --git a/model/alerting/webhook.go b/model/alerting/webhook.go index 70117faf..b1b5e2df 100644 --- a/model/alerting/webhook.go +++ b/model/alerting/webhook.go @@ -10,3 +10,14 @@ type CustomWebhook struct { URL string `json:"url,omitempty"` Body string `json:"body" elastic_mapping:"body:{type:text}"` } + +type Email struct { + ServerID string `json:"server_id"` + Recipients struct { + To []string `json:"to" elastic_mapping:"to:{type:keyword}"` + CC []string `json:"cc" elastic_mapping:"cc:{type:keyword}"` + BCC []string `json:"bcc" elastic_mapping:"bcc:{type:keyword}"` + } `json:"recipients" elastic_mapping:"recipients:{type:object}"` + Subject string `json:"subject" elastic_mapping:"subject:{type:text}"` + Body string `json:"body" elastic_mapping:"body:{type:text}"` +} \ No newline at end of file diff --git a/service/alerting/action/email.go b/service/alerting/action/email.go new file mode 100644 index 00000000..f60e0012 --- /dev/null +++ b/service/alerting/action/email.go @@ -0,0 +1,34 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package action + +import ( + "infini.sh/console/model/alerting" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/util" +) + +type EmailAction struct { + Data *alerting.Email + Subject string + Body string +} + +const EmailQueueName = "alert_email_messages" + +func (act *EmailAction) Execute()([]byte, error){ + queueCfg := queue.GetOrInitConfig(EmailQueueName) + emailMsg := util.MapStr{ + "email": act.Data.Recipients.To, + "template": "raw", + "variables": util.MapStr{ + "subject": act.Subject, + "body": act.Body, + }, + } + emailMsgBytes := util.MustToJSONBytes(emailMsg) + err := queue.Push(queueCfg, emailMsgBytes) + return nil, err +} \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 2666938b..cf463a5f 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -679,6 +679,22 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if err != nil { return fmt.Errorf("save alert message error: %w", err) } + // send recover message to channel + recoverCfg := rule.RecoveryNotificationConfig + if recoverCfg != nil && recoverCfg.Enabled { + if recoverCfg.AcceptTimeRange.Include(time.Now()) { + paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{ + alerting2.ParamEventID: alertItem.ID, + alerting2.ParamTimestamp: alertItem.Created.Unix(), + }) + err = attachTitleMessageToCtx(recoverCfg.Title, recoverCfg.Message, paramsCtx) + if err != nil { + return err + } + actionResults, _ := performChannels(recoverCfg.Channels, paramsCtx) + alertItem.ActionExecutionResults = actionResults + } + } } return nil } @@ -698,7 +714,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { }) alertItem.Priority = priority - err = attachTitleMessageToCtx(rule, paramsCtx) + title, message := rule.GetNotificationTitleAndMessage() + err = attachTitleMessageToCtx(title, message, paramsCtx) if err != nil { return err } @@ -707,7 +724,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { msg := &alerting.AlertMessage{ RuleID: rule.ID, - Created: time.Now(), + Created: alertItem.Created, Updated: time.Now(), ID: util.GetUUID(), ResourceID: rule.Resource.ID, @@ -756,12 +773,13 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } // if channel is not enabled return - if !rule.Channels.Enabled { + notifyCfg := rule.GetNotificationConfig() + if notifyCfg == nil || !notifyCfg.Enabled { return nil } - if rule.Channels.AcceptTimeRange.Include(time.Now()) { - periodDuration, err := time.ParseDuration(rule.Channels.ThrottlePeriod) + if notifyCfg.AcceptTimeRange.Include(time.Now()) { + periodDuration, err := time.ParseDuration(notifyCfg.ThrottlePeriod) if err != nil { alertItem.Error = err.Error() return err @@ -787,7 +805,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } if alertMessage == nil || period > periodDuration { - actionResults, errCount := performChannels(rule.Channels.Normal, paramsCtx) + actionResults, errCount := performChannels(notifyCfg.Normal, paramsCtx) alertItem.ActionExecutionResults = actionResults //change and save last notification time in local kv store when action error count equals zero if errCount == 0 { @@ -798,8 +816,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } - if rule.Channels.EscalationEnabled { - throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod) + if notifyCfg.EscalationEnabled { + throttlePeriod, err := time.ParseDuration(notifyCfg.EscalationThrottlePeriod) if err != nil { return err } @@ -819,7 +837,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { - actionResults, errCount := performChannels(rule.Channels.Escalation, paramsCtx) + actionResults, errCount := performChannels(notifyCfg.Escalation, paramsCtx) alertItem.ActionExecutionResults = actionResults //todo init last escalation time when create task (by last alert item is escalated) if errCount == 0 { @@ -836,17 +854,17 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } -func attachTitleMessageToCtx(rule *alerting.Rule, paramsCtx map[string]interface{}) error{ +func attachTitleMessageToCtx(title, message string, paramsCtx map[string]interface{}) error{ var ( tplBytes []byte err error ) - tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) + tplBytes, err = resolveMessage(message, paramsCtx) if err != nil { return fmt.Errorf("resolve message template error: %w", err) } paramsCtx[alerting2.ParamMessage] = string(tplBytes) - tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx) + tplBytes, err = resolveMessage(title, paramsCtx) if err != nil { return fmt.Errorf("resolve title template error: %w", err) } @@ -927,7 +945,8 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul alerting2.ParamEventID: util.GetUUID(), alerting2.ParamTimestamp: time.Now().Unix(), } ) - err = attachTitleMessageToCtx(rule, paramsCtx) + title, message := rule.GetNotificationTitleAndMessage() + err = attachTitleMessageToCtx(title, message, paramsCtx) if err != nil { return nil, err } @@ -997,6 +1016,20 @@ func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]by Data: &wh, Message: string(message), } + case alerting.ChannelEmail: + message, err = resolveMessage(channel.Email.Body, ctx) + if err != nil { + return nil, err, message + } + subjectBytes, err := resolveMessage(channel.Email.Subject, ctx) + if err != nil { + return nil, err, nil + } + act = &action.EmailAction{ + Data: channel.Email, + Subject: string(subjectBytes), + Body: string(message), + } default: return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message } diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 31620061..31411f97 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -70,7 +70,7 @@ func TestEngine( t *testing.T) { }, }, - Channels: alerting.RuleChannel{ + Channels: alerting.NotificationConfig{ Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ @@ -156,7 +156,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { // }, // }, // - // Channels: alerting.RuleChannel{ + // Channels: alerting.NotificationConfig{ // Normal: []alerting.Channel{ // {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ // HeaderParams: map[string]string{ @@ -216,7 +216,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { }, }, - Channels: alerting.RuleChannel{ + Channels: alerting.NotificationConfig{ Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{