diff --git a/main.go b/main.go index 822e6bf6..9f47c0e7 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" _ "expvar" + "infini.sh/console/plugin/api/email" _ "time/tzdata" log "github.com/cihub/seelog" @@ -135,6 +136,7 @@ func main() { orm.RegisterSchemaWithIndexName(task1.Task{}, "task") orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") + orm.RegisterSchemaWithIndexName(model.EmailServer{}, "email-server") api.RegisterSchema() if global.Env().SetupRequired() { @@ -150,6 +152,13 @@ func main() { } return err }) + task1.RunWithinGroup("initialize_email_server", func(ctx context.Context) error { + err := email.InitEmailServer() + if err != nil { + log.Errorf("init email server error: %v", err) + } + return err + }) } if !global.Env().SetupRequired() { diff --git a/model/alerting/destination.go b/model/alerting/destination.go index 1f3e321c..7b4a9f74 100644 --- a/model/alerting/destination.go +++ b/model/alerting/destination.go @@ -4,7 +4,9 @@ package alerting -import "infini.sh/framework/core/orm" +import ( + "infini.sh/framework/core/orm" +) type Channel struct { orm.ORMObjectBase @@ -13,6 +15,9 @@ type Channel struct { Priority int `json:"priority,omitempty"` Webhook *CustomWebhook `json:"webhook,omitempty" elastic_mapping:"webhook:{type:object}"` SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` + SubType string `json:"sub_type" elastic_mapping:"sub_type:{type:keyword,copy_to:search_text}"` + Email *Email `json:"email,omitempty" elastic_mapping:"email:{type:object}"` + Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"` } diff --git a/model/alerting/metric.go b/model/alerting/metric.go index d9737677..018922d5 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -13,9 +13,9 @@ import ( type Metric struct { insight.Metric - Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 - Title string `json:"title"` //text template - Message string `json:"message"` // text template + Title string `json:"title,omitempty"` //text template + Message string `json:"message,omitempty"` // text template + Expression string `json:"expression,omitempty" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 } diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 119f2b79..e930d64f 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -17,9 +17,11 @@ type Rule struct { Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:keyword}"` Resource Resource `json:"resource" elastic_mapping:"resource:{type:object}"` Metrics Metric `json:"metrics" elastic_mapping:"metrics:{type:object}"` - Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` - Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"` - Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` + Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` + Channels *NotificationConfig `json:"channels,omitempty" elastic_mapping:"channels:{type:object}"` + NotificationConfig *NotificationConfig `json:"notification_config,omitempty" elastic_mapping:"notification_config:{type:object}"` + RecoveryNotificationConfig *RecoveryNotificationConfig `json:"recovery_notification_config,omitempty" elastic_mapping:"recovery_notification_config:{type:object}"` + Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` LastNotificationTime time.Time `json:"-" elastic_mapping:"last_notification_time:{type:date}"` LastTermStartTime time.Time `json:"-"` //标识最近一轮告警的开始时间 LastEscalationTime time.Time `json:"-"` //标识最近一次告警升级发送通知的时间 @@ -54,9 +56,24 @@ func (rule *Rule) GetOrInitExpression() (string, error){ rule.Expression = strings.ReplaceAll(sb.String(), "result", metricExp) return rule.Expression, nil } +//GetNotificationConfig for adapter old version config +func (rule *Rule) GetNotificationConfig() *NotificationConfig { + if rule.NotificationConfig != nil { + return rule.NotificationConfig + } + return rule.Channels +} +func (rule *Rule) GetNotificationTitleAndMessage() (string, string) { + if rule.NotificationConfig != nil { + return rule.NotificationConfig.Title, rule.NotificationConfig.Message + } + return rule.Metrics.Title, rule.Metrics.Message +} -type RuleChannel struct { +type NotificationConfig struct { Enabled bool `json:"enabled"` + Title string `json:"title,omitempty"` //text template + Message string `json:"message,omitempty"` // text template Normal []Channel `json:"normal,omitempty"` Escalation []Channel `json:"escalation,omitempty"` ThrottlePeriod string `json:"throttle_period,omitempty"` //沉默周期 @@ -65,6 +82,15 @@ type RuleChannel struct { EscalationEnabled bool `json:"escalation_enabled,omitempty"` } +type RecoveryNotificationConfig struct { + Enabled bool `json:"enabled"` // channel enabled + Title string `json:"title"` //text template + Message string `json:"message"` // text template + AcceptTimeRange TimeRange `json:"accept_time_range,omitempty"` + Normal []Channel `json:"normal,omitempty"` + EventEnabled bool `json:"event_enabled"` +} + type MessageTemplate struct{ Type string `json:"type"` Source string `json:"source"` diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go index 968b22c5..0a6f2ea1 100644 --- a/model/alerting/rule_test.go +++ b/model/alerting/rule_test.go @@ -6,6 +6,7 @@ package alerting import ( "fmt" + "infini.sh/console/model/insight" "infini.sh/framework/core/util" "net/http" "testing" @@ -60,22 +61,25 @@ func TestCreateRule( t *testing.T) { //}, Metrics: Metric{ - PeriodInterval: "1m", - Items: []MetricItem{ - {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, - {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + Metric: insight.Metric{ + Groups: []insight.MetricGroupItem{{"metadata.labels.cluster_id", 10}, {"metadata.labels.node_id", 10}}, + Items: []insight.MetricItem{ + {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min" }, + {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max"}, + }, + BucketSize: "1m", + Formula: "a/b*100", }, - Formula: "a/b*100", //Expression: "min(fs.free_in_bytes)/max(fs.total_in_bytes)*100", }, Conditions: Condition{ Operator: "any", Items: []ConditionItem{ - {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"76"}, Priority: "error", Message: "磁盘可用率小于10%"}, + {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"76"}, Priority: "error"}, }, }, - Channels: RuleChannel{ + Channels: NotificationConfig{ Normal: []Channel{ {Name: "钉钉", Type: ChannelWebhook, Webhook: &CustomWebhook{ HeaderParams: map[string]string{ diff --git a/model/alerting/webhook.go b/model/alerting/webhook.go index 70117faf..a0ddc046 100644 --- a/model/alerting/webhook.go +++ b/model/alerting/webhook.go @@ -10,3 +10,14 @@ type CustomWebhook struct { URL string `json:"url,omitempty"` Body string `json:"body" elastic_mapping:"body:{type:text}"` } + +type Email struct { + ServerID string `json:"server_id" elastic_mapping:"server_id:{type:keyword}"` + Recipients struct { + To []string `json:"to,omitempty" elastic_mapping:"to:{type:keyword}"` + CC []string `json:"cc,omitempty" elastic_mapping:"cc:{type:keyword}"` + BCC []string `json:"bcc,omitempty" elastic_mapping:"bcc:{type:keyword}"` + } `json:"recipients" elastic_mapping:"recipients:{type:object}"` + Subject string `json:"subject" elastic_mapping:"subject:{type:text}"` + Body string `json:"body" elastic_mapping:"body:{type:text}"` +} \ No newline at end of file diff --git a/model/email_server.go b/model/email_server.go new file mode 100644 index 00000000..6c15545d --- /dev/null +++ b/model/email_server.go @@ -0,0 +1,35 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package model + +import ( + "fmt" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" +) + +type EmailServer struct { + orm.ORMObjectBase + Name string `json:"name" elastic_mapping:"name:{type:text}"` + Host string `json:"host" elastic_mapping:"host:{type:keyword}"` + Port int `json:"port" elastic_mapping:"port:{type:keyword}"` + TLS bool `json:"tls" elastic_mapping:"tls:{type:keyword}"` + Auth *elastic.BasicAuth `json:"auth" elastic_mapping:"auth:{type:object}"` + Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"` + CredentialID string `json:"credential_id" elastic_mapping:"credential_id:{type:keyword}"` +} + +func (serv *EmailServer) Validate(requireName bool) error { + if serv.Host == "" { + return fmt.Errorf("host can not be empty") + } + if serv.Port <= 0 { + return fmt.Errorf("invalid port [%d]", serv.Port) + } + if requireName && serv.Name == "" { + return fmt.Errorf("name can not be empty") + } + return nil +} \ No newline at end of file diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 6ce66dea..995ccd00 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -33,6 +33,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.DELETE, "/alerting/channel", alert.RequirePermission(alert.deleteChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.PUT, "/alerting/channel/:channel_id", alert.RequirePermission(alert.updateChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.GET, "/alerting/channel/_search", alert.RequirePermission(alert.searchChannel, enum.PermissionAlertChannelRead)) + api.HandleAPIMethod(api.POST, "/alerting/channel/test", alert.RequirePermission(alert.testChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.RequirePermission(alert.searchAlert, enum.PermissionAlertHistoryRead)) api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.RequirePermission(alert.getAlert, enum.PermissionAlertHistoryRead)) diff --git a/plugin/api/alerting/channel.go b/plugin/api/alerting/channel.go index a7a3dc6d..792c44c2 100644 --- a/plugin/api/alerting/channel.go +++ b/plugin/api/alerting/channel.go @@ -6,9 +6,11 @@ package alerting import ( "fmt" + "infini.sh/console/service/alerting/common" "net/http" "strconv" "strings" + "time" log "github.com/cihub/seelog" "infini.sh/console/model/alerting" @@ -33,10 +35,7 @@ func (h *AlertAPI) createChannel(w http.ResponseWriter, req *http.Request, ps ht return } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "created", - }, 200) + h.WriteCreatedOKJSON(w, obj.ID) } @@ -60,11 +59,7 @@ func (h *AlertAPI) getChannel(w http.ResponseWriter, req *http.Request, ps httpr return } - h.WriteJSON(w, util.MapStr{ - "found": true, - "_id": id, - "_source": obj, - }, 200) + h.WriteGetOKJSON(w, id, obj) } func (h *AlertAPI) updateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -101,10 +96,7 @@ func (h *AlertAPI) updateChannel(w http.ResponseWriter, req *http.Request, ps ht return } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "updated", - }, 200) + h.WriteUpdatedOKJSON(w, id) } func (h *AlertAPI) deleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -179,3 +171,43 @@ func (h *AlertAPI) searchChannel(w http.ResponseWriter, req *http.Request, ps ht } h.Write(w, res.Raw) } + +func (h *AlertAPI) testChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + obj := alerting.Channel{} + err := h.DecodeJSON(req, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + ctx := map[string]interface{}{ + "title": "test title", + "message": "test message", + "rule_id": util.GetUUID(), + "rule_name": "test rule", + "resource_id": util.GetUUID(), + "resource_name": "test resource", + "event_id": util.GetUUID(), + "timestamp": time.Now().UnixMilli(), + "first_group_value": "first group value", + "first_threshold": "90", + "priority": "critical", + "results": []util.MapStr{ + {"threshold": "90", + "priority": "critical", + "group_values": []string{"first group value", "second group value"}, + "issue_timestamp": time.Now().UnixMilli()-500, + "result_value": 90, + "relation_values": util.MapStr{"a": 100, "b": 90}, + }, + }, + + } + _, err, _ = common.PerformChannel(&obj, ctx) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + h.WriteAckOKJSON(w) +} \ No newline at end of file diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 6afa8d15..db302f1d 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -135,6 +135,20 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) return } + // adapter version smaller than 1.6.0 + if obj.Channels != nil && obj.NotificationConfig == nil { + obj.NotificationConfig = obj.Channels + for i := range obj.NotificationConfig.Normal { + obj.NotificationConfig.Normal[i].Enabled = true + } + for i := range obj.NotificationConfig.Escalation { + obj.NotificationConfig.Escalation[i].Enabled = true + } + } + if obj.NotificationConfig != nil && obj.NotificationConfig.Message == "" && obj.Metrics.Message != "" { + obj.NotificationConfig.Message = obj.Metrics.Message + obj.NotificationConfig.Title = obj.Metrics.Title + } alertAPI.WriteJSON(w, util.MapStr{ "found": true, diff --git a/plugin/api/email/api.go b/plugin/api/email/api.go new file mode 100644 index 00000000..809de227 --- /dev/null +++ b/plugin/api/email/api.go @@ -0,0 +1,30 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package email + +import ( + "infini.sh/console/plugin/api/email/common" + "infini.sh/framework/core/api" +) + +type EmailAPI struct { + api.Handler +} +func InitAPI() { + email := EmailAPI{} + api.HandleAPIMethod(api.POST, "/email/server/_test", email.testEmailServer) + api.HandleAPIMethod(api.GET, "/email/server/:email_server_id", email.getEmailServer) + api.HandleAPIMethod(api.POST, "/email/server", email.createEmailServer) + api.HandleAPIMethod(api.PUT, "/email/server/:email_server_id", email.updateEmailServer) + api.HandleAPIMethod(api.DELETE, "/email/server/:email_server_id", email.deleteEmailServer) + api.HandleAPIMethod(api.GET, "/email/server/_search", email.searchEmailServer) +} + +func InitEmailServer() error { + if !common.CheckEmailPipelineExists() { + return common.RefreshEmailServer() + } + return nil +} \ No newline at end of file diff --git a/plugin/api/email/common/auth.go b/plugin/api/email/common/auth.go new file mode 100644 index 00000000..a5c78301 --- /dev/null +++ b/plugin/api/email/common/auth.go @@ -0,0 +1,36 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "infini.sh/console/model" + "infini.sh/framework/core/credential" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" +) + +func GetBasicAuth(srv *model.EmailServer) (basicAuth elastic.BasicAuth, err error) { + if srv.Auth != nil && srv.Auth.Username != "" { + basicAuth = *srv.Auth + return + } + if srv.CredentialID != "" { + cred := credential.Credential{} + cred.ID = srv.CredentialID + _, err = orm.Get(&cred) + if err != nil { + return + } + var dv interface{} + dv, err = cred.Decode() + if err != nil { + return + } + if auth, ok := dv.(elastic.BasicAuth); ok { + basicAuth = auth + } + } + return +} diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go new file mode 100644 index 00000000..a756b9eb --- /dev/null +++ b/plugin/api/email/common/pipeline.go @@ -0,0 +1,134 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "infini.sh/console/model" + "infini.sh/framework/core/global" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "os" + "path" + "gopkg.in/yaml.v2" +) + +const emailServerConfigFile = "send_email.yml" +func RefreshEmailServer() error { + q := orm.Query{ + Size: 10, + } + q.Conds = orm.And(orm.Eq("enabled", true)) + err, result := orm.Search(model.EmailServer{}, &q ) + if err != nil { + return err + } + if len(result.Result) == 0 { + return StopEmailServer() + } + servers := make([]model.EmailServer,0, len(result.Result)) + for _, row := range result.Result { + emailServer := model.EmailServer{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &emailServer) + err = emailServer.Validate(false) + if err != nil { + return err + } + auth, err := GetBasicAuth(&emailServer) + if err != nil { + return err + } + emailServer.Auth = &auth + servers = append(servers, emailServer) + } + pipeCfgStr := GeneratePipelineConfig(servers) + cfgDir := global.Env().GetConfigDir() + sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) + _, err = util.FilePutContent(sendEmailCfgFile, pipeCfgStr) + return err +} + +func StopEmailServer() error { + cfgDir := global.Env().GetConfigDir() + sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) + if util.FilesExists(sendEmailCfgFile) { + return os.RemoveAll(sendEmailCfgFile) + } + return nil +} + +func CheckEmailPipelineExists() bool { + cfgDir := global.Env().GetConfigDir() + sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) + return util.FilesExists(sendEmailCfgFile) +} + + +func GeneratePipelineConfig(servers []model.EmailServer) string { + if len(servers) == 0 { + return "" + } + smtpServers := map[string]util.MapStr{} + for _, srv := range servers { + smtpServers[srv.ID] = util.MapStr{ + "server": util.MapStr{ + "host": srv.Host, + "port": srv.Port, + "tls": srv.TLS, + }, + "auth": util.MapStr{ + "username": srv.Auth.Username, + "password": srv.Auth.Password, + }, + } + } + + pipelineCfg := util.MapStr{ + "pipeline": []util.MapStr{ + { + "name": "send_email_service", + "auto_start": true, + "keep_running": true, + "retry_delay_in_ms": 5000, + "processor": []util.MapStr{ + { + "consumer": util.MapStr{ + "consumer": util.MapStr{ + "fetch_max_messages": 1, + }, + "max_worker_size": 200, + "num_of_slices": 1, + "idle_timeout_in_seconds": 30, + "queue_selector": util.MapStr{ + "keys": []string{"email_messages"}, + }, + "processor": []util.MapStr{ + { + "smtp": util.MapStr{ + "idle_timeout_in_seconds": 1, + "servers": smtpServers, + "templates": util.MapStr{ + "raw": util.MapStr{ + "content_type": "text/plain", + "subject": "$[[subject]]", + "body": "$[[body]]", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + buf, err := yaml.Marshal(pipelineCfg) + if err != nil { + panic(err) + } + return string(buf) +} diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go new file mode 100644 index 00000000..4daf334f --- /dev/null +++ b/plugin/api/email/server.go @@ -0,0 +1,370 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package email + +import ( + "bytes" + "crypto/tls" + "fmt" + log "github.com/cihub/seelog" + "infini.sh/console/model" + "infini.sh/console/model/alerting" + "infini.sh/console/plugin/api/email/common" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/credential" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + "src/github.com/buger/jsonparser" + "src/github.com/gopkg.in/gomail.v2" + "strconv" + "time" +) + +func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var obj = &model.EmailServer{} + err := h.DecodeJSON(req, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + q := util.MapStr{ + "size": 1, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "host": util.MapStr{ + "value": obj.Host, + }, + }, + }, + { + "term": util.MapStr{ + "port": util.MapStr{ + "value": obj.Port, + }, + }, + }, + }, + }, + }, + } + query := orm.Query{ + RawQuery: util.MustToJSONBytes(q), + } + err, result := orm.Search(obj, &query) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if len(result.Result) > 0 { + h.WriteError(w, fmt.Sprintf("email server [%s:%d] already exists", obj.Host, obj.Port), http.StatusInternalServerError) + return + } + if obj.CredentialID == "" && obj.Auth != nil && obj.Auth.Username != ""{ + credentialID, err := saveBasicAuthToCredential(obj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + obj.CredentialID = credentialID + } + obj.Auth = nil + + err = orm.Create(&orm.Context{ + Refresh: "wait_for", + }, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if obj.Enabled { + err = common.RefreshEmailServer() + if err != nil { + log.Error(err) + } + } + + h.WriteCreatedOKJSON(w, obj.ID) + +} + +func saveBasicAuthToCredential(srv *model.EmailServer)(string, error){ + if srv == nil { + return "", fmt.Errorf("param email config can not be empty") + } + cred := credential.Credential{ + Name: srv.Name, + Type: credential.BasicAuth, + Tags: []string{"Email"}, + Payload: map[string]interface{}{ + "basic_auth": map[string]interface{}{ + "username": srv.Auth.Username, + "password": srv.Auth.Password, + }, + }, + } + cred.ID = util.GetUUID() + err := cred.Encode() + if err != nil { + return "", err + } + err = orm.Create(nil, &cred) + if err != nil { + return "", err + } + return cred.ID, nil +} + +func (h *EmailAPI) getEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("email_server_id") + + obj := model.EmailServer{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteGetOKJSON(w, id, obj) +} + +func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("email_server_id") + obj := model.EmailServer{} + + obj.ID = id + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + id = obj.ID + create := obj.Created + newObj := model.EmailServer{} + err = h.DecodeJSON(req, &newObj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if !newObj.Enabled && obj.Enabled { + if err = checkEmailServerReferenced(&obj); err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + } + if newObj.Auth != nil && newObj.CredentialID == "" { + credentialID, err := saveBasicAuthToCredential(&newObj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + newObj.CredentialID = credentialID + newObj.Auth = nil + } + + //protect + newObj.ID = id + newObj.Created = create + err = orm.Update(&orm.Context{ + Refresh: "wait_for", + }, &newObj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + err = common.RefreshEmailServer() + if err != nil { + log.Error(err) + } + + h.WriteUpdatedOKJSON(w, obj.ID) +} + +func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("email_server_id") + + obj := model.EmailServer{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + if err = checkEmailServerReferenced(&obj); err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + err = orm.Delete(&orm.Context{ + Refresh: "wait_for", + }, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if obj.Enabled { + err = common.RefreshEmailServer() + if err != nil { + log.Error(err) + } + } + + h.WriteDeletedOKJSON(w, obj.ID) +} + +func checkEmailServerReferenced(srv *model.EmailServer) error { + q := &orm.Query{ + Size: 1, + } + q.Conds = orm.And(orm.Eq("email.server_id", srv.ID)) + err, result := orm.Search(alerting.Channel{}, q) + if err != nil { + return err + } + if len(result.Result) > 0 { + var chName interface{} = "" + if m, ok := result.Result[0].(map[string]interface{}); ok { + chName = m["name"] + } + return fmt.Errorf("email server used by channel [%s]", chName) + } + return nil +} + +func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + strEnabled = h.GetParameterOrDefault(req, "enabled", "true") + ) + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{ + From: from, + Size: size, + } + if strEnabled == "true" { + q.Conds = orm.And(orm.Eq("enabled", true)) + }else if strEnabled == "false" { + q.Conds = orm.And(orm.Eq("enabled", false)) + } + + err, res := orm.Search(&model.EmailServer{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + //remove password field + hitsBuf := bytes.Buffer{} + hitsBuf.Write([]byte("[")) + jsonparser.ArrayEach(res.Raw, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { + value = jsonparser.Delete(value, "_source", "auth", "password") + hitsBuf.Write(value) + hitsBuf.Write([]byte(",")) + }, "hits", "hits") + buf := hitsBuf.Bytes() + if buf[len(buf)-1] == ',' { + buf[len(buf)-1] = ']' + }else{ + hitsBuf.Write([]byte("]")) + } + res.Raw, err = jsonparser.Set(res.Raw, hitsBuf.Bytes(), "hits", "hits") + if err != nil { + log.Error(err.Error()) + h.ErrorInternalServer(w, err.Error()) + return + } + h.Write(w, res.Raw) +} + +func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + reqBody := &struct { + SendTo []string `json:"send_to"` + model.EmailServer + }{} + err := h.DecodeJSON(req, reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(reqBody.SendTo) == 0 { + h.WriteError(w, "receiver email address can not be empty", http.StatusInternalServerError) + return + } + if err = reqBody.Validate(false); err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if reqBody.CredentialID != "" { + auth, err := common.GetBasicAuth(&reqBody.EmailServer) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + reqBody.Auth = &auth + } + if reqBody.Auth == nil { + h.WriteError(w, "auth info required", http.StatusInternalServerError) + return + } + message := gomail.NewMessage() + message.SetHeader("From", reqBody.Auth.Username) + message.SetHeader("To", reqBody.SendTo...) + message.SetHeader("Subject", "test email") + + message.SetBody("text/plain", "This is just a test email, do not reply!") + d := gomail.NewDialerWithTimeout(reqBody.Host, reqBody.Port, reqBody.Auth.Username, reqBody.Auth.Password, 3*time.Second) + d.TLSConfig = &tls.Config{InsecureSkipVerify: true} + d.SSL = reqBody.TLS + + err = d.DialAndSend(message) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteAckOKJSON(w) +} diff --git a/plugin/api/init.go b/plugin/api/init.go index 79162e3c..cbafd8b3 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -1,6 +1,7 @@ package api import ( + "infini.sh/console/plugin/api/email" "infini.sh/console/plugin/api/license" "path" @@ -75,4 +76,5 @@ func Init(cfg *config.AppConfig) { notification.InitAPI() license.InitAPI() + email.InitAPI() } diff --git a/service/alerting/action/email.go b/service/alerting/action/email.go new file mode 100644 index 00000000..b7ad9647 --- /dev/null +++ b/service/alerting/action/email.go @@ -0,0 +1,35 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package action + +import ( + "infini.sh/console/model/alerting" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/util" +) + +type EmailAction struct { + Data *alerting.Email + Subject string + Body string +} + +const EmailQueueName = "email_messages" + +func (act *EmailAction) Execute()([]byte, error){ + queueCfg := queue.GetOrInitConfig(EmailQueueName) + emailMsg := util.MapStr{ + "server_id": act.Data.ServerID, + "email": act.Data.Recipients.To, + "template": "raw", + "variables": util.MapStr{ + "subject": act.Subject, + "body": act.Body, + }, + } + emailMsgBytes := util.MustToJSONBytes(emailMsg) + err := queue.Push(queueCfg, emailMsgBytes) + return nil, err +} \ No newline at end of file diff --git a/service/alerting/common/helper.go b/service/alerting/common/helper.go new file mode 100644 index 00000000..faf5e344 --- /dev/null +++ b/service/alerting/common/helper.go @@ -0,0 +1,88 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "bytes" + "fmt" + "infini.sh/console/model/alerting" + "infini.sh/console/service/alerting/action" + "infini.sh/console/service/alerting/funcs" + "infini.sh/framework/core/orm" + "text/template" +) + +func PerformChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error, []byte) { + if channel == nil { + return nil, fmt.Errorf("empty channel"), nil + } + var ( + act action.Action + message []byte + err error + ) + switch channel.Type { + + case alerting.ChannelWebhook: + message, err = ResolveMessage(channel.Webhook.Body, ctx) + if err != nil { + return nil, err, message + } + wh := *channel.Webhook + urlBytes, err := ResolveMessage(wh.URL, ctx) + if err != nil { + return nil, err, message + } + wh.URL = string(urlBytes) + act = &action.WebhookAction{ + Data: &wh, + Message: string(message), + } + case alerting.ChannelEmail: + message, err = ResolveMessage(channel.Email.Body, ctx) + if err != nil { + return nil, err, message + } + subjectBytes, err := ResolveMessage(channel.Email.Subject, ctx) + if err != nil { + return nil, err, nil + } + act = &action.EmailAction{ + Data: channel.Email, + Subject: string(subjectBytes), + Body: string(message), + } + default: + return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message + } + executeResult, err := act.Execute() + return executeResult, err, message +} + +func ResolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){ + msg := messageTemplate + tmpl, err := template.New("alert-message").Funcs(funcs.GenericFuncMap()).Parse(msg) + if err !=nil { + return nil, fmt.Errorf("parse message temlate error: %w", err) + } + msgBuffer := &bytes.Buffer{} + err = tmpl.Execute(msgBuffer, ctx) + return msgBuffer.Bytes(), err +} + +func RetrieveChannel(ch *alerting.Channel) (*alerting.Channel, error) { + if ch == nil { + return nil, fmt.Errorf("empty channel") + } + enabled := ch.Enabled + if ch.ID != "" { + _, err := orm.Get(ch) + if err != nil { + return nil, err + } + ch.Enabled = enabled + } + return ch, nil +} \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 2666938b..74ee4bc7 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -5,7 +5,6 @@ package elasticsearch import ( - "bytes" "context" "fmt" "github.com/Knetic/govaluate" @@ -13,8 +12,7 @@ import ( "infini.sh/console/model" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" - "infini.sh/console/service/alerting/action" - "infini.sh/console/service/alerting/funcs" + "infini.sh/console/service/alerting/common" "infini.sh/framework/core/elastic" "infini.sh/console/model/insight" "infini.sh/framework/core/kv" @@ -25,7 +23,6 @@ import ( "sort" "strconv" "strings" - "text/template" "time" ) @@ -679,6 +676,21 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if err != nil { return fmt.Errorf("save alert message error: %w", err) } + // todo add recover notification to inner system message + // send recover message to channel + recoverCfg := rule.RecoveryNotificationConfig + if recoverCfg != nil && recoverCfg.EventEnabled && recoverCfg.Enabled { + paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{ + alerting2.ParamEventID: alertItem.ID, + alerting2.ParamTimestamp: alertItem.Created.Unix(), + }) + err = attachTitleMessageToCtx(recoverCfg.Title, recoverCfg.Message, paramsCtx) + if err != nil { + return err + } + actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx) + alertItem.ActionExecutionResults = actionResults + } } return nil } @@ -698,7 +710,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { }) alertItem.Priority = priority - err = attachTitleMessageToCtx(rule, paramsCtx) + title, message := rule.GetNotificationTitleAndMessage() + err = attachTitleMessageToCtx(title, message, paramsCtx) if err != nil { return err } @@ -707,7 +720,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { msg := &alerting.AlertMessage{ RuleID: rule.ID, - Created: time.Now(), + Created: alertItem.Created, Updated: time.Now(), ID: util.GetUUID(), ResourceID: rule.Resource.ID, @@ -756,12 +769,13 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } // if channel is not enabled return - if !rule.Channels.Enabled { + notifyCfg := rule.GetNotificationConfig() + if notifyCfg == nil || !notifyCfg.Enabled { return nil } - if rule.Channels.AcceptTimeRange.Include(time.Now()) { - periodDuration, err := time.ParseDuration(rule.Channels.ThrottlePeriod) + if notifyCfg.AcceptTimeRange.Include(time.Now()) { + periodDuration, err := time.ParseDuration(notifyCfg.ThrottlePeriod) if err != nil { alertItem.Error = err.Error() return err @@ -787,7 +801,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } if alertMessage == nil || period > periodDuration { - actionResults, errCount := performChannels(rule.Channels.Normal, paramsCtx) + actionResults, errCount := performChannels(notifyCfg.Normal, paramsCtx) alertItem.ActionExecutionResults = actionResults //change and save last notification time in local kv store when action error count equals zero if errCount == 0 { @@ -798,8 +812,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } - if rule.Channels.EscalationEnabled { - throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod) + if notifyCfg.EscalationEnabled { + throttlePeriod, err := time.ParseDuration(notifyCfg.EscalationThrottlePeriod) if err != nil { return err } @@ -819,7 +833,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { - actionResults, errCount := performChannels(rule.Channels.Escalation, paramsCtx) + actionResults, errCount := performChannels(notifyCfg.Escalation, paramsCtx) alertItem.ActionExecutionResults = actionResults //todo init last escalation time when create task (by last alert item is escalated) if errCount == 0 { @@ -836,17 +850,17 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } -func attachTitleMessageToCtx(rule *alerting.Rule, paramsCtx map[string]interface{}) error{ +func attachTitleMessageToCtx(title, message string, paramsCtx map[string]interface{}) error{ var ( tplBytes []byte err error ) - tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) + tplBytes, err = common.ResolveMessage(message, paramsCtx) if err != nil { return fmt.Errorf("resolve message template error: %w", err) } paramsCtx[alerting2.ParamMessage] = string(tplBytes) - tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx) + tplBytes, err = common.ResolveMessage(title, paramsCtx) if err != nil { return fmt.Errorf("resolve title template error: %w", err) } @@ -927,7 +941,8 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul alerting2.ParamEventID: util.GetUUID(), alerting2.ParamTimestamp: time.Now().Unix(), } ) - err = attachTitleMessageToCtx(rule, paramsCtx) + title, message := rule.GetNotificationTitleAndMessage() + err = attachTitleMessageToCtx(title, message, paramsCtx) if err != nil { return nil, err } @@ -945,7 +960,15 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([ var errCount int var actionResults []alerting.ActionExecutionResult for _, channel := range channels { - resBytes, err, messageBytes := performChannel(&channel, ctx) + if !channel.Enabled { + continue + } + _, err := common.RetrieveChannel(&channel) + if err != nil { + log.Error(err) + continue + } + resBytes, err, messageBytes := common.PerformChannel(&channel, ctx) var errStr string if err != nil { errCount++ @@ -963,46 +986,8 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([ return actionResults, errCount } -func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){ - msg := messageTemplate - tmpl, err := template.New("alert-message").Funcs(funcs.GenericFuncMap()).Parse(msg) - if err !=nil { - return nil, fmt.Errorf("parse message temlate error: %w", err) - } - msgBuffer := &bytes.Buffer{} - err = tmpl.Execute(msgBuffer, ctx) - return msgBuffer.Bytes(), err -} -func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error, []byte) { - var ( - act action.Action - message []byte - err error - ) - switch channel.Type { - case alerting.ChannelWebhook: - message, err = resolveMessage(channel.Webhook.Body, ctx) - if err != nil { - return nil, err, message - } - wh := *channel.Webhook - urlBytes, err := resolveMessage(wh.URL, ctx) - if err != nil { - return nil, err, message - } - wh.URL = string(urlBytes) - act = &action.WebhookAction{ - Data: &wh, - Message: string(message), - } - default: - return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message - } - executeResult, err := act.Execute() - return executeResult, err, message -} func (engine *Engine) GenerateTask(rule alerting.Rule) func(ctx context.Context) { return func(ctx context.Context) { defer func() { diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 31620061..31411f97 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -70,7 +70,7 @@ func TestEngine( t *testing.T) { }, }, - Channels: alerting.RuleChannel{ + Channels: alerting.NotificationConfig{ Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ @@ -156,7 +156,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { // }, // }, // - // Channels: alerting.RuleChannel{ + // Channels: alerting.NotificationConfig{ // Normal: []alerting.Channel{ // {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ // HeaderParams: map[string]string{ @@ -216,7 +216,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { }, }, - Channels: alerting.RuleChannel{ + Channels: alerting.NotificationConfig{ Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{