From 35b5706767ae9d414820113d3ce361d099d56adc Mon Sep 17 00:00:00 2001 From: liugq Date: Fri, 21 Jul 2023 18:24:39 +0800 Subject: [PATCH] support single email service with multi email server --- plugin/api/email/common/pipeline.go | 137 +++++++++++++++------------- plugin/api/email/server.go | 28 ++++-- service/alerting/action/email.go | 5 +- 3 files changed, 94 insertions(+), 76 deletions(-) diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go index c6cf773a..81fcf931 100644 --- a/plugin/api/email/common/pipeline.go +++ b/plugin/api/email/common/pipeline.go @@ -5,13 +5,13 @@ package common import ( - "fmt" "infini.sh/console/model" "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "os" "path" + "gopkg.in/yaml.v2" ) const emailServerConfigFile = "send_email.yml" @@ -28,7 +28,7 @@ func RefreshEmailServer() error { //todo delete email server config file return nil } - servers := map[string]model.EmailServer{} + servers := make([]model.EmailServer,0, len(result.Result)) for _, row := range result.Result { emailServer := model.EmailServer{} buf := util.MustToJSONBytes(row) @@ -37,79 +37,86 @@ func RefreshEmailServer() error { if err != nil { return err } - servers[emailServer.ID] = emailServer + servers = append(servers, emailServer) } - pipeCfgStr := GeneratePipelineConfig(serv) - //cfg, err := yaml.NewConfig([]byte(pipeCfgStr)) - //if err != nil { - // return fmt.Errorf("new config error: %w", err) - //} + pipeCfgStr := GeneratePipelineConfig(servers) cfgDir := global.Env().GetConfigDir() sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) - _, err := util.FilePutContent(sendEmailCfgFile, pipeCfgStr) + _, err = util.FilePutContent(sendEmailCfgFile, pipeCfgStr) return err - //pipeCfg := pipeline.PipelineConfigV2{} - //err = cfg.Unpack(&pipeCfg) - //if err != nil { - // return fmt.Errorf("unpack pipeline config error: %w", err) - //} - //v := global.Lookup("pipeline_module") - //var ( - // pipeM *pipeline2.PipeModule - // ok bool - //) - //if pipeM, ok = v.(*pipeline2.PipeModule); !ok { - // return fmt.Errorf("can not find pipeline module") - //} - //err = pipeM.CreatePipeline(pipeCfg, true) - //if err != nil { - // return fmt.Errorf("create email server pipeline error: %w", err) - //} return nil } -func StopEmailServer(serv *model.EmailServer) error { +func StopEmailServer() error { cfgDir := global.Env().GetConfigDir() - sendEmailCfgFile := path.Join(cfgDir, "send_email.yml") + sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) return os.RemoveAll(sendEmailCfgFile) } -func getEmailServerTaskID(serv *model.EmailServer) string { - return fmt.Sprintf("send_email_%s", serv.ID) -} -func GeneratePipelineConfig(serv *model.EmailServer) string { - pipelineTpl := `name: %s -auto_start: true -keep_running: true -retry_delay_in_ms: 5000 -processor: - - consumer: - consumer: - fetch_max_messages: 1 - max_worker_size: 200 - num_of_slices: 1 - idle_timeout_in_seconds: 30 - queue_selector: - keys: - - %s - processor: - - smtp: - idle_timeout_in_seconds: 1 - server: - host: "%s" - port: %d - tls: %v - auth: - username: "%s" - password: "%s" - recipients: - cc: [] - templates: - raw: - content_type: "text/plain" - subject: "$[[subject]]" - body: "$[[body]]"` - pipelineCfg := fmt.Sprintf(pipelineTpl, getEmailServerTaskID(serv), serv.ID, serv.Host, serv.Port, serv.TLS, serv.Auth.Username, serv.Auth.Password) - return pipelineCfg +func GeneratePipelineConfig(servers []model.EmailServer) string { + if len(servers) == 0 { + return "" + } + smtpServers := map[string]util.MapStr{} + for _, srv := range servers { + smtpServers[srv.ID] = util.MapStr{ + "server": util.MapStr{ + "host": srv.Host, + "port": srv.Port, + "tls": srv.TLS, + }, + "auth": util.MapStr{ + "username": srv.Auth.Username, + "password": srv.Auth.Password, + }, + } + } + + pipelineCfg := util.MapStr{ + "pipeline": []util.MapStr{ + { + "name": "send_email_service", + "auto_start": true, + "keep_running": true, + "retry_delay_in_ms": 5000, + "processor": []util.MapStr{ + { + "consumer": util.MapStr{ + "consumer": util.MapStr{ + "fetch_max_messages": 1, + }, + "max_worker_size": 200, + "num_of_slices": 1, + "idle_timeout_in_seconds": 30, + "queue_selector": util.MapStr{ + "keys": []string{"email_messages"}, + }, + "processor": []util.MapStr{ + { + "smtp": util.MapStr{ + "idle_timeout_in_seconds": 1, + "servers": smtpServers, + "templates": util.MapStr{ + "raw": util.MapStr{ + "content_type": "text/plain", + "subject": "$[[subject]]", + "body": "$[[body]]", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + buf, err := yaml.Marshal(pipelineCfg) + if err != nil { + panic(err) + } + return string(buf) } diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go index 62cc4a4e..4cee6854 100644 --- a/plugin/api/email/server.go +++ b/plugin/api/email/server.go @@ -73,7 +73,7 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p return } if obj.Enabled { - err = common.StartEmailServer(obj) + err = common.RefreshEmailServer() if err != nil { log.Error(err) } @@ -140,12 +140,20 @@ func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, p //protect obj.ID = id obj.Created = create - err = orm.Update(nil, &obj) + err = orm.Update(&orm.Context{ + Refresh: "wait_for", + }, &obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } + if obj.Enabled { + err = common.RefreshEmailServer() + if err != nil { + log.Error(err) + } + } h.WriteJSON(w, util.MapStr{ "_id": obj.ID, @@ -168,19 +176,21 @@ func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, p return } //todo check whether referenced - if obj.Enabled { - err = common.StopEmailServer(&obj) - if err != nil { - log.Error(err) - } - } - err = orm.Delete(nil, &obj) + err = orm.Delete(&orm.Context{ + Refresh: "wait_for", + }, &obj) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } + if obj.Enabled { + err = common.RefreshEmailServer() + if err != nil { + log.Error(err) + } + } h.WriteJSON(w, util.MapStr{ "_id": obj.ID, diff --git a/service/alerting/action/email.go b/service/alerting/action/email.go index 74469633..b7ad9647 100644 --- a/service/alerting/action/email.go +++ b/service/alerting/action/email.go @@ -16,11 +16,12 @@ type EmailAction struct { Body string } -//const EmailQueueName = "alert_email_messages" +const EmailQueueName = "email_messages" func (act *EmailAction) Execute()([]byte, error){ - queueCfg := queue.GetOrInitConfig(act.Data.ServerID) + queueCfg := queue.GetOrInitConfig(EmailQueueName) emailMsg := util.MapStr{ + "server_id": act.Data.ServerID, "email": act.Data.Recipients.To, "template": "raw", "variables": util.MapStr{