Merge pull request 'alert_optimize' (#135) from alert_optimize into master

This commit is contained in:
silenceqi 2023-08-01 15:13:33 +08:00
commit 350549507b
19 changed files with 904 additions and 87 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
_ "expvar"
"infini.sh/console/plugin/api/email"
_ "time/tzdata"
log "github.com/cihub/seelog"
@ -135,6 +136,7 @@ func main() {
orm.RegisterSchemaWithIndexName(task1.Task{}, "task")
orm.RegisterSchemaWithIndexName(model.Layout{}, "layout")
orm.RegisterSchemaWithIndexName(model.Notification{}, "notification")
orm.RegisterSchemaWithIndexName(model.EmailServer{}, "email-server")
api.RegisterSchema()
if global.Env().SetupRequired() {
@ -150,6 +152,13 @@ func main() {
}
return err
})
task1.RunWithinGroup("initialize_email_server", func(ctx context.Context) error {
err := email.InitEmailServer()
if err != nil {
log.Errorf("init email server error: %v", err)
}
return err
})
}
if !global.Env().SetupRequired() {

View File

@ -4,7 +4,9 @@
package alerting
import "infini.sh/framework/core/orm"
import (
"infini.sh/framework/core/orm"
)
type Channel struct {
orm.ORMObjectBase
@ -13,6 +15,9 @@ type Channel struct {
Priority int `json:"priority,omitempty"`
Webhook *CustomWebhook `json:"webhook,omitempty" elastic_mapping:"webhook:{type:object}"`
SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"`
SubType string `json:"sub_type" elastic_mapping:"sub_type:{type:keyword,copy_to:search_text}"`
Email *Email `json:"email,omitempty" elastic_mapping:"email:{type:object}"`
Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"`
}

View File

@ -13,9 +13,9 @@ import (
type Metric struct {
insight.Metric
Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80
Title string `json:"title"` //text template
Message string `json:"message"` // text template
Title string `json:"title,omitempty"` //text template
Message string `json:"message,omitempty"` // text template
Expression string `json:"expression,omitempty" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80
}

View File

@ -18,7 +18,9 @@ type Rule struct {
Resource Resource `json:"resource" elastic_mapping:"resource:{type:object}"`
Metrics Metric `json:"metrics" elastic_mapping:"metrics:{type:object}"`
Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"`
Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"`
Channels *NotificationConfig `json:"channels,omitempty" elastic_mapping:"channels:{type:object}"`
NotificationConfig *NotificationConfig `json:"notification_config,omitempty" elastic_mapping:"notification_config:{type:object}"`
RecoveryNotificationConfig *RecoveryNotificationConfig `json:"recovery_notification_config,omitempty" elastic_mapping:"recovery_notification_config:{type:object}"`
Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"`
LastNotificationTime time.Time `json:"-" elastic_mapping:"last_notification_time:{type:date}"`
LastTermStartTime time.Time `json:"-"` //标识最近一轮告警的开始时间
@ -54,9 +56,24 @@ func (rule *Rule) GetOrInitExpression() (string, error){
rule.Expression = strings.ReplaceAll(sb.String(), "result", metricExp)
return rule.Expression, nil
}
//GetNotificationConfig for adapter old version config
func (rule *Rule) GetNotificationConfig() *NotificationConfig {
if rule.NotificationConfig != nil {
return rule.NotificationConfig
}
return rule.Channels
}
func (rule *Rule) GetNotificationTitleAndMessage() (string, string) {
if rule.NotificationConfig != nil {
return rule.NotificationConfig.Title, rule.NotificationConfig.Message
}
return rule.Metrics.Title, rule.Metrics.Message
}
type RuleChannel struct {
type NotificationConfig struct {
Enabled bool `json:"enabled"`
Title string `json:"title,omitempty"` //text template
Message string `json:"message,omitempty"` // text template
Normal []Channel `json:"normal,omitempty"`
Escalation []Channel `json:"escalation,omitempty"`
ThrottlePeriod string `json:"throttle_period,omitempty"` //沉默周期
@ -65,6 +82,15 @@ type RuleChannel struct {
EscalationEnabled bool `json:"escalation_enabled,omitempty"`
}
type RecoveryNotificationConfig struct {
Enabled bool `json:"enabled"` // channel enabled
Title string `json:"title"` //text template
Message string `json:"message"` // text template
AcceptTimeRange TimeRange `json:"accept_time_range,omitempty"`
Normal []Channel `json:"normal,omitempty"`
EventEnabled bool `json:"event_enabled"`
}
type MessageTemplate struct{
Type string `json:"type"`
Source string `json:"source"`

View File

@ -6,6 +6,7 @@ package alerting
import (
"fmt"
"infini.sh/console/model/insight"
"infini.sh/framework/core/util"
"net/http"
"testing"
@ -60,22 +61,25 @@ func TestCreateRule( t *testing.T) {
//},
Metrics: Metric{
PeriodInterval: "1m",
Items: []MetricItem{
{Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},
{Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},
Metric: insight.Metric{
Groups: []insight.MetricGroupItem{{"metadata.labels.cluster_id", 10}, {"metadata.labels.node_id", 10}},
Items: []insight.MetricItem{
{Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min" },
{Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max"},
},
BucketSize: "1m",
Formula: "a/b*100",
},
//Expression: "min(fs.free_in_bytes)/max(fs.total_in_bytes)*100",
},
Conditions: Condition{
Operator: "any",
Items: []ConditionItem{
{MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"76"}, Priority: "error", Message: "磁盘可用率小于10%"},
{MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"76"}, Priority: "error"},
},
},
Channels: RuleChannel{
Channels: NotificationConfig{
Normal: []Channel{
{Name: "钉钉", Type: ChannelWebhook, Webhook: &CustomWebhook{
HeaderParams: map[string]string{

View File

@ -10,3 +10,14 @@ type CustomWebhook struct {
URL string `json:"url,omitempty"`
Body string `json:"body" elastic_mapping:"body:{type:text}"`
}
type Email struct {
ServerID string `json:"server_id" elastic_mapping:"server_id:{type:keyword}"`
Recipients struct {
To []string `json:"to,omitempty" elastic_mapping:"to:{type:keyword}"`
CC []string `json:"cc,omitempty" elastic_mapping:"cc:{type:keyword}"`
BCC []string `json:"bcc,omitempty" elastic_mapping:"bcc:{type:keyword}"`
} `json:"recipients" elastic_mapping:"recipients:{type:object}"`
Subject string `json:"subject" elastic_mapping:"subject:{type:text}"`
Body string `json:"body" elastic_mapping:"body:{type:text}"`
}

35
model/email_server.go Normal file
View File

@ -0,0 +1,35 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package model
import (
"fmt"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
)
type EmailServer struct {
orm.ORMObjectBase
Name string `json:"name" elastic_mapping:"name:{type:text}"`
Host string `json:"host" elastic_mapping:"host:{type:keyword}"`
Port int `json:"port" elastic_mapping:"port:{type:keyword}"`
TLS bool `json:"tls" elastic_mapping:"tls:{type:keyword}"`
Auth *elastic.BasicAuth `json:"auth" elastic_mapping:"auth:{type:object}"`
Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:boolean}"`
CredentialID string `json:"credential_id" elastic_mapping:"credential_id:{type:keyword}"`
}
func (serv *EmailServer) Validate(requireName bool) error {
if serv.Host == "" {
return fmt.Errorf("host can not be empty")
}
if serv.Port <= 0 {
return fmt.Errorf("invalid port [%d]", serv.Port)
}
if requireName && serv.Name == "" {
return fmt.Errorf("name can not be empty")
}
return nil
}

View File

@ -33,6 +33,7 @@ func (alert *AlertAPI) Init() {
api.HandleAPIMethod(api.DELETE, "/alerting/channel", alert.RequirePermission(alert.deleteChannel, enum.PermissionAlertChannelWrite))
api.HandleAPIMethod(api.PUT, "/alerting/channel/:channel_id", alert.RequirePermission(alert.updateChannel, enum.PermissionAlertChannelWrite))
api.HandleAPIMethod(api.GET, "/alerting/channel/_search", alert.RequirePermission(alert.searchChannel, enum.PermissionAlertChannelRead))
api.HandleAPIMethod(api.POST, "/alerting/channel/test", alert.RequirePermission(alert.testChannel, enum.PermissionAlertChannelWrite))
api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.RequirePermission(alert.searchAlert, enum.PermissionAlertHistoryRead))
api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.RequirePermission(alert.getAlert, enum.PermissionAlertHistoryRead))

View File

@ -6,9 +6,11 @@ package alerting
import (
"fmt"
"infini.sh/console/service/alerting/common"
"net/http"
"strconv"
"strings"
"time"
log "github.com/cihub/seelog"
"infini.sh/console/model/alerting"
@ -33,10 +35,7 @@ func (h *AlertAPI) createChannel(w http.ResponseWriter, req *http.Request, ps ht
return
}
h.WriteJSON(w, util.MapStr{
"_id": obj.ID,
"result": "created",
}, 200)
h.WriteCreatedOKJSON(w, obj.ID)
}
@ -60,11 +59,7 @@ func (h *AlertAPI) getChannel(w http.ResponseWriter, req *http.Request, ps httpr
return
}
h.WriteJSON(w, util.MapStr{
"found": true,
"_id": id,
"_source": obj,
}, 200)
h.WriteGetOKJSON(w, id, obj)
}
func (h *AlertAPI) updateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -101,10 +96,7 @@ func (h *AlertAPI) updateChannel(w http.ResponseWriter, req *http.Request, ps ht
return
}
h.WriteJSON(w, util.MapStr{
"_id": obj.ID,
"result": "updated",
}, 200)
h.WriteUpdatedOKJSON(w, id)
}
func (h *AlertAPI) deleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -179,3 +171,43 @@ func (h *AlertAPI) searchChannel(w http.ResponseWriter, req *http.Request, ps ht
}
h.Write(w, res.Raw)
}
func (h *AlertAPI) testChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
obj := alerting.Channel{}
err := h.DecodeJSON(req, &obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
ctx := map[string]interface{}{
"title": "test title",
"message": "test message",
"rule_id": util.GetUUID(),
"rule_name": "test rule",
"resource_id": util.GetUUID(),
"resource_name": "test resource",
"event_id": util.GetUUID(),
"timestamp": time.Now().UnixMilli(),
"first_group_value": "first group value",
"first_threshold": "90",
"priority": "critical",
"results": []util.MapStr{
{"threshold": "90",
"priority": "critical",
"group_values": []string{"first group value", "second group value"},
"issue_timestamp": time.Now().UnixMilli()-500,
"result_value": 90,
"relation_values": util.MapStr{"a": 100, "b": 90},
},
},
}
_, err, _ = common.PerformChannel(&obj, ctx)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteAckOKJSON(w)
}

View File

@ -135,6 +135,20 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
// adapter version smaller than 1.6.0
if obj.Channels != nil && obj.NotificationConfig == nil {
obj.NotificationConfig = obj.Channels
for i := range obj.NotificationConfig.Normal {
obj.NotificationConfig.Normal[i].Enabled = true
}
for i := range obj.NotificationConfig.Escalation {
obj.NotificationConfig.Escalation[i].Enabled = true
}
}
if obj.NotificationConfig != nil && obj.NotificationConfig.Message == "" && obj.Metrics.Message != "" {
obj.NotificationConfig.Message = obj.Metrics.Message
obj.NotificationConfig.Title = obj.Metrics.Title
}
alertAPI.WriteJSON(w, util.MapStr{
"found": true,

30
plugin/api/email/api.go Normal file
View File

@ -0,0 +1,30 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package email
import (
"infini.sh/console/plugin/api/email/common"
"infini.sh/framework/core/api"
)
type EmailAPI struct {
api.Handler
}
func InitAPI() {
email := EmailAPI{}
api.HandleAPIMethod(api.POST, "/email/server/_test", email.testEmailServer)
api.HandleAPIMethod(api.GET, "/email/server/:email_server_id", email.getEmailServer)
api.HandleAPIMethod(api.POST, "/email/server", email.createEmailServer)
api.HandleAPIMethod(api.PUT, "/email/server/:email_server_id", email.updateEmailServer)
api.HandleAPIMethod(api.DELETE, "/email/server/:email_server_id", email.deleteEmailServer)
api.HandleAPIMethod(api.GET, "/email/server/_search", email.searchEmailServer)
}
func InitEmailServer() error {
if !common.CheckEmailPipelineExists() {
return common.RefreshEmailServer()
}
return nil
}

View File

@ -0,0 +1,36 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package common
import (
"infini.sh/console/model"
"infini.sh/framework/core/credential"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
)
func GetBasicAuth(srv *model.EmailServer) (basicAuth elastic.BasicAuth, err error) {
if srv.Auth != nil && srv.Auth.Username != "" {
basicAuth = *srv.Auth
return
}
if srv.CredentialID != "" {
cred := credential.Credential{}
cred.ID = srv.CredentialID
_, err = orm.Get(&cred)
if err != nil {
return
}
var dv interface{}
dv, err = cred.Decode()
if err != nil {
return
}
if auth, ok := dv.(elastic.BasicAuth); ok {
basicAuth = auth
}
}
return
}

View File

@ -0,0 +1,134 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package common
import (
"infini.sh/console/model"
"infini.sh/framework/core/global"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"os"
"path"
"gopkg.in/yaml.v2"
)
const emailServerConfigFile = "send_email.yml"
func RefreshEmailServer() error {
q := orm.Query{
Size: 10,
}
q.Conds = orm.And(orm.Eq("enabled", true))
err, result := orm.Search(model.EmailServer{}, &q )
if err != nil {
return err
}
if len(result.Result) == 0 {
return StopEmailServer()
}
servers := make([]model.EmailServer,0, len(result.Result))
for _, row := range result.Result {
emailServer := model.EmailServer{}
buf := util.MustToJSONBytes(row)
util.MustFromJSONBytes(buf, &emailServer)
err = emailServer.Validate(false)
if err != nil {
return err
}
auth, err := GetBasicAuth(&emailServer)
if err != nil {
return err
}
emailServer.Auth = &auth
servers = append(servers, emailServer)
}
pipeCfgStr := GeneratePipelineConfig(servers)
cfgDir := global.Env().GetConfigDir()
sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile)
_, err = util.FilePutContent(sendEmailCfgFile, pipeCfgStr)
return err
}
func StopEmailServer() error {
cfgDir := global.Env().GetConfigDir()
sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile)
if util.FilesExists(sendEmailCfgFile) {
return os.RemoveAll(sendEmailCfgFile)
}
return nil
}
func CheckEmailPipelineExists() bool {
cfgDir := global.Env().GetConfigDir()
sendEmailCfgFile := path.Join(cfgDir, emailServerConfigFile)
return util.FilesExists(sendEmailCfgFile)
}
func GeneratePipelineConfig(servers []model.EmailServer) string {
if len(servers) == 0 {
return ""
}
smtpServers := map[string]util.MapStr{}
for _, srv := range servers {
smtpServers[srv.ID] = util.MapStr{
"server": util.MapStr{
"host": srv.Host,
"port": srv.Port,
"tls": srv.TLS,
},
"auth": util.MapStr{
"username": srv.Auth.Username,
"password": srv.Auth.Password,
},
}
}
pipelineCfg := util.MapStr{
"pipeline": []util.MapStr{
{
"name": "send_email_service",
"auto_start": true,
"keep_running": true,
"retry_delay_in_ms": 5000,
"processor": []util.MapStr{
{
"consumer": util.MapStr{
"consumer": util.MapStr{
"fetch_max_messages": 1,
},
"max_worker_size": 200,
"num_of_slices": 1,
"idle_timeout_in_seconds": 30,
"queue_selector": util.MapStr{
"keys": []string{"email_messages"},
},
"processor": []util.MapStr{
{
"smtp": util.MapStr{
"idle_timeout_in_seconds": 1,
"servers": smtpServers,
"templates": util.MapStr{
"raw": util.MapStr{
"content_type": "text/plain",
"subject": "$[[subject]]",
"body": "$[[body]]",
},
},
},
},
},
},
},
},
},
},
}
buf, err := yaml.Marshal(pipelineCfg)
if err != nil {
panic(err)
}
return string(buf)
}

370
plugin/api/email/server.go Normal file
View File

@ -0,0 +1,370 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package email
import (
"bytes"
"crypto/tls"
"fmt"
log "github.com/cihub/seelog"
"infini.sh/console/model"
"infini.sh/console/model/alerting"
"infini.sh/console/plugin/api/email/common"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/credential"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"net/http"
"src/github.com/buger/jsonparser"
"src/github.com/gopkg.in/gomail.v2"
"strconv"
"time"
)
func (h *EmailAPI) createEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var obj = &model.EmailServer{}
err := h.DecodeJSON(req, obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
q := util.MapStr{
"size": 1,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"host": util.MapStr{
"value": obj.Host,
},
},
},
{
"term": util.MapStr{
"port": util.MapStr{
"value": obj.Port,
},
},
},
},
},
},
}
query := orm.Query{
RawQuery: util.MustToJSONBytes(q),
}
err, result := orm.Search(obj, &query)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if len(result.Result) > 0 {
h.WriteError(w, fmt.Sprintf("email server [%s:%d] already exists", obj.Host, obj.Port), http.StatusInternalServerError)
return
}
if obj.CredentialID == "" && obj.Auth != nil && obj.Auth.Username != ""{
credentialID, err := saveBasicAuthToCredential(obj)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
obj.CredentialID = credentialID
}
obj.Auth = nil
err = orm.Create(&orm.Context{
Refresh: "wait_for",
}, obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if obj.Enabled {
err = common.RefreshEmailServer()
if err != nil {
log.Error(err)
}
}
h.WriteCreatedOKJSON(w, obj.ID)
}
func saveBasicAuthToCredential(srv *model.EmailServer)(string, error){
if srv == nil {
return "", fmt.Errorf("param email config can not be empty")
}
cred := credential.Credential{
Name: srv.Name,
Type: credential.BasicAuth,
Tags: []string{"Email"},
Payload: map[string]interface{}{
"basic_auth": map[string]interface{}{
"username": srv.Auth.Username,
"password": srv.Auth.Password,
},
},
}
cred.ID = util.GetUUID()
err := cred.Encode()
if err != nil {
return "", err
}
err = orm.Create(nil, &cred)
if err != nil {
return "", err
}
return cred.ID, nil
}
func (h *EmailAPI) getEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("email_server_id")
obj := model.EmailServer{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteGetOKJSON(w, id, obj)
}
func (h *EmailAPI) updateEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("email_server_id")
obj := model.EmailServer{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"result": "not_found",
}, http.StatusNotFound)
return
}
id = obj.ID
create := obj.Created
newObj := model.EmailServer{}
err = h.DecodeJSON(req, &newObj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if !newObj.Enabled && obj.Enabled {
if err = checkEmailServerReferenced(&obj); err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
}
if newObj.Auth != nil && newObj.CredentialID == "" {
credentialID, err := saveBasicAuthToCredential(&newObj)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
newObj.CredentialID = credentialID
newObj.Auth = nil
}
//protect
newObj.ID = id
newObj.Created = create
err = orm.Update(&orm.Context{
Refresh: "wait_for",
}, &newObj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
err = common.RefreshEmailServer()
if err != nil {
log.Error(err)
}
h.WriteUpdatedOKJSON(w, obj.ID)
}
func (h *EmailAPI) deleteEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("email_server_id")
obj := model.EmailServer{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"result": "not_found",
}, http.StatusNotFound)
return
}
if err = checkEmailServerReferenced(&obj); err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
err = orm.Delete(&orm.Context{
Refresh: "wait_for",
}, &obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
if obj.Enabled {
err = common.RefreshEmailServer()
if err != nil {
log.Error(err)
}
}
h.WriteDeletedOKJSON(w, obj.ID)
}
func checkEmailServerReferenced(srv *model.EmailServer) error {
q := &orm.Query{
Size: 1,
}
q.Conds = orm.And(orm.Eq("email.server_id", srv.ID))
err, result := orm.Search(alerting.Channel{}, q)
if err != nil {
return err
}
if len(result.Result) > 0 {
var chName interface{} = ""
if m, ok := result.Result[0].(map[string]interface{}); ok {
chName = m["name"]
}
return fmt.Errorf("email server used by channel [%s]", chName)
}
return nil
}
func (h *EmailAPI) searchEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
strSize = h.GetParameterOrDefault(req, "size", "20")
strFrom = h.GetParameterOrDefault(req, "from", "0")
strEnabled = h.GetParameterOrDefault(req, "enabled", "true")
)
size, _ := strconv.Atoi(strSize)
if size <= 0 {
size = 20
}
from, _ := strconv.Atoi(strFrom)
if from < 0 {
from = 0
}
q := orm.Query{
From: from,
Size: size,
}
if strEnabled == "true" {
q.Conds = orm.And(orm.Eq("enabled", true))
}else if strEnabled == "false" {
q.Conds = orm.And(orm.Eq("enabled", false))
}
err, res := orm.Search(&model.EmailServer{}, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
//remove password field
hitsBuf := bytes.Buffer{}
hitsBuf.Write([]byte("["))
jsonparser.ArrayEach(res.Raw, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
value = jsonparser.Delete(value, "_source", "auth", "password")
hitsBuf.Write(value)
hitsBuf.Write([]byte(","))
}, "hits", "hits")
buf := hitsBuf.Bytes()
if buf[len(buf)-1] == ',' {
buf[len(buf)-1] = ']'
}else{
hitsBuf.Write([]byte("]"))
}
res.Raw, err = jsonparser.Set(res.Raw, hitsBuf.Bytes(), "hits", "hits")
if err != nil {
log.Error(err.Error())
h.ErrorInternalServer(w, err.Error())
return
}
h.Write(w, res.Raw)
}
func (h *EmailAPI) testEmailServer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
reqBody := &struct {
SendTo []string `json:"send_to"`
model.EmailServer
}{}
err := h.DecodeJSON(req, reqBody)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(reqBody.SendTo) == 0 {
h.WriteError(w, "receiver email address can not be empty", http.StatusInternalServerError)
return
}
if err = reqBody.Validate(false); err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if reqBody.CredentialID != "" {
auth, err := common.GetBasicAuth(&reqBody.EmailServer)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
reqBody.Auth = &auth
}
if reqBody.Auth == nil {
h.WriteError(w, "auth info required", http.StatusInternalServerError)
return
}
message := gomail.NewMessage()
message.SetHeader("From", reqBody.Auth.Username)
message.SetHeader("To", reqBody.SendTo...)
message.SetHeader("Subject", "test email")
message.SetBody("text/plain", "This is just a test email, do not reply!")
d := gomail.NewDialerWithTimeout(reqBody.Host, reqBody.Port, reqBody.Auth.Username, reqBody.Auth.Password, 3*time.Second)
d.TLSConfig = &tls.Config{InsecureSkipVerify: true}
d.SSL = reqBody.TLS
err = d.DialAndSend(message)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteAckOKJSON(w)
}

View File

@ -1,6 +1,7 @@
package api
import (
"infini.sh/console/plugin/api/email"
"infini.sh/console/plugin/api/license"
"path"
@ -75,4 +76,5 @@ func Init(cfg *config.AppConfig) {
notification.InitAPI()
license.InitAPI()
email.InitAPI()
}

View File

@ -0,0 +1,35 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package action
import (
"infini.sh/console/model/alerting"
"infini.sh/framework/core/queue"
"infini.sh/framework/core/util"
)
type EmailAction struct {
Data *alerting.Email
Subject string
Body string
}
const EmailQueueName = "email_messages"
func (act *EmailAction) Execute()([]byte, error){
queueCfg := queue.GetOrInitConfig(EmailQueueName)
emailMsg := util.MapStr{
"server_id": act.Data.ServerID,
"email": act.Data.Recipients.To,
"template": "raw",
"variables": util.MapStr{
"subject": act.Subject,
"body": act.Body,
},
}
emailMsgBytes := util.MustToJSONBytes(emailMsg)
err := queue.Push(queueCfg, emailMsgBytes)
return nil, err
}

View File

@ -0,0 +1,88 @@
/* Copyright © INFINI Ltd. All rights reserved.
* Web: https://infinilabs.com
* Email: hello#infini.ltd */
package common
import (
"bytes"
"fmt"
"infini.sh/console/model/alerting"
"infini.sh/console/service/alerting/action"
"infini.sh/console/service/alerting/funcs"
"infini.sh/framework/core/orm"
"text/template"
)
func PerformChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error, []byte) {
if channel == nil {
return nil, fmt.Errorf("empty channel"), nil
}
var (
act action.Action
message []byte
err error
)
switch channel.Type {
case alerting.ChannelWebhook:
message, err = ResolveMessage(channel.Webhook.Body, ctx)
if err != nil {
return nil, err, message
}
wh := *channel.Webhook
urlBytes, err := ResolveMessage(wh.URL, ctx)
if err != nil {
return nil, err, message
}
wh.URL = string(urlBytes)
act = &action.WebhookAction{
Data: &wh,
Message: string(message),
}
case alerting.ChannelEmail:
message, err = ResolveMessage(channel.Email.Body, ctx)
if err != nil {
return nil, err, message
}
subjectBytes, err := ResolveMessage(channel.Email.Subject, ctx)
if err != nil {
return nil, err, nil
}
act = &action.EmailAction{
Data: channel.Email,
Subject: string(subjectBytes),
Body: string(message),
}
default:
return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message
}
executeResult, err := act.Execute()
return executeResult, err, message
}
func ResolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){
msg := messageTemplate
tmpl, err := template.New("alert-message").Funcs(funcs.GenericFuncMap()).Parse(msg)
if err !=nil {
return nil, fmt.Errorf("parse message temlate error: %w", err)
}
msgBuffer := &bytes.Buffer{}
err = tmpl.Execute(msgBuffer, ctx)
return msgBuffer.Bytes(), err
}
func RetrieveChannel(ch *alerting.Channel) (*alerting.Channel, error) {
if ch == nil {
return nil, fmt.Errorf("empty channel")
}
enabled := ch.Enabled
if ch.ID != "" {
_, err := orm.Get(ch)
if err != nil {
return nil, err
}
ch.Enabled = enabled
}
return ch, nil
}

View File

@ -5,7 +5,6 @@
package elasticsearch
import (
"bytes"
"context"
"fmt"
"github.com/Knetic/govaluate"
@ -13,8 +12,7 @@ import (
"infini.sh/console/model"
"infini.sh/console/model/alerting"
alerting2 "infini.sh/console/service/alerting"
"infini.sh/console/service/alerting/action"
"infini.sh/console/service/alerting/funcs"
"infini.sh/console/service/alerting/common"
"infini.sh/framework/core/elastic"
"infini.sh/console/model/insight"
"infini.sh/framework/core/kv"
@ -25,7 +23,6 @@ import (
"sort"
"strconv"
"strings"
"text/template"
"time"
)
@ -679,6 +676,21 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
if err != nil {
return fmt.Errorf("save alert message error: %w", err)
}
// todo add recover notification to inner system message
// send recover message to channel
recoverCfg := rule.RecoveryNotificationConfig
if recoverCfg != nil && recoverCfg.EventEnabled && recoverCfg.Enabled {
paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{
alerting2.ParamEventID: alertItem.ID,
alerting2.ParamTimestamp: alertItem.Created.Unix(),
})
err = attachTitleMessageToCtx(recoverCfg.Title, recoverCfg.Message, paramsCtx)
if err != nil {
return err
}
actionResults, _ := performChannels(recoverCfg.Normal, paramsCtx)
alertItem.ActionExecutionResults = actionResults
}
}
return nil
}
@ -698,7 +710,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
})
alertItem.Priority = priority
err = attachTitleMessageToCtx(rule, paramsCtx)
title, message := rule.GetNotificationTitleAndMessage()
err = attachTitleMessageToCtx(title, message, paramsCtx)
if err != nil {
return err
}
@ -707,7 +720,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered {
msg := &alerting.AlertMessage{
RuleID: rule.ID,
Created: time.Now(),
Created: alertItem.Created,
Updated: time.Now(),
ID: util.GetUUID(),
ResourceID: rule.Resource.ID,
@ -756,12 +769,13 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
return nil
}
// if channel is not enabled return
if !rule.Channels.Enabled {
notifyCfg := rule.GetNotificationConfig()
if notifyCfg == nil || !notifyCfg.Enabled {
return nil
}
if rule.Channels.AcceptTimeRange.Include(time.Now()) {
periodDuration, err := time.ParseDuration(rule.Channels.ThrottlePeriod)
if notifyCfg.AcceptTimeRange.Include(time.Now()) {
periodDuration, err := time.ParseDuration(notifyCfg.ThrottlePeriod)
if err != nil {
alertItem.Error = err.Error()
return err
@ -787,7 +801,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
}
if alertMessage == nil || period > periodDuration {
actionResults, errCount := performChannels(rule.Channels.Normal, paramsCtx)
actionResults, errCount := performChannels(notifyCfg.Normal, paramsCtx)
alertItem.ActionExecutionResults = actionResults
//change and save last notification time in local kv store when action error count equals zero
if errCount == 0 {
@ -798,8 +812,8 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
}
}
if rule.Channels.EscalationEnabled {
throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod)
if notifyCfg.EscalationEnabled {
throttlePeriod, err := time.ParseDuration(notifyCfg.EscalationThrottlePeriod)
if err != nil {
return err
}
@ -819,7 +833,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
}
}
if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration {
actionResults, errCount := performChannels(rule.Channels.Escalation, paramsCtx)
actionResults, errCount := performChannels(notifyCfg.Escalation, paramsCtx)
alertItem.ActionExecutionResults = actionResults
//todo init last escalation time when create task (by last alert item is escalated)
if errCount == 0 {
@ -836,17 +850,17 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
return nil
}
func attachTitleMessageToCtx(rule *alerting.Rule, paramsCtx map[string]interface{}) error{
func attachTitleMessageToCtx(title, message string, paramsCtx map[string]interface{}) error{
var (
tplBytes []byte
err error
)
tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx)
tplBytes, err = common.ResolveMessage(message, paramsCtx)
if err != nil {
return fmt.Errorf("resolve message template error: %w", err)
}
paramsCtx[alerting2.ParamMessage] = string(tplBytes)
tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx)
tplBytes, err = common.ResolveMessage(title, paramsCtx)
if err != nil {
return fmt.Errorf("resolve title template error: %w", err)
}
@ -927,7 +941,8 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul
alerting2.ParamEventID: util.GetUUID(),
alerting2.ParamTimestamp: time.Now().Unix(),
} )
err = attachTitleMessageToCtx(rule, paramsCtx)
title, message := rule.GetNotificationTitleAndMessage()
err = attachTitleMessageToCtx(title, message, paramsCtx)
if err != nil {
return nil, err
}
@ -945,7 +960,15 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([
var errCount int
var actionResults []alerting.ActionExecutionResult
for _, channel := range channels {
resBytes, err, messageBytes := performChannel(&channel, ctx)
if !channel.Enabled {
continue
}
_, err := common.RetrieveChannel(&channel)
if err != nil {
log.Error(err)
continue
}
resBytes, err, messageBytes := common.PerformChannel(&channel, ctx)
var errStr string
if err != nil {
errCount++
@ -963,46 +986,8 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([
return actionResults, errCount
}
func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){
msg := messageTemplate
tmpl, err := template.New("alert-message").Funcs(funcs.GenericFuncMap()).Parse(msg)
if err !=nil {
return nil, fmt.Errorf("parse message temlate error: %w", err)
}
msgBuffer := &bytes.Buffer{}
err = tmpl.Execute(msgBuffer, ctx)
return msgBuffer.Bytes(), err
}
func performChannel(channel *alerting.Channel, ctx map[string]interface{}) ([]byte, error, []byte) {
var (
act action.Action
message []byte
err error
)
switch channel.Type {
case alerting.ChannelWebhook:
message, err = resolveMessage(channel.Webhook.Body, ctx)
if err != nil {
return nil, err, message
}
wh := *channel.Webhook
urlBytes, err := resolveMessage(wh.URL, ctx)
if err != nil {
return nil, err, message
}
wh.URL = string(urlBytes)
act = &action.WebhookAction{
Data: &wh,
Message: string(message),
}
default:
return nil, fmt.Errorf("unsupported action type: %s", channel.Type), message
}
executeResult, err := act.Execute()
return executeResult, err, message
}
func (engine *Engine) GenerateTask(rule alerting.Rule) func(ctx context.Context) {
return func(ctx context.Context) {
defer func() {

View File

@ -70,7 +70,7 @@ func TestEngine( t *testing.T) {
},
},
Channels: alerting.RuleChannel{
Channels: alerting.NotificationConfig{
Normal: []alerting.Channel{
{Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
HeaderParams: map[string]string{
@ -156,7 +156,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
// },
// },
//
// Channels: alerting.RuleChannel{
// Channels: alerting.NotificationConfig{
// Normal: []alerting.Channel{
// {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
// HeaderParams: map[string]string{
@ -216,7 +216,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
},
},
Channels: alerting.RuleChannel{
Channels: alerting.NotificationConfig{
Normal: []alerting.Channel{
{Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
HeaderParams: map[string]string{