support single email service with multi email server

This commit is contained in:
liugq 2023-07-21 18:24:39 +08:00
parent 4ffd8de0a2
commit 35b5706767
3 changed files with 94 additions and 76 deletions

View File

@ -5,13 +5,13 @@
package common package common
import ( import (
"fmt"
"infini.sh/console/model" "infini.sh/console/model"
"infini.sh/framework/core/global" "infini.sh/framework/core/global"
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"os" "os"
"path" "path"
"gopkg.in/yaml.v2"
) )
const emailServerConfigFile = "send_email.yml" const emailServerConfigFile = "send_email.yml"
@ -28,7 +28,7 @@ func RefreshEmailServer() error {
//todo delete email server config file //todo delete email server config file
return nil return nil
} }
servers := map[string]model.EmailServer{} servers := make([]model.EmailServer,0, len(result.Result))
for _, row := range result.Result { for _, row := range result.Result {
emailServer := model.EmailServer{} emailServer := model.EmailServer{}
buf := util.MustToJSONBytes(row) buf := util.MustToJSONBytes(row)
@ -37,79 +37,86 @@ func RefreshEmailServer() error {
if err != nil { if err != nil {
return err return err
} }
servers[emailServer.ID] = emailServer servers = append(servers, emailServer)
} }
pipeCfgStr := GeneratePipelineConfig(serv) pipeCfgStr := GeneratePipelineConfig(servers)
//cfg, err := yaml.NewConfig([]byte(pipeCfgStr))
//if err != nil {
// return fmt.Errorf("new config error: %w", err)
//}
cfgDir := global.Env().GetConfigDir() cfgDir := global.Env().GetConfigDir()
sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile) sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile)
_, err := util.FilePutContent(sendEmailCfgFile, pipeCfgStr) _, err = util.FilePutContent(sendEmailCfgFile, pipeCfgStr)
return err return err
//pipeCfg := pipeline.PipelineConfigV2{}
//err = cfg.Unpack(&pipeCfg)
//if err != nil {
// return fmt.Errorf("unpack pipeline config error: %w", err)
//}
//v := global.Lookup("pipeline_module")
//var (
// pipeM *pipeline2.PipeModule
// ok bool
//)
//if pipeM, ok = v.(*pipeline2.PipeModule); !ok {
// return fmt.Errorf("can not find pipeline module")
//}
//err = pipeM.CreatePipeline(pipeCfg, true)
//if err != nil {
// return fmt.Errorf("create email server pipeline error: %w", err)
//}
return nil return nil
} }
func StopEmailServer(serv *model.EmailServer) error { func StopEmailServer() error {
cfgDir := global.Env().GetConfigDir() cfgDir := global.Env().GetConfigDir()
sendEmailCfgFile := path.Join(cfgDir, "send_email.yml") sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile)
return os.RemoveAll(sendEmailCfgFile) return os.RemoveAll(sendEmailCfgFile)
} }
func getEmailServerTaskID(serv *model.EmailServer) string {
return fmt.Sprintf("send_email_%s", serv.ID)
}
func GeneratePipelineConfig(serv *model.EmailServer) string { func GeneratePipelineConfig(servers []model.EmailServer) string {
pipelineTpl := `name: %s if len(servers) == 0 {
auto_start: true return ""
keep_running: true }
retry_delay_in_ms: 5000 smtpServers := map[string]util.MapStr{}
processor: for _, srv := range servers {
- consumer: smtpServers[srv.ID] = util.MapStr{
consumer: "server": util.MapStr{
fetch_max_messages: 1 "host": srv.Host,
max_worker_size: 200 "port": srv.Port,
num_of_slices: 1 "tls": srv.TLS,
idle_timeout_in_seconds: 30 },
queue_selector: "auth": util.MapStr{
keys: "username": srv.Auth.Username,
- %s "password": srv.Auth.Password,
processor: },
- smtp: }
idle_timeout_in_seconds: 1 }
server:
host: "%s" pipelineCfg := util.MapStr{
port: %d "pipeline": []util.MapStr{
tls: %v {
auth: "name": "send_email_service",
username: "%s" "auto_start": true,
password: "%s" "keep_running": true,
recipients: "retry_delay_in_ms": 5000,
cc: [] "processor": []util.MapStr{
templates: {
raw: "consumer": util.MapStr{
content_type: "text/plain" "consumer": util.MapStr{
subject: "$[[subject]]" "fetch_max_messages": 1,
body: "$[[body]]"` },
pipelineCfg := fmt.Sprintf(pipelineTpl, getEmailServerTaskID(serv), serv.ID, serv.Host, serv.Port, serv.TLS, serv.Auth.Username, serv.Auth.Password) "max_worker_size": 200,
return pipelineCfg "num_of_slices": 1,
"idle_timeout_in_seconds": 30,
"queue_selector": util.MapStr{
"keys": []string{"email_messages"},
},
"processor": []util.MapStr{
{
"smtp": util.MapStr{
"idle_timeout_in_seconds": 1,
"servers": smtpServers,
"templates": util.MapStr{
"raw": util.MapStr{
"content_type": "text/plain",
"subject": "$[[subject]]",
"body": "$[[body]]",
},
},
},
},
},
},
},
},
},
},
}
buf, err := yaml.Marshal(pipelineCfg)
if err != nil {
panic(err)
}
return string(buf)
} }

View File

@ -73,7 +73,7 @@ func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, p
return return
} }
if obj.Enabled { if obj.Enabled {
err = common.StartEmailServer(obj) err = common.RefreshEmailServer()
if err != nil { if err != nil {
log.Error(err) log.Error(err)
} }
@ -140,12 +140,20 @@ func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, p
//protect //protect
obj.ID = id obj.ID = id
obj.Created = create obj.Created = create
err = orm.Update(nil, &obj) err = orm.Update(&orm.Context{
Refresh: "wait_for",
}, &obj)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err) log.Error(err)
return return
} }
if obj.Enabled {
err = common.RefreshEmailServer()
if err != nil {
log.Error(err)
}
}
h.WriteJSON(w, util.MapStr{ h.WriteJSON(w, util.MapStr{
"_id": obj.ID, "_id": obj.ID,
@ -168,19 +176,21 @@ func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, p
return return
} }
//todo check whether referenced //todo check whether referenced
if obj.Enabled {
err = common.StopEmailServer(&obj)
if err != nil {
log.Error(err)
}
}
err = orm.Delete(nil, &obj) err = orm.Delete(&orm.Context{
Refresh: "wait_for",
}, &obj)
if err != nil { if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err) log.Error(err)
return return
} }
if obj.Enabled {
err = common.RefreshEmailServer()
if err != nil {
log.Error(err)
}
}
h.WriteJSON(w, util.MapStr{ h.WriteJSON(w, util.MapStr{
"_id": obj.ID, "_id": obj.ID,

View File

@ -16,11 +16,12 @@ type EmailAction struct {
Body string Body string
} }
//const EmailQueueName = "alert_email_messages" const EmailQueueName = "email_messages"
func (act *EmailAction) Execute()([]byte, error){ func (act *EmailAction) Execute()([]byte, error){
queueCfg := queue.GetOrInitConfig(act.Data.ServerID) queueCfg := queue.GetOrInitConfig(EmailQueueName)
emailMsg := util.MapStr{ emailMsg := util.MapStr{
"server_id": act.Data.ServerID,
"email": act.Data.Recipients.To, "email": act.Data.Recipients.To,
"template": "raw", "template": "raw",
"variables": util.MapStr{ "variables": util.MapStr{