new alerting api

This commit is contained in:
silenceqi 2021-09-25 15:05:07 +08:00
parent 97ff38a96b
commit e22848bd7c
58 changed files with 2360 additions and 414 deletions

View File

@ -7,8 +7,8 @@ import (
"infini.sh/framework/core/ui"
"infini.sh/search-center/api/index_management"
"infini.sh/search-center/config"
"path"
"infini.sh/search-center/service/alerting"
"path"
)
func Init(cfg *config.AppConfig) {
@ -37,36 +37,34 @@ func Init(cfg *config.AppConfig) {
ui.HandleUIMethod(api.DELETE, path.Join(pathPrefix, "index/:index"), handler.HandleDeleteIndexAction)
ui.HandleUIMethod(api.POST, path.Join(pathPrefix, "index/:index"), handler.HandleCreateIndexAction)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/alerts", alerting.GetAlerts)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/monitors/_search", alerting.Search)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_indices", alerting.GetIndices)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_aliases", alerting.GetAliases)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_mappings", alerting.GetMappings)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/_plugins", alerting.GetPlugins)
//new api
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations/email_accounts", alerting.CreateEmailAccount)
ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.UpdateEmailAccount)
ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.DeleteEmailAccount)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations/email_accounts", alerting.GetEmailAccounts)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.GetEmailAccount)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations/email_groups", alerting.CreateEmailGroup)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations/email_groups", alerting.GetEmailGroups)
ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.DeleteEmailGroup)
ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.UpdateEmailGroup)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.GetEmailGroup)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations", alerting.GetDestinations)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations", alerting.CreateDestination)
ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/destinations/:destinationId", alerting.UpdateDestination)
ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/destinations/:destinationId", alerting.DeleteDestination)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/monitors/:monitorID", alerting.GetMonitor)
ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/monitors/:monitorID", alerting.UpdateMonitor)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/monitors", alerting.GetMonitors)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/monitors", alerting.CreateMonitor)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/monitors/_execute", alerting.ExecuteMonitor)
ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/monitors/:monitorID", alerting.DeleteMonitor)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_monitors/:monitorID/_acknowledge/alerts", alerting.AcknowledgeAlerts)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/_settings", alerting.GetSettings)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations", alerting.GetDestinations)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations", alerting.CreateDestination)
ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/destinations/:destinationId", alerting.UpdateDestination)
ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/destinations/:destinationId", alerting.DeleteDestination)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations/email_accounts", alerting.CreateEmailAccount)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations/email_accounts", alerting.GetEmailAccounts)
ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.DeleteEmailAccount)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.GetEmailAccount)
ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.UpdateEmailAccount)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations/email_groups", alerting.CreateEmailGroup)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations/email_groups", alerting.GetEmailGroups)
ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.DeleteEmailGroup)
ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.UpdateEmailGroup)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.GetEmailGroup)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_indices", alerting.GetIndices)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_aliases", alerting.GetAliases)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/monitors/_search", alerting.Search)
ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/alerts", alerting.GetAlerts)
ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_monitors/:monitorID/_acknowledge/alerts", alerting.AcknowledgeAlerts)
task.RegisterScheduleTask(task.ScheduleTask{
Description: "sync reindex task result",

View File

@ -11,6 +11,8 @@ import (
"infini.sh/framework/modules"
"infini.sh/search-center/config"
"infini.sh/search-center/model"
"infini.sh/search-center/model/alerting"
alertSrv "infini.sh/search-center/service/alerting"
)
var appConfig *config.AppConfig
@ -72,6 +74,10 @@ func main() {
orm.RegisterSchemaWithIndexName(model.Dict{}, "dict")
orm.RegisterSchemaWithIndexName(model.Reindex{}, "reindex")
orm.RegisterSchemaWithIndexName(elastic.IndexPattern{}, "view")
orm.RegisterSchemaWithIndexName(alerting.Config{}, "alerting-config")
orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alerting-alerts")
orm.RegisterSchemaWithIndexName(alerting.AlertHistory{}, "alerting-alert-history")
alertSrv.GetScheduler().Start()
})
}

32
model/alerting/alert.go Normal file
View File

@ -0,0 +1,32 @@
package alerting
type Alert struct {
ClusterID string `json:"cluster_id"`
AcknowledgedTime *int64 `json:"acknowledged_time" elastic_mapping:"acknowledged_time:{type:date}"`
ActionExecutionResults []ActionExecutionResult `json:"action_execution_results" elastic_mapping:"action_execution_results"`
AlertHistories []AlertHistory `json:"alert_history" elastic_mapping:"alert_history"`
EndTime *int64 `json:"end_time" elastic_mapping:"end_time:{type:date}"`
ErrorMessage string `json:"error_message" elastic_mapping:"error_message:{type:text}"`
Id string `json:"id" elastic_mapping:"id:{type:keyword}"`
LastNotificationTime int64 `json:"last_notification_time" elastic_mapping:"last_notification_time:{type:date}"`
MonitorId string `json:"monitor_id" elastic_mapping:"monitor_id:{type:keyword}"`
MonitorName string `json:"monitor_name" elastic_mapping:"monitor_name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}"`
Severity string `json:"severity" elastic_mapping:"severity:{type:keyword}"`
StartTime int64 `json:"start_time" elastic_mapping:"start_time:{type:date}"`
State string `json:"state" elastic_mapping:"state:{type:keyword}"`
TriggerId string `json:"trigger_id" elastic_mapping:"trigger_id:{type:keyword}"`
TriggerName string `json:"trigger_name" elastic_mapping:"trigger_name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}"`
}
type ActionExecutionResult struct {
ActionID string `json:"action_id" elastic_mapping:"action_id:{type:keyword}"`
LastExecutionTime int64 `json:"last_execution_time" elastic_mapping:"last_execution_time:{type:date}"`
ThrottledCount int `json:"throttled_count" elastic_mapping:"throttled_count:{type:integer}"`
Error string `json:"error,omitempty"`
Result string `json:"result"`
}
type AlertHistory struct {
Message string `json:"message" elastic_mapping:"message:{type:text}"`
Timestamp int64 `json:"timestamp" elastic_mapping:"timestamp:{type:date}"`
}

19
model/alerting/config.go Normal file
View File

@ -0,0 +1,19 @@
package alerting
//type Config struct {
// ID string `json:"id" index:"id"`
// Type string `json:"type" elastic_mapping:"type:{type:keyword}"`
// Destination Destination `json:"destination,omitempty" elastic_mapping:"destination:{dynamic:false,properties:{custom_webhook:{properties:{header_params:{type:object,enabled:false},host:{type:text},password:{type:text},path:{type:keyword},port:{type:integer},query_params:{type:object,enabled:false},scheme:{type:keyword},url:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},username:{type:text}}},email:{properties:{email_account_id:{type:keyword},recipients:{type:nested,properties:{email:{type:text},email_group_id:{type:keyword},type:{type:keyword}}}}},last_update_time:{type:date,format:strict_date_time||epoch_millis},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schema_version:{type:integer},slack:{properties:{url:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}}},type:{type:keyword},user:{properties:{backend_roles:{type:text,fields:{keyword:{type:keyword}}},custom_attribute_names:{type:text,fields:{keyword:{type:keyword}}},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},roles:{type:text,fields:{keyword:{type:keyword}}}}}}}"`
// EmailAccount EmailAccount `json:"email_account,omitempty" elastic_mapping:"email_account:{properties:{email:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},from:{type:text},host:{type:text},method:{type:text},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},port:{type:integer},schema_version:{type:long}}}"`
// EmailGroup EmailGroup `json:"email_group,omitempty" elastic_mapping:"email_group:{properties:{emails:{type:nested,properties:{email:{type:text}}},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schema_version:{type:long}}}"`
// Monitor Monitor `json:"monitor,omitempty" elastic_mapping:"monitor:{dynamic:false,properties:{enabled:{type:boolean},enabled_time:{type:date,format:strict_date_time||epoch_millis},inputs:{type:nested,properties:{search:{properties:{indices:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},query:{type:object,enabled:false}}}}},last_update_time:{type:date,format:strict_date_time||epoch_millis},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schedule:{properties:{cron:{properties:{expression:{type:text},timezone:{type:keyword}}},period:{properties:{interval:{type:integer},unit:{type:keyword}}}}},schema_version:{type:integer},triggers:{type:nested,properties:{actions:{type:nested,properties:{destination_id:{type:keyword},message_template:{type:object,enabled:false},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},subject_template:{type:object,enabled:false},throttle:{properties:{unit:{type:keyword},value:{type:integer}}},throttle_enabled:{type:boolean}}},condition:{type:object,enabled:false},min_time_between_executions:{type:integer},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}}},type:{type:keyword}}}"`
//}
type Config struct {
ID string `json:"id" index:"id"`
Type string `json:"type" elastic_mapping:"type:{type:keyword}"`
Destination Destination `json:"destination,omitempty" elastic_mapping:"destination"`
EmailAccount EmailAccount `json:"email_account,omitempty" elastic_mapping:"email_account"`
EmailGroup EmailGroup `json:"email_group,omitempty" elastic_mapping:"email_group"`
Monitor Monitor `json:"monitor,omitempty" elastic_mapping:"monitor"`
}

View File

@ -0,0 +1,40 @@
package alerting
type Destination struct {
Type string `json:"type" elastic_mapping:"type:{type:keyword}"`
Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"`
SchemaVersion int `json:"schema_version" elastic_mapping:"schema_version:{type:integer}"`
LastUpdateTime int64 `json:"last_update_time" elastic_mapping:"last_update_time:{type:date,format:strict_date_time||epoch_millis}"`
CustomWebhook CustomWebhook `json:"custom_webhook,omitempty" elastic_mapping:"custom_webhook"`
Email EmailDestination `json:"email,omitempty" elastic_mapping:"email"`
Slack Slack `json:"slack,omitempty" elastic_mapping:"slack"`
}
type EmailDestination struct {
EmailAccountID string `json:"email_account_id" elastic_mapping:"email_account_id:{type:keyword}"`
Recipients []Recipient `json:"recipients" elastic_mapping:"recipients"`
}
type Recipient struct {
EmailGroupID string `json:"email_group_id,omitempty" elastic_mapping:"email_group_id:{type:keyword}"`
Email string `json:"email,omitempty" elastic_mapping:"email:{type:text}"`
Type string `json:"type" elastic_mapping:"type:{type:keyword}"`
}
type CustomWebhook struct {
HeaderParams map[string]string `json:"header_params" elastic_mapping:"header_params:{type:object,enabled:false}"`
Host string `json:"host" elastic_mapping:"host:{type:text}"`
Method string `json:"method" elastic_mapping:"host:{type:keyword}"`
Password string `json:"password" elastic_mapping:"password:{type:text}"`
Path string `json:"path" elastic_mapping:"path:{type:keyword}"`
Port int `json:"port" elastic_mapping:"port:{type:integer}"`
QueryParams map[string]string `json:"query_params" elastic_mapping:"query_params:{type:object,enabled:false}"`
Scheme string `json:"scheme" elastic_mapping:"scheme:{type:keyword}"`
URL string `json:"url" elastic_mapping:"url:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}"`
Username string `json:"username" elastic_mapping:"username:{type:text}"`
}
type Slack struct {
URl string `json:"url" elastic_mapping:"url:{type:keyword}"`
}

View File

@ -0,0 +1,12 @@
package alerting
type EmailAccount struct {
Email string `json:"email" elastic_mapping:"email:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"`
//From string `json:"from" elastic_mapping:"from:{type:text}"`
Host string `json:"host" elastic_mapping:"host:{type:text}"`
Method string `json:"method" elastic_mapping:"method:{type:text}"`
Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"`
Port int `json:"port" elastic_mapping:"port:{type:integer}"`
SchemaVersion int64 `json:"schema_version" elastic_mapping:"schema_version:{type:long}"`
Password string `json:"password,omitempty" elastic_mapping:"password:{type:keyword}"`
}

View File

@ -0,0 +1,7 @@
package alerting
type EmailGroup struct {
Emails []map[string]interface{} `json:"emails" elastic_mapping:"emails:{type: nested, properties: {email: {type: text}}}"`
Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"`
SchemaVersion int64 `json:"schema_version" elastic_mapping:"schema_version:{type:long}"`
}

63
model/alerting/monitor.go Normal file
View File

@ -0,0 +1,63 @@
package alerting
type Monitor struct {
Enabled bool `json:"enabled" elastic_mapping:"enabled: {type:boolean}"`
EnabledTime int64 `json:"enabled_time" elastic_mapping:"enabled_time:{type:date,format:strict_date_time||epoch_millis}"`
Inputs []MonitorInput `json:"inputs" elastic_mapping:"inputs"`
LastUpdateTime int64 `json:"last_update_time" elastic_mapping:"last_update_time:{type:date,format:strict_date_time||epoch_millis}"`
Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"`
Schedule Schedule `json:"schedule" elastic_mapping:"schedule"`
SchemaVersion int `json:"schema_version" elastic_mapping:"schema_version:{type:integer}"`
Triggers []Trigger `json:"triggers" elastic_mapping:"triggers"`
Type string `json:"type" elastic_mapping:"type:{type:keyword}`
}
type MonitorInput struct {
Search MonitorSearch `json:"search" elastic_mapping:"search"`
}
type MonitorSearch struct {
Indices []string `json:"indices" elastic_mapping:"indices:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"`
Query map[string]interface{} `json:"query" elastic_mapping:"query:{type:object,enabled:false}}"`
}
type Cron struct {
Expression string `json:"expression" elastic_mapping:"expression:{type:text}"`
Timezone string `json:"timezone" elastic_mapping:"timezone:{type:keyword}"`
}
type Period struct {
Interval int `json:"interval" elastic_mapping:"interval:{type:integer}"`
Unit string `json:"unit" elastic_mapping:"unit:{type:keyword}"`
}
type Schedule struct {
Cron *Cron `json:"cron,omitempty" elastic_mapping:"cron"`
Period *Period `json:"period,omitempty" elastic_mapping:"period"`
}
type Throttle struct {
Unit string `json:"unit" elastic_mapping:"unit:{type:keyword}"`
Value int `json:"value" elastic_mapping:"value:{type:integer}"`
}
type Action struct {
ID string `json:"id"`
DestinationId string `json:"destination_id" elastic_mapping:"destination_id:{type:keyword}"`
MessageTemplate map[string]interface{} `json:"message_template" elastic_mapping:"message_template:{type:object}"`
Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"`
SubjectTemplate map[string]interface{} `json:"subject_template" elastic_mapping:"subject_template:{type:object}"`
ThrottleEnabled bool `json:"throttle_enabled" elastic_mapping:"throttle_enabled:{type:boolean}"`
Throttle *Throttle `json:"throttle,omitempty" elastic_mapping:"throttle"`
}
type Trigger struct {
ID string `json:"id"`
Severity string `json:"severity" elastic_mapping:"severity:{type:keyword}"`
Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"`
Condition map[string]interface{} `json:"condition" elastic_mapping:"condition:{type:object, enable:false}"`
Actions []Action `json:"actions" elastic_mapping:"actions"`
MinTimeBetweenExecutions int `json:"min_time_between_executions" elastic_mapping:"min_time_between_executions:{type:integer}"`
}

1
model/alerting/temp Normal file
View File

@ -0,0 +1 @@
monitor:{dynamic:false,properties:{enabled:{type:boolean},enabled_time:{type:date,format:strict_date_time||epoch_millis},inputs:{type:nested,properties:{search:{properties:{indices:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},query:{type:object,enabled:false}}}}},last_update_time:{type:date,format:strict_date_time||epoch_millis},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schedule:{properties:{cron:{properties:{expression:{type:text},timezone:{type:keyword}}},period:{properties:{interval:{type:integer},unit:{type:keyword}}}}},schema_version:{type:integer},triggers:{type:nested,properties:{actions:{type:nested,properties:{destination_id:{type:keyword},message_template:{type:object,enabled:false},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},subject_template:{type:object,enabled:false},throttle:{properties:{unit:{type:keyword},value:{type:integer}}},throttle_enabled:{type:boolean}}},condition:{type:object,enabled:false},min_time_between_executions:{type:integer},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}}},type:{type:keyword}}}

View File

@ -0,0 +1,7 @@
package action
const (
ACTION_EMAIL = "email"
ACTION_WEBHOOK = "custom_webhook"
ACTION_SLACK = "slack"
)

View File

@ -0,0 +1,86 @@
package action
import (
"crypto/tls"
"fmt"
"infini.sh/search-center/model/alerting"
"net"
"net/smtp"
"strings"
)
type EmailAction struct {
Sender *alerting.EmailAccount
Message string
Subject string
Receiver []string
}
func (act *EmailAction) Execute() ([]byte, error){
from := act.Sender.Email
//Todo add password input in frontend?
act.Sender.Host = strings.TrimSpace(act.Sender.Host)
addr := fmt.Sprintf("%s:%d", act.Sender.Host, act.Sender.Port)
msg := fmt.Sprintf("To: %s\r\nSubject: %s\r\n%s\r\n", strings.Join(act.Receiver, ","), act.Subject, act.Message)
auth := smtp.PlainAuth("", from, act.Sender.Password, act.Sender.Host)
if act.Sender.Method == "ssl" {
err := SendEmailOverTLS(addr, auth, from, act.Receiver, []byte(msg))
return nil, err
}
err := smtp.SendMail(addr, auth, from, act.Receiver, []byte(msg))
return nil, err
}
func SendEmailOverTLS(addr string, auth smtp.Auth, from string, to []string, msg []byte) error{
host, _, _ := net.SplitHostPort(addr)
tlsConfig := &tls.Config {
InsecureSkipVerify: true,
ServerName: host,
}
conn, err := tls.Dial("tcp", addr, tlsConfig)
if err != nil {
return err
}
c, err := smtp.NewClient(conn, host)
if err != nil {
return err
}
if err = c.Auth(auth); err != nil {
return err
}
// To && From
if err = c.Mail(from); err != nil {
return err
}
for _, dst := range to {
if err = c.Rcpt(dst); err != nil {
return err
}
}
// Data
w, err := c.Data()
if err != nil {
return err
}
_, err = w.Write(msg)
if err != nil {
return err
}
err = w.Close()
if err != nil {
return err
}
c.Quit()
return nil
}

View File

@ -0,0 +1,25 @@
package action
import (
"infini.sh/search-center/model/alerting"
"testing"
)
func TestEmailAction(t *testing.T){
ea := EmailAction{
Sender: &alerting.EmailAccount{
Email: "liugq@infini.ltd",
Host: "smtp.ym.163.com",
Port: 994,
Method: "ssl",
Password: "<your email password>",
},
Message: "hello world",
Subject: "test email",
Receiver: []string{"786027438@qq.com"},
}
_, err := ea.Execute()
if err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,62 @@
package action
import (
"crypto/tls"
"fmt"
"infini.sh/search-center/model/alerting"
"io/ioutil"
"net/http"
"strings"
)
type Action interface {
Execute() ([]byte, error)
}
type WebhookAction struct {
Data *alerting.CustomWebhook
Message string
}
var actionClient = http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
func (act *WebhookAction) Execute()([]byte, error){
var reqURL string
if act.Data.URL != "" {
reqURL = act.Data.URL
}
if act.Data.Host != "" {
reqURL = fmt.Sprintf("%s://%s:%d/%s", act.Data.Scheme, act.Data.Host, act.Data.Port, act.Data.Path )
urlBuilder := strings.Builder{}
urlBuilder.WriteString(reqURL)
if len(act.Data.QueryParams) > 0 {
urlBuilder.WriteString("?")
}
for k, v := range act.Data.QueryParams {
urlBuilder.WriteString(k)
urlBuilder.WriteString("=")
urlBuilder.WriteString(v)
urlBuilder.WriteString("&")
}
reqURL = urlBuilder.String()
}
reqBody := strings.NewReader(act.Message)
req, err := http.NewRequest(http.MethodPost, reqURL, reqBody)
if err != nil {
return nil, err
}
for key, value := range act.Data.HeaderParams {
req.Header.Add(key, value)
}
res, err := actionClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
return ioutil.ReadAll(res.Body)
}

View File

@ -0,0 +1,25 @@
package action
import (
"infini.sh/search-center/model/alerting"
"net/http"
"testing"
)
func TestWebhookAction(t *testing.T){
ea := WebhookAction{
Message: `{"msgtype": "text","text": {"content":"通知:我就是我, 是不一样的烟火,Trigger: {{ctx.trigger.name}}"},"at":{"atMobiles":["18692254900"],"isAtAll": false}}`,
Data: &alerting.CustomWebhook{
HeaderParams: map[string]string{
"Content-Type": "application/json",
},
URL: "https://oapi.dingtalk.com/robot/send?access_token=6a5c7c9454ff74537a6de493153b1da68860942d4b0aeb33797cb68b5111b077",
Method: http.MethodPost,
},
}
_, err := ea.Execute()
if err != nil {
t.Fatal(err)
}
}

View File

@ -8,6 +8,8 @@ import (
"fmt"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/search-center/model/alerting"
"io"
"net/http"
"net/url"
@ -23,6 +25,16 @@ func getQueryParam(req *http.Request, key string, or ...string) string {
return val
}
func getAlertIndexName(typ string) string {
switch typ{
case INDEX_ALL_ALERTS:
return fmt.Sprintf("%s,%s", orm.GetIndexName(alerting.AlertHistory{}), orm.GetIndexName(alerting.Alert{}))
case INDEX_ALERT_HISTORY:
return orm.GetIndexName(alerting.AlertHistory{})
}
return orm.GetIndexName(alerting.Alert{})
}
func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){
id := ps.ByName("id")
conf := elastic.GetConfig(id)
@ -41,11 +53,6 @@ func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){
alertState = getQueryParam(req, "alertState", "ALL")
monitorIds = req.URL.Query()["monitorIds"]
params = map[string]string{
"startIndex": from,
"size": size,
"severityLevel": severityLevel,
"alertState": alertState,
"searchString": search,
}
)
@ -68,18 +75,67 @@ func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){
params["sortOrder"] = sortDirection
params["missing"] = "_last"
}
sort := IfaceMap{
params["sortString"]: params["sortOrder"],
}
must := []IfaceMap{
{
"match": IfaceMap{
"cluster_id": id,
},
},
}
if severityLevel != "ALL" {
must = append(must, IfaceMap{
"match": IfaceMap{
"severity": severityLevel,
},
})
}
if alertState != "ALL" {
must = append(must, IfaceMap{
"match": IfaceMap{
"state": alertState,
},
})
}
if len(monitorIds) > 0{
params["monitorId"] = monitorIds[0]
must = append(must, IfaceMap{
"match": IfaceMap{
"monitor_id": monitorIds[0],
},
})
}
if clearSearch := strings.TrimSpace(search); clearSearch != ""{
searches := strings.Split(clearSearch, " ")
clearSearch = strings.Join(searches, "* *")
params["searchString"] = fmt.Sprintf("*%s*", clearSearch)
must = append(must, IfaceMap{
"query_string": IfaceMap{
//"default_field": "destination.name",
"default_operator": "AND",
"query": fmt.Sprintf(`*%s*`, clearSearch),
},
})
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/alerts", conf.Endpoint, API_PREFIX )
res, err := doRequest(reqUrl, http.MethodGet, params, nil)
reqBody := IfaceMap{
"size":size,
"from": from,
"query": IfaceMap{
"bool":IfaceMap{
"must": must,
},
},
"sort": sort,
}
indexName := getAlertIndexName(INDEX_ALL_ALERTS)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, indexName )
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
writeError(w, err)
return
@ -91,23 +147,25 @@ func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){
writeError(w, err)
return
}
alerts := []IfaceMap{}
rawAlerts := queryValue(alertRes, "alerts", nil)
alerts := []interface{}{}
rawAlerts := queryValue(alertRes, "hits.hits", nil)
if ds, ok := rawAlerts.([]interface{}); ok {
for _, alert := range ds {
if alertItem, ok := alert.(map[string]interface{}); ok {
alertItem["version"] = queryValue(alertItem, "alert_version", "")
if queryValue(alertItem, "id", nil) == nil {
alertItem["id"] = queryValue(alertItem, "alert_id", nil)
//alertItem["version"] = queryValue(alertItem, "alert_version", "")
if alertID, ok := queryValue(alertItem, "_source.id", "").(string); ok && alertID == "" {
if source, ok := alertItem["_source"].(map[string]interface{}); ok {
source["id"] = alertItem["_id"]
}
}
alerts = append(alerts, alertItem)
alerts = append(alerts, alertItem["_source"])
}
}
}
writeJSON(w, IfaceMap{
"ok": true,
"alerts": alerts,
"totalAlerts": queryValue(alertRes, "totalAlerts", 0),
"totalAlerts": queryValue(alertRes, "hits.total.value", 0),
}, http.StatusOK)
}
@ -140,8 +198,8 @@ func decodeJSON(reader io.Reader, obj interface{}) error{
}
func writeJSON(w http.ResponseWriter, data interface{}, statusCode int){
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Header().Set("content-type", "application/json")
buf, _ := json.Marshal(data)
w.Write(buf)
}
@ -212,4 +270,4 @@ func assignTo(dst IfaceMap, src IfaceMap){
for k, v := range src {
dst[k] = v
}
}
}

View File

@ -1,10 +1,17 @@
package alerting
const INFINI_PREFIX = "opendistro"
const EMAIL_ACCOUNT_FIELD = "email_account"
const EMAIL_GROUP_FIELD = "email_group"
const DESTINATION_FIELD = "destination"
const MONITOR_FIELD = "monitor"
const INDEX_ALERT = "ALERT"
const INDEX_ALERT_HISTORY = "ALERT_HISTORY"
const INDEX_ALL_ALERTS = "ALL_ALERTS"
const PLUGIN_NAME = INFINI_PREFIX + "-alerting"
const INDEX_PREFIX = INFINI_PREFIX +"-alerting"
const INDEX_ALL_ALERTS = "."+INDEX_PREFIX +`-alert*`
const API_PREFIX = "_opendistro"
const (
ALERT_ACTIVE = "ACTIVE"
ALERT_COMPLETED = "COMPLETED"
ALERT_DELETED = "DELETED"
ALERT_ACKNOWLEDGED = "ACKNOWLEDGED"
ALERT_ERROR = "ERROR"
)

View File

@ -5,8 +5,13 @@ import (
"fmt"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"infini.sh/search-center/model/alerting"
"net/http"
"runtime/debug"
"strings"
"time"
)
func GetDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -18,7 +23,7 @@ func GetDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Para
}
dstID := ps.ByName("destID")
reqUrl := fmt.Sprintf("%s/_opendistro/_alerting/monitors/%s", conf.Endpoint, dstID)
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), dstID)
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
if err != nil {
writeError(w, err)
@ -32,74 +37,11 @@ func GetDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Para
}
res.Body.Close()
if _, ok := resBody["monitor"]; !ok {
writeJSON(w, IfaceMap{
"ok": false,
}, http.StatusOK)
return
}
queryDSL := ` {
"size": 0,
"query"": {
"bool": {
"must": {
"term"": {
"monitor_id": "%s",
},
},
},
},
"aggs": {
"active_count": {
"terms": {
"field": "state",
}
},
"24_hour_count": {
"date_range": {
"field": "start_time",
"ranges": [{ "from": "now-24h/h" }]
}
}
}
}`
queryDSL = fmt.Sprintf(queryDSL, id)
reqUrl = fmt.Sprintf("%s/_opendistro/_alerting/monitors/_search", conf.Endpoint)
res, err = doRequest(reqUrl, http.MethodPost, map[string]string{
"index": ".opendistro-alerting-alert*",
}, queryDSL)
if err != nil {
writeError(w, err)
return
}
var searchResBody = IfaceMap{}
err = decodeJSON(res.Body, &searchResBody)
if err != nil {
writeError(w, err)
return
}
dayCount := queryValue(searchResBody, "aggregations.24_hour_count.buckets.0.doc_count", 0)
activeBuckets := queryValue(searchResBody, "aggregations.active_count.buckets",[]interface{}{})
activeCount := 0
if ab, ok := activeBuckets.([]IfaceMap); ok {
for _, curr := range ab {
if curr["key"].(string) == "ACTIVE" {
activeCount = int(curr["doc_count"].(float64))
}
}
}
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody["monitor"],
"activeCount": activeCount,
"dayCount": dayCount,
"version": queryValue(resBody, "_version", nil),
"ifSeqNo": queryValue(resBody, "_seq_no", nil),
"ifPrimaryTerm": queryValue(resBody, "_primary_term", nil),
"destination": queryValue(resBody, "_source.destination", nil),
}, http.StatusOK)
}
func GetDestinations(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -114,65 +56,99 @@ func GetDestinations(w http.ResponseWriter, req *http.Request, ps httprouter.Par
size = getQueryParam(req, "size", "20")
search = getQueryParam(req, "search", "")
sortDirection = getQueryParam(req, "sortDirection", "desc")
sortField = getQueryParam(req, "sortField", "start_time")
typ = getQueryParam(req, "type", "ALL")
sortField = getQueryParam(req, "sortField", "")
typ = getQueryParam(req, "type", "")
)
var params = map[string]string{}
switch (sortField) {
case "name":
params = map[string]string{
"sortString": "destination.name.keyword",
"sortOrder": sortDirection,
}
case "type":
params = map[string]string{
"sortString": "destination.type",
"sortOrder": sortDirection,
}
default:
}
params["startIndex"] = from
params["size"] = size
params["searchString"] = search
params["destinationType"] = typ
if clearSearch := strings.TrimSpace(search); clearSearch != "" {
clearSearch = strings.ReplaceAll(clearSearch, " ", "* *")
params["searchString"] = clearSearch
must := []IfaceMap{
{
"exists": IfaceMap{
"field": DESTINATION_FIELD,
},
},
//{
// "match": IfaceMap{
// "cluster_id": id,
// },
//},
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations", conf.Endpoint, API_PREFIX)
res, err := doRequest(reqUrl, http.MethodGet, params, nil)
if clearSearch := strings.TrimSpace(search); clearSearch != "" {
clearSearch = strings.ReplaceAll(clearSearch, " ", "* *")
must = append(must, IfaceMap{
"query_string": IfaceMap{
"default_field": "destination.name",
"default_operator": "AND",
"query": fmt.Sprintf(`*%s*`, clearSearch),
},
})
}
var sort interface{} = IfaceMap{}
switch (sortField) {
case "name":
sort = IfaceMap{ "destination.name.keyword": sortDirection }
case "type":
sort = IfaceMap{ "destination.type": sortDirection }
default:
}
if typ != "" && typ != "ALL" {
must = append(must, IfaceMap{
"match": IfaceMap{
"destination.type": typ,
},
})
}
reqBody := IfaceMap{
"from": from,
"size": size,
"sort": sort,
"query": IfaceMap{
"bool": IfaceMap{
"must": must,
},
},
}
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}))
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
writeError(w, err)
return
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
writeError(w, err)
return
}
rawDests := queryValue(resBody, "destinations", []interface{}{})
totalDestinations := queryValue(resBody, "hits.total.value", 0)
rawHits := queryValue(resBody, "hits.hits", []interface{}{})
dests := []IfaceMap{}
if ds, ok := rawDests.([]interface{}); ok {
for _, dest := range ds {
if destination, ok := dest.(map[string]interface{}); ok {
destination["version"] = queryValue(destination, "schema_version", "")
destination["ifSeqNo"] = queryValue(destination, "seq_no", 0)
destination["ifPrimaryTerm"] = queryValue(destination, "primary_term", 0)
dests = append(dests, destination)
if rh, ok := rawHits.([]interface{}); ok {
for _, hit := range rh {
if destination, ok := hit.(map[string]interface{}); ok {
newItem := IfaceMap{}
newItem["id"] = queryValue(destination, "_id", "")
source := queryValue(destination, "_source."+DESTINATION_FIELD, nil)
if ms, ok := source.(map[string]interface{}); ok {
assignTo(newItem, ms)
}
dests = append(dests, newItem)
}
}
}
writeJSON(w, IfaceMap{
"ok": true,
"destinations": dests,
"totalDestinations": queryValue(resBody, "totalDestinations", 0),
"totalDestinations":totalDestinations,
}, http.StatusOK)
}
@ -184,10 +160,40 @@ func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
return
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations", conf.Endpoint, API_PREFIX)
config := getDefaultConfig()
destId := util.GetUUID()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destId)
var destination = &alerting.Destination{}
err := decodeJSON(req.Body, &destination)
if err != nil {
writeError(w, err)
return
}
destination.LastUpdateTime = time.Now().UnixNano()/1e6
var toSaveDest = IfaceMap{
"type": destination.Type,
"name": destination.Name,
"last_update_time": destination.LastUpdateTime,
"schema_version": destination.SchemaVersion,
"id": destId,
}
switch destination.Type {
case "email" :
toSaveDest[destination.Type] = destination.Email
case "custom_webhook":
toSaveDest[destination.Type] = destination.CustomWebhook
case "slack":
toSaveDest[destination.Type] = destination.Slack
default:
writeError(w, errors.New("type unsupported"))
return
}
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, req.Body)
}, IfaceMap{
"cluster_id": id,
DESTINATION_FIELD: toSaveDest,
})
if err != nil {
writeError(w, err)
return
@ -203,7 +209,11 @@ func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody,
"resp": IfaceMap{
DESTINATION_FIELD: toSaveDest,
"_id": queryValue(resBody, "_id", ""),
"_version": queryValue(resBody, "_version", 0),
},
}, http.StatusOK)
}
@ -217,18 +227,40 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
}
destinationId := ps.ByName("destinationId")
var (
ifSeqNo = getQueryParam(req, "ifSeqNo")
ifPrimaryTerm = getQueryParam(req, "ifPrimaryTerm")
)
//PUT /_opendistro/_alerting/destinations/2g3CsHsB3EDgQAwRGzgS?if_seq_no=15&if_primary_term=2&refresh=wait_for
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/%s", conf.Endpoint, API_PREFIX, destinationId)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId)
var destination = &alerting.Destination{}
err := decodeJSON(req.Body, &destination)
if err != nil {
writeError(w, err)
return
}
destination.LastUpdateTime = time.Now().UnixNano()/1e6
var toSaveDest = IfaceMap{
"type": destination.Type,
"name": destination.Name,
"last_update_time": destination.LastUpdateTime,
"schema_version": destination.SchemaVersion,
"id": destinationId,
}
switch destination.Type {
case "email" :
toSaveDest[destination.Type] = destination.Email
case "custom_webhook":
toSaveDest[destination.Type] = destination.CustomWebhook
case "slack":
toSaveDest[destination.Type] = destination.Slack
default:
writeError(w, errors.New("type unsupported"))
return
}
res, err := doRequest(reqUrl, http.MethodPut, map[string]string{
"refresh": "wait_for",
"if_seq_no": ifSeqNo,
"if_primary_term": ifPrimaryTerm,
}, req.Body)
}, IfaceMap{
"cluster_id": id,
DESTINATION_FIELD: toSaveDest,
})
if err != nil {
writeError(w, err)
return
@ -241,8 +273,6 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
writeError(w, err)
return
}
//TODO error handle: check whether resBody has contains field error
writeJSON(w, IfaceMap{
"ok": true,
"version": queryValue(resBody, "_version", ""),
@ -261,7 +291,8 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
destinationId := ps.ByName("destinationId")
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/%s", conf.Endpoint, API_PREFIX, destinationId)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId)
res, err := doRequest(reqUrl, http.MethodDelete, nil, nil)
if err != nil {
writeError(w, err)
@ -275,7 +306,6 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
writeError(w, err)
return
}
//TODO error handle: check whether resBody has contains field error
resultIfce := queryValue(resBody, "result", "")
var isOk = false
@ -288,18 +318,57 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P
}
func getDefaultConfig() *elastic.ElasticsearchConfig{
return elastic.GetConfig("default")
}
//var (
// ks keystore.WritableKeystore
// ksOnce = &sync.Once{}
//)
//func getKeystore() keystore.WritableKeystore {
// ksOnce.Do(func() {
// tempKS, err := keystore.Factory(nil, "data/search-center/keystore")
// if err != nil {
// panic(err)
// }
// ks, err = keystore.AsWritableKeystore(tempKS)
// if err != nil {
// panic(err)
// }
// })
// return ks
//}
func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
}
}()
id := ps.ByName("id")
conf := elastic.GetConfig(id)
if conf == nil {
writeError(w, errors.New("cluster not found"))
return
}
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts", conf.Endpoint, API_PREFIX)
reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{}))
var emailAccount = &alerting.EmailAccount{}
err := decodeJSON(req.Body, &emailAccount)
if err != nil {
writeError(w, err)
return
}
//var password = emailAccount.Password
//emailAccount.Password = ""
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, req.Body)
}, IfaceMap{
EMAIL_ACCOUNT_FIELD: emailAccount,
"cluster_id": id,
})
if err != nil {
writeError(w, err)
return
@ -313,9 +382,27 @@ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
return
}
//kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", emailAccount.Name)
//secStr, _ := getKeystore().Retrieve(kk)
//kst := getKeystore()
//if secStr == nil {
// kst.Store(kk, []byte(password))
// kst.Save()
//}else{
// oldPwd, _ := secStr.Get()
// if bytes.Compare(oldPwd, []byte(password)) != 0 {
// kst.Store(kk, []byte(password))
// }
// kst.Save()
//}
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody,
"resp": IfaceMap{
EMAIL_ACCOUNT_FIELD: emailAccount,
"_id": queryValue(resBody, "_id", ""),
"_version": queryValue(resBody, "_version", 0),
},
}, http.StatusOK)
}
@ -328,17 +415,22 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
}
emailAccountId := ps.ByName("emailAccountId")
var (
ifSeqNo = getQueryParam(req, "ifSeqNo")
ifPrimaryTerm = getQueryParam(req, "ifPrimaryTerm")
)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId)
var emailAccount = &alerting.EmailAccount{}
err := decodeJSON(req.Body, &emailAccount)
if err != nil {
writeError(w, err)
return
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts/%s", conf.Endpoint, API_PREFIX, emailAccountId)
res, err := doRequest(reqUrl, http.MethodPut, map[string]string{
"refresh": "wait_for",
"if_seq_no": ifSeqNo,
"if_primary_term": ifPrimaryTerm,
}, req.Body)
}, IfaceMap{
"cluster_id": id,
EMAIL_ACCOUNT_FIELD: emailAccount,
})
if err != nil {
writeError(w, err)
return
@ -351,7 +443,6 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
writeError(w, err)
return
}
//TODO error handle: check whether resBody has contains field error
writeJSON(w, IfaceMap{
"ok": true,
@ -369,9 +460,13 @@ func DeleteEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
}
emailAccountId := ps.ByName("emailAccountId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts/%s", conf.Endpoint, API_PREFIX, emailAccountId)
res, err := doRequest(reqUrl, http.MethodDelete, nil, nil)
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId)
res, err := doRequest(reqUrl, http.MethodDelete, map[string]string{
"refresh": "wait_for",
}, nil)
if err != nil {
writeError(w, err)
return
@ -384,7 +479,6 @@ func DeleteEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
writeError(w, err)
return
}
//TODO error handle: check whether resBody has contains field error
resultIfce := queryValue(resBody, "result", "")
var isOk = false
@ -394,7 +488,6 @@ func DeleteEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.
writeJSON(w, IfaceMap{
"ok": isOk,
}, http.StatusOK)
}
func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -412,24 +505,28 @@ func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
sortField = getQueryParam(req, "sortField", "name")
)
must := IfaceMap{
"match_all": IfaceMap{},
must := []IfaceMap{
{
"exists": IfaceMap{
"field": EMAIL_ACCOUNT_FIELD,
},
},
}
if clearSearch := strings.TrimSpace(search); clearSearch != "" {
clearSearch = strings.ReplaceAll(clearSearch, " ", "* *")
must = IfaceMap{
must = append(must, IfaceMap{
"query_string": IfaceMap{
"default_field": "email_account.name",
"default_operator": "AND",
"query": fmt.Sprintf(`*%s*`, clearSearch),
},
}
})
}
sortQueryMap := IfaceMap{ "name": IfaceMap{ "email_account.name.keyword": sortDirection } }
var sort interface{}
if sortQuery, ok := sortQueryMap[sortField]; ok {
if sortQuery, ok := sortQueryMap[sortField]; ok {
sort = sortQuery
}
reqBody := IfaceMap{
@ -443,8 +540,11 @@ func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
},
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts/_search", conf.Endpoint, API_PREFIX)
res, err := doRequest(reqUrl, http.MethodPost, nil, reqBody)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}))
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
writeError(w, err)
return
@ -465,12 +565,10 @@ func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
if emailAccount, ok := hit.(map[string]interface{}); ok {
newItem := IfaceMap{}
newItem["id"] = queryValue(emailAccount, "_id", "")
source := queryValue(emailAccount, "_source", nil)
source := queryValue(emailAccount, "_source."+EMAIL_ACCOUNT_FIELD, nil)
if ms, ok := source.(map[string]interface{}); ok {
assignTo(newItem, ms)
}
newItem["ifSeqNo"] = queryValue(emailAccount, "_seq_no", 0)
newItem["ifPrimaryTerm"] = queryValue(emailAccount, "_primary_term", 0)
emailAccounts = append(emailAccounts, newItem)
}
}
@ -492,8 +590,11 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par
emailAccountId := ps.ByName("emailAccountId")
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts/%s", conf.Endpoint, API_PREFIX, emailAccountId)
res, err := doRequest(reqUrl, http.MethodGet,nil, req.Body)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId)
res, err := doRequest(reqUrl, http.MethodGet,nil, nil)
if err != nil {
writeError(w, err)
return
@ -506,14 +607,18 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par
writeError(w, err)
return
}
//name := queryValue(resBody,"_source.email_account.name", "")
//kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", name)
//secStr, _ := getKeystore().Retrieve(kk)
//if secStr != nil {
// pwd, _ := secStr.Get()
// fmt.Println(string(pwd))
//}
writeJSON(w, IfaceMap{
"ok": true,
"resp": queryValue(resBody, "email_account", nil),
"ifSeqNo": queryValue(resBody, "_seq_no", 0),
"ifPrimaryTerm": queryValue(resBody, "_primary_term", 0),
"resp": queryValue(resBody, "_source.email_account", nil),
}, http.StatusOK)
}
@ -527,10 +632,20 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
return
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups", conf.Endpoint, API_PREFIX)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{}))
var emailGroup = &alerting.EmailGroup{}
err := decodeJSON(req.Body, &emailGroup)
if err != nil {
writeError(w, err)
return
}
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, req.Body)
}, IfaceMap{
"cluster_id": id,
EMAIL_GROUP_FIELD: emailGroup,
})
if err != nil {
writeError(w, err)
return
@ -546,7 +661,15 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody,
"resp": IfaceMap{
EMAIL_GROUP_FIELD: IfaceMap{
"emails": emailGroup.Emails,
"name": emailGroup.Name,
"schema_version": emailGroup.SchemaVersion,
},
"_id": queryValue(resBody, "_id", ""),
"_version": queryValue(resBody, "_version", 0),
},
}, http.StatusOK)
}
@ -559,17 +682,20 @@ func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
}
emailGroupId := ps.ByName("emailGroupId")
var (
ifSeqNo = getQueryParam(req, "ifSeqNo")
ifPrimaryTerm = getQueryParam(req, "ifPrimaryTerm")
)
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups/%s", conf.Endpoint, API_PREFIX, emailGroupId)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId)
var emailGroup = &alerting.EmailGroup{}
err := decodeJSON(req.Body, &emailGroup)
if err != nil {
writeError(w, err)
return
}
res, err := doRequest(reqUrl, http.MethodPut, map[string]string{
"refresh": "wait_for",
"if_seq_no": ifSeqNo,
"if_primary_term": ifPrimaryTerm,
}, req.Body)
}, IfaceMap{
"cluster_id": id,
EMAIL_GROUP_FIELD: emailGroup,
})
if err != nil {
writeError(w, err)
return
@ -582,8 +708,6 @@ func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
writeError(w, err)
return
}
//TODO error handle: check whether resBody has contains field error
writeJSON(w, IfaceMap{
"ok": true,
"id": queryValue(resBody, "_id", ""),
@ -600,8 +724,9 @@ func DeleteEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
}
emailGroupId := ps.ByName("emailGroupId")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId)
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups/%s", conf.Endpoint, API_PREFIX, emailGroupId)
res, err := doRequest(reqUrl, http.MethodDelete, nil, nil)
if err != nil {
writeError(w, err)
@ -615,7 +740,6 @@ func DeleteEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa
writeError(w, err)
return
}
//TODO error handle: check whether resBody has contains field error
resultIfce := queryValue(resBody, "result", "")
var isOk = false
@ -643,19 +767,23 @@ func GetEmailGroups(w http.ResponseWriter, req *http.Request, ps httprouter.Para
sortField = getQueryParam(req, "sortField", "name")
)
must := IfaceMap{
"match_all": IfaceMap{},
must := []IfaceMap{
{
"exists": IfaceMap{
"field": EMAIL_GROUP_FIELD,
},
},
}
if clearSearch := strings.TrimSpace(search); clearSearch != "" {
clearSearch = strings.ReplaceAll(clearSearch, " ", "* *")
must = IfaceMap{
must = append(must, IfaceMap{
"query_string": IfaceMap{
"default_field": "email_group.name",
"default_operator": "AND",
"query": fmt.Sprintf(`*%s*`, clearSearch),
},
}
})
}
sortQueryMap := IfaceMap{ "name": IfaceMap{ "email_group.name.keyword": sortDirection } }
@ -674,9 +802,10 @@ func GetEmailGroups(w http.ResponseWriter, req *http.Request, ps httprouter.Para
},
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups/_search", conf.Endpoint, API_PREFIX)
res, err := doRequest(reqUrl, http.MethodPost, nil, reqBody)
//TODO to handle api error in doRequest function
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}))
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
writeError(w, err)
return
@ -697,12 +826,10 @@ func GetEmailGroups(w http.ResponseWriter, req *http.Request, ps httprouter.Para
if emailGroup, ok := hit.(map[string]interface{}); ok {
newItem := IfaceMap{}
newItem["id"] = queryValue(emailGroup, "_id", "")
source := queryValue(emailGroup, "_source", nil)
source := queryValue(emailGroup, "_source."+EMAIL_GROUP_FIELD, nil)
if ms, ok := source.(map[string]interface{}); ok {
assignTo(newItem, ms)
}
newItem["ifSeqNo"] = queryValue(emailGroup, "_seq_no", 0)
newItem["ifPrimaryTerm"] = queryValue(emailGroup, "_primary_term", 0)
emailGroups = append(emailGroups, newItem)
}
}
@ -722,9 +849,10 @@ func GetEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Param
return
}
emailAccountId := ps.ByName("emailGroupId")
emailGroupId := ps.ByName("emailGroupId")
reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups/%s", conf.Endpoint, API_PREFIX, emailAccountId)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId)
res, err := doRequest(reqUrl, http.MethodGet,nil, req.Body)
if err != nil {
writeError(w, err)
@ -738,7 +866,7 @@ func GetEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Param
writeError(w, err)
return
}
emailGroup := queryValue(resBody, "email_group", nil)
emailGroup := queryValue(resBody, "_source."+EMAIL_GROUP_FIELD, nil)
if emailGroup == nil {
writeJSON(w, IfaceMap{
"ok": false,
@ -749,8 +877,6 @@ func GetEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Param
writeJSON(w, IfaceMap{
"ok": true,
"resp": emailGroup,
"ifSeqNo": queryValue(resBody, "_seq_no", 0),
"ifPrimaryTerm": queryValue(resBody, "_primary_term", 0),
}, http.StatusOK)
}

View File

@ -13,7 +13,7 @@ import (
type SearchBody struct {
Query IfaceMap `json:"query"`
Index string `json:"index"`
Size int `json:""size`
Size int `json:"size"`
}
func Search(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
@ -29,12 +29,11 @@ func Search(w http.ResponseWriter, req *http.Request, ps httprouter.Params){
writeError(w, err)
return
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/_search", conf.Endpoint, API_PREFIX)
params := map[string]string{
"index": body.Index,
}
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, body.Index)
body.Query["size"] = body.Size
res, err := doRequest(reqUrl, http.MethodPost, params, body.Query)
res, err := doRequest(reqUrl, http.MethodPost, nil, body.Query)
if err != nil {
writeError(w, err)
return

33
service/alerting/event.go Normal file
View File

@ -0,0 +1,33 @@
package alerting
import (
"src/github.com/buger/jsonparser"
"strconv"
"strings"
)
type MonitorEvent struct {
Fields []byte
}
func (ev *MonitorEvent) GetValue(key string) (interface{}, error) {
keyParts := strings.Split(key, ".")//todo whether ctx field contains dot
val, valType, _, err := jsonparser.Get(ev.Fields, keyParts...)
if err != nil {
return val, err
}
switch valType {
case jsonparser.String:
return string(val), nil
case jsonparser.Number:
return strconv.Atoi(string(val))
}
return nil, nil
}
//func SplitFieldsKey(key, sep string) []string {
// sr := strings.NewReader(key)
// if ch, size, sr.ReadRune()
//}

View File

@ -5,19 +5,17 @@ import (
"fmt"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"infini.sh/search-center/model/alerting"
alertUtil "infini.sh/search-center/service/alerting/util"
"net/http"
"runtime/debug"
"strconv"
"strings"
"time"
)
func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
}
}()
id := ps.ByName("id")
conf := elastic.GetConfig(id)
if conf == nil {
@ -26,8 +24,9 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
}
mid := ps.ByName("monitorID")
// /_opendistro/_alerting/monitors/uiSjqXsBHT9Hsiy5Dq6g
reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/%s", conf.Endpoint, API_PREFIX, mid)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), mid)
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
if err != nil {
writeError(w, err)
@ -40,44 +39,39 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
return
}
res.Body.Close()
if _, ok := resBody["monitor"]; !ok {
writeJSON(w, IfaceMap{
"ok": false,
}, http.StatusOK)
if found, ok := resBody["found"].(bool); ok && !found {
writeError(w, errors.New("monitor not found"))
return
}
queryDSL := ` {
"size": 0,
"query": {
"bool": {
"must": {
"term": {
"monitor_id": "%s"
}
}
}
},
"aggs": {
"active_count": {
"terms": {
"field": "state"
}
},
"24_hour_count": {
"date_range": {
"field": "start_time",
"ranges": [{ "from": "now-24h/h" }]
}
}
}
}`
queryDSL = fmt.Sprintf(queryDSL, id)
reqUrl = fmt.Sprintf("%s/%s/_alerting/monitors/_search", conf.Endpoint, API_PREFIX)
res, err = doRequest(reqUrl, http.MethodPost, map[string]string{
"index": INDEX_ALL_ALERTS,
}, queryDSL)
"size": 0,
"query": {
"bool": {
"must": {
"term": {
"monitor_id": "%s"
}
}
}
},
"aggs": {
"active_count": {
"terms": {
"field": "state"
}
},
"24_hour_count": {
"date_range": {
"field": "start_time",
"ranges": [{ "from": "now-24h/h" }]
}
}
}
}`
queryDSL = fmt.Sprintf(queryDSL, mid)
reqUrl = fmt.Sprintf("%s/%s/_search", config.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS))
res, err = doRequest(reqUrl, http.MethodPost,nil, queryDSL)
if err != nil {
writeError(w, err)
@ -90,33 +84,37 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
writeError(w, err)
return
}
dayCount := queryValue(searchResBody, "aggregations.24_hour_count.buckets.0.doc_count", 0)
dayCountBuckets := queryValue(searchResBody, "aggregations.24_hour_count.buckets", 0)
dayCount := 0
if dcb, ok := dayCountBuckets.([]interface{}); ok {
if dayAgg, ok := dcb[0].(map[string]interface{}); ok {
dayCount = int(dayAgg["doc_count"].(float64))
}
}
activeBuckets := queryValue(searchResBody, "aggregations.active_count.buckets",[]interface{}{})
activeCount := 0
if ab, ok := activeBuckets.([]IfaceMap); ok {
for _, curr := range ab {
if curr["key"].(string) == "ACTIVE" {
activeCount = int(curr["doc_count"].(float64))
if ab, ok := activeBuckets.([]interface{}); ok {
for _, bk := range ab {
if curr, ok := bk.(map[string]interface{}); ok {
if curr["key"].(string) == "ACTIVE" {
activeCount = int(curr["doc_count"].(float64))
break
}
}
}
}
monitor := queryValue(resBody, "_source.monitor", nil)
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody["monitor"],
"resp": monitor,
"activeCount": activeCount,
"dayCount": dayCount,
"version": queryValue(resBody, "_version", nil),
"ifSeqNo": queryValue(resBody, "_seq_no", nil),
"ifPrimaryTerm": queryValue(resBody, "_primary_term", nil),
}, http.StatusOK)
}
func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
defer func() {
if err := recover(); err != nil {
debug.PrintStack()
}
}()
id := ps.ByName("id")
conf := elastic.GetConfig(id)
if conf == nil {
@ -130,20 +128,31 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
sortDirection = getQueryParam(req, "sortDirection")
sortField = getQueryParam(req, "sortField")
state = getQueryParam(req, "state")
must = IfaceMap{ "match_all": IfaceMap{} }
must = []IfaceMap{
{
"match": IfaceMap{
"cluster_id": id,
},
},
{
"exists": IfaceMap{
"field": MONITOR_FIELD,
},
},
}
)
if clearSearch := strings.TrimSpace(search); clearSearch != "" {
clearSearch = strings.ReplaceAll(clearSearch, " ", "* *")
must = IfaceMap{
"query_string": IfaceMap{
"default_field": "monitor.name",
"default_operator": "AND",
"query": fmt.Sprintf("*%s*", clearSearch),
},
}
must = append(must, IfaceMap{
"query_string": IfaceMap{
//"default_field": "monitor.name",
"default_operator": "AND",
"query": fmt.Sprintf("*%s*", clearSearch),
},
})
}
var filter = []IfaceMap{
IfaceMap{ "term": IfaceMap{ "monitor.type": "monitor" }},
{ "term": IfaceMap{ "monitor.type": "monitor" }},
}
if state != "all" {
filter = append(filter, IfaceMap{
@ -175,18 +184,24 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
sortPageData["from"] = intFrom
}
var params = IfaceMap{
"seq_no_primary_term": true,
"version": true,
"query": IfaceMap{
"bool": IfaceMap{
"filter": filter,
"must": must,
"must_not": []IfaceMap{
{
"exists": IfaceMap{
"field": "monitor.status",
},
},
},
},
},
}
assignTo(params, sortPageData)
reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/_search", conf.Endpoint, API_PREFIX )
res, err := doRequest(reqUrl, http.MethodPost, nil, params)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}) )
res, err := doRequest(reqUrl, http.MethodGet, nil, params)
if err != nil {
writeError(w, err)
return
@ -208,12 +223,10 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
if hit, ok := hitIface.(map[string]interface{}); ok {
id := queryValue(hit, "_id", "")
monitorIDs = append(monitorIDs, id)
monitor := queryValue(hit, "_source", IfaceMap{}).(map[string]interface{})
monitor := queryValue(hit, "_source.monitor", IfaceMap{}).(map[string]interface{})
monitorMap[id.(string)] = IfaceMap{
"id": id,
"version": queryValue(hit, "_version", ""),
"ifSeqNo": queryValue(hit, "_seq_no", false),
"ifPrimaryTerm": queryValue(hit, "_primary_term", false),
"name": queryValue(monitor, "name", ""),
"enabled": queryValue(monitor, "enabled", false),
"monitor": monitor,
@ -235,9 +248,7 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
if sortF, ok := aggsSorts[sortField]; ok {
aggsOrderData["order"] = IfaceMap{ sortF.(string): sortDirection }
}
var queryParams = map[string]string{
"index": INDEX_ALL_ALERTS,
}
var termsMap = IfaceMap{
"field": "monitor_id",
"size": intFrom + intSize,
@ -250,13 +261,13 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
"uniq_monitor_ids": IfaceMap{
"terms": termsMap,
"aggregations": IfaceMap{
"active": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": "ACTIVE" } } },
"acknowledged": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": "ACKNOWLEDGED" } } },
"errors": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": "ERROR" } } },
"active": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_ACTIVE } } },
"acknowledged": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_ACKNOWLEDGED } } },
"errors": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_ERROR } } },
"ignored": IfaceMap{
"filter": IfaceMap{
"bool": IfaceMap{
"filter": IfaceMap{ "term": IfaceMap{ "state": "COMPLETED" } },
"filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_COMPLETED } },
"must_not": IfaceMap{ "exists": IfaceMap{ "field": "acknowledged_time" } },
},
},
@ -276,9 +287,8 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
},
}
reqUrl = fmt.Sprintf("%s/%s/_alerting/monitors/_search", conf.Endpoint, API_PREFIX)
searchRes, err := doRequest(reqUrl, http.MethodPost, queryParams, aggsParams)
reqUrl = fmt.Sprintf("%s/%s/_search", config.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS))
searchRes, err := doRequest(reqUrl, http.MethodPost, nil, aggsParams)
if err != nil {
writeError(w, err)
return
@ -309,6 +319,7 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params)
monitor["latestAlert"] = latestAlert
monitor["active"] = queryValue(bk, "active.doc_count", 0)
monitor["errors"] = queryValue(bk, "errors.doc_count", 0)
monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0)
monitor["currentTime"] = time.Now().UnixNano() / 1e6
usedMonitors = append(usedMonitors, monitor)
delete(monitorMap, id.(string))
@ -351,10 +362,22 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
return
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors", conf.Endpoint, API_PREFIX)
var monitor = &alerting.Monitor{}
err := decodeJSON(req.Body, &monitor)
if err != nil {
writeError(w, err)
return
}
monitor.LastUpdateTime = time.Now().UnixNano()/1e6
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{}))
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, req.Body)
}, IfaceMap{
"cluster_id": id,
MONITOR_FIELD: monitor,
})
if err != nil {
writeError(w, err)
return
@ -367,10 +390,19 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
writeError(w, err)
return
}
monitorId := queryValue(resBody, "_id", "").(string)
GetScheduler().AddMonitor(monitorId, &ScheduleMonitor{
Monitor: monitor,
ClusterID: id,
})
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody,
"resp": IfaceMap{
MONITOR_FIELD: monitor,
"_id": monitorId,
"_version": queryValue(resBody, "_version", 0),
},
}, http.StatusOK)
}
@ -383,9 +415,56 @@ func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
}
monitorId := ps.ByName("monitorID")
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/%s", conf.Endpoint, API_PREFIX, monitorId)
res, err := doRequest(reqUrl, http.MethodDelete, nil, nil)
//change alert state to deleted and move alert to history
reqUrl := fmt.Sprintf("%s/_reindex", config.Endpoint)
query := IfaceMap{
"bool": IfaceMap{
"must": []IfaceMap{
{"match": IfaceMap{
"monitor_id": monitorId,
}},
},
},
}
reqBody := IfaceMap{
"source": IfaceMap{
"index": getAlertIndexName(INDEX_ALERT),
"query": query,
},
"dest": IfaceMap{
"index": getAlertIndexName(INDEX_ALERT_HISTORY),
},
"script": IfaceMap{
"source": fmt.Sprintf("ctx._source['state'] = '%s';", ALERT_DELETED),
},
}
_, err := doRequest(reqUrl, http.MethodPost,nil, reqBody)
if err != nil {
writeError(w, err)
return
}
//delete alert
reqUrl = fmt.Sprintf("%s/%s/_delete_by_query", config.Endpoint, getAlertIndexName(INDEX_ALERT))
_, err = doRequest(reqUrl, http.MethodPost, nil, IfaceMap{
"query" : query,
})
if err != nil {
writeError(w, err)
return
}
//logic delete monitor
reqUrl = fmt.Sprintf("%s/%s/_update/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), monitorId)
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh": "wait_for",
}, IfaceMap{
"script" : IfaceMap{
"source": "ctx._source.monitor.status = 'DELETED';",
"lang": "painless",
},
})
if err != nil {
writeError(w, err)
return
@ -398,12 +477,12 @@ func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
writeError(w, err)
return
}
//TODO error handle: check whether resBody has contains field error
resultIfce := queryValue(resBody, "result", "")
var isOk = false
if result, ok := resultIfce.(string); ok && result == "deleted" {
if result, ok := resultIfce.(string); ok && result == "updated" {
isOk = true
GetScheduler().RemoveMonitor(monitorId)
}
writeJSON(w, IfaceMap{
"ok": isOk,
@ -420,17 +499,36 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
}
monitorId := ps.ByName("monitorID")
var (
ifSeqNo = getQueryParam(req, "ifSeqNo")
ifPrimaryTerm = getQueryParam(req, "ifPrimaryTerm")
)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), monitorId)
reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/%s", conf.Endpoint, API_PREFIX, monitorId)
var monitor = &alerting.Monitor{}
err := decodeJSON(req.Body, &monitor)
if err != nil {
writeError(w, err)
return
}
if len(monitor.Triggers) > 0 {
for i, trigger := range monitor.Triggers {
if trigger.ID == "" {
monitor.Triggers[i].ID = util.GetUUID()
}
if len(trigger.Actions) > 0 {
for j, action := range trigger.Actions {
if action.ID == ""{
monitor.Triggers[i].Actions[j].ID = util.GetUUID()
}
}
}
}
}
monitor.LastUpdateTime = time.Now().UnixNano()/1e6
res, err := doRequest(reqUrl, http.MethodPut, map[string]string{
"refresh": "wait_for",
"if_seq_no": ifSeqNo,
"if_primary_term": ifPrimaryTerm,
}, req.Body)
}, IfaceMap{
"cluster_id": id,
MONITOR_FIELD: monitor,
})
if err != nil {
writeError(w, err)
return
@ -444,6 +542,10 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param
return
}
GetScheduler().UpdateMonitor(monitorId, &ScheduleMonitor{
Monitor: monitor,
ClusterID: id,
})
writeJSON(w, IfaceMap{
"ok": true,
"version": queryValue(resBody, "_version", ""),
@ -461,9 +563,40 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P
}
monitorId := ps.ByName("monitorID")
var ackAlertsReq = struct {
AlertIDs []string `json:"alerts"`
}{}
err := decodeJSON(req.Body, &ackAlertsReq)
if err != nil {
writeError(w, err)
return
}
reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/%s/_acknowledge/alerts", conf.Endpoint, API_PREFIX, monitorId)
res, err := doRequest(reqUrl, http.MethodPost,nil, req.Body)
config := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_update_by_query", getAlertIndexName(INDEX_ALERT), config.Endpoint)
reqBody := IfaceMap{
"query": IfaceMap{
"bool": IfaceMap{
"must":[]IfaceMap{
{"match": IfaceMap{
"monitor_id": monitorId,
}},
{
"terms": IfaceMap{
"_id": ackAlertsReq.AlertIDs,
},
},
},
},
},
"script": IfaceMap{
"source": fmt.Sprintf("ctx._source['state'] = '%s';ctx._source['acknowledged_time'] = %dL;", ALERT_ACKNOWLEDGED, time.Now().UnixNano()/1e6),
},
}
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"refresh":"",
}, reqBody)
if err != nil {
writeError(w, err)
return
@ -478,13 +611,16 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P
}
var isOk = false
if failed, ok := resBody["failed"].([]interface{}); ok && len(failed) == 0 {
if failed, ok := resBody["failures"].([]interface{}); ok && len(failed) == 0 {
isOk = true
}
writeJSON(w, IfaceMap{
"ok": isOk,
"resp": resBody,
"resp": IfaceMap{
"success": ackAlertsReq.AlertIDs,
"failed": []string{},
},
}, http.StatusOK)
}
@ -500,10 +636,23 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para
dryrun = getQueryParam(req, "dryrun", "true")
)
reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/_execute", conf.Endpoint, API_PREFIX)
res, err := doRequest(reqUrl, http.MethodPost, map[string]string{
"dryrun": dryrun,
}, req.Body)
var monitor = &alerting.Monitor{}
err := decodeJSON(req.Body, &monitor)
if err != nil {
writeError(w, err)
return
}
if monitor.Name == "TEMP_MONITOR"{
}
if len(monitor.Inputs) == 0 {
writeError(w, errors.New("no input"))
return
}
periodStart := time.Now()
reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, strings.Join(monitor.Inputs[0].Search.Indices, ","))
res, err := doRequest(reqUrl, http.MethodGet, nil, monitor.Inputs[0].Search.Query)
if err != nil {
writeError(w, err)
return
@ -516,10 +665,49 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para
writeError(w, err)
return
}
//TODO error handle: check whether resBody has contains field error
var triggerResults = IfaceMap{}
if dryrun == "true" {
sm := ScheduleMonitor{
Monitor: monitor,
}
for _, trigger := range monitor.Triggers {
triggerResult := IfaceMap{
"error": nil,
"action_results": IfaceMap{},
"name": trigger.Name,
}
monitorCtx, err := createMonitorContext(&trigger, resBody, &sm, IfaceMap{})
if err != nil {
triggerResult["error"] = err
triggerResults[trigger.ID] = triggerResult
continue
}
isTrigger, err := resolveTriggerResult(&trigger, monitorCtx)
triggerResult["triggered"] = isTrigger
if err != nil {
triggerResult["error"] = err
}
if trigger.ID == "" {
trigger.ID = util.GetUUID()
}
triggerResults[trigger.ID] = triggerResult
}
}
period := alertUtil.GetMonitorPeriod(&periodStart, &monitor.Schedule)
writeJSON(w, IfaceMap{
"ok": true,
"resp": resBody,
"resp": IfaceMap{
"error": nil,
"monitor_name": monitor.Name,
"input_results": IfaceMap{
"error": nil,
"results": []IfaceMap{resBody},
},
"trigger_results": triggerResults,
"period_start": period.Start,
"period_end": period.End,
},
}, http.StatusOK)
}

View File

@ -0,0 +1,61 @@
package alerting
import (
"fmt"
httprouter "infini.sh/framework/core/api/router"
"net/http"
)
func GetAlertMetric(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
}
func getLastAlertDayCount() error{
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s", conf.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS))
reqBody := IfaceMap{
"size": 0,
"query": IfaceMap{
"bool": IfaceMap{
"filter": []IfaceMap{
{"range": IfaceMap{
"start_time": IfaceMap{
"gte": "now-3M",
},
}},
},
},
},
"aggs": IfaceMap{
"alert_day_count": IfaceMap{
"date_histogram": IfaceMap{
"field": "start_time",
"interval": "day",
},
},
},
}
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
return err
}
result := IfaceMap{}
defer res.Body.Close()
err = decodeJSON(res.Body, &result)
if err != nil {
return err
}
buckets := queryValue(result, "aggregations.alert_day_count.buckets", []interface{}{})
var metricData []interface{}
if bks, ok := buckets.([]interface{}); ok {
for _, bk := range bks {
if bkm, ok := bk.(map[string]interface{}); ok {
metricData = append(metricData, []interface{}{
queryValue(bkm, "key", ""),
queryValue(bkm, "doc_count", 0),
})
}
}
}
return nil
}

View File

@ -0,0 +1,591 @@
package alerting
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/buger/jsonparser"
log "github.com/cihub/seelog"
"github.com/elastic/go-ucfg/yaml"
cronlib "github.com/robfig/cron"
"github.com/valyala/fasttemplate"
"infini.sh/framework/core/conditions"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/search-center/model/alerting"
"infini.sh/search-center/service/alerting/action"
"io"
"net/http"
"strings"
"sync"
"time"
)
var alertScheduler *scheduler
var alertSchedulerOnce = &sync.Once{}
func GetScheduler() *scheduler {
alertSchedulerOnce.Do(func(){
alertScheduler = NewScheduler()
})
return alertScheduler
}
func NewScheduler() *scheduler{
cr := cronlib.New(cronlib.WithParser(cronlib.NewParser(
cronlib.SecondOptional | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow | cronlib.Descriptor,
)))
return &scheduler{
cron: cr,
mu: &sync.Mutex{},
}
}
type scheduler struct{
monitors sync.Map
cron *cronlib.Cron
IsStart bool
mu *sync.Mutex
}
func (scd *scheduler) Start() error{
scd.mu.Lock()
if scd.IsStart {
return nil
}
scd.mu.Unlock()
monitors, err := getEnabledMonitors()
if err != nil {
return err
}
for id, monitor := range monitors {
err = scd.AddMonitor(id, &monitor)
if err != nil {
return err
}
scd.monitors.Store(id, &monitor)
}
go scd.cron.Start()
return nil
}
func (scd *scheduler) AddMonitor(key string, monitor *ScheduleMonitor) error{
monitor.MonitorID = key
if _, ok := scd.monitors.Load(key); ok {
return errors.New("monitor already exists")
}
jobFunc := generateMonitorJob(monitor)
var cron *alerting.Cron
if monitor.Monitor.Schedule.Period != nil {
cron = convertPeriodToCronExpression( monitor.Monitor.Schedule.Period)
}
if monitor.Monitor.Schedule.Cron != nil {
cron = monitor.Monitor.Schedule.Cron
}
if cron != nil {
timezone := ""
if cron.Timezone != "" {
timezone = fmt.Sprintf("CRON_TZ=%s ", cron.Timezone)
}
//fmt.Println(timezone + cron.Expression)
entryID, err := scd.cron.AddFunc(timezone + cron.Expression, jobFunc)
if err != nil {
return err
}
monitor.EntryID = entryID
}
scd.monitors.Store(key, monitor)
return nil
}
func (scd *scheduler) Stop(){
scd.monitors.Range(func (key, val interface{}) bool{
monitor := val.(*ScheduleMonitor)
scd.cron.Remove(monitor.EntryID)
scd.monitors.Delete(key)
return true
})
}
func (scd *scheduler) RemoveMonitor(key string) bool{
value, ok := scd.monitors.Load(key)
if ok && value != nil {
if monitor, ok := value.(*ScheduleMonitor); ok {
scd.cron.Remove(monitor.EntryID)
scd.monitors.Delete(key)
return ok
}
}
return ok
}
func (scd *scheduler) UpdateMonitor(key string, monitor *ScheduleMonitor) error{
scd.RemoveMonitor(key)
if monitor.Monitor.Enabled {
return scd.AddMonitor(key, monitor)
}
return nil
}
func convertPeriodToCronExpression(period *alerting.Period) *alerting.Cron{
var expression = "@every 1m"
switch period.Unit {
case "MINUTES":
expression = fmt.Sprintf("@every %dm", period.Interval)
case "HOURS":
expression = fmt.Sprintf("@every %dh", period.Interval)
case "DAYS":
expression = fmt.Sprintf("@every %dh", period.Interval * 24)
}
return &alerting.Cron{
Expression: expression,
Timezone: "",
}
}
type MonitorJob func()
func generateMonitorJob(smt *ScheduleMonitor) MonitorJob{
sm := *smt
return func() {
queryResult, err := getQueryResult(sm.ClusterID, &sm.Monitor.Inputs[0])
if err != nil {
log.Error(err)
}
for _, trigger := range sm.Monitor.Triggers {
monitorCtx, err := createMonitorContext(&trigger, queryResult, &sm, IfaceMap{})
if err != nil {
log.Error(err)
continue
}
isTrigger, err := resolveTriggerResult(&trigger, monitorCtx)
//fmt.Println("is triggered: ", isTrigger, err)
if err != nil {
log.Error(err)
continue
}
alertItem := alerting.Alert{
TriggerId: trigger.ID,
TriggerName: trigger.Name,
MonitorId: sm.MonitorID,
MonitorName: sm.Monitor.Name,
StartTime: time.Now().UnixNano()/1e6,
Severity: trigger.Severity,
State: ALERT_COMPLETED,
ClusterID: sm.ClusterID,
}
if !isTrigger {
endTime := time.Now().UnixNano()/1e6
alertItem.EndTime = &endTime
err = saveAlertInfo(&alertItem)
if err != nil {
log.Error(err)
}
continue
}
//check ack state
lastAlert, err := getLastAlert(sm.MonitorID, trigger.ID, sm.ClusterID)
if err != nil {
log.Error(err)
continue
}
if lastAlert != nil && lastAlert["state"].(string) == ALERT_ACKNOWLEDGED {
continue
}
alertItem.State = ALERT_ACTIVE
for _, act := range trigger.Actions {
message, err := resolveMessage(act.MessageTemplate, monitorCtx)
if err != nil {
alertItem.ErrorMessage = err.Error()
continue
}
destination, err := resolveDestination(act.DestinationId)
if err != nil {
alertItem.ErrorMessage = err.Error()
continue
}
var tact action.Action
alertItem.LastNotificationTime = time.Now().UnixNano()/1e6
switch destination.Type {
case action.ACTION_EMAIL:
sender, err := resolveEmailAccount(destination.Email.EmailAccountID)
if err != nil {
alertItem.ErrorMessage = err.Error()
continue
}
subject, err := resolveMessage(act.SubjectTemplate, monitorCtx)
if err != nil {
alertItem.ErrorMessage = err.Error()
continue
}
receiver, err := getEmailRecipient(destination.Email.Recipients)
if err != nil {
alertItem.ErrorMessage = err.Error()
continue
}
tact = &action.EmailAction{
Message: string(message),
Subject: string(subject),
Sender: sender,
Receiver: receiver,
}
case action.ACTION_WEBHOOK:
tact = &action.WebhookAction{
Data: &destination.CustomWebhook,
Message: string(message),
}
}
if tact != nil {
actResult, err := tact.Execute()
var errStr string
if err != nil {
errStr = err.Error()
alertItem.ErrorMessage += errStr
}
alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, alerting.ActionExecutionResult{
ActionID: act.ID,
LastExecutionTime: alertItem.LastNotificationTime,
Error: errStr,
Result: string(actResult),
})
}
if alertItem.ErrorMessage != "" {
alertItem.State = ALERT_ERROR
}
err = saveAlertInfo(&alertItem)
if err != nil {
log.Error(err)
}
}
}
}
}
func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{}, error) {
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, getAlertIndexName(INDEX_ALERT))
reqBody := IfaceMap{
"size": 1,
"query": IfaceMap{
"bool": IfaceMap{
"must": []IfaceMap{
{
"match": IfaceMap{
"monitor_id": monitorID,
},
},
{
"match": IfaceMap{
"cluster_id": clusterID,
},
},
{
"match": IfaceMap{
"trigger_id": triggerID,
},
},
},
},
},
"sort": []IfaceMap{
{"start_time": IfaceMap{"order":"desc"}},
},
}
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
return nil, err
}
var resBody = &elastic.SearchResponse{}
err = decodeJSON(res.Body, resBody)
if err != nil {
return nil, err
}
if len(resBody.Hits.Hits) > 0 {
return resBody.Hits.Hits[0].Source, nil
}
return nil, nil
}
func saveAlertInfo(alertItem *alerting.Alert) error {
conf := getDefaultConfig()
indexName := getAlertIndexName(INDEX_ALERT)
reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, indexName)
reqBody := IfaceMap{
"size": 1,
"query": IfaceMap{
"bool": IfaceMap{
"must": []IfaceMap{
{
"match": IfaceMap{
"monitor_id": alertItem.MonitorId,
},
},
//{
// "match": IfaceMap{
// "state": ALERT_ACTIVE,
// },
//},
{
"match": IfaceMap{
"cluster_id": alertItem.ClusterID,
},
},
{
"match": IfaceMap{
"trigger_id": alertItem.TriggerId,
},
},
},
},
},
}
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
return err
}
var resBody = elastic.SearchResponse{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
return err
}
res.Body.Close()
if len(resBody.Hits.Hits) == 0 {
if alertItem.State == ALERT_COMPLETED {
return nil
}
reqUrl = fmt.Sprintf("%s/%s/_doc", conf.Endpoint, indexName)
_,err = doRequest(reqUrl, http.MethodPost, nil, alertItem)
return err
}
currentState := queryValue(resBody.Hits.Hits[0].Source, "state", "").(string)
alertItem.Id = resBody.Hits.Hits[0].ID.(string)
if currentState != alertItem.State {
reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, getAlertIndexName(INDEX_ALERT_HISTORY), alertItem.Id)
source := resBody.Hits.Hits[0].Source
source["end_time"] = time.Now().UnixNano()/1e6
if alertItem.State == ALERT_COMPLETED {
if currentState == ALERT_ACTIVE {
source["state"] = ALERT_COMPLETED
}
}
_,err = doRequest(reqUrl, http.MethodPut, nil, source)
reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, indexName, resBody.Hits.Hits[0].ID.(string))
_,err = doRequest(reqUrl, http.MethodDelete, nil, alertItem)
return err
}
alertItem.StartTime = int64(queryValue(resBody.Hits.Hits[0].Source, "start_time", 0).(float64))
reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, indexName, alertItem.Id)
_,err = doRequest(reqUrl, http.MethodPut, nil, alertItem)
return err
}
func getEmailRecipient(recipients []alerting.Recipient) ([]string, error){
var emails []string
for _, recipient := range recipients {
if recipient.Type == "email" {
emails = append(emails, recipient.Email)
continue
}
if recipient.Type == "email_group" {
eg, err := resolveEmailGroup(recipient.EmailGroupID)
if err != nil {
return emails, err
}
for _, em := range eg.Emails {
if email, ok := em["email"].(string); ok {
emails = append(emails, email)
}
}
}
}
return emails, nil
}
func resolveDestination(ID string)(*alerting.Destination, error){
//todo may be cache destination ?
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID)
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
if err != nil {
return nil,err
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
return nil, err
}
res.Body.Close()
destination := &alerting.Destination{}
buf, _ := json.Marshal(queryValue(resBody, "_source."+DESTINATION_FIELD, IfaceMap{}))
_ = json.Unmarshal(buf, destination)
return destination, nil
}
func resolveEmailAccount(ID string)(*alerting.EmailAccount, error){
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID)
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
if err != nil {
return nil,err
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
return nil, err
}
res.Body.Close()
email := &alerting.EmailAccount{}
buf, _ := json.Marshal(queryValue(resBody, "_source."+EMAIL_ACCOUNT_FIELD, IfaceMap{}))
_ = json.Unmarshal(buf, email)
return email, nil
}
func resolveEmailGroup(ID string)(*alerting.EmailGroup, error){
conf := getDefaultConfig()
reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID)
res, err := doRequest(reqUrl, http.MethodGet, nil, nil)
if err != nil {
return nil,err
}
var resBody = IfaceMap{}
err = decodeJSON(res.Body, &resBody)
if err != nil {
return nil, err
}
res.Body.Close()
emailGroup := &alerting.EmailGroup{}
buf, _ := json.Marshal(queryValue(resBody, "_source."+EMAIL_GROUP_FIELD, IfaceMap{}))
_ = json.Unmarshal(buf, emailGroup)
return emailGroup, nil
}
func getQueryResult(clusterID string, input *alerting.MonitorInput) (IfaceMap, error) {
conf := elastic.GetConfig(clusterID)
reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, strings.Join(input.Search.Indices, ","))
res, err := doRequest(reqUrl, http.MethodGet, nil, input.Search.Query)
if err != nil {
return nil, err
}
defer res.Body.Close()
resBody := IfaceMap{}
err = decodeJSON(res.Body, &resBody)
return resBody, err
}
func resolveMessage(messageTemplate IfaceMap, monitorCtx []byte ) ([]byte, error){
msg := messageTemplate["source"].(string)
tpl := fasttemplate.New(msg, "{{", "}}")
msgBuffer := bytes.NewBuffer(nil)
_, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){
keyParts := strings.Split(tag,".")
value, _, _, err := jsonparser.Get(monitorCtx, keyParts...)
if err != nil {
return 0, err
}
return writer.Write(value)
})
return msgBuffer.Bytes(), err
//return json.Marshal(msg)
}
func createMonitorContext(trigger *alerting.Trigger, result IfaceMap, smt *ScheduleMonitor, extra IfaceMap) ([]byte, error){
ctx := IfaceMap{
"_ctx": IfaceMap{
"results": []interface{}{
result,
},
"trigger": trigger,
"monitor": smt.Monitor,
"cluster_id": smt.ClusterID,
"periodStart": "",
"periodEnd":"",
},
}
return json.Marshal(ctx)
}
func resolveTriggerResult(trigger *alerting.Trigger, monitorCtx []byte ) (bool, error){
source := queryValue(trigger.Condition, "script.source", "")
sourceBytes := []byte(source.(string))
config, err := yaml.NewConfig(sourceBytes)
if err != nil {
return false, err
}
var boolConfig = &conditions.Config{}
err = config.Unpack(boolConfig)
if err != nil {
return false, err
}
cond, err := conditions.NewCondition(boolConfig)
if err != nil {
return false, err
}
ev := &MonitorEvent{
Fields: monitorCtx,
}
return cond.Check(ev), nil
}
func getEnabledMonitors() (map[string]ScheduleMonitor, error){
config := elastic.GetConfig("default")
reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}))
must := []IfaceMap{
{
"exists": IfaceMap{
"field": MONITOR_FIELD,
},
},
{
"match": IfaceMap{
MONITOR_FIELD+".enabled": true,
},
},
}
reqBody := IfaceMap{
"size": 100,
"query": IfaceMap{
"bool": IfaceMap{
"must": must,
},
},
}
resBody := elastic.SearchResponse{}
res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody)
if err != nil {
return nil, err
}
err = decodeJSON(res.Body, &resBody)
if err != nil {
return nil, err
}
if len(resBody.Hits.Hits) == 0 {
return nil, nil
}
var monitors = map[string]ScheduleMonitor{}
for _, hit := range resBody.Hits.Hits {
monitor := &alerting.Monitor{}
buf, _ := json.Marshal(hit.Source[MONITOR_FIELD])
_ = json.Unmarshal(buf, monitor)
monitors[hit.ID.(string)] = ScheduleMonitor{
Monitor: monitor,
ClusterID: hit.Source["cluster_id"].(string),
}
}
return monitors, nil
}
type ScheduleMonitor struct {
Monitor *alerting.Monitor
ClusterID string
EntryID cronlib.EntryID
MonitorID string
}

View File

@ -0,0 +1,74 @@
package alerting
import (
"encoding/json"
"fmt"
"infini.sh/framework/core/conditions"
"infini.sh/framework/core/util"
"src/github.com/elastic/go-ucfg/yaml"
"testing"
)
func TestParseYamlConfig(t *testing.T){
yamlStr := `range:
ctx.results.hits.hits.[0]._source.number.test.gt: 3`
config, err := yaml.NewConfig([]byte(yamlStr))
if err != nil {
t.Fatal(err)
}
var boolConfig = &conditions.Config{}
err = config.Unpack(boolConfig)
if err != nil {
t.Fatal(err)
}
fmt.Println(boolConfig.Range)
cond, err := conditions.NewCondition(boolConfig)
if err != nil {
t.Fatal(err)
}
searchResStr := `{
"_shards": {
"failed": 0,
"skipped": 0,
"successful": 1,
"total": 1
},
"hits": {
"hits": [{"_source":{"number.test": 5, "normal": 7, "number": {"value": 5}}}],
"max_score": null,
"total": {
"relation": "eq",
"value": 5
}
},
"timed_out": false,
"took": 0
}`
searchResults := util.MapStr{}
err = json.Unmarshal([]byte(searchResStr), &searchResults)
if err != nil {
t.Fatal(err)
}
fields := util.MapStr{
"ctx": util.MapStr{
"results": searchResults,
"test": util.MapStr{
"number": 2,
},
},
}
//searchEvent := &event.Event{
// Fields: fields,
//}
fieldsBytes, _ := json.Marshal(fields)
tevent := &MonitorEvent{
Fields: fieldsBytes,
}
//fmt.Println(cond.Check(searchEvent))
fmt.Println(cond.Check(tevent))
}

View File

@ -0,0 +1,78 @@
package util
import (
"fmt"
cronlib "github.com/robfig/cron"
"infini.sh/search-center/model/alerting"
"time"
)
type MonitorPeriod struct {
Start int64
End int64
}
func GetMonitorPeriod(currentTime *time.Time, schedule *alerting.Schedule) *MonitorPeriod{
if schedule.Period != nil {
return transformPeriod(currentTime, schedule.Period)
}
if schedule.Cron != nil {
return transformCron(currentTime, schedule.Cron)
}
return nil
}
func transformCron(currentTime *time.Time, cron *alerting.Cron) *MonitorPeriod {
timezone := ""
if cron.Timezone != "" {
timezone = fmt.Sprintf("CRON_TZ=%s ", cron.Timezone)
}
parser := cronlib.NewParser(
cronlib.SecondOptional | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow,
)
sd, _ := parser.Parse(timezone + cron.Expression)
ssd := sd.(*cronlib.SpecSchedule)
var duration = time.Minute
if ssd.Hour == 1 {
duration = time.Hour
}
tempTime := *currentTime
nextTime := sd.Next(tempTime)
var preTime = tempTime
for {
tempTime = tempTime.Add(-duration)
if preTime = sd.Next(tempTime); !preTime.Equal(nextTime) {
break
}
}
mp := &MonitorPeriod{
Start: preTime.UnixNano()/1e6,
End: currentTime.UnixNano()/1e6,
}
return mp
}
func transformPeriod(currentTime *time.Time, period *alerting.Period) *MonitorPeriod {
if period == nil {
return nil
}
mp := &MonitorPeriod{
End: currentTime.UnixNano()/1e6,
}
var duration time.Duration
switch period.Unit {
case "MINUTES":
duration = time.Minute
case "HOURS":
duration = time.Hour
case "DAYS":
duration = time.Hour * 24
default:
return nil
}
mp.Start = currentTime.Add(-duration * time.Duration(period.Interval)).UnixNano()/1e6
return mp
}

View File

@ -0,0 +1,22 @@
package util
import (
"fmt"
"infini.sh/search-center/model/alerting"
"testing"
"time"
)
func TestGetMonitorPeriod(t *testing.T) {
now := time.Now()
periods := GetMonitorPeriod(&now, &alerting.Schedule{
Cron: &alerting.Cron{
Expression: "0 0 1 */1 *",
},
//Period: &alerting.Period{
// Unit: "MINUTES",
// Interval: 10,
//},
})
fmt.Println(periods)
}

View File

@ -91,7 +91,7 @@ const Console = (params:props) => {
return ( <PanelContextProvider registry={registryRef.current}>
<RequestContextProvider>
<EditorContextProvider>
<ServicesContextProvider value={{ services: { history, storage, objectStorageClient, settings} }}>
<ServicesContextProvider value={{ services: { history, storage, objectStorageClient, settings}, clusterID: params.selectedCluster.id }}>
<ConsoleWrapper {...params} />
</ServicesContextProvider>
</EditorContextProvider>

View File

@ -141,7 +141,7 @@ const ConsoleInputUI = ({clusterID, initialText}:ConsoleInputProps) => {
}
}, []);
useEffect(()=>{
retrieveAutoCompleteInfo(settings, settings.getAutocomplete());
retrieveAutoCompleteInfo(settings, settings.getAutocomplete(), clusterID);
},[clusterID])
const handleSaveAsCommonCommand = async () => {

View File

@ -44,6 +44,7 @@ interface ContextServices {
export interface ContextValue {
services: ContextServices;
clusterID: string;
}
interface ContextProps {

View File

@ -20,6 +20,7 @@ export type BaseResponseType =
export interface EsRequestArgs {
requests: Array<{ url: string; method: string; data: string[] }>;
clusterID: string;
}
export interface ESResponseObject<V = unknown> {

View File

@ -47,7 +47,7 @@ function buildRawCommonCommandRequest(cmd:any){
}
export const useSendCurrentRequestToES = () => {
const dispatch = useRequestActionContext();
const { services: { history } } = useServicesContext();
const { services: { history }, clusterID } = useServicesContext();
return useCallback(async () => {
try {
@ -80,7 +80,7 @@ export const useSendCurrentRequestToES = () => {
dispatch({ type: 'sendRequest', payload: undefined });
// @ts-ignore
const results = await sendRequestToES({ requests });
const results = await sendRequestToES({ requests, clusterID });
// let saveToHistoryError: undefined | Error;

View File

@ -69,7 +69,7 @@ export function sendRequestToES(args: EsRequestArgs): Promise<ESRequestResult[]>
} // append a new line for bulk requests.
const startTime = Date.now();
send(esMethod, esPath, esData).always(
send(esMethod, esPath, esData, {clusterID: args.clusterID}).always(
(dataOrjqXHR, textStatus: string, jqXhrORerrorThrown) => {
if (reqId !== CURRENT_REQ_ID) {
return;

View File

@ -37,6 +37,7 @@ import { stringify } from 'query-string';
interface SendOptions {
asSystemRequest?: boolean;
clusterID?: string;
}
const esVersion: string[] = [];
@ -62,15 +63,15 @@ export function send(
method: string,
path: string,
data: string | object,
{ asSystemRequest }: SendOptions = {}
{ asSystemRequest, clusterID }: SendOptions = {}
) {
const wrappedDfd = $.Deferred();
const clusterID = extractClusterIDFromURL();
if(!clusterID){
console.log('can not get clusterid from url');
return;
}
// const clusterID = extractClusterIDFromURL();
// if(!clusterID){
// console.log('can not get clusterid from url');
// return;
// }
// @ts-ignore
const options: JQuery.AjaxSettings = {
url: `/elasticsearch/${clusterID}/_proxy?` + stringify({ path, method }),

View File

@ -267,7 +267,7 @@ export function clear() {
templates = [];
}
function retrieveSettings(settingsKey, settingsToRetrieve) {
function retrieveSettings(settingsKey, settingsToRetrieve, clusterID) {
const settingKeyToPathMap = {
fields: '_mapping',
indices: '_aliases',
@ -280,7 +280,7 @@ function retrieveSettings(settingsKey, settingsToRetrieve) {
if(settingsKey === 'commands'){
return es.queryCommonCommands();
}
return es.send('GET', settingKeyToPathMap[settingsKey], null, true);
return es.send('GET', settingKeyToPathMap[settingsKey], null, {clusterID, asSystemRequest: true});
} else {
const settingsPromise = new $.Deferred();
if (settingsToRetrieve[settingsKey] === false) {
@ -319,13 +319,13 @@ function getObject(value){
* @param settings Settings A way to retrieve the current settings
* @param settingsToRetrieve any
*/
export function retrieveAutoCompleteInfo(settings, settingsToRetrieve) {
export function retrieveAutoCompleteInfo(settings, settingsToRetrieve, clusterID) {
clearSubscriptions();
const mappingPromise = retrieveSettings('fields', settingsToRetrieve);
const aliasesPromise = retrieveSettings('indices', settingsToRetrieve);
const templatesPromise = retrieveSettings('templates', settingsToRetrieve);
const commandsPromise = retrieveSettings('commands', settingsToRetrieve);
const mappingPromise = retrieveSettings('fields', settingsToRetrieve, clusterID);
const aliasesPromise = retrieveSettings('indices', settingsToRetrieve, clusterID);
const templatesPromise = retrieveSettings('templates', settingsToRetrieve, clusterID);
const commandsPromise = retrieveSettings('commands', settingsToRetrieve, clusterID);
$.when(mappingPromise, aliasesPromise, templatesPromise, commandsPromise).done((mappings, aliases, templates, commands) => {

View File

@ -103,12 +103,6 @@ class BasicLayout extends React.PureComponent {
dispatch({
type: 'setting/getSetting',
});
// dispatch({
// type: 'cluster/fetchClusterVersion',
// payload: {
// cluster: 'single-es'
// }
// });
this.renderRef = requestAnimationFrame(() => {
this.setState({
rendering: false,

View File

@ -22,7 +22,7 @@ export default {
'alert.dashboard.create-monitor-text': '暂无监控项。 创建监控项以添加触发器和操作。 一旦触发警报,状态将显示在此表中。',
'alert.dashboard.create-trigger-text': '暂无监控项。 创建触发器以开始警报。 一旦触发警报,状态将显示在此表中。',
'alert.dashboard.table.columns.start_time': '告警开始时间',
'alert.dashboard.table.columns.end_time': '告警截止时间',
'alert.dashboard.table.columns.end_time': '告警结束时间',
'alert.dashboard.table.columns.monitor_name': '监控项名称',
'alert.dashboard.table.columns.trigger_name': '触发器名称',
'alert.dashboard.table.columns.severity': '告警级别',

View File

@ -5,12 +5,15 @@ import {formatESSearchResult, extractClusterIDFromURL} from '@/lib/elasticsearch
import {Modal} from 'antd';
import router from "umi/router";
const MENU_COLLAPSED_KEY = "search-center:menu:collapsed";
console.log(localStorage.getItem(MENU_COLLAPSED_KEY))
export default {
namespace: 'global',
state: {
collapsed: false,
collapsed: localStorage.getItem(MENU_COLLAPSED_KEY) === 'true',
isInitCollapsed:false,
notices: [],
clusterVisible: true,
clusterList: [],
@ -160,9 +163,18 @@ export default {
reducers: {
changeLayoutCollapsed(state, { payload }) {
//layout sider init(false) bug
if(!state.isInitCollapsed && state.collapsed){
return {
...state,
isInitCollapsed: true,
};
}
localStorage.setItem(MENU_COLLAPSED_KEY, payload);
return {
...state,
collapsed: payload,
isInitCollapsed: true,
};
},
saveNotices(state, { payload }) {

View File

@ -40,7 +40,7 @@ const AlertingUI = (props)=>{
}
useMemo(()=>{
httpClient.params.basePath.prepend = (url)=>{
return '/elasticsearch/'+ props.selectedCluster.id + url;
return '/elasticsearch/'+ props.selectedCluster.id +"/" + url;
}
}, [props.selectedCluster]);
const isDarkMode = false;

View File

@ -20,9 +20,9 @@ import { isInvalid, hasError } from '../../../../../utils/validate';
const frequencies = [
{ value: 'interval', text: 'By interval' },
{ value: 'daily', text: 'Daily' },
{ value: 'weekly', text: 'Weekly' },
{ value: 'monthly', text: 'Monthly' },
// { value: 'daily', text: 'Daily' },
// { value: 'weekly', text: 'Weekly' },
// { value: 'monthly', text: 'Monthly' },
{ value: 'cronExpression', text: 'Custom cron expression' },
];

View File

@ -21,7 +21,7 @@ import { SEARCH_TYPE, INPUTS_DETECTOR_ID } from '../../../../../utils/constants'
export default function monitorToFormik(monitor) {
const formikValues = _.cloneDeep(FORMIK_INITIAL_VALUES);
if (!monitor) return formikValues;
const {
let {
name,
enabled,
schedule: { cron: { expression: cronExpression = formikValues.cronExpression, timezone } = {} },
@ -32,6 +32,11 @@ export default function monitorToFormik(monitor) {
// In that case we don't want to guess on the UI what selections a user made, so we will default to just showing the extraction query
const { searchType = 'query', fieldName } = search;
const isAD = searchType === SEARCH_TYPE.AD;
if(monitor.schedule) {
schedule = monitor.schedule
schedule.cron && (schedule.frequency = "cronExpression")
schedule.period && (schedule.frequency = "interval")
}
return {
/* INITIALIZE WITH DEFAULTS */

View File

@ -127,12 +127,13 @@ class DefineMonitor extends Component {
async getPlugins() {
const { httpClient } = this.props;
try {
const pluginsResponse = await httpClient.get('/alerting/_plugins');
if (pluginsResponse.ok) {
this.setState({ plugins: pluginsResponse.resp.map((plugin) => plugin.component) });
} else {
console.error('There was a problem getting plugins list');
}
return [];
// const pluginsResponse = await httpClient.get('/alerting/_plugins');
// if (pluginsResponse.ok) {
// this.setState({ plugins: pluginsResponse.resp.map((plugin) => plugin.component) });
// } else {
// console.error('There was a problem getting plugins list');
// }
} catch (e) {
console.error('There was a problem getting plugins list', e);
}

View File

@ -71,8 +71,8 @@ class ConfigureActions extends React.Component {
label: `${destination.name} - (${getDestinationLabel(destination)})`,
value: destination.id,
type: destination.type,
}))
.filter(({ type }) => allowList.includes(type));
}));
// .filter(({ type }) => allowList.includes(type));
this.setState({ destinations, loadingDestinations: false });
// If actions is not defined If user choose to delete actions, it will not override customer's preferences.
if (destinations.length > 0 && !values.actions && !actionDeleted) {

View File

@ -69,7 +69,8 @@ export default class CreateTrigger extends Component {
onCreate = (trigger, triggerMetadata, { setSubmitting, setErrors }) => {
const { monitor, updateMonitor, onCloseTrigger } = this.props;
const { ui_metadata: uiMetadata, triggers } = monitor;
let { ui_metadata: uiMetadata, triggers } = monitor;
uiMetadata = uiMetadata || {triggers:[]};
const updatedTriggers = [trigger].concat(triggers);
const updatedUiMetadata = {
...uiMetadata,

View File

@ -0,0 +1,60 @@
import {List, Icon} from 'antd';
import './alertitem.scss';
import moment from 'moment';
type TimeValue = number | null;
interface ActionExecutionResult {
action_id: string;
last_execution_time: TimeValue,
result?: string;
}
export interface AlertRecord {
acknowledged_time: TimeValue,
action_execution_results?: ActionExecutionResult[];
cluster_id: string;
end_time: TimeValue;
error_message: string;
id: string;
last_notification_time: TimeValue;
monitor_id: string;
monitor_name: string;
severity: string;
start_time: TimeValue;
state: string;
trigger_id: string;
trigger_name: string;
}
interface AlertItemProps {
item: AlertRecord;
onClick: (item: AlertRecord)=>void
}
export const AlertItem = ({
item,
onClick
}: AlertItemProps)=>{
return (
<List.Item
onClick={()=>{onClick(item)}}
key={item.id}
className="alert"
>
<div className="wrapper">
<div className={"status" + ` ${item.state}`}>
<div>{item.severity}</div>
</div>
<div className="content">{item.monitor_name+":"+item.trigger_name}</div>
<div className="right">
<div className="time">{moment(item.start_time).fromNow()}</div>
<div className="arrow">
<Icon type="right"/>
</div>
</div>
</div>
</List.Item>
)
}

View File

@ -0,0 +1,42 @@
import {List} from 'antd';
import {AlertItem, AlertRecord} from './AlertItem';
import './alertlist.scss';
interface AlertListProps {
dataSource: AlertRecord[];
pagination?: any;
title: string;
onItemClick: (item: AlertRecord)=>void
}
export const AlertList = ({
dataSource,
pagination,
title,
onItemClick,
}: AlertListProps)=>{
return (
<div className="alert-list">
<div className="title">
{title}
<span className="total">({pagination?.total})</span>
</div>
<List
itemLayout="vertical"
size="large"
pagination={{
onChange: page => {
console.log(page);
},
pageSize: 20,
...pagination,
}}
dataSource={dataSource}
renderItem={item => (
<AlertItem item={item} onClick={onItemClick} />
)}
/>
</div>
)
}

View File

@ -0,0 +1,54 @@
.alert {
background-color: #fff;
border-radius: 5px;
border-bottom: none !important;
overflow:hidden;
padding: 0 !important;
margin-bottom: 5px;
&:hover{
cursor: pointer;
background-color: rgb(147, 158, 160);
color: #fff;
}
.wrapper{
display: flex;
min-height: 50px;
align-items: center;
.status{
width: 40px;
align-self: stretch;
display: inline-flex;
align-items: center;
justify-content: center;
color: #fff;
font-size: 20px;
&.COMPLETED{
background-color: sandybrown;
}
&.ACKNOWLEDGED{
background-color: seagreen;
}
&.ACTIVE{
background-color:tomato;
}
&.ERROR{
background-color:red;
}
&.DELETED{
background-color: gray;
}
}
.content{
padding-left: 10px;
}
.right{
margin-left: auto;
flex: 0 0 120px;
display: inline-flex;
.arrow{
margin-left: auto;
padding-right: 10px;
}
}
}
}

View File

@ -0,0 +1,14 @@
.alert-list{
background: #f0f2f5;
padding: 10px 5px;
.title{
color: #333;
font-weight:600;
padding-bottom: 6px;
.total{
color: #999;
margin-left: 15px;
font-size: 12px;
}
}
}

View File

@ -0,0 +1,76 @@
import {AlertList} from '../components/AlertList/AlertList';
import _ from 'lodash';
import {useState, useEffect} from 'react';
import './alertoverview.scss';
export const AlertOverview = (props: any)=>{
const {httpClient, history} = props;
const [data, setData] = useState({
alerts: [],
totalAlerts: 0,
});
const getAlerts = _.debounce(
(from, size, search, sortField, sortDirection, severityLevel, alertState, monitorIds) => {
let params = {
from,
size,
search,
sortField,
sortDirection,
severityLevel,
alertState,
};
if(monitorIds){
params["monitorIds"]= monitorIds;
}
// const queryParamsString = queryString.stringify(params);
// history.replace({ ...this.props.location, search: queryParamsString });
httpClient.get('/alerting/alerts', { query: params }).then((resp:any) => {
if (resp.ok) {
const { alerts, totalAlerts } = resp;
setData({
alerts,
totalAlerts,
});
} else {
console.log('error getting alerts:', resp);
}
});
},
500,
{ leading: true }
);
const pageSize = 10;
useEffect(()=>{
getAlerts(0, pageSize, "", "start_time", "desc", "ALL", "ALL","")
},[])
const onPageChange = (pageIndex: number)=>{
const from = (pageIndex - 1) * pageSize;
getAlerts(from, pageSize, "", "start_time", "desc", "ALL", "ALL","")
}
const onItemClick = (item: any)=>{
history.push(`/monitors/${item.monitor_id}`)
}
return (
<div className="alert-overview">
<div className="left">
<AlertList dataSource={data.alerts}
title="Open Alerts"
onItemClick={onItemClick}
pagination={{
pageSize,
total: data.totalAlerts,
onChange: onPageChange,
}}/>
</div>
<div className="right">
<div></div>
</div>
</div>
)
}

View File

@ -16,7 +16,7 @@
import React, { Component } from 'react';
import _ from 'lodash';
import queryString from 'query-string';
import { EuiBasicTable, EuiButton, EuiHorizontalRule, EuiIcon } from '@elastic/eui';
import { EuiBasicTable, EuiButton, EuiHorizontalRule, EuiIcon,RIGHT_ALIGNMENT,EuiButtonIcon } from '@elastic/eui';
import ContentPanel from '../../../components/ContentPanel';
import DashboardEmptyPrompt from '../components/DashboardEmptyPrompt';
@ -55,6 +55,21 @@ export default class Dashboard extends Component {
sortField,
} = this.getURLQueryParams();
const tableColumns = [...columns, {
align: RIGHT_ALIGNMENT,
width: '40px',
isExpander: true,
render: (item) => {
const {itemIdToExpandedRowMap} = this.state;
(
<EuiButtonIcon
onClick={() => toggleDetails(item)}
aria-label={itemIdToExpandedRowMap[item.id] ? 'Collapse' : 'Expand'}
iconType={itemIdToExpandedRowMap[item.id] ? 'arrowUp' : 'arrowDown'}
/>
)},
}]
this.state = {
alerts: [],
alertState,
@ -67,6 +82,8 @@ export default class Dashboard extends Component {
sortDirection,
sortField,
totalAlerts: 0,
itemIdToExpandedRowMap:{},
columns: tableColumns,
};
}
@ -357,6 +374,8 @@ export default class Dashboard extends Component {
<EuiHorizontalRule margin="xs" />
<EuiBasicTable
itemIdToExpandedRowMap={this.state.itemIdToExpandedRowMap}
isExpandable={true}
items={alerts}
/*
* If using just ID, doesn't update selectedItems when doing acknowledge
@ -364,7 +383,7 @@ export default class Dashboard extends Component {
* $id-$version will correctly remove selected items
* */
itemId={(item) => `${item.id}-${item.version}`}
columns={columns}
columns={this.state.columns}
pagination={pagination}
sorting={sorting}
isSelectable={true}

View File

@ -0,0 +1,9 @@
.alert-overview{
display: flex;
.right{
flex: 0 0 30%;
}
.left{
flex: 1 1 auto;
}
}

View File

@ -95,5 +95,5 @@ export const columns = [
truncateText: false,
render: renderTime,
dataType: 'date',
},
}
];

View File

@ -19,6 +19,7 @@ import {
FormikFieldText,
FormikFieldNumber,
FormikSelect,
FormikFieldPassword,
} from '../../../../../components/FormControls';
import { isInvalid, hasError } from '../../../../../utils/validate';
import { validateEmail, validateHost, validatePort, validateSenderName } from './utils/validate';
@ -101,6 +102,28 @@ const Sender = ({ sender, arrayHelpers, context, index, onDelete }) => {
}}
/>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<FormikFieldPassword
name={`senders.${index}.password`}
formRow
// fieldProps={{ validate: validateEmail }}
rowProps={{
label: 'Email password',
isInvalid,
error: hasError,
}}
inputProps={{
placeholder: '',
onChange: (e, field, form) => {
field.onChange(e);
onSenderChange(index, sender, arrayHelpers);
},
isInvalid,
}}
/>
</EuiFlexItem>
</EuiFlexGroup>
<EuiFlexGroup>
<EuiFlexItem grow={false}>
<FormikFieldText
name={`senders.${index}.host`}

View File

@ -105,6 +105,7 @@ export default class ManageSenders extends React.Component {
host: sender.host,
port: sender.port,
method: sender.method,
password: sender.password,
};
try {
const response = await httpClient.post(`/alerting/destinations/email_accounts`, {
@ -136,6 +137,7 @@ export default class ManageSenders extends React.Component {
host: updatedSender.host,
port: updatedSender.port,
method: updatedSender.method,
password: updatedSender.password,
};
try {
const response = await httpClient.put(`/alerting/email_accounts/${id}`, {

View File

@ -110,7 +110,8 @@ class DestinationsList extends React.Component {
isEmailAllowed = () => {
const { allowList } = this.state;
return allowList.includes(DESTINATION_TYPE.EMAIL);
return true;
// return allowList.includes(DESTINATION_TYPE.EMAIL);
};
isDeleteAllowed = async (type, id) => {

View File

@ -21,6 +21,7 @@ import Dashboard from '../Dashboard/containers/Dashboard';
import Monitors from '../Monitors/containers/Monitors';
import DestinationsList from '../Destinations/containers/DestinationsList';
import { formatMessage } from 'umi/locale';
import {AlertOverview} from '../Dashboard/containers/AlertOverview';
const getSelectedTabId = (pathname) => {
if (pathname.includes('monitors')) return 'monitors';
@ -100,7 +101,8 @@ export default class Home extends Component {
// exact
path="/dashboard"
render={(props) => (
<Dashboard {...props} httpClient={httpClient} notifications={notifications} />
// <Dashboard {...props} httpClient={httpClient} notifications={notifications} />
<AlertOverview {...props} httpClient={httpClient} notifications={notifications} />
)}
/>
<Route

View File

@ -137,7 +137,7 @@ class MonitorHistory extends PureComponent {
acknowledgedTime,
endTime: currentEndTime,
state,
errorsCount: alert_history.length,
errorsCount: alert_history?.length || 0,
},
},
lastEndTime

View File

@ -0,0 +1,9 @@
import React from "react";
export default (props)=>{
return (
<div>
</div>
);
}

View File

@ -66,10 +66,10 @@ export const AD_PREVIEW_DAYS = 7;
export const MAX_QUERY_RESULT_SIZE = 200;
export const OPEN_DISTRO_PREFIX = 'opendistro';
export const OPEN_DISTRO_PREFIX = 'infini-search-center';
export const PLUGIN_NAME = `alerting`;
export const INDEX_PREFIX = `${OPEN_DISTRO_PREFIX}-alerting`;
export const INDEX_PREFIX = `${OPEN_DISTRO_PREFIX}_alerting`;
export const INDEX = {
SCHEDULED_JOBS: `.${INDEX_PREFIX}-config`,
ALERTS: `.${INDEX_PREFIX}-alerts`,

View File

@ -166,6 +166,7 @@ const MonitorDatePicker = ({timeRange, commonlyUsedRanges, onChange, isLoading})
return (
<EuiSuperDatePicker
dateFormat= ''
isLoading={isLoading}
start={timeRange?.min}
end={timeRange?.max}
@ -318,13 +319,11 @@ class ClusterMonitor extends PureComponent {
}
const [from, to] = x;
let timeRange = {
min: from,
max: to,
min: moment(from).toISOString(),
max: moment(to).toISOString(),
}
this.setState({
timeRange: timeRange,
lastSeconds: 0,
pickerValue: [moment(from), moment(to)],
}, () => {
this.fetchData();
});
@ -374,6 +373,26 @@ class ClusterMonitor extends PureComponent {
const commonlyUsedRanges =
[
{
from: 'now-15m',
to: 'now',
display: '最近15分钟'
},
{
from: 'now-30m',
to: 'now',
display: '最近30分钟'
},
{
from: 'now-1h',
to: 'now',
display: '最近一小时'
},
{
from: 'now-24h',
to: 'now',
display: '最近一天',
},
{
from: 'now/d',
to: 'now/d',
@ -384,26 +403,6 @@ class ClusterMonitor extends PureComponent {
to: 'now/w',
display: '这个星期'
},
{
from: 'now-15m',
to: 'now',
display: '最近15分钟'
},
{
from: 'now-30m',
to: 'now',
display: '最近三十分钟'
},
{
from: 'now-1h',
to: 'now',
display: '最近一小时'
},
{
from: 'now-24h',
to: 'now',
display: '最近一天',
},
{
from: 'now-7d',
to: 'now',
@ -503,7 +502,7 @@ class ClusterMonitor extends PureComponent {
<div key={e} className={styles.vizChartContainer}>
<Chart size={[, 200]} className={styles.vizChartItem}>
<Settings theme={theme} showLegend legendPosition={Position.Top}
// onBrushEnd={this.handleChartBrush}
onBrushEnd={this.handleChartBrush}
tooltip={{
headerFormatter: disableHeaderFormat
? undefined