From a5e036d62ac67b51e7c00bb7eed8fe2543745fd1 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 12 Jul 2023 16:28:31 +0800 Subject: [PATCH] add email server api --- model/email_server.go | 36 ++++ plugin/api/email/api.go | 56 ++++++ plugin/api/email/common/pipeline.go | 97 +++++++++++ plugin/api/email/server.go | 255 ++++++++++++++++++++++++++++ 4 files changed, 444 insertions(+) create mode 100644 model/email_server.go create mode 100644 plugin/api/email/api.go create mode 100644 plugin/api/email/common/pipeline.go create mode 100644 plugin/api/email/server.go diff --git a/model/email_server.go b/model/email_server.go new file mode 100644 index 00000000..e9636d1b --- /dev/null +++ b/model/email_server.go @@ -0,0 +1,36 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package model + +import ( + "fmt" + "infini.sh/framework/core/orm" +) + +type EmailServer struct { + orm.ORMObjectBase + Name string `json:"name" elastic_mapping:"name:{type:text}"` + Host string `json:"host" elastic_mapping:"host:{type:keyword}"` + Port int `json:"port" elastic_mapping:"port:{type:keyword}"` + TLS bool `json:"tls" elastic_mapping:"tls:{type:keyword}"` + Auth struct { + Username string `json:"username" elastic_mapping:"username:{type:keyword}"` + Password string `json:"password" elastic_mapping:"password:{type:keyword}"` + } `json:"auth" elastic_mapping:"auth:{type:object}"` + Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"` +} + +func (serv *EmailServer) Validate(requireName bool) error { + if serv.Host == "" { + return fmt.Errorf("host can not be empty") + } + if serv.Port <= 0 { + return fmt.Errorf("invalid port [%d]", serv.Port) + } + if requireName && serv.Name == "" { + return fmt.Errorf("name can not be empty") + } + return nil +} \ No newline at end of file diff --git a/plugin/api/email/api.go b/plugin/api/email/api.go new file mode 100644 index 00000000..36403c43 --- /dev/null +++ b/plugin/api/email/api.go @@ -0,0 +1,56 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package email + +import ( + "infini.sh/console/model" + "infini.sh/console/plugin/api/email/common" + "infini.sh/framework/core/api" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + log "src/github.com/cihub/seelog" +) + +type EmailAPI struct { + api.Handler +} +func InitAPI() { + email := EmailAPI{} + api.HandleAPIMethod(api.POST, "/email/server/_test", email.testEmailServer) + api.HandleAPIMethod(api.GET, "/email/server/:email_server_id", email.getEmailServer) + api.HandleAPIMethod(api.POST, "/email/server", email.createEmailServer) + api.HandleAPIMethod(api.PUT, "/email/server/:email_server_id", email.updateEmailServer) + api.HandleAPIMethod(api.DELETE, "/email/server/:email_server_id", email.deleteEmailServer) + api.HandleAPIMethod(api.GET, "/email/server/_search", email.searchEmailServer) +} + +func InitEmailServer() error { + q := orm.Query{ + Size: 10, + } + q.Conds = orm.And(orm.Eq("enabled", true)) + err, result := orm.Search(model.EmailServer{}, &q ) + if err != nil { + return err + } + if len(result.Result) == 0 { + return nil + } + for _, row := range result.Result { + emailServer := model.EmailServer{} + buf := util.MustToJSONBytes(row) + util.MustFromJSONBytes(buf, &emailServer) + err = emailServer.Validate(false) + if err != nil { + log.Error(err) + continue + } + err = common.StartEmailServer(&emailServer) + if err != nil { + log.Error(err) + } + } + return nil +} \ No newline at end of file diff --git a/plugin/api/email/common/pipeline.go b/plugin/api/email/common/pipeline.go new file mode 100644 index 00000000..48fe5697 --- /dev/null +++ b/plugin/api/email/common/pipeline.go @@ -0,0 +1,97 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package common + +import ( + "fmt" + "infini.sh/console/model" + "infini.sh/framework/core/global" + "infini.sh/framework/core/pipeline" + "infini.sh/framework/lib/go-ucfg/yaml" + pipeline2 "infini.sh/framework/modules/pipeline" +) + +func StartEmailServer(serv *model.EmailServer) error { + pipeCfgStr := GeneratePipelineConfig(serv) + cfg, err := yaml.NewConfig([]byte(pipeCfgStr)) + if err != nil { + return fmt.Errorf("new config error: %w", err) + } + pipeCfg := pipeline.PipelineConfigV2{} + err = cfg.Unpack(&pipeCfg) + if err != nil { + return fmt.Errorf("unpack pipeline config error: %w", err) + } + v := global.Lookup("pipeline_module") + var ( + pipeM *pipeline2.PipeModule + ok bool + ) + if pipeM, ok = v.(*pipeline2.PipeModule); !ok { + return fmt.Errorf("can not find pipeline module") + } + err = pipeM.CreatePipeline(pipeCfg, true) + if err != nil { + return fmt.Errorf("create email server pipeline error: %w", err) + } + return nil +} + +func StopEmailServer(serv *model.EmailServer) error { + v := global.Lookup("pipeline_module") + var ( + pipeM *pipeline2.PipeModule + ok bool + ) + if pipeM, ok = v.(*pipeline2.PipeModule); !ok { + return fmt.Errorf("can not find pipeline module") + } + emailServerTaskID := getEmailServerTaskID(serv) + exists := pipeM.StopTask(emailServerTaskID) + if exists { + pipeM.DeleteTask(emailServerTaskID) + } + return nil +} + +func getEmailServerTaskID(serv *model.EmailServer) string { + return fmt.Sprintf("send_email_%s", serv.ID) +} + +func GeneratePipelineConfig(serv *model.EmailServer) string { + pipelineTpl := `name: %s +auto_start: true +keep_running: true +retry_delay_in_ms: 5000 +processor: + - consumer: + consumer: + fetch_max_messages: 1 + max_worker_size: 200 + num_of_slices: 1 + idle_timeout_in_seconds: 30 + queue_selector: + keys: + - %s + processor: + - smtp: + idle_timeout_in_seconds: 1 + server: + host: "%s" + port: %d + tls: %v + auth: + username: "%s" + password: "%s" + recipients: + cc: [] + templates: + raw: + content_type: "text/plain" + subject: "$[[subject]]" + body: "$[[body]]"` + pipelineCfg := fmt.Sprintf(pipelineTpl, getEmailServerTaskID(serv), serv.ID, serv.Host, serv.Port, serv.TLS, serv.Auth.Username, serv.Auth.Password) + return pipelineCfg +} diff --git a/plugin/api/email/server.go b/plugin/api/email/server.go new file mode 100644 index 00000000..3aab8136 --- /dev/null +++ b/plugin/api/email/server.go @@ -0,0 +1,255 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package email + +import ( + "crypto/tls" + "fmt" + log "github.com/cihub/seelog" + "infini.sh/console/model" + "infini.sh/console/plugin/api/email/common" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + "src/github.com/gopkg.in/gomail.v2" + "strconv" + "time" +) + +func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var obj = &model.EmailServer{} + err := h.DecodeJSON(req, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + q := util.MapStr{ + "size": 1, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "host": util.MapStr{ + "value": obj.Host, + }, + }, + }, + { + "term": util.MapStr{ + "port": util.MapStr{ + "value": obj.Port, + }, + }, + }, + }, + }, + }, + } + query := orm.Query{ + RawQuery: util.MustToJSONBytes(q), + } + err, result := orm.Search(obj, &query) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if len(result.Result) > 0 { + h.WriteError(w, fmt.Sprintf("email server [%s:%d] already exists", obj.Host, obj.Port), http.StatusInternalServerError) + return + } + + err = orm.Create(nil, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if obj.Enabled { + err = common.StartEmailServer(obj) + if err != nil { + log.Error(err) + } + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "created", + }, 200) + +} + +func (h *EmailAPI) getEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("email_server_id") + + obj := model.EmailServer{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} + +func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("email_server_id") + obj := model.EmailServer{} + + obj.ID = id + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + id = obj.ID + create := obj.Created + obj = model.EmailServer{} + err = h.DecodeJSON(req, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + //protect + obj.ID = id + obj.Created = create + err = orm.Update(nil, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "updated", + }, 200) +} + +func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("email_server_id") + + obj := model.EmailServer{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + //todo check whether referenced + if obj.Enabled { + err = common.StopEmailServer(&obj) + if err != nil { + log.Error(err) + } + } + + err = orm.Delete(nil, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "deleted", + }, 200) +} + +func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + ) + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{ + From: from, + Size: size, + } + q.Conds = orm.And(orm.Eq("enabled", true)) + + err, res := orm.Search(&model.EmailServer{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.Write(w, res.Raw) +} + +func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + reqBody := &struct { + SendTo []string `json:"send_to"` + model.EmailServer + }{} + err := h.DecodeJSON(req, reqBody) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(reqBody.SendTo) == 0 { + h.WriteError(w, "receiver email address can not be empty", http.StatusInternalServerError) + return + } + if err = reqBody.Validate(false); err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + message := gomail.NewMessage() + message.SetHeader("From", reqBody.Auth.Username) + message.SetHeader("To", reqBody.SendTo...) + message.SetHeader("Subject", "test email") + + message.SetBody("text/plain", "This is just a test email, do not reply!") + d := gomail.NewDialerWithTimeout(reqBody.Host, reqBody.Port, reqBody.Auth.Username, reqBody.Auth.Password, 3*time.Second) + d.TLSConfig = &tls.Config{InsecureSkipVerify: true} + d.SSL = reqBody.TLS + + err = d.DialAndSend(message) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteAckOKJSON(w) +}