From 0f4bb211d525a57b18eb53a408cb462dc398bd14 Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 6 Jul 2023 14:20:08 +0800 Subject: [PATCH 01/22] add alert recovery notification --- model/alerting/destination.go | 2 + model/alerting/metric.go | 6 +- model/alerting/rule.go | 33 +++++++++-- model/alerting/rule_test.go | 2 +- model/alerting/webhook.go | 11 ++++ service/alerting/action/email.go | 34 +++++++++++ service/alerting/elasticsearch/engine.go | 59 +++++++++++++++---- service/alerting/elasticsearch/engine_test.go | 6 +- 8 files changed, 129 insertions(+), 24 deletions(-) create mode 100644 service/alerting/action/email.go diff --git a/model/alerting/destination.go b/model/alerting/destination.go index 1f3e321c..25d6b3ea 100644 --- a/model/alerting/destination.go +++ b/model/alerting/destination.go @@ -13,6 +13,8 @@ type Channel struct { Priority int `json:"priority,omitempty"` Webhook *CustomWebhook `json:"webhook,omitempty" elastic_mapping:"webhook:{type:object}"` SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` + SubType string `json:"sub_type" elastic_mapping:"sub_type:{type:keyword,copy_to:search_text}"` + Email *Email `json:"email,omitempty" elastic_mapping:"email:{type:object}"` } diff --git a/model/alerting/metric.go b/model/alerting/metric.go index d9737677..018922d5 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -13,9 +13,9 @@ import ( type Metric struct { insight.Metric - Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 - Title string `json:"title"` //text template - Message string `json:"message"` // text template + Title string `json:"title,omitempty"` //text template + Message string `json:"message,omitempty"` // text template + Expression string `json:"expression,omitempty" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 } diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 119f2b79..43cdc3bb 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -17,9 +17,11 @@ type Rule struct { Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:keyword}"` Resource Resource `json:"resource" elastic_mapping:"resource:{type:object}"` Metrics Metric `json:"metrics" elastic_mapping:"metrics:{type:object}"` - Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` - Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"` - Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` + Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` + Channels NotificationConfig `json:"channels,omitempty" elastic_mapping:"channels:{type:object}"` + NotificationConfig *NotificationConfig `json:"notification_config,omitempty" elastic_mapping:"notification_config:{type:object}"` + RecoveryNotificationConfig *RecoveryNotificationConfig `json:"recovery_notification_config,omitempty" elastic_mapping:"recovery_notification_config:{type:object}"` + Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` LastNotificationTime time.Time `json:"-" elastic_mapping:"last_notification_time:{type:date}"` LastTermStartTime time.Time `json:"-"` //标识最近一轮告警的开始时间 LastEscalationTime time.Time `json:"-"` //标识最近一次告警升级发送通知的时间 @@ -54,9 +56,24 @@ func (rule *Rule) GetOrInitExpression() (string, error){ rule.Expression = strings.ReplaceAll(sb.String(), "result", metricExp) return rule.Expression, nil } +//GetNotificationConfig for adapter old version config +func (rule *Rule) GetNotificationConfig() *NotificationConfig { + if rule.NotificationConfig != nil { + return rule.NotificationConfig + } + return &rule.Channels +} +func (rule *Rule) GetNotificationTitleAndMessage() (string, string) { + if rule.NotificationConfig != nil { + return rule.NotificationConfig.Title, rule.NotificationConfig.Message + } + return rule.Metrics.Title, rule.Metrics.Message +} -type RuleChannel struct { +type NotificationConfig struct { Enabled bool `json:"enabled"` + Title string `json:"title,omitempty"` //text template + Message string `json:"message,omitempty"` // text template Normal []Channel `json:"normal,omitempty"` Escalation []Channel `json:"escalation,omitempty"` ThrottlePeriod string `json:"throttle_period,omitempty"` //沉默周期 @@ -65,6 +82,14 @@ type RuleChannel struct { EscalationEnabled bool `json:"escalation_enabled,omitempty"` } +type RecoveryNotificationConfig struct { + Enabled bool `json:"enabled"` + Title string `json:"title"` //text template + Message string `json:"message"` // text template + AcceptTimeRange TimeRange `json:"accept_time_range,omitempty"` + Channels []Channel `json:"channels,omitempty"` +} + type MessageTemplate struct{ Type string `json:"type"` Source string `json:"source"` diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go index 968b22c5..a59c008f 100644 --- a/model/alerting/rule_test.go +++ b/model/alerting/rule_test.go @@ -75,7 +75,7 @@ func TestCreateRule( t *testing.T) { }, }, - Channels: RuleChannel{ + Channels: NotificationConfig{ Normal: []Channel{ {Name: "钉钉", Type: ChannelWebhook, Webhook: &CustomWebhook{ HeaderParams: map[string]string{ diff --git a/model/alerting/webhook.go b/model/alerting/webhook.go index 70117faf..b1b5e2df 100644 --- a/model/alerting/webhook.go +++ b/model/alerting/webhook.go @@ -10,3 +10,14 @@ type CustomWebhook struct { URL string `json:"url,omitempty"` Body string `json:"body" elastic_mapping:"body:{type:text}"` } + +type Email struct { + ServerID string `json:"server_id"` + Recipients struct { + To []string `json:"to" elastic_mapping:"to:{type:keyword}"` + CC []string `json:"cc" elastic_mapping:"cc:{type:keyword}"` + BCC []string `json:"bcc" elastic_mapping:"bcc:{type:keyword}"` + } `json:"recipients" elastic_mapping:"recipients:{type:object}"` + Subject string `json:"subject" elastic_mapping:"subject:{type:text}"` + Body string `json:"body" elastic_mapping:"body:{type:text}"` +} \ No newline at end of file diff --git a/service/alerting/action/email.go b/service/alerting/action/email.go new file mode 100644 index 00000000..f60e0012 --- /dev/null +++ b/service/alerting/action/email.go @@ -0,0 +1,34 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package action + +import ( + "infini.sh/console/model/alerting" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/util" +) + +type EmailAction struct { + Data *alerting.Email + Subject string + Body string +} + +const EmailQueueName = "alert_email_messages" + +func (act *EmailAction) Execute()([]byte, error){ + queueCfg := queue.GetOrInitConfig(EmailQueueName) + emailMsg := util.MapStr{ + "email": act.Data.Recipients.To, + "template": "raw", + "variables": util.MapStr{ + "subject": act.Subject, + "body": act.Body, + }, + } + emailMsgBytes := util.MustToJSONBytes(emailMsg) + err := queue.Push(queueCfg, emailMsgBytes) + return nil, err +} \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 2666938b..cf463a5f 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -679,6 +679,22 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if err != nil { return fmt.Errorf("save alert message error: %w", err) } + // send recover message to channel + recoverCfg := rule.RecoveryNotificationConfig + if recoverCfg != nil && recoverCfg.Enabled { + if recoverCfg.AcceptTimeRange.Include(time.Now()) { + paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{ + alerting2.ParamEventID: alertItem.ID, + alerting2.ParamTimestamp: alertItem.Created.Unix(), + }) + err = attachTitleMessageToCtx(recoverCfg.Title, recoverCfg.Message, paramsCtx) + if err != nil { + return err + } + actionResults, _ := performChannels(recoverCfg.Channels, paramsCtx) + alertItem.ActionExecutionResults = actionResults + } + } } return nil } @@ -698,7 +714,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { }) alertItem.Priority = priority - err = attachTitleMessageToCtx(rule, paramsCtx) + title, message := rule.GetNotificationTitleAndMessage() + err = attachTitleMessageToCtx(title, message, paramsCtx) if err != nil { return err } @@ -707,7 +724,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { msg := &alerting.AlertMessage{ RuleID: rule.ID, - Created: time.Now(), + Created: alertItem.Created, Updated: time.Now(), ID: util.GetUUID(), ResourceID: rule.Resource.ID, @@ -756,12 +773,13 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } // if channel is not enabled return - if !rule.Channels.Enabled { + notifyCfg := rule.GetNotificationConfig() + if notifyCfg == nil || !notifyCfg.Enabled { return nil } - if rule.Channels.AcceptTimeRange.Include(time.Now()) { - periodDuration, err := time.ParseDuration(rule.Channels.ThrottlePeriod) + if notifyCfg.AcceptTimeRange.Include(time.Now()) { + periodDuration, err := time.ParseDuration(notifyCfg.ThrottlePeriod) if err != nil { alertItem.Error = err.Error() return err @@ -787,7 +805,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } if alertMessage == nil || period > periodDuration { - actionResults, errCount := performChannels(rule.Channels.Normal, paramsCtx) + actionResults, errCount := performChannels(notifyCfg.Normal, paramsCtx) alertItem.ActionExecutionResults = actionResults //change and save last notification time in local kv store when action error count equals zero if errCount == 0 { @@ -798,8 +816,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } - if rule.Channels.EscalationEnabled { - throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod) + if notifyCfg.EscalationEnabled { + throttlePeriod, err := time.ParseDuration(notifyCfg.EscalationThrottlePeriod) if err != nil { return err } @@ -819,7 +837,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { - actionResults, errCount := performChannels(rule.Channels.Escalation, paramsCtx) + actionResults, errCount := performChannels(notifyCfg.Escalation, paramsCtx) alertItem.ActionExecutionResults = actionResults //todo init last escalation time when create task (by last alert item is escalated) if errCount == 0 { @@ -836,17 +854,17 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } -func attachTitleMessageToCtx(rule *alerting.Rule, paramsCtx map[string]interface{}) error{ +func attachTitleMessageToCtx(title, message string, paramsCtx map[string]interface{}) error{ var ( tplBytes []byte err error ) - tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) + tplBytes, err = resolveMessage(message, paramsCtx) if err != nil { return fmt.Errorf("resolve message template error: %w", err) } paramsCtx[alerting2.ParamMessage] = string(tplBytes) - tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx) + tplBytes, err = resolveMessage(title, paramsCtx) if err != nil { return fmt.Errorf("resolve title template error: %w", err) } @@ -927,7 +945,8 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul alerting2.ParamEventID: util.GetUUID(), alerting2.ParamTimestamp: time.Now().Unix(), } ) - err = attachTitleMessageToCtx(rule, paramsCtx) + title, message := rule.GetNotificationTitleAndMessage() + err = attachTitleMessageToCtx(title, message, paramsCtx) if err != nil { return nil, err } @@ -997,6 +1016,20 @@ func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]by Data: &wh, Message: string(message), } + case alerting.ChannelEmail: + message, err = resolveMessage(channel.Email.Body, ctx) + if err != nil { + return nil, err, message + } + subjectBytes, err := resolveMessage(channel.Email.Subject, ctx) + if err != nil { + return nil, err, nil + } + act = &action.EmailAction{ + Data: channel.Email, + Subject: string(subjectBytes), + Body: string(message), + } default: return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message } diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 31620061..31411f97 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -70,7 +70,7 @@ func TestEngine( t *testing.T) { }, }, - Channels: alerting.RuleChannel{ + Channels: alerting.NotificationConfig{ Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ @@ -156,7 +156,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { // }, // }, // - // Channels: alerting.RuleChannel{ + // Channels: alerting.NotificationConfig{ // Normal: []alerting.Channel{ // {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ // HeaderParams: map[string]string{ @@ -216,7 +216,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { }, }, - Channels: alerting.RuleChannel{ + Channels: alerting.NotificationConfig{ Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ From 63cacee1a740409a0190e9e673ba83242edc5f41 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 12 Jul 2023 14:55:42 +0800 Subject: [PATCH 02/22] add email channel --- main.go | 13 ++++++++++++- model/alerting/webhook.go | 8 ++++---- plugin/api/init.go | 2 ++ service/alerting/action/email.go | 4 ++-- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index 822e6bf6..6e6ad75e 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" _ "expvar" + "infini.sh/console/plugin/api/email" _ "time/tzdata" log "github.com/cihub/seelog" @@ -65,7 +66,9 @@ func main() { modules = append(modules, &elastic2.ElasticModule{}) modules = append(modules, &queue2.DiskQueue{}) modules = append(modules, &redis.RedisModule{}) - modules = append(modules, &pipeline.PipeModule{}) + pipeM := &pipeline.PipeModule{} + global.Register("pipeline_module", pipeM) + modules = append(modules, pipeM) modules = append(modules, &task.TaskModule{}) modules = append(modules, &metrics.MetricsModule{}) modules = append(modules, &security.Module{}) @@ -135,6 +138,7 @@ func main() { orm.RegisterSchemaWithIndexName(task1.Task{}, "task") orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") + orm.RegisterSchemaWithIndexName(model.EmailServer{}, "email-server") api.RegisterSchema() if global.Env().SetupRequired() { @@ -150,6 +154,13 @@ func main() { } return err }) + task1.RunWithinGroup("initialize_email_server", func(ctx context.Context) error { + err := email.InitEmailServer() + if err != nil { + log.Errorf("init email server error: %v", err) + } + return err + }) } if !global.Env().SetupRequired() { diff --git a/model/alerting/webhook.go b/model/alerting/webhook.go index b1b5e2df..a0ddc046 100644 --- a/model/alerting/webhook.go +++ b/model/alerting/webhook.go @@ -12,11 +12,11 @@ type CustomWebhook struct { } type Email struct { - ServerID string `json:"server_id"` + ServerID string `json:"server_id" elastic_mapping:"server_id:{type:keyword}"` Recipients struct { - To []string `json:"to" elastic_mapping:"to:{type:keyword}"` - CC []string `json:"cc" elastic_mapping:"cc:{type:keyword}"` - BCC []string `json:"bcc" elastic_mapping:"bcc:{type:keyword}"` + To []string `json:"to,omitempty" elastic_mapping:"to:{type:keyword}"` + CC []string `json:"cc,omitempty" elastic_mapping:"cc:{type:keyword}"` + BCC []string `json:"bcc,omitempty" elastic_mapping:"bcc:{type:keyword}"` } `json:"recipients" elastic_mapping:"recipients:{type:object}"` Subject string `json:"subject" elastic_mapping:"subject:{type:text}"` Body string `json:"body" elastic_mapping:"body:{type:text}"` diff --git a/plugin/api/init.go b/plugin/api/init.go index 79162e3c..cbafd8b3 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -1,6 +1,7 @@ package api import ( + "infini.sh/console/plugin/api/email" "infini.sh/console/plugin/api/license" "path" @@ -75,4 +76,5 @@ func Init(cfg *config.AppConfig) { notification.InitAPI() license.InitAPI() + email.InitAPI() } diff --git a/service/alerting/action/email.go b/service/alerting/action/email.go index f60e0012..74469633 100644 --- a/service/alerting/action/email.go +++ b/service/alerting/action/email.go @@ -16,10 +16,10 @@ type EmailAction struct { Body string } -const EmailQueueName = "alert_email_messages" +//const EmailQueueName = "alert_email_messages" func (act *EmailAction) Execute()([]byte, error){ - queueCfg := queue.GetOrInitConfig(EmailQueueName) + queueCfg := queue.GetOrInitConfig(act.Data.ServerID) emailMsg := util.MapStr{ "email": act.Data.Recipients.To, "template": "raw", From a5e036d62ac67b51e7c00bb7eed8fe2543745fd1 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 12 Jul 2023 16:28:31 +0800 Subject: [PATCH 03/22] add email server api --- model/email_server.go | 36 ++++ plugin/api/email/api.go | 56 ++++++ plugin/api/email/common/pipeline.go | 97 +++++++++++ plugin/api/email/server.go | 255 ++++++++++++++++++++++++++++ 4 files changed, 444 insertions(+) create mode 100644 model/email_server.go create mode 100644 plugin/api/email/api.go create mode 100644 plugin/api/email/common/pipeline.go create mode 100644 plugin/api/email/server.go diff --git a/model/email_server.go b/model/email_server.go new file mode 100644 index 00000000..e9636d1b --- /dev/null +++ b/model/email_server.go @@ -0,0 +1,36 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package model + +import ( + "fmt" + "infini.sh/framework/core/orm" +) + +type EmailServer struct { + orm.ORMObjectBase + Name string `json:"name" elastic_mapping:"name:{type:text}"` + Host string `json:"host" elastic_mapping:"host:{type:keyword}"` + Port int `json:"port" elastic_mapping:"port:{type:keyword}"` + TLS bool `json:"tls" elastic_mapping:"tls:{type:keyword}"` + Auth struct { + Username string `json:"username" elastic_mapping:"username:{type:keyword}"` + Password string `json:"password" elastic_mapping:"password:{type:keyword}"` + } `json:"auth" elastic_mapping:"auth:{type:object}"` + Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"` +} + +func (serv *EmailServer) Validate(requireName bool) error { + if serv.Host == "" { + return fmt.Errorf("host can not be empty") + } + if serv.Port <= 0 { + return fmt.Errorf("invalid port [%d]", serv.Port) + } + if requireName && serv.Name == "" { + return fmt.Errorf("name can not be empty") + } + return nil +} \ No newline at end of file diff --git a/plugin/api/email/api.go b/plugin/api/email/api.go new file mode 100644 index 00000000..36403c43 --- /dev/null +++ b/plugin/api/email/api.go @@ -0,0 +1,56 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package email + +import ( + "infini.sh/console/model" + "infini.sh/console/plugin/api/email/common" + "infini.sh/framework/core/api" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + log "src/github.com/cihub/seelog" +) + +type EmailAPI struct { + api.Handler +} +func InitAPI() { + email := EmailAPI{} + api.HandleAPIMethod(api.POST, "/email/server/_test", email.testEmailServer) + api.HandleAPIMethod(api.GET, "/email/server/:email_server_id", email.getEmailServer) + api.HandleAPIMethod(api.POST, "/email/server", email.createEmailServer) + api.HandleAPIMethod(api.PUT, "/email/server/:email_server_id", email.updateEmailServer) + api.HandleAPIMethod(api.DELETE, "/email/server/:email_server_id", email.deleteEmailServer) + api.HandleAPIMethod(api.GET, "/email/server/_search", email.searchEmailServer) +} + +func InitEmailServer() error { + q := orm.Query{ + Size: 10, + } + q.Conds = orm.And(orm.Eq("enabled", true)) + err, result := orm.Search(model.EmailServer{}, &q ) + if err != nil { + return err + } + if len(result.Result) == 0 { + return nil + } + for _, row := range result.Result { + emailServer := model.EmailServer{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &emailServer) + err = emailServer.Validate(false) + if err != nil { + log.Error(err) + continue + } + err = common.StartEmailServer(&emailServer) + if err != nil { + log.Error(err) + } + } + return nil +} \ No newline at end of file diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go new file mode 100644 index 00000000..48fe5697 --- /dev/null +++ b/plugin/api/email/common/pipeline.go @@ -0,0 +1,97 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "fmt" + "infini.sh/console/model" + "infini.sh/framework/core/global" + "infini.sh/framework/core/pipeline" + "infini.sh/framework/lib/go-ucfg/yaml" + pipeline2 "infini.sh/framework/modules/pipeline" +) + +func StartEmailServer(serv *model.EmailServer) error { + pipeCfgStr := GeneratePipelineConfig(serv) + cfg, err := yaml.NewConfig([]byte(pipeCfgStr)) + if err != nil { + return fmt.Errorf("new config error: %w", err) + } + pipeCfg := pipeline.PipelineConfigV2{} + err = cfg.Unpack(&pipeCfg) + if err != nil { + return fmt.Errorf("unpack pipeline config error: %w", err) + } + v := global.Lookup("pipeline_module") + var ( + pipeM *pipeline2.PipeModule + ok bool + ) + if pipeM, ok = v.(*pipeline2.PipeModule); !ok { + return fmt.Errorf("can not find pipeline module") + } + err = pipeM.CreatePipeline(pipeCfg, true) + if err != nil { + return fmt.Errorf("create email server pipeline error: %w", err) + } + return nil +} + +func StopEmailServer(serv *model.EmailServer) error { + v := global.Lookup("pipeline_module") + var ( + pipeM *pipeline2.PipeModule + ok bool + ) + if pipeM, ok = v.(*pipeline2.PipeModule); !ok { + return fmt.Errorf("can not find pipeline module") + } + emailServerTaskID := getEmailServerTaskID(serv) + exists := pipeM.StopTask(emailServerTaskID) + if exists { + pipeM.DeleteTask(emailServerTaskID) + } + return nil +} + +func getEmailServerTaskID(serv *model.EmailServer) string { + return fmt.Sprintf("send_email_%s", serv.ID) +} + +func GeneratePipelineConfig(serv *model.EmailServer) string { + pipelineTpl := `name: %s +auto_start: true +keep_running: true +retry_delay_in_ms: 5000 +processor: + - consumer: + consumer: + fetch_max_messages: 1 + max_worker_size: 200 + num_of_slices: 1 + idle_timeout_in_seconds: 30 + queue_selector: + keys: + - %s + processor: + - smtp: + idle_timeout_in_seconds: 1 + server: + host: "%s" + port: %d + tls: %v + auth: + username: "%s" + password: "%s" + recipients: + cc: [] + templates: + raw: + content_type: "text/plain" + subject: "$[[subject]]" + body: "$[[body]]"` + pipelineCfg := fmt.Sprintf(pipelineTpl, getEmailServerTaskID(serv), serv.ID, serv.Host, serv.Port, serv.TLS, serv.Auth.Username, serv.Auth.Password) + return pipelineCfg +} diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go new file mode 100644 index 00000000..3aab8136 --- /dev/null +++ b/plugin/api/email/server.go @@ -0,0 +1,255 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package email + +import ( + "crypto/tls" + "fmt" + log "github.com/cihub/seelog" + "infini.sh/console/model" + "infini.sh/console/plugin/api/email/common" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + "src/github.com/gopkg.in/gomail.v2" + "strconv" + "time" +) + +func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var obj = &model.EmailServer{} + err := h.DecodeJSON(req, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + q := util.MapStr{ + "size": 1, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "host": util.MapStr{ + "value": obj.Host, + }, + }, + }, + { + "term": util.MapStr{ + "port": util.MapStr{ + "value": obj.Port, + }, + }, + }, + }, + }, + }, + } + query := orm.Query{ + RawQuery: util.MustToJSONBytes(q), + } + err, result := orm.Search(obj, &query) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if len(result.Result) > 0 { + h.WriteError(w, fmt.Sprintf("email server [%s:%d] already exists", obj.Host, obj.Port), http.StatusInternalServerError) + return + } + + err = orm.Create(nil, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if obj.Enabled { + err = common.StartEmailServer(obj) + if err != nil { + log.Error(err) + } + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "created", + }, 200) + +} + +func (h *EmailAPI) getEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("email_server_id") + + obj := model.EmailServer{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} + +func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("email_server_id") + obj := model.EmailServer{} + + obj.ID = id + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + id = obj.ID + create := obj.Created + obj = model.EmailServer{} + err = h.DecodeJSON(req, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + //protect + obj.ID = id + obj.Created = create + err = orm.Update(nil, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "updated", + }, 200) +} + +func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("email_server_id") + + obj := model.EmailServer{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + //todo check whether referenced + if obj.Enabled { + err = common.StopEmailServer(&obj) + if err != nil { + log.Error(err) + } + } + + err = orm.Delete(nil, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "deleted", + }, 200) +} + +func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + ) + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{ + From: from, + Size: size, + } + q.Conds = orm.And(orm.Eq("enabled", true)) + + err, res := orm.Search(&model.EmailServer{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.Write(w, res.Raw) +} + +func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + reqBody := &struct { + SendTo []string `json:"send_to"` + model.EmailServer + }{} + err := h.DecodeJSON(req, reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(reqBody.SendTo) == 0 { + h.WriteError(w, "receiver email address can not be empty", http.StatusInternalServerError) + return + } + if err = reqBody.Validate(false); err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + message := gomail.NewMessage() + message.SetHeader("From", reqBody.Auth.Username) + message.SetHeader("To", reqBody.SendTo...) + message.SetHeader("Subject", "test email") + + message.SetBody("text/plain", "This is just a test email, do not reply!") + d := gomail.NewDialerWithTimeout(reqBody.Host, reqBody.Port, reqBody.Auth.Username, reqBody.Auth.Password, 3*time.Second) + d.TLSConfig = &tls.Config{InsecureSkipVerify: true} + d.SSL = reqBody.TLS + + err = d.DialAndSend(message) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteAckOKJSON(w) +} From f1cb35e57e082472d1a71958123b5000735ba4a0 Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 20 Jul 2023 12:26:58 +0800 Subject: [PATCH 04/22] clear call pipeline module --- main.go | 4 +- plugin/api/email/common/pipeline.go | 69 +++++++++++++---------------- 2 files changed, 32 insertions(+), 41 deletions(-) diff --git a/main.go b/main.go index 6e6ad75e..9f47c0e7 100644 --- a/main.go +++ b/main.go @@ -66,9 +66,7 @@ func main() { modules = append(modules, &elastic2.ElasticModule{}) modules = append(modules, &queue2.DiskQueue{}) modules = append(modules, &redis.RedisModule{}) - pipeM := &pipeline.PipeModule{} - global.Register("pipeline_module", pipeM) - modules = append(modules, pipeM) + modules = append(modules, &pipeline.PipeModule{}) modules = append(modules, &task.TaskModule{}) modules = append(modules, &metrics.MetricsModule{}) modules = append(modules, &security.Module{}) diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go index 48fe5697..0d1c1a05 100644 --- a/plugin/api/email/common/pipeline.go +++ b/plugin/api/email/common/pipeline.go @@ -8,52 +8,45 @@ import ( "fmt" "infini.sh/console/model" "infini.sh/framework/core/global" - "infini.sh/framework/core/pipeline" - "infini.sh/framework/lib/go-ucfg/yaml" - pipeline2 "infini.sh/framework/modules/pipeline" + "infini.sh/framework/core/util" + "os" + "path" ) func StartEmailServer(serv *model.EmailServer) error { pipeCfgStr := GeneratePipelineConfig(serv) - cfg, err := yaml.NewConfig([]byte(pipeCfgStr)) - if err != nil { - return fmt.Errorf("new config error: %w", err) - } - pipeCfg := pipeline.PipelineConfigV2{} - err = cfg.Unpack(&pipeCfg) - if err != nil { - return fmt.Errorf("unpack pipeline config error: %w", err) - } - v := global.Lookup("pipeline_module") - var ( - pipeM *pipeline2.PipeModule - ok bool - ) - if pipeM, ok = v.(*pipeline2.PipeModule); !ok { - return fmt.Errorf("can not find pipeline module") - } - err = pipeM.CreatePipeline(pipeCfg, true) - if err != nil { - return fmt.Errorf("create email server pipeline error: %w", err) - } + //cfg, err := yaml.NewConfig([]byte(pipeCfgStr)) + //if err != nil { + // return fmt.Errorf("new config error: %w", err) + //} + cfgDir := global.Env().GetConfigDir() + sendEmailCfgFile := path.Join(cfgDir, "send_email.yml") + _, err := util.FilePutContent(sendEmailCfgFile, pipeCfgStr) + return err + //pipeCfg := pipeline.PipelineConfigV2{} + //err = cfg.Unpack(&pipeCfg) + //if err != nil { + // return fmt.Errorf("unpack pipeline config error: %w", err) + //} + //v := global.Lookup("pipeline_module") + //var ( + // pipeM *pipeline2.PipeModule + // ok bool + //) + //if pipeM, ok = v.(*pipeline2.PipeModule); !ok { + // return fmt.Errorf("can not find pipeline module") + //} + //err = pipeM.CreatePipeline(pipeCfg, true) + //if err != nil { + // return fmt.Errorf("create email server pipeline error: %w", err) + //} return nil } func StopEmailServer(serv *model.EmailServer) error { - v := global.Lookup("pipeline_module") - var ( - pipeM *pipeline2.PipeModule - ok bool - ) - if pipeM, ok = v.(*pipeline2.PipeModule); !ok { - return fmt.Errorf("can not find pipeline module") - } - emailServerTaskID := getEmailServerTaskID(serv) - exists := pipeM.StopTask(emailServerTaskID) - if exists { - pipeM.DeleteTask(emailServerTaskID) - } - return nil + cfgDir := global.Env().GetConfigDir() + sendEmailCfgFile := path.Join(cfgDir, "send_email.yml") + return os.RemoveAll(sendEmailCfgFile) } func getEmailServerTaskID(serv *model.EmailServer) string { From 4ffd8de0a202a9abc0c564e52301b7ff2f4d3ada Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 21 Jul 2023 16:17:16 +0800 Subject: [PATCH 05/22] fake commit (uncomplete) --- main.go | 8 ---- plugin/api/email/api.go | 61 +++++++++++++---------------- plugin/api/email/common/pipeline.go | 29 +++++++++++++- plugin/api/email/server.go | 4 +- 4 files changed, 58 insertions(+), 44 deletions(-) diff --git a/main.go b/main.go index 9f47c0e7..2d2a1f4a 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,6 @@ import ( "context" "errors" _ "expvar" - "infini.sh/console/plugin/api/email" _ "time/tzdata" log "github.com/cihub/seelog" @@ -152,13 +151,6 @@ func main() { } return err }) - task1.RunWithinGroup("initialize_email_server", func(ctx context.Context) error { - err := email.InitEmailServer() - if err != nil { - log.Errorf("init email server error: %v", err) - } - return err - }) } if !global.Env().SetupRequired() { diff --git a/plugin/api/email/api.go b/plugin/api/email/api.go index 36403c43..da7da481 100644 --- a/plugin/api/email/api.go +++ b/plugin/api/email/api.go @@ -5,12 +5,7 @@ package email import ( - "infini.sh/console/model" - "infini.sh/console/plugin/api/email/common" "infini.sh/framework/core/api" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - log "src/github.com/cihub/seelog" ) type EmailAPI struct { @@ -26,31 +21,31 @@ func InitAPI() { api.HandleAPIMethod(api.GET, "/email/server/_search", email.searchEmailServer) } -func InitEmailServer() error { - q := orm.Query{ - Size: 10, - } - q.Conds = orm.And(orm.Eq("enabled", true)) - err, result := orm.Search(model.EmailServer{}, &q ) - if err != nil { - return err - } - if len(result.Result) == 0 { - return nil - } - for _, row := range result.Result { - emailServer := model.EmailServer{} - buf := util.MustToJSONBytes(row) - util.MustFromJSONBytes(buf, &emailServer) - err = emailServer.Validate(false) - if err != nil { - log.Error(err) - continue - } - err = common.StartEmailServer(&emailServer) - if err != nil { - log.Error(err) - } - } - return nil -} \ No newline at end of file +//func InitEmailServer() error { +// q := orm.Query{ +// Size: 10, +// } +// q.Conds = orm.And(orm.Eq("enabled", true)) +// err, result := orm.Search(model.EmailServer{}, &q ) +// if err != nil { +// return err +// } +// if len(result.Result) == 0 { +// return nil +// } +// for _, row := range result.Result { +// emailServer := model.EmailServer{} +// buf := util.MustToJSONBytes(row) +// util.MustFromJSONBytes(buf, &emailServer) +// err = emailServer.Validate(false) +// if err != nil { +// log.Error(err) +// continue +// } +// err = common.StartEmailServer(&emailServer) +// if err != nil { +// log.Error(err) +// } +// } +// return nil +//} \ No newline at end of file diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go index 0d1c1a05..c6cf773a 100644 --- a/plugin/api/email/common/pipeline.go +++ b/plugin/api/email/common/pipeline.go @@ -8,19 +8,44 @@ import ( "fmt" "infini.sh/console/model" "infini.sh/framework/core/global" + "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "os" "path" ) -func StartEmailServer(serv *model.EmailServer) error { +const emailServerConfigFile = "send_email.yml" +func RefreshEmailServer() error { + q := orm.Query{ + Size: 10, + } + q.Conds = orm.And(orm.Eq("enabled", true)) + err, result := orm.Search(model.EmailServer{}, &q ) + if err != nil { + return err + } + if len(result.Result) == 0 { + //todo delete email server config file + return nil + } + servers := map[string]model.EmailServer{} + for _, row := range result.Result { + emailServer := model.EmailServer{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &emailServer) + err = emailServer.Validate(false) + if err != nil { + return err + } + servers[emailServer.ID] = emailServer + } pipeCfgStr := GeneratePipelineConfig(serv) //cfg, err := yaml.NewConfig([]byte(pipeCfgStr)) //if err != nil { // return fmt.Errorf("new config error: %w", err) //} cfgDir := global.Env().GetConfigDir() - sendEmailCfgFile := path.Join(cfgDir, "send_email.yml") + sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) _, err := util.FilePutContent(sendEmailCfgFile, pipeCfgStr) return err //pipeCfg := pipeline.PipelineConfigV2{} diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 3aab8136..62cc4a4e 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -64,7 +64,9 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p return } - err = orm.Create(nil, obj) + err = orm.Create(&orm.Context{ + Refresh: "wait_for", + }, obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) From 35b5706767ae9d414820113d3ce361d099d56adc Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 21 Jul 2023 18:24:39 +0800 Subject: [PATCH 06/22] support single email service with multi email server --- plugin/api/email/common/pipeline.go | 137 +++++++++++++++------------- plugin/api/email/server.go | 28 ++++-- service/alerting/action/email.go | 5 +- 3 files changed, 94 insertions(+), 76 deletions(-) diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go index c6cf773a..81fcf931 100644 --- a/plugin/api/email/common/pipeline.go +++ b/plugin/api/email/common/pipeline.go @@ -5,13 +5,13 @@ package common import ( - "fmt" "infini.sh/console/model" "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "os" "path" + "gopkg.in/yaml.v2" ) const emailServerConfigFile = "send_email.yml" @@ -28,7 +28,7 @@ func RefreshEmailServer() error { //todo delete email server config file return nil } - servers := map[string]model.EmailServer{} + servers := make([]model.EmailServer,0, len(result.Result)) for _, row := range result.Result { emailServer := model.EmailServer{} buf := util.MustToJSONBytes(row) @@ -37,79 +37,86 @@ func RefreshEmailServer() error { if err != nil { return err } - servers[emailServer.ID] = emailServer + servers = append(servers, emailServer) } - pipeCfgStr := GeneratePipelineConfig(serv) - //cfg, err := yaml.NewConfig([]byte(pipeCfgStr)) - //if err != nil { - // return fmt.Errorf("new config error: %w", err) - //} + pipeCfgStr := GeneratePipelineConfig(servers) cfgDir := global.Env().GetConfigDir() sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) - _, err := util.FilePutContent(sendEmailCfgFile, pipeCfgStr) + _, err = util.FilePutContent(sendEmailCfgFile, pipeCfgStr) return err - //pipeCfg := pipeline.PipelineConfigV2{} - //err = cfg.Unpack(&pipeCfg) - //if err != nil { - // return fmt.Errorf("unpack pipeline config error: %w", err) - //} - //v := global.Lookup("pipeline_module") - //var ( - // pipeM *pipeline2.PipeModule - // ok bool - //) - //if pipeM, ok = v.(*pipeline2.PipeModule); !ok { - // return fmt.Errorf("can not find pipeline module") - //} - //err = pipeM.CreatePipeline(pipeCfg, true) - //if err != nil { - // return fmt.Errorf("create email server pipeline error: %w", err) - //} return nil } -func StopEmailServer(serv *model.EmailServer) error { +func StopEmailServer() error { cfgDir := global.Env().GetConfigDir() - sendEmailCfgFile := path.Join(cfgDir, "send_email.yml") + sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) return os.RemoveAll(sendEmailCfgFile) } -func getEmailServerTaskID(serv *model.EmailServer) string { - return fmt.Sprintf("send_email_%s", serv.ID) -} -func GeneratePipelineConfig(serv *model.EmailServer) string { - pipelineTpl := `name: %s -auto_start: true -keep_running: true -retry_delay_in_ms: 5000 -processor: - - consumer: - consumer: - fetch_max_messages: 1 - max_worker_size: 200 - num_of_slices: 1 - idle_timeout_in_seconds: 30 - queue_selector: - keys: - - %s - processor: - - smtp: - idle_timeout_in_seconds: 1 - server: - host: "%s" - port: %d - tls: %v - auth: - username: "%s" - password: "%s" - recipients: - cc: [] - templates: - raw: - content_type: "text/plain" - subject: "$[[subject]]" - body: "$[[body]]"` - pipelineCfg := fmt.Sprintf(pipelineTpl, getEmailServerTaskID(serv), serv.ID, serv.Host, serv.Port, serv.TLS, serv.Auth.Username, serv.Auth.Password) - return pipelineCfg +func GeneratePipelineConfig(servers []model.EmailServer) string { + if len(servers) == 0 { + return "" + } + smtpServers := map[string]util.MapStr{} + for _, srv := range servers { + smtpServers[srv.ID] = util.MapStr{ + "server": util.MapStr{ + "host": srv.Host, + "port": srv.Port, + "tls": srv.TLS, + }, + "auth": util.MapStr{ + "username": srv.Auth.Username, + "password": srv.Auth.Password, + }, + } + } + + pipelineCfg := util.MapStr{ + "pipeline": []util.MapStr{ + { + "name": "send_email_service", + "auto_start": true, + "keep_running": true, + "retry_delay_in_ms": 5000, + "processor": []util.MapStr{ + { + "consumer": util.MapStr{ + "consumer": util.MapStr{ + "fetch_max_messages": 1, + }, + "max_worker_size": 200, + "num_of_slices": 1, + "idle_timeout_in_seconds": 30, + "queue_selector": util.MapStr{ + "keys": []string{"email_messages"}, + }, + "processor": []util.MapStr{ + { + "smtp": util.MapStr{ + "idle_timeout_in_seconds": 1, + "servers": smtpServers, + "templates": util.MapStr{ + "raw": util.MapStr{ + "content_type": "text/plain", + "subject": "$[[subject]]", + "body": "$[[body]]", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + buf, err := yaml.Marshal(pipelineCfg) + if err != nil { + panic(err) + } + return string(buf) } diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 62cc4a4e..4cee6854 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -73,7 +73,7 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p return } if obj.Enabled { - err = common.StartEmailServer(obj) + err = common.RefreshEmailServer() if err != nil { log.Error(err) } @@ -140,12 +140,20 @@ func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, p //protect obj.ID = id obj.Created = create - err = orm.Update(nil, &obj) + err = orm.Update(&orm.Context{ + Refresh: "wait_for", + }, &obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } + if obj.Enabled { + err = common.RefreshEmailServer() + if err != nil { + log.Error(err) + } + } h.WriteJSON(w, util.MapStr{ "_id": obj.ID, @@ -168,19 +176,21 @@ func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, p return } //todo check whether referenced - if obj.Enabled { - err = common.StopEmailServer(&obj) - if err != nil { - log.Error(err) - } - } - err = orm.Delete(nil, &obj) + err = orm.Delete(&orm.Context{ + Refresh: "wait_for", + }, &obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } + if obj.Enabled { + err = common.RefreshEmailServer() + if err != nil { + log.Error(err) + } + } h.WriteJSON(w, util.MapStr{ "_id": obj.ID, diff --git a/service/alerting/action/email.go b/service/alerting/action/email.go index 74469633..b7ad9647 100644 --- a/service/alerting/action/email.go +++ b/service/alerting/action/email.go @@ -16,11 +16,12 @@ type EmailAction struct { Body string } -//const EmailQueueName = "alert_email_messages" +const EmailQueueName = "email_messages" func (act *EmailAction) Execute()([]byte, error){ - queueCfg := queue.GetOrInitConfig(act.Data.ServerID) + queueCfg := queue.GetOrInitConfig(EmailQueueName) emailMsg := util.MapStr{ + "server_id": act.Data.ServerID, "email": act.Data.Recipients.To, "template": "raw", "variables": util.MapStr{ From 7f36831998be1680ddb1e98d31b7b3737ed0dacf Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 25 Jul 2023 10:55:17 +0800 Subject: [PATCH 07/22] support reference channel --- model/alerting/destination.go | 4 +++- model/alerting/rule_test.go | 16 ++++++++++------ plugin/api/email/server.go | 7 ++++++- service/alerting/elasticsearch/engine.go | 4 ++++ service/alerting/elasticsearch/helper.go | 24 ++++++++++++++++++++++++ 5 files changed, 47 insertions(+), 8 deletions(-) create mode 100644 service/alerting/elasticsearch/helper.go diff --git a/model/alerting/destination.go b/model/alerting/destination.go index 25d6b3ea..dee045ce 100644 --- a/model/alerting/destination.go +++ b/model/alerting/destination.go @@ -4,7 +4,9 @@ package alerting -import "infini.sh/framework/core/orm" +import ( + "infini.sh/framework/core/orm" +) type Channel struct { orm.ORMObjectBase diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go index a59c008f..0a6f2ea1 100644 --- a/model/alerting/rule_test.go +++ b/model/alerting/rule_test.go @@ -6,6 +6,7 @@ package alerting import ( "fmt" + "infini.sh/console/model/insight" "infini.sh/framework/core/util" "net/http" "testing" @@ -60,18 +61,21 @@ func TestCreateRule( t *testing.T) { //}, Metrics: Metric{ - PeriodInterval: "1m", - Items: []MetricItem{ - {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, - {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + Metric: insight.Metric{ + Groups: []insight.MetricGroupItem{{"metadata.labels.cluster_id", 10}, {"metadata.labels.node_id", 10}}, + Items: []insight.MetricItem{ + {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min" }, + {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max"}, + }, + BucketSize: "1m", + Formula: "a/b*100", }, - Formula: "a/b*100", //Expression: "min(fs.free_in_bytes)/max(fs.total_in_bytes)*100", }, Conditions: Condition{ Operator: "any", Items: []ConditionItem{ - {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"76"}, Priority: "error", Message: "磁盘可用率小于10%"}, + {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"76"}, Priority: "error"}, }, }, diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 4cee6854..f20c2ee4 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -203,6 +203,7 @@ func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, p var ( strSize = h.GetParameterOrDefault(req, "size", "20") strFrom = h.GetParameterOrDefault(req, "from", "0") + strEnabled = h.GetParameterOrDefault(req, "enabled", "true") ) size, _ := strconv.Atoi(strSize) if size <= 0 { @@ -217,7 +218,11 @@ func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, p From: from, Size: size, } - q.Conds = orm.And(orm.Eq("enabled", true)) + if strEnabled == "true" { + q.Conds = orm.And(orm.Eq("enabled", true)) + }else if strEnabled == "false" { + q.Conds = orm.And(orm.Eq("enabled", false)) + } err, res := orm.Search(&model.EmailServer{}, &q) if err != nil { diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index cf463a5f..b20a5f37 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -999,6 +999,10 @@ func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]by message []byte err error ) + channel, err = RetrieveChannel(channel) + if err != nil { + return nil, err, nil + } switch channel.Type { case alerting.ChannelWebhook: diff --git a/service/alerting/elasticsearch/helper.go b/service/alerting/elasticsearch/helper.go new file mode 100644 index 00000000..f19ada63 --- /dev/null +++ b/service/alerting/elasticsearch/helper.go @@ -0,0 +1,24 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package elasticsearch + +import ( + "fmt" + "infini.sh/console/model/alerting" + "infini.sh/framework/core/orm" +) + +func RetrieveChannel(ch *alerting.Channel) (*alerting.Channel, error) { + if ch == nil { + return nil, fmt.Errorf("empty channel") + } + if ch.ID != "" { + _, err := orm.Get(ch) + if err != nil { + return nil, err + } + } + return ch, nil +} From d08c430356b578befc74adce2e8b9ef20d228654 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 26 Jul 2023 15:15:08 +0800 Subject: [PATCH 08/22] init email server pipeline when start --- main.go | 8 ++++++++ plugin/api/email/common/pipeline.go | 15 +++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 2d2a1f4a..9f47c0e7 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" _ "expvar" + "infini.sh/console/plugin/api/email" _ "time/tzdata" log "github.com/cihub/seelog" @@ -151,6 +152,13 @@ func main() { } return err }) + task1.RunWithinGroup("initialize_email_server", func(ctx context.Context) error { + err := email.InitEmailServer() + if err != nil { + log.Errorf("init email server error: %v", err) + } + return err + }) } if !global.Env().SetupRequired() { diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go index 81fcf931..20d98b42 100644 --- a/plugin/api/email/common/pipeline.go +++ b/plugin/api/email/common/pipeline.go @@ -25,8 +25,7 @@ func RefreshEmailServer() error { return err } if len(result.Result) == 0 { - //todo delete email server config file - return nil + return StopEmailServer() } servers := make([]model.EmailServer,0, len(result.Result)) for _, row := range result.Result { @@ -44,13 +43,21 @@ func RefreshEmailServer() error { sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) _, err = util.FilePutContent(sendEmailCfgFile, pipeCfgStr) return err - return nil } func StopEmailServer() error { cfgDir := global.Env().GetConfigDir() sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) - return os.RemoveAll(sendEmailCfgFile) + if util.FilesExists(sendEmailCfgFile) { + return os.RemoveAll(sendEmailCfgFile) + } + return nil +} + +func CheckEmailPipelineExists() bool { + cfgDir := global.Env().GetConfigDir() + sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) + return util.FilesExists(sendEmailCfgFile) } From 4223bd052db348e025960a6b6d18603409d463e5 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 26 Jul 2023 15:16:16 +0800 Subject: [PATCH 09/22] refresh email server pipeline when config change --- plugin/api/email/api.go | 35 +++++------------------- plugin/api/email/server.go | 8 ++---- service/alerting/elasticsearch/engine.go | 20 ++++++-------- 3 files changed, 19 insertions(+), 44 deletions(-) diff --git a/plugin/api/email/api.go b/plugin/api/email/api.go index da7da481..809de227 100644 --- a/plugin/api/email/api.go +++ b/plugin/api/email/api.go @@ -5,6 +5,7 @@ package email import ( + "infini.sh/console/plugin/api/email/common" "infini.sh/framework/core/api" ) @@ -21,31 +22,9 @@ func InitAPI() { api.HandleAPIMethod(api.GET, "/email/server/_search", email.searchEmailServer) } -//func InitEmailServer() error { -// q := orm.Query{ -// Size: 10, -// } -// q.Conds = orm.And(orm.Eq("enabled", true)) -// err, result := orm.Search(model.EmailServer{}, &q ) -// if err != nil { -// return err -// } -// if len(result.Result) == 0 { -// return nil -// } -// for _, row := range result.Result { -// emailServer := model.EmailServer{} -// buf := util.MustToJSONBytes(row) -// util.MustFromJSONBytes(buf, &emailServer) -// err = emailServer.Validate(false) -// if err != nil { -// log.Error(err) -// continue -// } -// err = common.StartEmailServer(&emailServer) -// if err != nil { -// log.Error(err) -// } -// } -// return nil -//} \ No newline at end of file +func InitEmailServer() error { + if !common.CheckEmailPipelineExists() { + return common.RefreshEmailServer() + } + return nil +} \ No newline at end of file diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index f20c2ee4..26525172 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -148,11 +148,9 @@ func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, p log.Error(err) return } - if obj.Enabled { - err = common.RefreshEmailServer() - if err != nil { - log.Error(err) - } + err = common.RefreshEmailServer() + if err != nil { + log.Error(err) } h.WriteJSON(w, util.MapStr{ diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index b20a5f37..52a3882b 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -682,18 +682,16 @@ func (engine *Engine) Do(rule *alerting.Rule) error { // send recover message to channel recoverCfg := rule.RecoveryNotificationConfig if recoverCfg != nil && recoverCfg.Enabled { - if recoverCfg.AcceptTimeRange.Include(time.Now()) { - paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{ - alerting2.ParamEventID: alertItem.ID, - alerting2.ParamTimestamp: alertItem.Created.Unix(), - }) - err = attachTitleMessageToCtx(recoverCfg.Title, recoverCfg.Message, paramsCtx) - if err != nil { - return err - } - actionResults, _ := performChannels(recoverCfg.Channels, paramsCtx) - alertItem.ActionExecutionResults = actionResults + paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{ + alerting2.ParamEventID: alertItem.ID, + alerting2.ParamTimestamp: alertItem.Created.Unix(), + }) + err = attachTitleMessageToCtx(recoverCfg.Title, recoverCfg.Message, paramsCtx) + if err != nil { + return err } + actionResults, _ := performChannels(recoverCfg.Channels, paramsCtx) + alertItem.ActionExecutionResults = actionResults } } return nil From 5f2569d9996bf50a55c1ec6736e77ad1319d601e Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 26 Jul 2023 16:41:15 +0800 Subject: [PATCH 10/22] check whether email server referenced by channel --- plugin/api/email/server.go | 43 ++++++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 26525172..4ac585d4 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -9,6 +9,7 @@ import ( "fmt" log "github.com/cihub/seelog" "infini.sh/console/model" + "infini.sh/console/model/alerting" "infini.sh/console/plugin/api/email/common" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/orm" @@ -129,20 +130,27 @@ func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, p id = obj.ID create := obj.Created - obj = model.EmailServer{} - err = h.DecodeJSON(req, &obj) + newObj := model.EmailServer{} + err = h.DecodeJSON(req, &newObj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } + if !newObj.Enabled && obj.Enabled { + if err = checkEmailServerReferenced(&obj); err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + } //protect - obj.ID = id - obj.Created = create + newObj.ID = id + newObj.Created = create err = orm.Update(&orm.Context{ Refresh: "wait_for", - }, &obj) + }, &newObj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -173,7 +181,11 @@ func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, p }, http.StatusNotFound) return } - //todo check whether referenced + if err = checkEmailServerReferenced(&obj); err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } err = orm.Delete(&orm.Context{ Refresh: "wait_for", @@ -196,6 +208,25 @@ func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, p }, 200) } +func checkEmailServerReferenced(srv *model.EmailServer) error { + q := &orm.Query{ + Size: 1, + } + q.Conds = orm.And(orm.Eq("email.server_id", srv.ID)) + err, result := orm.Search(alerting.Channel{}, q) + if err != nil { + return err + } + if len(result.Result) > 0 { + var chName interface{} = "" + if m, ok := result.Result[0].(map[string]interface{}); ok { + chName = m["name"] + } + return fmt.Errorf("email server used by channel [%s]", chName) + } + return nil +} + func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( From d6d1013e0a869f1e8ca590f502792f25aceb3a5e Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 26 Jul 2023 17:54:42 +0800 Subject: [PATCH 11/22] add test api to send channel message --- plugin/api/alerting/api.go | 1 + plugin/api/alerting/channel.go | 58 ++++++++++++---- plugin/api/email/server.go | 21 ++---- service/alerting/common/helper.go | 87 ++++++++++++++++++++++++ service/alerting/elasticsearch/engine.go | 67 ++---------------- service/alerting/elasticsearch/helper.go | 24 ------- 6 files changed, 141 insertions(+), 117 deletions(-) create mode 100644 service/alerting/common/helper.go delete mode 100644 service/alerting/elasticsearch/helper.go diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 6ce66dea..995ccd00 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -33,6 +33,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.DELETE, "/alerting/channel", alert.RequirePermission(alert.deleteChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.PUT, "/alerting/channel/:channel_id", alert.RequirePermission(alert.updateChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.GET, "/alerting/channel/_search", alert.RequirePermission(alert.searchChannel, enum.PermissionAlertChannelRead)) + api.HandleAPIMethod(api.POST, "/alerting/channel/test", alert.RequirePermission(alert.testChannel, enum.PermissionAlertChannelWrite)) api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.RequirePermission(alert.searchAlert, enum.PermissionAlertHistoryRead)) api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.RequirePermission(alert.getAlert, enum.PermissionAlertHistoryRead)) diff --git a/plugin/api/alerting/channel.go b/plugin/api/alerting/channel.go index a7a3dc6d..792c44c2 100644 --- a/plugin/api/alerting/channel.go +++ b/plugin/api/alerting/channel.go @@ -6,9 +6,11 @@ package alerting import ( "fmt" + "infini.sh/console/service/alerting/common" "net/http" "strconv" "strings" + "time" log "github.com/cihub/seelog" "infini.sh/console/model/alerting" @@ -33,10 +35,7 @@ func (h *AlertAPI) createChannel(w http.ResponseWriter, req *http.Request, ps ht return } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "created", - }, 200) + h.WriteCreatedOKJSON(w, obj.ID) } @@ -60,11 +59,7 @@ func (h *AlertAPI) getChannel(w http.ResponseWriter, req *http.Request, ps httpr return } - h.WriteJSON(w, util.MapStr{ - "found": true, - "_id": id, - "_source": obj, - }, 200) + h.WriteGetOKJSON(w, id, obj) } func (h *AlertAPI) updateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -101,10 +96,7 @@ func (h *AlertAPI) updateChannel(w http.ResponseWriter, req *http.Request, ps ht return } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "updated", - }, 200) + h.WriteUpdatedOKJSON(w, id) } func (h *AlertAPI) deleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -179,3 +171,43 @@ func (h *AlertAPI) searchChannel(w http.ResponseWriter, req *http.Request, ps ht } h.Write(w, res.Raw) } + +func (h *AlertAPI) testChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + obj := alerting.Channel{} + err := h.DecodeJSON(req, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + ctx := map[string]interface{}{ + "title": "test title", + "message": "test message", + "rule_id": util.GetUUID(), + "rule_name": "test rule", + "resource_id": util.GetUUID(), + "resource_name": "test resource", + "event_id": util.GetUUID(), + "timestamp": time.Now().UnixMilli(), + "first_group_value": "first group value", + "first_threshold": "90", + "priority": "critical", + "results": []util.MapStr{ + {"threshold": "90", + "priority": "critical", + "group_values": []string{"first group value", "second group value"}, + "issue_timestamp": time.Now().UnixMilli()-500, + "result_value": 90, + "relation_values": util.MapStr{"a": 100, "b": 90}, + }, + }, + + } + _, err, _ = common.PerformChannel(&obj, ctx) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + h.WriteAckOKJSON(w) +} \ No newline at end of file diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 4ac585d4..1797a312 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -80,10 +80,7 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p } } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "created", - }, 200) + h.WriteCreatedOKJSON(w, obj.ID) } @@ -107,11 +104,7 @@ func (h *EmailAPI) getEmailServer(w http.ResponseWriter, req *http.Request, ps h return } - h.WriteJSON(w, util.MapStr{ - "found": true, - "_id": id, - "_source": obj, - }, 200) + h.WriteGetOKJSON(w, id, obj) } func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -161,10 +154,7 @@ func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, p log.Error(err) } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "updated", - }, 200) + h.WriteUpdatedOKJSON(w, obj.ID) } func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -202,10 +192,7 @@ func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, p } } - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "deleted", - }, 200) + h.WriteDeletedOKJSON(w, obj.ID) } func checkEmailServerReferenced(srv *model.EmailServer) error { diff --git a/service/alerting/common/helper.go b/service/alerting/common/helper.go new file mode 100644 index 00000000..fd8150b5 --- /dev/null +++ b/service/alerting/common/helper.go @@ -0,0 +1,87 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "bytes" + "fmt" + "infini.sh/console/model/alerting" + "infini.sh/console/service/alerting/action" + "infini.sh/console/service/alerting/funcs" + "infini.sh/framework/core/orm" + "text/template" +) + +func PerformChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error, []byte) { + var ( + act action.Action + message []byte + err error + ) + channel, err = RetrieveChannel(channel) + if err != nil { + return nil, err, nil + } + switch channel.Type { + + case alerting.ChannelWebhook: + message, err = ResolveMessage(channel.Webhook.Body, ctx) + if err != nil { + return nil, err, message + } + wh := *channel.Webhook + urlBytes, err := ResolveMessage(wh.URL, ctx) + if err != nil { + return nil, err, message + } + wh.URL = string(urlBytes) + act = &action.WebhookAction{ + Data: &wh, + Message: string(message), + } + case alerting.ChannelEmail: + message, err = ResolveMessage(channel.Email.Body, ctx) + if err != nil { + return nil, err, message + } + subjectBytes, err := ResolveMessage(channel.Email.Subject, ctx) + if err != nil { + return nil, err, nil + } + act = &action.EmailAction{ + Data: channel.Email, + Subject: string(subjectBytes), + Body: string(message), + } + default: + return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message + } + executeResult, err := act.Execute() + return executeResult, err, message +} + +func ResolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){ + msg := messageTemplate + tmpl, err := template.New("alert-message").Funcs(funcs.GenericFuncMap()).Parse(msg) + if err !=nil { + return nil, fmt.Errorf("parse message temlate error: %w", err) + } + msgBuffer := &bytes.Buffer{} + err = tmpl.Execute(msgBuffer, ctx) + return msgBuffer.Bytes(), err +} + +func RetrieveChannel(ch *alerting.Channel) (*alerting.Channel, error) { + if ch == nil { + return nil, fmt.Errorf("empty channel") + } + if ch.ID != "" { + _, err := orm.Get(ch) + if err != nil { + return nil, err + } + } + return ch, nil +} \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 52a3882b..36394ebf 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -5,7 +5,6 @@ package elasticsearch import ( - "bytes" "context" "fmt" "github.com/Knetic/govaluate" @@ -13,8 +12,7 @@ import ( "infini.sh/console/model" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" - "infini.sh/console/service/alerting/action" - "infini.sh/console/service/alerting/funcs" + "infini.sh/console/service/alerting/common" "infini.sh/framework/core/elastic" "infini.sh/console/model/insight" "infini.sh/framework/core/kv" @@ -25,7 +23,6 @@ import ( "sort" "strconv" "strings" - "text/template" "time" ) @@ -857,12 +854,12 @@ func attachTitleMessageToCtx(title, message string, paramsCtx map[string]interfa tplBytes []byte err error ) - tplBytes, err = resolveMessage(message, paramsCtx) + tplBytes, err = common.ResolveMessage(message, paramsCtx) if err != nil { return fmt.Errorf("resolve message template error: %w", err) } paramsCtx[alerting2.ParamMessage] = string(tplBytes) - tplBytes, err = resolveMessage(title, paramsCtx) + tplBytes, err = common.ResolveMessage(title, paramsCtx) if err != nil { return fmt.Errorf("resolve title template error: %w", err) } @@ -962,7 +959,7 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([ var errCount int var actionResults []alerting.ActionExecutionResult for _, channel := range channels { - resBytes, err, messageBytes := performChannel(&channel, ctx) + resBytes, err, messageBytes := common.PerformChannel(&channel, ctx) var errStr string if err != nil { errCount++ @@ -980,64 +977,8 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([ return actionResults, errCount } -func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){ - msg := messageTemplate - tmpl, err := template.New("alert-message").Funcs(funcs.GenericFuncMap()).Parse(msg) - if err !=nil { - return nil, fmt.Errorf("parse message temlate error: %w", err) - } - msgBuffer := &bytes.Buffer{} - err = tmpl.Execute(msgBuffer, ctx) - return msgBuffer.Bytes(), err -} -func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error, []byte) { - var ( - act action.Action - message []byte - err error - ) - channel, err = RetrieveChannel(channel) - if err != nil { - return nil, err, nil - } - switch channel.Type { - case alerting.ChannelWebhook: - message, err = resolveMessage(channel.Webhook.Body, ctx) - if err != nil { - return nil, err, message - } - wh := *channel.Webhook - urlBytes, err := resolveMessage(wh.URL, ctx) - if err != nil { - return nil, err, message - } - wh.URL = string(urlBytes) - act = &action.WebhookAction{ - Data: &wh, - Message: string(message), - } - case alerting.ChannelEmail: - message, err = resolveMessage(channel.Email.Body, ctx) - if err != nil { - return nil, err, message - } - subjectBytes, err := resolveMessage(channel.Email.Subject, ctx) - if err != nil { - return nil, err, nil - } - act = &action.EmailAction{ - Data: channel.Email, - Subject: string(subjectBytes), - Body: string(message), - } - default: - return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message - } - executeResult, err := act.Execute() - return executeResult, err, message -} func (engine *Engine) GenerateTask(rule alerting.Rule) func(ctx context.Context) { return func(ctx context.Context) { defer func() { diff --git a/service/alerting/elasticsearch/helper.go b/service/alerting/elasticsearch/helper.go deleted file mode 100644 index f19ada63..00000000 --- a/service/alerting/elasticsearch/helper.go +++ /dev/null @@ -1,24 +0,0 @@ -/* Copyright © INFINI Ltd. All rights reserved. - * Web: https://infinilabs.com - * Email: hello#infini.ltd */ - -package elasticsearch - -import ( - "fmt" - "infini.sh/console/model/alerting" - "infini.sh/framework/core/orm" -) - -func RetrieveChannel(ch *alerting.Channel) (*alerting.Channel, error) { - if ch == nil { - return nil, fmt.Errorf("empty channel") - } - if ch.ID != "" { - _, err := orm.Get(ch) - if err != nil { - return nil, err - } - } - return ch, nil -} From e6d08bc128e8444a56390f8a3451739d908447b5 Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 27 Jul 2023 10:29:02 +0800 Subject: [PATCH 12/22] rename recovery_notification_config.channels to recovery_notification_config.normal --- model/alerting/rule.go | 6 +++--- service/alerting/elasticsearch/engine.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 43cdc3bb..45c0f0d3 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -18,7 +18,7 @@ type Rule struct { Resource Resource `json:"resource" elastic_mapping:"resource:{type:object}"` Metrics Metric `json:"metrics" elastic_mapping:"metrics:{type:object}"` Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` - Channels NotificationConfig `json:"channels,omitempty" elastic_mapping:"channels:{type:object}"` + Channels *NotificationConfig `json:"channels,omitempty" elastic_mapping:"channels:{type:object}"` NotificationConfig *NotificationConfig `json:"notification_config,omitempty" elastic_mapping:"notification_config:{type:object}"` RecoveryNotificationConfig *RecoveryNotificationConfig `json:"recovery_notification_config,omitempty" elastic_mapping:"recovery_notification_config:{type:object}"` Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` @@ -61,7 +61,7 @@ func (rule *Rule) GetNotificationConfig() *NotificationConfig { if rule.NotificationConfig != nil { return rule.NotificationConfig } - return &rule.Channels + return rule.Channels } func (rule *Rule) GetNotificationTitleAndMessage() (string, string) { if rule.NotificationConfig != nil { @@ -87,7 +87,7 @@ type RecoveryNotificationConfig struct { Title string `json:"title"` //text template Message string `json:"message"` // text template AcceptTimeRange TimeRange `json:"accept_time_range,omitempty"` - Channels []Channel `json:"channels,omitempty"` + Normal []Channel `json:"normal,omitempty"` } type MessageTemplate struct{ diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 36394ebf..13585831 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -687,7 +687,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if err != nil { return err } - actionResults, _ := performChannels(recoverCfg.Channels, paramsCtx) + actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx) alertItem.ActionExecutionResults = actionResults } } From 3f5beb7c9fdd30aabd291892f76fc978fea9290b Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 27 Jul 2023 15:15:15 +0800 Subject: [PATCH 13/22] add enabled field to channel --- model/alerting/destination.go | 1 + service/alerting/elasticsearch/engine.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/model/alerting/destination.go b/model/alerting/destination.go index dee045ce..7b4a9f74 100644 --- a/model/alerting/destination.go +++ b/model/alerting/destination.go @@ -17,6 +17,7 @@ type Channel struct { SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` SubType string `json:"sub_type" elastic_mapping:"sub_type:{type:keyword,copy_to:search_text}"` Email *Email `json:"email,omitempty" elastic_mapping:"email:{type:object}"` + Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"` } diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 13585831..0bf1b0b4 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -959,6 +959,9 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([ var errCount int var actionResults []alerting.ActionExecutionResult for _, channel := range channels { + if !channel.Enabled { + continue + } resBytes, err, messageBytes := common.PerformChannel(&channel, ctx) var errStr string if err != nil { From 1ce263990326304165eb1a747247579ae737467a Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 27 Jul 2023 16:22:09 +0800 Subject: [PATCH 14/22] add event_enabled field to recovery_notification_config --- model/alerting/rule.go | 3 ++- service/alerting/elasticsearch/engine.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/model/alerting/rule.go b/model/alerting/rule.go index 45c0f0d3..e930d64f 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -83,11 +83,12 @@ type NotificationConfig struct { } type RecoveryNotificationConfig struct { - Enabled bool `json:"enabled"` + Enabled bool `json:"enabled"` // channel enabled Title string `json:"title"` //text template Message string `json:"message"` // text template AcceptTimeRange TimeRange `json:"accept_time_range,omitempty"` Normal []Channel `json:"normal,omitempty"` + EventEnabled bool `json:"event_enabled"` } type MessageTemplate struct{ diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 0bf1b0b4..9c314e9a 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -676,9 +676,10 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if err != nil { return fmt.Errorf("save alert message error: %w", err) } + // todo add recover notification to inner system message // send recover message to channel recoverCfg := rule.RecoveryNotificationConfig - if recoverCfg != nil && recoverCfg.Enabled { + if recoverCfg != nil && recoverCfg.EventEnabled && recoverCfg.Enabled { paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{ alerting2.ParamEventID: alertItem.ID, alerting2.ParamTimestamp: alertItem.Created.Unix(), From 3709f5683dcdf6b6a70c3958f333aef4156f9f14 Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 28 Jul 2023 11:43:46 +0800 Subject: [PATCH 15/22] remove auth.password in email server search api --- plugin/api/email/server.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 1797a312..48cfc8d6 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -5,6 +5,7 @@ package email import ( + "bytes" "crypto/tls" "fmt" log "github.com/cihub/seelog" @@ -15,6 +16,7 @@ import ( "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" + "src/github.com/buger/jsonparser" "src/github.com/gopkg.in/gomail.v2" "strconv" "time" @@ -137,6 +139,9 @@ func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, p return } } + if obj.Auth.Password != "" && newObj.Auth.Password == "" && obj.Auth.Username == newObj.Auth.Username { + newObj.Auth.Password = obj.Auth.Password + } //protect newObj.ID = id @@ -246,6 +251,26 @@ func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, p h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + //remove password field + hitsBuf := bytes.Buffer{} + hitsBuf.Write([]byte("[")) + jsonparser.ArrayEach(res.Raw, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { + value = jsonparser.Delete(value, "_source", "auth", "password") + hitsBuf.Write(value) + hitsBuf.Write([]byte(",")) + }, "hits", "hits") + buf := hitsBuf.Bytes() + if buf[len(buf)-1] == ',' { + buf[len(buf)-1] = ']' + }else{ + hitsBuf.Write([]byte("]")) + } + res.Raw, err = jsonparser.Set(res.Raw, hitsBuf.Bytes(), "hits", "hits") + if err != nil { + log.Error(err.Error()) + h.ErrorInternalServer(w, err.Error()) + return + } h.Write(w, res.Raw) } From ce98378a7c53d49d358dfd3a5633bd21c3d1ffee Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 28 Jul 2023 16:02:54 +0800 Subject: [PATCH 16/22] test email server with old password when password is empty and username equals --- plugin/api/email/server.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 48cfc8d6..b6a961bc 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -293,6 +293,18 @@ func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + if reqBody.Auth.Password == "" && reqBody.ID != "" { + obj := model.EmailServer{} + obj.ID = reqBody.ID + _, err := orm.Get(&obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if reqBody.Auth.Username == obj.Auth.Username { + reqBody.Auth.Password = obj.Auth.Password + } + } message := gomail.NewMessage() message.SetHeader("From", reqBody.Auth.Username) message.SetHeader("To", reqBody.SendTo...) From d42c05c8fba33592a0a4da7d0608d4e32e647615 Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 28 Jul 2023 17:06:07 +0800 Subject: [PATCH 17/22] adapter old alert rule data struct --- plugin/api/alerting/rule.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 6afa8d15..b0a260c5 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -135,6 +135,14 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) return } + // adapter version smaller than 1.6.0 + if obj.Channels != nil && obj.NotificationConfig == nil { + obj.NotificationConfig = obj.Channels + } + if obj.NotificationConfig != nil && obj.NotificationConfig.Message == "" && obj.Metrics.Message != "" { + obj.NotificationConfig.Message = obj.Metrics.Message + obj.NotificationConfig.Title = obj.Metrics.Title + } alertAPI.WriteJSON(w, util.MapStr{ "found": true, From fcf01793356e34fee950f6290fcaf9a989781211 Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 28 Jul 2023 18:16:46 +0800 Subject: [PATCH 18/22] default to enabled channel of old rule data --- plugin/api/alerting/rule.go | 6 ++++++ service/alerting/common/helper.go | 9 +++++---- service/alerting/elasticsearch/engine.go | 5 +++++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index b0a260c5..db302f1d 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -138,6 +138,12 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h // adapter version smaller than 1.6.0 if obj.Channels != nil && obj.NotificationConfig == nil { obj.NotificationConfig = obj.Channels + for i := range obj.NotificationConfig.Normal { + obj.NotificationConfig.Normal[i].Enabled = true + } + for i := range obj.NotificationConfig.Escalation { + obj.NotificationConfig.Escalation[i].Enabled = true + } } if obj.NotificationConfig != nil && obj.NotificationConfig.Message == "" && obj.Metrics.Message != "" { obj.NotificationConfig.Message = obj.Metrics.Message diff --git a/service/alerting/common/helper.go b/service/alerting/common/helper.go index fd8150b5..faf5e344 100644 --- a/service/alerting/common/helper.go +++ b/service/alerting/common/helper.go @@ -15,15 +15,14 @@ import ( ) func PerformChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error, []byte) { + if channel == nil { + return nil, fmt.Errorf("empty channel"), nil + } var ( act action.Action message []byte err error ) - channel, err = RetrieveChannel(channel) - if err != nil { - return nil, err, nil - } switch channel.Type { case alerting.ChannelWebhook: @@ -77,11 +76,13 @@ func RetrieveChannel(ch *alerting.Channel) (*alerting.Channel, error) { if ch == nil { return nil, fmt.Errorf("empty channel") } + enabled := ch.Enabled if ch.ID != "" { _, err := orm.Get(ch) if err != nil { return nil, err } + ch.Enabled = enabled } return ch, nil } \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 9c314e9a..74ee4bc7 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -963,6 +963,11 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([ if !channel.Enabled { continue } + _, err := common.RetrieveChannel(&channel) + if err != nil { + log.Error(err) + continue + } resBytes, err, messageBytes := common.PerformChannel(&channel, ctx) var errStr string if err != nil { From 0965256ecc0081a75aa0b0994c2a57f151953fc1 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 31 Jul 2023 15:12:58 +0800 Subject: [PATCH 19/22] use credential to store email server auth info --- model/email_server.go | 7 +++---- plugin/api/email/common/pipeline.go | 5 +++++ plugin/api/email/server.go | 10 +++------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/model/email_server.go b/model/email_server.go index e9636d1b..6c15545d 100644 --- a/model/email_server.go +++ b/model/email_server.go @@ -6,6 +6,7 @@ package model import ( "fmt" + "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" ) @@ -15,11 +16,9 @@ type EmailServer struct { Host string `json:"host" elastic_mapping:"host:{type:keyword}"` Port int `json:"port" elastic_mapping:"port:{type:keyword}"` TLS bool `json:"tls" elastic_mapping:"tls:{type:keyword}"` - Auth struct { - Username string `json:"username" elastic_mapping:"username:{type:keyword}"` - Password string `json:"password" elastic_mapping:"password:{type:keyword}"` - } `json:"auth" elastic_mapping:"auth:{type:object}"` + Auth *elastic.BasicAuth `json:"auth" elastic_mapping:"auth:{type:object}"` Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"` + CredentialID string `json:"credential_id" elastic_mapping:"credential_id:{type:keyword}"` } func (serv *EmailServer) Validate(requireName bool) error { diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go index 20d98b42..a756b9eb 100644 --- a/plugin/api/email/common/pipeline.go +++ b/plugin/api/email/common/pipeline.go @@ -36,6 +36,11 @@ func RefreshEmailServer() error { if err != nil { return err } + auth, err := GetBasicAuth(&emailServer) + if err != nil { + return err + } + emailServer.Auth = &auth servers = append(servers, emailServer) } pipeCfgStr := GeneratePipelineConfig(servers) diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index b6a961bc..d9afe53f 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -293,17 +293,13 @@ func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - if reqBody.Auth.Password == "" && reqBody.ID != "" { - obj := model.EmailServer{} - obj.ID = reqBody.ID - _, err := orm.Get(&obj) + if reqBody.Auth.Password == "" && reqBody.CredentialID != "" { + auth, err := common.GetBasicAuth(&reqBody.EmailServer) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - if reqBody.Auth.Username == obj.Auth.Username { - reqBody.Auth.Password = obj.Auth.Password - } + reqBody.Auth = &auth } message := gomail.NewMessage() message.SetHeader("From", reqBody.Auth.Username) From c17a734b174817bb1725c8f412038d3c48459c28 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 31 Jul 2023 15:46:41 +0800 Subject: [PATCH 20/22] add get auth info --- plugin/api/email/common/auth.go | 36 +++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 plugin/api/email/common/auth.go diff --git a/plugin/api/email/common/auth.go b/plugin/api/email/common/auth.go new file mode 100644 index 00000000..a5c78301 --- /dev/null +++ b/plugin/api/email/common/auth.go @@ -0,0 +1,36 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "infini.sh/console/model" + "infini.sh/framework/core/credential" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" +) + +func GetBasicAuth(srv *model.EmailServer) (basicAuth elastic.BasicAuth, err error) { + if srv.Auth != nil && srv.Auth.Username != "" { + basicAuth = *srv.Auth + return + } + if srv.CredentialID != "" { + cred := credential.Credential{} + cred.ID = srv.CredentialID + _, err = orm.Get(&cred) + if err != nil { + return + } + var dv interface{} + dv, err = cred.Decode() + if err != nil { + return + } + if auth, ok := dv.(elastic.BasicAuth); ok { + basicAuth = auth + } + } + return +} From 3de262f02671c2da67feca39cc615a9209f70cca Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 31 Jul 2023 16:40:58 +0800 Subject: [PATCH 21/22] handle create email server with manual input auth info --- plugin/api/email/server.go | 49 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index d9afe53f..19dfa539 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -13,6 +13,7 @@ import ( "infini.sh/console/model/alerting" "infini.sh/console/plugin/api/email/common" httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/credential" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "net/http" @@ -66,6 +67,16 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p h.WriteError(w, fmt.Sprintf("email server [%s:%d] already exists", obj.Host, obj.Port), http.StatusInternalServerError) return } + if obj.CredentialID == "" && obj.Auth != nil && obj.Auth.Username != ""{ + credentialID, err := saveBasicAuthToCredential(obj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + obj.CredentialID = credentialID + } + obj.Auth = nil err = orm.Create(&orm.Context{ Refresh: "wait_for", @@ -86,6 +97,33 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p } +func saveBasicAuthToCredential(srv *model.EmailServer)(string, error){ + if srv == nil { + return "", fmt.Errorf("param email config can not be empty") + } + cred := credential.Credential{ + Name: srv.Name, + Type: credential.BasicAuth, + Tags: []string{"Email"}, + Payload: map[string]interface{}{ + "basic_auth": map[string]interface{}{ + "username": srv.Auth.Username, + "password": srv.Auth.Password, + }, + }, + } + cred.ID = util.GetUUID() + err := cred.Encode() + if err != nil { + return "", err + } + err = orm.Create(nil, &cred) + if err != nil { + return "", err + } + return cred.ID, nil +} + func (h *EmailAPI) getEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("email_server_id") @@ -139,8 +177,15 @@ func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, p return } } - if obj.Auth.Password != "" && newObj.Auth.Password == "" && obj.Auth.Username == newObj.Auth.Username { - newObj.Auth.Password = obj.Auth.Password + if newObj.Auth != nil && newObj.CredentialID == "" { + credentialID, err := saveBasicAuthToCredential(&newObj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + newObj.CredentialID = credentialID + newObj.Auth = nil } //protect From c99fce0ae67957405deae204598d998310377d20 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 31 Jul 2023 17:31:40 +0800 Subject: [PATCH 22/22] fix get auth info --- plugin/api/email/server.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 19dfa539..4daf334f 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -338,7 +338,7 @@ func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - if reqBody.Auth.Password == "" && reqBody.CredentialID != "" { + if reqBody.CredentialID != "" { auth, err := common.GetBasicAuth(&reqBody.EmailServer) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -346,6 +346,10 @@ func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps } reqBody.Auth = &auth } + if reqBody.Auth == nil { + h.WriteError(w, "auth info required", http.StatusInternalServerError) + return + } message := gomail.NewMessage() message.SetHeader("From", reqBody.Auth.Username) message.SetHeader("To", reqBody.SendTo...)