From e22848bd7cd53075a14b48529b05fde07bbbc38e Mon Sep 17 00:00:00 2001 From: silenceqi Date: Sat, 25 Sep 2021 15:05:07 +0800 Subject: [PATCH] new alerting api --- api/init.go | 46 +- main.go | 6 + model/alerting/alert.go | 32 + model/alerting/config.go | 19 + model/alerting/destination.go | 40 ++ model/alerting/email_account.go | 12 + model/alerting/email_group.go | 7 + model/alerting/monitor.go | 63 ++ model/alerting/temp | 1 + service/alerting/action/action.go | 7 + service/alerting/action/email.go | 86 +++ service/alerting/action/email_test.go | 25 + service/alerting/action/webhook.go | 62 ++ service/alerting/action/webhook_test.go | 25 + service/alerting/alert.go | 92 ++- service/alerting/constants.go | 21 +- service/alerting/destination.go | 488 +++++++++------ service/alerting/elasticsearch.go | 11 +- service/alerting/event.go | 33 + service/alerting/monitor.go | 400 ++++++++---- service/alerting/overview.go | 61 ++ service/alerting/schedule.go | 591 ++++++++++++++++++ service/alerting/schedule_test.go | 74 +++ service/alerting/util/period.go | 78 +++ service/alerting/util/period_test.go | 22 + .../kibana/console/components/Console.tsx | 2 +- .../console/components/ConsoleInput.tsx | 2 +- .../console/contexts/services_context.tsx | 1 + .../kibana/console/entities/es_request.ts | 1 + .../use_send_current_request_to_es/index.ts | 4 +- .../send_request_to_es.ts | 2 +- .../kibana/console/modules/es/index.ts | 13 +- .../console/modules/mappings/mappings.js | 14 +- web/src/layouts/BasicLayout.js | 6 - web/src/locales/zh-CN/alert.js | 2 +- web/src/models/global.js | 14 +- web/src/pages/Alerting/index.js | 2 +- .../Schedule/Frequencies/Frequency.js | 6 +- .../CreateMonitor/utils/monitorToFormik.js | 7 +- .../containers/DefineMonitor/DefineMonitor.js | 13 +- .../ConfigureActions/ConfigureActions.js | 4 +- .../containers/CreateTrigger/CreateTrigger.js | 3 +- .../components/AlertList/AlertItem.tsx | 60 ++ .../components/AlertList/AlertList.tsx | 42 ++ .../components/AlertList/alertitem.scss | 54 ++ .../components/AlertList/alertlist.scss | 14 + .../Dashboard/containers/AlertOverview.tsx | 76 +++ .../pages/Dashboard/containers/Dashboard.js | 23 +- .../Dashboard/containers/alertoverview.scss | 9 + .../pages/Dashboard/utils/tableUtils.js | 2 +- .../createDestinations/Email/Sender.js | 23 + .../ManageSenders/ManageSenders.js | 2 + .../DestinationsList/DestinationsList.js | 3 +- web/src/pages/Alerting/pages/Home/Home.js | 4 +- .../MonitorHistory/MonitorHistory.js | 2 +- .../pages/Alerting/pages/Overview/Overview.js | 9 + web/src/pages/Alerting/utils/constants.js | 4 +- web/src/pages/Cluster/Metrics.js | 49 +- 58 files changed, 2360 insertions(+), 414 deletions(-) create mode 100644 model/alerting/alert.go create mode 100644 model/alerting/config.go create mode 100644 model/alerting/destination.go create mode 100644 model/alerting/email_account.go create mode 100644 model/alerting/email_group.go create mode 100644 model/alerting/monitor.go create mode 100644 model/alerting/temp create mode 100644 service/alerting/action/action.go create mode 100644 service/alerting/action/email.go create mode 100644 service/alerting/action/email_test.go create mode 100644 service/alerting/action/webhook.go create mode 100644 service/alerting/action/webhook_test.go create mode 100644 service/alerting/event.go create mode 100644 service/alerting/overview.go create mode 100644 service/alerting/schedule.go create mode 100644 service/alerting/schedule_test.go create mode 100644 service/alerting/util/period.go create mode 100644 service/alerting/util/period_test.go create mode 100644 web/src/pages/Alerting/pages/Dashboard/components/AlertList/AlertItem.tsx create mode 100644 web/src/pages/Alerting/pages/Dashboard/components/AlertList/AlertList.tsx create mode 100644 web/src/pages/Alerting/pages/Dashboard/components/AlertList/alertitem.scss create mode 100644 web/src/pages/Alerting/pages/Dashboard/components/AlertList/alertlist.scss create mode 100644 web/src/pages/Alerting/pages/Dashboard/containers/AlertOverview.tsx create mode 100644 web/src/pages/Alerting/pages/Dashboard/containers/alertoverview.scss create mode 100644 web/src/pages/Alerting/pages/Overview/Overview.js diff --git a/api/init.go b/api/init.go index 0a098ce0..367cca55 100644 --- a/api/init.go +++ b/api/init.go @@ -7,8 +7,8 @@ import ( "infini.sh/framework/core/ui" "infini.sh/search-center/api/index_management" "infini.sh/search-center/config" - "path" "infini.sh/search-center/service/alerting" + "path" ) func Init(cfg *config.AppConfig) { @@ -37,36 +37,34 @@ func Init(cfg *config.AppConfig) { ui.HandleUIMethod(api.DELETE, path.Join(pathPrefix, "index/:index"), handler.HandleDeleteIndexAction) ui.HandleUIMethod(api.POST, path.Join(pathPrefix, "index/:index"), handler.HandleCreateIndexAction) - ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/alerts", alerting.GetAlerts) - ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/monitors/_search", alerting.Search) - ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_indices", alerting.GetIndices) - ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_aliases", alerting.GetAliases) - ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_mappings", alerting.GetMappings) - ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/_plugins", alerting.GetPlugins) + //new api + ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations/email_accounts", alerting.CreateEmailAccount) + ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.UpdateEmailAccount) + ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.DeleteEmailAccount) + ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations/email_accounts", alerting.GetEmailAccounts) + ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.GetEmailAccount) + ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations/email_groups", alerting.CreateEmailGroup) + ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations/email_groups", alerting.GetEmailGroups) + ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.DeleteEmailGroup) + ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.UpdateEmailGroup) + ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.GetEmailGroup) + ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations", alerting.GetDestinations) + ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations", alerting.CreateDestination) + ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/destinations/:destinationId", alerting.UpdateDestination) + ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/destinations/:destinationId", alerting.DeleteDestination) ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/monitors/:monitorID", alerting.GetMonitor) ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/monitors/:monitorID", alerting.UpdateMonitor) ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/monitors", alerting.GetMonitors) ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/monitors", alerting.CreateMonitor) ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/monitors/_execute", alerting.ExecuteMonitor) ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/monitors/:monitorID", alerting.DeleteMonitor) - ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_monitors/:monitorID/_acknowledge/alerts", alerting.AcknowledgeAlerts) + ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/_settings", alerting.GetSettings) - ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations", alerting.GetDestinations) - ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations", alerting.CreateDestination) - ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/destinations/:destinationId", alerting.UpdateDestination) - ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/destinations/:destinationId", alerting.DeleteDestination) - ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations/email_accounts", alerting.CreateEmailAccount) - ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations/email_accounts", alerting.GetEmailAccounts) - ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.DeleteEmailAccount) - ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.GetEmailAccount) - ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/email_accounts/:emailAccountId", alerting.UpdateEmailAccount) - ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/destinations/email_groups", alerting.CreateEmailGroup) - ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/destinations/email_groups", alerting.GetEmailGroups) - ui.HandleUIMethod(api.DELETE, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.DeleteEmailGroup) - ui.HandleUIMethod(api.PUT, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.UpdateEmailGroup) - ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/email_groups/:emailGroupId", alerting.GetEmailGroup) - - + ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_indices", alerting.GetIndices) + ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_aliases", alerting.GetAliases) + ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/monitors/_search", alerting.Search) + ui.HandleUIMethod(api.GET, "/elasticsearch/:id/alerting/alerts", alerting.GetAlerts) + ui.HandleUIMethod(api.POST, "/elasticsearch/:id/alerting/_monitors/:monitorID/_acknowledge/alerts", alerting.AcknowledgeAlerts) task.RegisterScheduleTask(task.ScheduleTask{ Description: "sync reindex task result", diff --git a/main.go b/main.go index 59d28f94..8a0e59fa 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,8 @@ import ( "infini.sh/framework/modules" "infini.sh/search-center/config" "infini.sh/search-center/model" + "infini.sh/search-center/model/alerting" + alertSrv "infini.sh/search-center/service/alerting" ) var appConfig *config.AppConfig @@ -72,6 +74,10 @@ func main() { orm.RegisterSchemaWithIndexName(model.Dict{}, "dict") orm.RegisterSchemaWithIndexName(model.Reindex{}, "reindex") orm.RegisterSchemaWithIndexName(elastic.IndexPattern{}, "view") + orm.RegisterSchemaWithIndexName(alerting.Config{}, "alerting-config") + orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alerting-alerts") + orm.RegisterSchemaWithIndexName(alerting.AlertHistory{}, "alerting-alert-history") + alertSrv.GetScheduler().Start() }) } diff --git a/model/alerting/alert.go b/model/alerting/alert.go new file mode 100644 index 00000000..1d6ced67 --- /dev/null +++ b/model/alerting/alert.go @@ -0,0 +1,32 @@ +package alerting + +type Alert struct { + ClusterID string `json:"cluster_id"` + AcknowledgedTime *int64 `json:"acknowledged_time" elastic_mapping:"acknowledged_time:{type:date}"` + ActionExecutionResults []ActionExecutionResult `json:"action_execution_results" elastic_mapping:"action_execution_results"` + AlertHistories []AlertHistory `json:"alert_history" elastic_mapping:"alert_history"` + EndTime *int64 `json:"end_time" elastic_mapping:"end_time:{type:date}"` + ErrorMessage string `json:"error_message" elastic_mapping:"error_message:{type:text}"` + Id string `json:"id" elastic_mapping:"id:{type:keyword}"` + LastNotificationTime int64 `json:"last_notification_time" elastic_mapping:"last_notification_time:{type:date}"` + MonitorId string `json:"monitor_id" elastic_mapping:"monitor_id:{type:keyword}"` + MonitorName string `json:"monitor_name" elastic_mapping:"monitor_name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}"` + Severity string `json:"severity" elastic_mapping:"severity:{type:keyword}"` + StartTime int64 `json:"start_time" elastic_mapping:"start_time:{type:date}"` + State string `json:"state" elastic_mapping:"state:{type:keyword}"` + TriggerId string `json:"trigger_id" elastic_mapping:"trigger_id:{type:keyword}"` + TriggerName string `json:"trigger_name" elastic_mapping:"trigger_name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}"` +} + +type ActionExecutionResult struct { + ActionID string `json:"action_id" elastic_mapping:"action_id:{type:keyword}"` + LastExecutionTime int64 `json:"last_execution_time" elastic_mapping:"last_execution_time:{type:date}"` + ThrottledCount int `json:"throttled_count" elastic_mapping:"throttled_count:{type:integer}"` + Error string `json:"error,omitempty"` + Result string `json:"result"` +} + +type AlertHistory struct { + Message string `json:"message" elastic_mapping:"message:{type:text}"` + Timestamp int64 `json:"timestamp" elastic_mapping:"timestamp:{type:date}"` +} \ No newline at end of file diff --git a/model/alerting/config.go b/model/alerting/config.go new file mode 100644 index 00000000..b72ab769 --- /dev/null +++ b/model/alerting/config.go @@ -0,0 +1,19 @@ +package alerting + +//type Config struct { +// ID string `json:"id" index:"id"` +// Type string `json:"type" elastic_mapping:"type:{type:keyword}"` +// Destination Destination `json:"destination,omitempty" elastic_mapping:"destination:{dynamic:false,properties:{custom_webhook:{properties:{header_params:{type:object,enabled:false},host:{type:text},password:{type:text},path:{type:keyword},port:{type:integer},query_params:{type:object,enabled:false},scheme:{type:keyword},url:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},username:{type:text}}},email:{properties:{email_account_id:{type:keyword},recipients:{type:nested,properties:{email:{type:text},email_group_id:{type:keyword},type:{type:keyword}}}}},last_update_time:{type:date,format:strict_date_time||epoch_millis},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schema_version:{type:integer},slack:{properties:{url:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}}},type:{type:keyword},user:{properties:{backend_roles:{type:text,fields:{keyword:{type:keyword}}},custom_attribute_names:{type:text,fields:{keyword:{type:keyword}}},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},roles:{type:text,fields:{keyword:{type:keyword}}}}}}}"` +// EmailAccount EmailAccount `json:"email_account,omitempty" elastic_mapping:"email_account:{properties:{email:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},from:{type:text},host:{type:text},method:{type:text},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},port:{type:integer},schema_version:{type:long}}}"` +// EmailGroup EmailGroup `json:"email_group,omitempty" elastic_mapping:"email_group:{properties:{emails:{type:nested,properties:{email:{type:text}}},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schema_version:{type:long}}}"` +// Monitor Monitor `json:"monitor,omitempty" elastic_mapping:"monitor:{dynamic:false,properties:{enabled:{type:boolean},enabled_time:{type:date,format:strict_date_time||epoch_millis},inputs:{type:nested,properties:{search:{properties:{indices:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},query:{type:object,enabled:false}}}}},last_update_time:{type:date,format:strict_date_time||epoch_millis},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schedule:{properties:{cron:{properties:{expression:{type:text},timezone:{type:keyword}}},period:{properties:{interval:{type:integer},unit:{type:keyword}}}}},schema_version:{type:integer},triggers:{type:nested,properties:{actions:{type:nested,properties:{destination_id:{type:keyword},message_template:{type:object,enabled:false},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},subject_template:{type:object,enabled:false},throttle:{properties:{unit:{type:keyword},value:{type:integer}}},throttle_enabled:{type:boolean}}},condition:{type:object,enabled:false},min_time_between_executions:{type:integer},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}}},type:{type:keyword}}}"` +//} + +type Config struct { + ID string `json:"id" index:"id"` + Type string `json:"type" elastic_mapping:"type:{type:keyword}"` + Destination Destination `json:"destination,omitempty" elastic_mapping:"destination"` + EmailAccount EmailAccount `json:"email_account,omitempty" elastic_mapping:"email_account"` + EmailGroup EmailGroup `json:"email_group,omitempty" elastic_mapping:"email_group"` + Monitor Monitor `json:"monitor,omitempty" elastic_mapping:"monitor"` +} diff --git a/model/alerting/destination.go b/model/alerting/destination.go new file mode 100644 index 00000000..6f97cec4 --- /dev/null +++ b/model/alerting/destination.go @@ -0,0 +1,40 @@ +package alerting + +type Destination struct { + Type string `json:"type" elastic_mapping:"type:{type:keyword}"` + Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` + SchemaVersion int `json:"schema_version" elastic_mapping:"schema_version:{type:integer}"` + LastUpdateTime int64 `json:"last_update_time" elastic_mapping:"last_update_time:{type:date,format:strict_date_time||epoch_millis}"` + CustomWebhook CustomWebhook `json:"custom_webhook,omitempty" elastic_mapping:"custom_webhook"` + Email EmailDestination `json:"email,omitempty" elastic_mapping:"email"` + Slack Slack `json:"slack,omitempty" elastic_mapping:"slack"` +} + +type EmailDestination struct { + EmailAccountID string `json:"email_account_id" elastic_mapping:"email_account_id:{type:keyword}"` + Recipients []Recipient `json:"recipients" elastic_mapping:"recipients"` +} + +type Recipient struct { + EmailGroupID string `json:"email_group_id,omitempty" elastic_mapping:"email_group_id:{type:keyword}"` + Email string `json:"email,omitempty" elastic_mapping:"email:{type:text}"` + Type string `json:"type" elastic_mapping:"type:{type:keyword}"` +} + + +type CustomWebhook struct { + HeaderParams map[string]string `json:"header_params" elastic_mapping:"header_params:{type:object,enabled:false}"` + Host string `json:"host" elastic_mapping:"host:{type:text}"` + Method string `json:"method" elastic_mapping:"host:{type:keyword}"` + Password string `json:"password" elastic_mapping:"password:{type:text}"` + Path string `json:"path" elastic_mapping:"path:{type:keyword}"` + Port int `json:"port" elastic_mapping:"port:{type:integer}"` + QueryParams map[string]string `json:"query_params" elastic_mapping:"query_params:{type:object,enabled:false}"` + Scheme string `json:"scheme" elastic_mapping:"scheme:{type:keyword}"` + URL string `json:"url" elastic_mapping:"url:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}"` + Username string `json:"username" elastic_mapping:"username:{type:text}"` +} + +type Slack struct { + URl string `json:"url" elastic_mapping:"url:{type:keyword}"` +} \ No newline at end of file diff --git a/model/alerting/email_account.go b/model/alerting/email_account.go new file mode 100644 index 00000000..e3da6af7 --- /dev/null +++ b/model/alerting/email_account.go @@ -0,0 +1,12 @@ +package alerting + +type EmailAccount struct { + Email string `json:"email" elastic_mapping:"email:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` + //From string `json:"from" elastic_mapping:"from:{type:text}"` + Host string `json:"host" elastic_mapping:"host:{type:text}"` + Method string `json:"method" elastic_mapping:"method:{type:text}"` + Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` + Port int `json:"port" elastic_mapping:"port:{type:integer}"` + SchemaVersion int64 `json:"schema_version" elastic_mapping:"schema_version:{type:long}"` + Password string `json:"password,omitempty" elastic_mapping:"password:{type:keyword}"` +} diff --git a/model/alerting/email_group.go b/model/alerting/email_group.go new file mode 100644 index 00000000..9c0952ae --- /dev/null +++ b/model/alerting/email_group.go @@ -0,0 +1,7 @@ +package alerting + +type EmailGroup struct { + Emails []map[string]interface{} `json:"emails" elastic_mapping:"emails:{type: nested, properties: {email: {type: text}}}"` + Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` + SchemaVersion int64 `json:"schema_version" elastic_mapping:"schema_version:{type:long}"` +} \ No newline at end of file diff --git a/model/alerting/monitor.go b/model/alerting/monitor.go new file mode 100644 index 00000000..4ef04147 --- /dev/null +++ b/model/alerting/monitor.go @@ -0,0 +1,63 @@ +package alerting + +type Monitor struct { + Enabled bool `json:"enabled" elastic_mapping:"enabled: {type:boolean}"` + EnabledTime int64 `json:"enabled_time" elastic_mapping:"enabled_time:{type:date,format:strict_date_time||epoch_millis}"` + Inputs []MonitorInput `json:"inputs" elastic_mapping:"inputs"` + LastUpdateTime int64 `json:"last_update_time" elastic_mapping:"last_update_time:{type:date,format:strict_date_time||epoch_millis}"` + Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` + Schedule Schedule `json:"schedule" elastic_mapping:"schedule"` + SchemaVersion int `json:"schema_version" elastic_mapping:"schema_version:{type:integer}"` + Triggers []Trigger `json:"triggers" elastic_mapping:"triggers"` + Type string `json:"type" elastic_mapping:"type:{type:keyword}` +} + +type MonitorInput struct { + Search MonitorSearch `json:"search" elastic_mapping:"search"` +} + +type MonitorSearch struct { + Indices []string `json:"indices" elastic_mapping:"indices:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` + Query map[string]interface{} `json:"query" elastic_mapping:"query:{type:object,enabled:false}}"` +} + +type Cron struct { + Expression string `json:"expression" elastic_mapping:"expression:{type:text}"` + Timezone string `json:"timezone" elastic_mapping:"timezone:{type:keyword}"` +} + +type Period struct { + Interval int `json:"interval" elastic_mapping:"interval:{type:integer}"` + Unit string `json:"unit" elastic_mapping:"unit:{type:keyword}"` +} + + +type Schedule struct { + Cron *Cron `json:"cron,omitempty" elastic_mapping:"cron"` + Period *Period `json:"period,omitempty" elastic_mapping:"period"` +} + + +type Throttle struct { + Unit string `json:"unit" elastic_mapping:"unit:{type:keyword}"` + Value int `json:"value" elastic_mapping:"value:{type:integer}"` +} + +type Action struct { + ID string `json:"id"` + DestinationId string `json:"destination_id" elastic_mapping:"destination_id:{type:keyword}"` + MessageTemplate map[string]interface{} `json:"message_template" elastic_mapping:"message_template:{type:object}"` + Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` + SubjectTemplate map[string]interface{} `json:"subject_template" elastic_mapping:"subject_template:{type:object}"` + ThrottleEnabled bool `json:"throttle_enabled" elastic_mapping:"throttle_enabled:{type:boolean}"` + Throttle *Throttle `json:"throttle,omitempty" elastic_mapping:"throttle"` +} + +type Trigger struct { + ID string `json:"id"` + Severity string `json:"severity" elastic_mapping:"severity:{type:keyword}"` + Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` + Condition map[string]interface{} `json:"condition" elastic_mapping:"condition:{type:object, enable:false}"` + Actions []Action `json:"actions" elastic_mapping:"actions"` + MinTimeBetweenExecutions int `json:"min_time_between_executions" elastic_mapping:"min_time_between_executions:{type:integer}"` +} \ No newline at end of file diff --git a/model/alerting/temp b/model/alerting/temp new file mode 100644 index 00000000..617439b9 --- /dev/null +++ b/model/alerting/temp @@ -0,0 +1 @@ +monitor:{dynamic:false,properties:{enabled:{type:boolean},enabled_time:{type:date,format:strict_date_time||epoch_millis},inputs:{type:nested,properties:{search:{properties:{indices:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},query:{type:object,enabled:false}}}}},last_update_time:{type:date,format:strict_date_time||epoch_millis},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schedule:{properties:{cron:{properties:{expression:{type:text},timezone:{type:keyword}}},period:{properties:{interval:{type:integer},unit:{type:keyword}}}}},schema_version:{type:integer},triggers:{type:nested,properties:{actions:{type:nested,properties:{destination_id:{type:keyword},message_template:{type:object,enabled:false},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},subject_template:{type:object,enabled:false},throttle:{properties:{unit:{type:keyword},value:{type:integer}}},throttle_enabled:{type:boolean}}},condition:{type:object,enabled:false},min_time_between_executions:{type:integer},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}}},type:{type:keyword}}} \ No newline at end of file diff --git a/service/alerting/action/action.go b/service/alerting/action/action.go new file mode 100644 index 00000000..9cd12255 --- /dev/null +++ b/service/alerting/action/action.go @@ -0,0 +1,7 @@ +package action + +const ( + ACTION_EMAIL = "email" + ACTION_WEBHOOK = "custom_webhook" + ACTION_SLACK = "slack" +) \ No newline at end of file diff --git a/service/alerting/action/email.go b/service/alerting/action/email.go new file mode 100644 index 00000000..a7d88bd7 --- /dev/null +++ b/service/alerting/action/email.go @@ -0,0 +1,86 @@ +package action + +import ( + "crypto/tls" + "fmt" + "infini.sh/search-center/model/alerting" + "net" + "net/smtp" + "strings" +) + +type EmailAction struct { + Sender *alerting.EmailAccount + Message string + Subject string + Receiver []string +} + +func (act *EmailAction) Execute() ([]byte, error){ + from := act.Sender.Email + //Todo add password input in frontend? + act.Sender.Host = strings.TrimSpace(act.Sender.Host) + addr := fmt.Sprintf("%s:%d", act.Sender.Host, act.Sender.Port) + msg := fmt.Sprintf("To: %s\r\nSubject: %s\r\n%s\r\n", strings.Join(act.Receiver, ","), act.Subject, act.Message) + + auth := smtp.PlainAuth("", from, act.Sender.Password, act.Sender.Host) + if act.Sender.Method == "ssl" { + err := SendEmailOverTLS(addr, auth, from, act.Receiver, []byte(msg)) + return nil, err + } + err := smtp.SendMail(addr, auth, from, act.Receiver, []byte(msg)) + + return nil, err +} + +func SendEmailOverTLS(addr string, auth smtp.Auth, from string, to []string, msg []byte) error{ + host, _, _ := net.SplitHostPort(addr) + tlsConfig := &tls.Config { + InsecureSkipVerify: true, + ServerName: host, + } + + conn, err := tls.Dial("tcp", addr, tlsConfig) + if err != nil { + return err + } + + c, err := smtp.NewClient(conn, host) + if err != nil { + return err + } + + if err = c.Auth(auth); err != nil { + return err + } + + // To && From + if err = c.Mail(from); err != nil { + return err + } + + for _, dst := range to { + if err = c.Rcpt(dst); err != nil { + return err + } + } + // Data + w, err := c.Data() + if err != nil { + return err + } + + _, err = w.Write(msg) + if err != nil { + return err + } + + err = w.Close() + if err != nil { + return err + } + + c.Quit() + return nil +} + diff --git a/service/alerting/action/email_test.go b/service/alerting/action/email_test.go new file mode 100644 index 00000000..f026a66b --- /dev/null +++ b/service/alerting/action/email_test.go @@ -0,0 +1,25 @@ +package action + +import ( + "infini.sh/search-center/model/alerting" + "testing" +) + +func TestEmailAction(t *testing.T){ + ea := EmailAction{ + Sender: &alerting.EmailAccount{ + Email: "liugq@infini.ltd", + Host: "smtp.ym.163.com", + Port: 994, + Method: "ssl", + Password: "", + }, + Message: "hello world", + Subject: "test email", + Receiver: []string{"786027438@qq.com"}, + } + _, err := ea.Execute() + if err != nil { + t.Fatal(err) + } +} \ No newline at end of file diff --git a/service/alerting/action/webhook.go b/service/alerting/action/webhook.go new file mode 100644 index 00000000..7702dc6e --- /dev/null +++ b/service/alerting/action/webhook.go @@ -0,0 +1,62 @@ +package action + +import ( + "crypto/tls" + "fmt" + "infini.sh/search-center/model/alerting" + "io/ioutil" + "net/http" + "strings" +) + +type Action interface { + Execute() ([]byte, error) +} + +type WebhookAction struct { + Data *alerting.CustomWebhook + Message string +} + +var actionClient = http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, +} + +func (act *WebhookAction) Execute()([]byte, error){ + var reqURL string + if act.Data.URL != "" { + reqURL = act.Data.URL + } + if act.Data.Host != "" { + reqURL = fmt.Sprintf("%s://%s:%d/%s", act.Data.Scheme, act.Data.Host, act.Data.Port, act.Data.Path ) + urlBuilder := strings.Builder{} + urlBuilder.WriteString(reqURL) + if len(act.Data.QueryParams) > 0 { + urlBuilder.WriteString("?") + } + for k, v := range act.Data.QueryParams { + urlBuilder.WriteString(k) + urlBuilder.WriteString("=") + urlBuilder.WriteString(v) + urlBuilder.WriteString("&") + } + reqURL = urlBuilder.String() + } + reqBody := strings.NewReader(act.Message) + req, err := http.NewRequest(http.MethodPost, reqURL, reqBody) + if err != nil { + return nil, err + } + for key, value := range act.Data.HeaderParams { + req.Header.Add(key, value) + } + res, err := actionClient.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + return ioutil.ReadAll(res.Body) +} + diff --git a/service/alerting/action/webhook_test.go b/service/alerting/action/webhook_test.go new file mode 100644 index 00000000..7fafdca7 --- /dev/null +++ b/service/alerting/action/webhook_test.go @@ -0,0 +1,25 @@ +package action + +import ( + "infini.sh/search-center/model/alerting" + "net/http" + "testing" +) + +func TestWebhookAction(t *testing.T){ + ea := WebhookAction{ + Message: `{"msgtype": "text","text": {"content":"通知:我就是我, 是不一样的烟火,Trigger: {{ctx.trigger.name}}"},"at":{"atMobiles":["18692254900"],"isAtAll": false}}`, + Data: &alerting.CustomWebhook{ + HeaderParams: map[string]string{ + "Content-Type": "application/json", + }, + URL: "https://oapi.dingtalk.com/robot/send?access_token=6a5c7c9454ff74537a6de493153b1da68860942d4b0aeb33797cb68b5111b077", + Method: http.MethodPost, + }, + } + _, err := ea.Execute() + if err != nil { + t.Fatal(err) + } + +} \ No newline at end of file diff --git a/service/alerting/alert.go b/service/alerting/alert.go index 8f68057d..85def0a6 100644 --- a/service/alerting/alert.go +++ b/service/alerting/alert.go @@ -8,6 +8,8 @@ import ( "fmt" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/search-center/model/alerting" "io" "net/http" "net/url" @@ -23,6 +25,16 @@ func getQueryParam(req *http.Request, key string, or ...string) string { return val } +func getAlertIndexName(typ string) string { + switch typ{ + case INDEX_ALL_ALERTS: + return fmt.Sprintf("%s,%s", orm.GetIndexName(alerting.AlertHistory{}), orm.GetIndexName(alerting.Alert{})) + case INDEX_ALERT_HISTORY: + return orm.GetIndexName(alerting.AlertHistory{}) + } + return orm.GetIndexName(alerting.Alert{}) +} + func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){ id := ps.ByName("id") conf := elastic.GetConfig(id) @@ -41,11 +53,6 @@ func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){ alertState = getQueryParam(req, "alertState", "ALL") monitorIds = req.URL.Query()["monitorIds"] params = map[string]string{ - "startIndex": from, - "size": size, - "severityLevel": severityLevel, - "alertState": alertState, - "searchString": search, } ) @@ -68,18 +75,67 @@ func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){ params["sortOrder"] = sortDirection params["missing"] = "_last" } + sort := IfaceMap{ + params["sortString"]: params["sortOrder"], + } + must := []IfaceMap{ + { + "match": IfaceMap{ + "cluster_id": id, + }, + }, + } + if severityLevel != "ALL" { + must = append(must, IfaceMap{ + "match": IfaceMap{ + "severity": severityLevel, + }, + }) + } + if alertState != "ALL" { + must = append(must, IfaceMap{ + "match": IfaceMap{ + "state": alertState, + }, + }) + } if len(monitorIds) > 0{ - params["monitorId"] = monitorIds[0] + must = append(must, IfaceMap{ + "match": IfaceMap{ + "monitor_id": monitorIds[0], + }, + }) } if clearSearch := strings.TrimSpace(search); clearSearch != ""{ searches := strings.Split(clearSearch, " ") clearSearch = strings.Join(searches, "* *") params["searchString"] = fmt.Sprintf("*%s*", clearSearch) + must = append(must, IfaceMap{ + "query_string": IfaceMap{ + //"default_field": "destination.name", + "default_operator": "AND", + "query": fmt.Sprintf(`*%s*`, clearSearch), + }, + }) } - reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/alerts", conf.Endpoint, API_PREFIX ) - res, err := doRequest(reqUrl, http.MethodGet, params, nil) + reqBody := IfaceMap{ + "size":size, + "from": from, + "query": IfaceMap{ + "bool":IfaceMap{ + "must": must, + }, + }, + "sort": sort, + } + indexName := getAlertIndexName(INDEX_ALL_ALERTS) + + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, indexName ) + + res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) if err != nil { writeError(w, err) return @@ -91,23 +147,25 @@ func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){ writeError(w, err) return } - alerts := []IfaceMap{} - rawAlerts := queryValue(alertRes, "alerts", nil) + alerts := []interface{}{} + rawAlerts := queryValue(alertRes, "hits.hits", nil) if ds, ok := rawAlerts.([]interface{}); ok { for _, alert := range ds { if alertItem, ok := alert.(map[string]interface{}); ok { - alertItem["version"] = queryValue(alertItem, "alert_version", "") - if queryValue(alertItem, "id", nil) == nil { - alertItem["id"] = queryValue(alertItem, "alert_id", nil) + //alertItem["version"] = queryValue(alertItem, "alert_version", "") + if alertID, ok := queryValue(alertItem, "_source.id", "").(string); ok && alertID == "" { + if source, ok := alertItem["_source"].(map[string]interface{}); ok { + source["id"] = alertItem["_id"] + } } - alerts = append(alerts, alertItem) + alerts = append(alerts, alertItem["_source"]) } } } writeJSON(w, IfaceMap{ "ok": true, "alerts": alerts, - "totalAlerts": queryValue(alertRes, "totalAlerts", 0), + "totalAlerts": queryValue(alertRes, "hits.total.value", 0), }, http.StatusOK) } @@ -140,8 +198,8 @@ func decodeJSON(reader io.Reader, obj interface{}) error{ } func writeJSON(w http.ResponseWriter, data interface{}, statusCode int){ + w.Header().Set("Content-Type", "application/json") w.WriteHeader(statusCode) - w.Header().Set("content-type", "application/json") buf, _ := json.Marshal(data) w.Write(buf) } @@ -212,4 +270,4 @@ func assignTo(dst IfaceMap, src IfaceMap){ for k, v := range src { dst[k] = v } -} \ No newline at end of file +} diff --git a/service/alerting/constants.go b/service/alerting/constants.go index da9fca8a..ec9e4360 100644 --- a/service/alerting/constants.go +++ b/service/alerting/constants.go @@ -1,10 +1,17 @@ package alerting -const INFINI_PREFIX = "opendistro" +const EMAIL_ACCOUNT_FIELD = "email_account" +const EMAIL_GROUP_FIELD = "email_group" +const DESTINATION_FIELD = "destination" +const MONITOR_FIELD = "monitor" +const INDEX_ALERT = "ALERT" +const INDEX_ALERT_HISTORY = "ALERT_HISTORY" +const INDEX_ALL_ALERTS = "ALL_ALERTS" -const PLUGIN_NAME = INFINI_PREFIX + "-alerting" -const INDEX_PREFIX = INFINI_PREFIX +"-alerting" - -const INDEX_ALL_ALERTS = "."+INDEX_PREFIX +`-alert*` - -const API_PREFIX = "_opendistro" \ No newline at end of file +const ( + ALERT_ACTIVE = "ACTIVE" + ALERT_COMPLETED = "COMPLETED" + ALERT_DELETED = "DELETED" + ALERT_ACKNOWLEDGED = "ACKNOWLEDGED" + ALERT_ERROR = "ERROR" +) diff --git a/service/alerting/destination.go b/service/alerting/destination.go index e229ece1..7639e31c 100644 --- a/service/alerting/destination.go +++ b/service/alerting/destination.go @@ -5,8 +5,13 @@ import ( "fmt" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "infini.sh/search-center/model/alerting" "net/http" + "runtime/debug" "strings" + "time" ) func GetDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -18,7 +23,7 @@ func GetDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Para } dstID := ps.ByName("destID") - reqUrl := fmt.Sprintf("%s/_opendistro/_alerting/monitors/%s", conf.Endpoint, dstID) + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), dstID) res, err := doRequest(reqUrl, http.MethodGet, nil, nil) if err != nil { writeError(w, err) @@ -32,74 +37,11 @@ func GetDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Para } res.Body.Close() - if _, ok := resBody["monitor"]; !ok { - writeJSON(w, IfaceMap{ - "ok": false, - }, http.StatusOK) - return - } - - queryDSL := ` { - "size": 0, - "query"": { - "bool": { - "must": { - "term"": { - "monitor_id": "%s", - }, - }, - }, - }, - "aggs": { - "active_count": { - "terms": { - "field": "state", - } - }, - "24_hour_count": { - "date_range": { - "field": "start_time", - "ranges": [{ "from": "now-24h/h" }] - } - } - } - }` - queryDSL = fmt.Sprintf(queryDSL, id) - reqUrl = fmt.Sprintf("%s/_opendistro/_alerting/monitors/_search", conf.Endpoint) - res, err = doRequest(reqUrl, http.MethodPost, map[string]string{ - "index": ".opendistro-alerting-alert*", - }, queryDSL) - - if err != nil { - writeError(w, err) - return - } - - var searchResBody = IfaceMap{} - err = decodeJSON(res.Body, &searchResBody) - if err != nil { - writeError(w, err) - return - } - dayCount := queryValue(searchResBody, "aggregations.24_hour_count.buckets.0.doc_count", 0) - activeBuckets := queryValue(searchResBody, "aggregations.active_count.buckets",[]interface{}{}) - activeCount := 0 - if ab, ok := activeBuckets.([]IfaceMap); ok { - for _, curr := range ab { - if curr["key"].(string) == "ACTIVE" { - activeCount = int(curr["doc_count"].(float64)) - } - } - } writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody["monitor"], - "activeCount": activeCount, - "dayCount": dayCount, - "version": queryValue(resBody, "_version", nil), - "ifSeqNo": queryValue(resBody, "_seq_no", nil), - "ifPrimaryTerm": queryValue(resBody, "_primary_term", nil), + "destination": queryValue(resBody, "_source.destination", nil), }, http.StatusOK) + } func GetDestinations(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -114,65 +56,99 @@ func GetDestinations(w http.ResponseWriter, req *http.Request, ps httprouter.Par size = getQueryParam(req, "size", "20") search = getQueryParam(req, "search", "") sortDirection = getQueryParam(req, "sortDirection", "desc") - sortField = getQueryParam(req, "sortField", "start_time") - typ = getQueryParam(req, "type", "ALL") + sortField = getQueryParam(req, "sortField", "") + typ = getQueryParam(req, "type", "") ) - var params = map[string]string{} - switch (sortField) { - case "name": - params = map[string]string{ - "sortString": "destination.name.keyword", - "sortOrder": sortDirection, - } - case "type": - params = map[string]string{ - "sortString": "destination.type", - "sortOrder": sortDirection, - } - default: - } - params["startIndex"] = from - params["size"] = size - params["searchString"] = search - params["destinationType"] = typ - if clearSearch := strings.TrimSpace(search); clearSearch != "" { - clearSearch = strings.ReplaceAll(clearSearch, " ", "* *") - params["searchString"] = clearSearch + must := []IfaceMap{ + { + "exists": IfaceMap{ + "field": DESTINATION_FIELD, + }, + }, + //{ + // "match": IfaceMap{ + // "cluster_id": id, + // }, + //}, } - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations", conf.Endpoint, API_PREFIX) - res, err := doRequest(reqUrl, http.MethodGet, params, nil) + if clearSearch := strings.TrimSpace(search); clearSearch != "" { + clearSearch = strings.ReplaceAll(clearSearch, " ", "* *") + must = append(must, IfaceMap{ + "query_string": IfaceMap{ + "default_field": "destination.name", + "default_operator": "AND", + "query": fmt.Sprintf(`*%s*`, clearSearch), + }, + }) + } + + var sort interface{} = IfaceMap{} + switch (sortField) { + case "name": + sort = IfaceMap{ "destination.name.keyword": sortDirection } + case "type": + sort = IfaceMap{ "destination.type": sortDirection } + default: + } + if typ != "" && typ != "ALL" { + must = append(must, IfaceMap{ + "match": IfaceMap{ + "destination.type": typ, + }, + }) + } + + reqBody := IfaceMap{ + "from": from, + "size": size, + "sort": sort, + "query": IfaceMap{ + "bool": IfaceMap{ + "must": must, + }, + }, + } + + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{})) + + res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) if err != nil { writeError(w, err) return } - var resBody = IfaceMap{} err = decodeJSON(res.Body, &resBody) if err != nil { writeError(w, err) return } - rawDests := queryValue(resBody, "destinations", []interface{}{}) + + totalDestinations := queryValue(resBody, "hits.total.value", 0) + rawHits := queryValue(resBody, "hits.hits", []interface{}{}) dests := []IfaceMap{} - if ds, ok := rawDests.([]interface{}); ok { - for _, dest := range ds { - if destination, ok := dest.(map[string]interface{}); ok { - - destination["version"] = queryValue(destination, "schema_version", "") - destination["ifSeqNo"] = queryValue(destination, "seq_no", 0) - destination["ifPrimaryTerm"] = queryValue(destination, "primary_term", 0) - dests = append(dests, destination) + if rh, ok := rawHits.([]interface{}); ok { + for _, hit := range rh { + if destination, ok := hit.(map[string]interface{}); ok { + newItem := IfaceMap{} + newItem["id"] = queryValue(destination, "_id", "") + source := queryValue(destination, "_source."+DESTINATION_FIELD, nil) + if ms, ok := source.(map[string]interface{}); ok { + assignTo(newItem, ms) + } + dests = append(dests, newItem) } } } + writeJSON(w, IfaceMap{ "ok": true, "destinations": dests, - "totalDestinations": queryValue(resBody, "totalDestinations", 0), + "totalDestinations":totalDestinations, }, http.StatusOK) } @@ -184,10 +160,40 @@ func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P return } - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations", conf.Endpoint, API_PREFIX) + config := getDefaultConfig() + destId := util.GetUUID() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destId) + var destination = &alerting.Destination{} + err := decodeJSON(req.Body, &destination) + if err != nil { + writeError(w, err) + return + } + destination.LastUpdateTime = time.Now().UnixNano()/1e6 + var toSaveDest = IfaceMap{ + "type": destination.Type, + "name": destination.Name, + "last_update_time": destination.LastUpdateTime, + "schema_version": destination.SchemaVersion, + "id": destId, + } + switch destination.Type { + case "email" : + toSaveDest[destination.Type] = destination.Email + case "custom_webhook": + toSaveDest[destination.Type] = destination.CustomWebhook + case "slack": + toSaveDest[destination.Type] = destination.Slack + default: + writeError(w, errors.New("type unsupported")) + return + } res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ "refresh": "wait_for", - }, req.Body) + }, IfaceMap{ + "cluster_id": id, + DESTINATION_FIELD: toSaveDest, + }) if err != nil { writeError(w, err) return @@ -203,7 +209,11 @@ func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody, + "resp": IfaceMap{ + DESTINATION_FIELD: toSaveDest, + "_id": queryValue(resBody, "_id", ""), + "_version": queryValue(resBody, "_version", 0), + }, }, http.StatusOK) } @@ -217,18 +227,40 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P } destinationId := ps.ByName("destinationId") - var ( - ifSeqNo = getQueryParam(req, "ifSeqNo") - ifPrimaryTerm = getQueryParam(req, "ifPrimaryTerm") - ) - //PUT /_opendistro/_alerting/destinations/2g3CsHsB3EDgQAwRGzgS?if_seq_no=15&if_primary_term=2&refresh=wait_for - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/%s", conf.Endpoint, API_PREFIX, destinationId) + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId) + var destination = &alerting.Destination{} + err := decodeJSON(req.Body, &destination) + if err != nil { + writeError(w, err) + return + } + destination.LastUpdateTime = time.Now().UnixNano()/1e6 + var toSaveDest = IfaceMap{ + "type": destination.Type, + "name": destination.Name, + "last_update_time": destination.LastUpdateTime, + "schema_version": destination.SchemaVersion, + "id": destinationId, + } + switch destination.Type { + case "email" : + toSaveDest[destination.Type] = destination.Email + case "custom_webhook": + toSaveDest[destination.Type] = destination.CustomWebhook + case "slack": + toSaveDest[destination.Type] = destination.Slack + default: + writeError(w, errors.New("type unsupported")) + return + } res, err := doRequest(reqUrl, http.MethodPut, map[string]string{ "refresh": "wait_for", - "if_seq_no": ifSeqNo, - "if_primary_term": ifPrimaryTerm, - }, req.Body) + }, IfaceMap{ + "cluster_id": id, + DESTINATION_FIELD: toSaveDest, + }) if err != nil { writeError(w, err) return @@ -241,8 +273,6 @@ func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P writeError(w, err) return } - //TODO error handle: check whether resBody has contains field error - writeJSON(w, IfaceMap{ "ok": true, "version": queryValue(resBody, "_version", ""), @@ -261,7 +291,8 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P destinationId := ps.ByName("destinationId") - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/%s", conf.Endpoint, API_PREFIX, destinationId) + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), destinationId) res, err := doRequest(reqUrl, http.MethodDelete, nil, nil) if err != nil { writeError(w, err) @@ -275,7 +306,6 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P writeError(w, err) return } - //TODO error handle: check whether resBody has contains field error resultIfce := queryValue(resBody, "result", "") var isOk = false @@ -288,18 +318,57 @@ func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.P } +func getDefaultConfig() *elastic.ElasticsearchConfig{ + return elastic.GetConfig("default") +} + +//var ( +// ks keystore.WritableKeystore +// ksOnce = &sync.Once{} +//) +//func getKeystore() keystore.WritableKeystore { +// ksOnce.Do(func() { +// tempKS, err := keystore.Factory(nil, "data/search-center/keystore") +// if err != nil { +// panic(err) +// } +// ks, err = keystore.AsWritableKeystore(tempKS) +// if err != nil { +// panic(err) +// } +// }) +// return ks +//} + func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + defer func() { + if err := recover(); err != nil { + debug.PrintStack() + } + }() id := ps.ByName("id") conf := elastic.GetConfig(id) if conf == nil { writeError(w, errors.New("cluster not found")) return } + config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts", conf.Endpoint, API_PREFIX) + reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{})) + var emailAccount = &alerting.EmailAccount{} + err := decodeJSON(req.Body, &emailAccount) + if err != nil { + writeError(w, err) + return + } + //var password = emailAccount.Password + //emailAccount.Password = "" res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ "refresh": "wait_for", - }, req.Body) + }, IfaceMap{ + EMAIL_ACCOUNT_FIELD: emailAccount, + "cluster_id": id, + }) if err != nil { writeError(w, err) return @@ -313,9 +382,27 @@ func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. return } + //kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", emailAccount.Name) + //secStr, _ := getKeystore().Retrieve(kk) + //kst := getKeystore() + //if secStr == nil { + // kst.Store(kk, []byte(password)) + // kst.Save() + //}else{ + // oldPwd, _ := secStr.Get() + // if bytes.Compare(oldPwd, []byte(password)) != 0 { + // kst.Store(kk, []byte(password)) + // } + // kst.Save() + //} + writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody, + "resp": IfaceMap{ + EMAIL_ACCOUNT_FIELD: emailAccount, + "_id": queryValue(resBody, "_id", ""), + "_version": queryValue(resBody, "_version", 0), + }, }, http.StatusOK) } @@ -328,17 +415,22 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. } emailAccountId := ps.ByName("emailAccountId") - var ( - ifSeqNo = getQueryParam(req, "ifSeqNo") - ifPrimaryTerm = getQueryParam(req, "ifPrimaryTerm") - ) + config := getDefaultConfig() + + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId) + var emailAccount = &alerting.EmailAccount{} + err := decodeJSON(req.Body, &emailAccount) + if err != nil { + writeError(w, err) + return + } - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts/%s", conf.Endpoint, API_PREFIX, emailAccountId) res, err := doRequest(reqUrl, http.MethodPut, map[string]string{ "refresh": "wait_for", - "if_seq_no": ifSeqNo, - "if_primary_term": ifPrimaryTerm, - }, req.Body) + }, IfaceMap{ + "cluster_id": id, + EMAIL_ACCOUNT_FIELD: emailAccount, + }) if err != nil { writeError(w, err) return @@ -351,7 +443,6 @@ func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. writeError(w, err) return } - //TODO error handle: check whether resBody has contains field error writeJSON(w, IfaceMap{ "ok": true, @@ -369,9 +460,13 @@ func DeleteEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. } emailAccountId := ps.ByName("emailAccountId") + config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts/%s", conf.Endpoint, API_PREFIX, emailAccountId) - res, err := doRequest(reqUrl, http.MethodDelete, nil, nil) + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId) + + res, err := doRequest(reqUrl, http.MethodDelete, map[string]string{ + "refresh": "wait_for", + }, nil) if err != nil { writeError(w, err) return @@ -384,7 +479,6 @@ func DeleteEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. writeError(w, err) return } - //TODO error handle: check whether resBody has contains field error resultIfce := queryValue(resBody, "result", "") var isOk = false @@ -394,7 +488,6 @@ func DeleteEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter. writeJSON(w, IfaceMap{ "ok": isOk, }, http.StatusOK) - } func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -412,24 +505,28 @@ func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Pa sortField = getQueryParam(req, "sortField", "name") ) - must := IfaceMap{ - "match_all": IfaceMap{}, + must := []IfaceMap{ + { + "exists": IfaceMap{ + "field": EMAIL_ACCOUNT_FIELD, + }, + }, } if clearSearch := strings.TrimSpace(search); clearSearch != "" { clearSearch = strings.ReplaceAll(clearSearch, " ", "* *") - must = IfaceMap{ + must = append(must, IfaceMap{ "query_string": IfaceMap{ "default_field": "email_account.name", "default_operator": "AND", "query": fmt.Sprintf(`*%s*`, clearSearch), }, - } + }) } sortQueryMap := IfaceMap{ "name": IfaceMap{ "email_account.name.keyword": sortDirection } } var sort interface{} - if sortQuery, ok := sortQueryMap[sortField]; ok { + if sortQuery, ok := sortQueryMap[sortField]; ok { sort = sortQuery } reqBody := IfaceMap{ @@ -443,8 +540,11 @@ func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Pa }, } - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts/_search", conf.Endpoint, API_PREFIX) - res, err := doRequest(reqUrl, http.MethodPost, nil, reqBody) + config := getDefaultConfig() + + reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{})) + + res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) if err != nil { writeError(w, err) return @@ -465,12 +565,10 @@ func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Pa if emailAccount, ok := hit.(map[string]interface{}); ok { newItem := IfaceMap{} newItem["id"] = queryValue(emailAccount, "_id", "") - source := queryValue(emailAccount, "_source", nil) + source := queryValue(emailAccount, "_source."+EMAIL_ACCOUNT_FIELD, nil) if ms, ok := source.(map[string]interface{}); ok { assignTo(newItem, ms) } - newItem["ifSeqNo"] = queryValue(emailAccount, "_seq_no", 0) - newItem["ifPrimaryTerm"] = queryValue(emailAccount, "_primary_term", 0) emailAccounts = append(emailAccounts, newItem) } } @@ -492,8 +590,11 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par emailAccountId := ps.ByName("emailAccountId") - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_accounts/%s", conf.Endpoint, API_PREFIX, emailAccountId) - res, err := doRequest(reqUrl, http.MethodGet,nil, req.Body) + config := getDefaultConfig() + + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailAccountId) + + res, err := doRequest(reqUrl, http.MethodGet,nil, nil) if err != nil { writeError(w, err) return @@ -506,14 +607,18 @@ func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Par writeError(w, err) return } + //name := queryValue(resBody,"_source.email_account.name", "") + //kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", name) + //secStr, _ := getKeystore().Retrieve(kk) + //if secStr != nil { + // pwd, _ := secStr.Get() + // fmt.Println(string(pwd)) + //} writeJSON(w, IfaceMap{ "ok": true, - "resp": queryValue(resBody, "email_account", nil), - "ifSeqNo": queryValue(resBody, "_seq_no", 0), - "ifPrimaryTerm": queryValue(resBody, "_primary_term", 0), + "resp": queryValue(resBody, "_source.email_account", nil), }, http.StatusOK) - } @@ -527,10 +632,20 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa return } - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups", conf.Endpoint, API_PREFIX) + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{})) + var emailGroup = &alerting.EmailGroup{} + err := decodeJSON(req.Body, &emailGroup) + if err != nil { + writeError(w, err) + return + } res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ "refresh": "wait_for", - }, req.Body) + }, IfaceMap{ + "cluster_id": id, + EMAIL_GROUP_FIELD: emailGroup, + }) if err != nil { writeError(w, err) return @@ -546,7 +661,15 @@ func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody, + "resp": IfaceMap{ + EMAIL_GROUP_FIELD: IfaceMap{ + "emails": emailGroup.Emails, + "name": emailGroup.Name, + "schema_version": emailGroup.SchemaVersion, + }, + "_id": queryValue(resBody, "_id", ""), + "_version": queryValue(resBody, "_version", 0), + }, }, http.StatusOK) } @@ -559,17 +682,20 @@ func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa } emailGroupId := ps.ByName("emailGroupId") - var ( - ifSeqNo = getQueryParam(req, "ifSeqNo") - ifPrimaryTerm = getQueryParam(req, "ifPrimaryTerm") - ) - - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups/%s", conf.Endpoint, API_PREFIX, emailGroupId) + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId) + var emailGroup = &alerting.EmailGroup{} + err := decodeJSON(req.Body, &emailGroup) + if err != nil { + writeError(w, err) + return + } res, err := doRequest(reqUrl, http.MethodPut, map[string]string{ "refresh": "wait_for", - "if_seq_no": ifSeqNo, - "if_primary_term": ifPrimaryTerm, - }, req.Body) + }, IfaceMap{ + "cluster_id": id, + EMAIL_GROUP_FIELD: emailGroup, + }) if err != nil { writeError(w, err) return @@ -582,8 +708,6 @@ func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa writeError(w, err) return } - //TODO error handle: check whether resBody has contains field error - writeJSON(w, IfaceMap{ "ok": true, "id": queryValue(resBody, "_id", ""), @@ -600,8 +724,9 @@ func DeleteEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa } emailGroupId := ps.ByName("emailGroupId") + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId) - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups/%s", conf.Endpoint, API_PREFIX, emailGroupId) res, err := doRequest(reqUrl, http.MethodDelete, nil, nil) if err != nil { writeError(w, err) @@ -615,7 +740,6 @@ func DeleteEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Pa writeError(w, err) return } - //TODO error handle: check whether resBody has contains field error resultIfce := queryValue(resBody, "result", "") var isOk = false @@ -643,19 +767,23 @@ func GetEmailGroups(w http.ResponseWriter, req *http.Request, ps httprouter.Para sortField = getQueryParam(req, "sortField", "name") ) - must := IfaceMap{ - "match_all": IfaceMap{}, + must := []IfaceMap{ + { + "exists": IfaceMap{ + "field": EMAIL_GROUP_FIELD, + }, + }, } if clearSearch := strings.TrimSpace(search); clearSearch != "" { clearSearch = strings.ReplaceAll(clearSearch, " ", "* *") - must = IfaceMap{ + must = append(must, IfaceMap{ "query_string": IfaceMap{ "default_field": "email_group.name", "default_operator": "AND", "query": fmt.Sprintf(`*%s*`, clearSearch), }, - } + }) } sortQueryMap := IfaceMap{ "name": IfaceMap{ "email_group.name.keyword": sortDirection } } @@ -674,9 +802,10 @@ func GetEmailGroups(w http.ResponseWriter, req *http.Request, ps httprouter.Para }, } - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups/_search", conf.Endpoint, API_PREFIX) - res, err := doRequest(reqUrl, http.MethodPost, nil, reqBody) - //TODO to handle api error in doRequest function + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{})) + + res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) if err != nil { writeError(w, err) return @@ -697,12 +826,10 @@ func GetEmailGroups(w http.ResponseWriter, req *http.Request, ps httprouter.Para if emailGroup, ok := hit.(map[string]interface{}); ok { newItem := IfaceMap{} newItem["id"] = queryValue(emailGroup, "_id", "") - source := queryValue(emailGroup, "_source", nil) + source := queryValue(emailGroup, "_source."+EMAIL_GROUP_FIELD, nil) if ms, ok := source.(map[string]interface{}); ok { assignTo(newItem, ms) } - newItem["ifSeqNo"] = queryValue(emailGroup, "_seq_no", 0) - newItem["ifPrimaryTerm"] = queryValue(emailGroup, "_primary_term", 0) emailGroups = append(emailGroups, newItem) } } @@ -722,9 +849,10 @@ func GetEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Param return } - emailAccountId := ps.ByName("emailGroupId") + emailGroupId := ps.ByName("emailGroupId") - reqUrl := fmt.Sprintf("%s/%s/_alerting/destinations/email_groups/%s", conf.Endpoint, API_PREFIX, emailAccountId) + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), emailGroupId) res, err := doRequest(reqUrl, http.MethodGet,nil, req.Body) if err != nil { writeError(w, err) @@ -738,7 +866,7 @@ func GetEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Param writeError(w, err) return } - emailGroup := queryValue(resBody, "email_group", nil) + emailGroup := queryValue(resBody, "_source."+EMAIL_GROUP_FIELD, nil) if emailGroup == nil { writeJSON(w, IfaceMap{ "ok": false, @@ -749,8 +877,6 @@ func GetEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Param writeJSON(w, IfaceMap{ "ok": true, "resp": emailGroup, - "ifSeqNo": queryValue(resBody, "_seq_no", 0), - "ifPrimaryTerm": queryValue(resBody, "_primary_term", 0), }, http.StatusOK) } diff --git a/service/alerting/elasticsearch.go b/service/alerting/elasticsearch.go index f8b023c6..dfc53b68 100644 --- a/service/alerting/elasticsearch.go +++ b/service/alerting/elasticsearch.go @@ -13,7 +13,7 @@ import ( type SearchBody struct { Query IfaceMap `json:"query"` Index string `json:"index"` - Size int `json:""size` + Size int `json:"size"` } func Search(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ @@ -29,12 +29,11 @@ func Search(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ writeError(w, err) return } - reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/_search", conf.Endpoint, API_PREFIX) - params := map[string]string{ - "index": body.Index, - } + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, body.Index) + body.Query["size"] = body.Size - res, err := doRequest(reqUrl, http.MethodPost, params, body.Query) + res, err := doRequest(reqUrl, http.MethodPost, nil, body.Query) if err != nil { writeError(w, err) return diff --git a/service/alerting/event.go b/service/alerting/event.go new file mode 100644 index 00000000..cfcac62c --- /dev/null +++ b/service/alerting/event.go @@ -0,0 +1,33 @@ +package alerting + +import ( + "src/github.com/buger/jsonparser" + "strconv" + "strings" +) + +type MonitorEvent struct { + Fields []byte +} + +func (ev *MonitorEvent) GetValue(key string) (interface{}, error) { + keyParts := strings.Split(key, ".")//todo whether ctx field contains dot + val, valType, _, err := jsonparser.Get(ev.Fields, keyParts...) + if err != nil { + return val, err + } + switch valType { + case jsonparser.String: + return string(val), nil + case jsonparser.Number: + + return strconv.Atoi(string(val)) + } + return nil, nil +} + +//func SplitFieldsKey(key, sep string) []string { +// sr := strings.NewReader(key) +// if ch, size, sr.ReadRune() +//} + diff --git a/service/alerting/monitor.go b/service/alerting/monitor.go index c1039af3..eea3017e 100644 --- a/service/alerting/monitor.go +++ b/service/alerting/monitor.go @@ -5,19 +5,17 @@ import ( "fmt" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "infini.sh/search-center/model/alerting" + alertUtil "infini.sh/search-center/service/alerting/util" "net/http" - "runtime/debug" "strconv" "strings" "time" ) func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - defer func() { - if err := recover(); err != nil { - debug.PrintStack() - } - }() id := ps.ByName("id") conf := elastic.GetConfig(id) if conf == nil { @@ -26,8 +24,9 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) } mid := ps.ByName("monitorID") - // /_opendistro/_alerting/monitors/uiSjqXsBHT9Hsiy5Dq6g - reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/%s", conf.Endpoint, API_PREFIX, mid) + config := getDefaultConfig() + + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), mid) res, err := doRequest(reqUrl, http.MethodGet, nil, nil) if err != nil { writeError(w, err) @@ -40,44 +39,39 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) return } res.Body.Close() - - if _, ok := resBody["monitor"]; !ok { - writeJSON(w, IfaceMap{ - "ok": false, - }, http.StatusOK) + if found, ok := resBody["found"].(bool); ok && !found { + writeError(w, errors.New("monitor not found")) return } queryDSL := ` { - "size": 0, - "query": { - "bool": { - "must": { - "term": { - "monitor_id": "%s" - } - } - } - }, - "aggs": { - "active_count": { - "terms": { - "field": "state" - } - }, - "24_hour_count": { - "date_range": { - "field": "start_time", - "ranges": [{ "from": "now-24h/h" }] - } - } - } - }` - queryDSL = fmt.Sprintf(queryDSL, id) - reqUrl = fmt.Sprintf("%s/%s/_alerting/monitors/_search", conf.Endpoint, API_PREFIX) - res, err = doRequest(reqUrl, http.MethodPost, map[string]string{ - "index": INDEX_ALL_ALERTS, - }, queryDSL) + "size": 0, + "query": { + "bool": { + "must": { + "term": { + "monitor_id": "%s" + } + } + } + }, + "aggs": { + "active_count": { + "terms": { + "field": "state" + } + }, + "24_hour_count": { + "date_range": { + "field": "start_time", + "ranges": [{ "from": "now-24h/h" }] + } + } + } + }` + queryDSL = fmt.Sprintf(queryDSL, mid) + reqUrl = fmt.Sprintf("%s/%s/_search", config.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS)) + res, err = doRequest(reqUrl, http.MethodPost,nil, queryDSL) if err != nil { writeError(w, err) @@ -90,33 +84,37 @@ func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) writeError(w, err) return } - dayCount := queryValue(searchResBody, "aggregations.24_hour_count.buckets.0.doc_count", 0) + dayCountBuckets := queryValue(searchResBody, "aggregations.24_hour_count.buckets", 0) + dayCount := 0 + if dcb, ok := dayCountBuckets.([]interface{}); ok { + if dayAgg, ok := dcb[0].(map[string]interface{}); ok { + dayCount = int(dayAgg["doc_count"].(float64)) + } + } + activeBuckets := queryValue(searchResBody, "aggregations.active_count.buckets",[]interface{}{}) activeCount := 0 - if ab, ok := activeBuckets.([]IfaceMap); ok { - for _, curr := range ab { - if curr["key"].(string) == "ACTIVE" { - activeCount = int(curr["doc_count"].(float64)) + if ab, ok := activeBuckets.([]interface{}); ok { + for _, bk := range ab { + if curr, ok := bk.(map[string]interface{}); ok { + if curr["key"].(string) == "ACTIVE" { + activeCount = int(curr["doc_count"].(float64)) + break + } } } } + monitor := queryValue(resBody, "_source.monitor", nil) writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody["monitor"], + "resp": monitor, "activeCount": activeCount, "dayCount": dayCount, "version": queryValue(resBody, "_version", nil), - "ifSeqNo": queryValue(resBody, "_seq_no", nil), - "ifPrimaryTerm": queryValue(resBody, "_primary_term", nil), }, http.StatusOK) } func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - defer func() { - if err := recover(); err != nil { - debug.PrintStack() - } - }() id := ps.ByName("id") conf := elastic.GetConfig(id) if conf == nil { @@ -130,20 +128,31 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) sortDirection = getQueryParam(req, "sortDirection") sortField = getQueryParam(req, "sortField") state = getQueryParam(req, "state") - must = IfaceMap{ "match_all": IfaceMap{} } + must = []IfaceMap{ + { + "match": IfaceMap{ + "cluster_id": id, + }, + }, + { + "exists": IfaceMap{ + "field": MONITOR_FIELD, + }, + }, + } ) if clearSearch := strings.TrimSpace(search); clearSearch != "" { clearSearch = strings.ReplaceAll(clearSearch, " ", "* *") - must = IfaceMap{ - "query_string": IfaceMap{ - "default_field": "monitor.name", - "default_operator": "AND", - "query": fmt.Sprintf("*%s*", clearSearch), - }, - } + must = append(must, IfaceMap{ + "query_string": IfaceMap{ + //"default_field": "monitor.name", + "default_operator": "AND", + "query": fmt.Sprintf("*%s*", clearSearch), + }, + }) } var filter = []IfaceMap{ - IfaceMap{ "term": IfaceMap{ "monitor.type": "monitor" }}, + { "term": IfaceMap{ "monitor.type": "monitor" }}, } if state != "all" { filter = append(filter, IfaceMap{ @@ -175,18 +184,24 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) sortPageData["from"] = intFrom } var params = IfaceMap{ - "seq_no_primary_term": true, - "version": true, "query": IfaceMap{ "bool": IfaceMap{ "filter": filter, "must": must, + "must_not": []IfaceMap{ + { + "exists": IfaceMap{ + "field": "monitor.status", + }, + }, + }, }, }, } assignTo(params, sortPageData) - reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/_search", conf.Endpoint, API_PREFIX ) - res, err := doRequest(reqUrl, http.MethodPost, nil, params) + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{}) ) + res, err := doRequest(reqUrl, http.MethodGet, nil, params) if err != nil { writeError(w, err) return @@ -208,12 +223,10 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) if hit, ok := hitIface.(map[string]interface{}); ok { id := queryValue(hit, "_id", "") monitorIDs = append(monitorIDs, id) - monitor := queryValue(hit, "_source", IfaceMap{}).(map[string]interface{}) + monitor := queryValue(hit, "_source.monitor", IfaceMap{}).(map[string]interface{}) monitorMap[id.(string)] = IfaceMap{ "id": id, "version": queryValue(hit, "_version", ""), - "ifSeqNo": queryValue(hit, "_seq_no", false), - "ifPrimaryTerm": queryValue(hit, "_primary_term", false), "name": queryValue(monitor, "name", ""), "enabled": queryValue(monitor, "enabled", false), "monitor": monitor, @@ -235,9 +248,7 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) if sortF, ok := aggsSorts[sortField]; ok { aggsOrderData["order"] = IfaceMap{ sortF.(string): sortDirection } } - var queryParams = map[string]string{ - "index": INDEX_ALL_ALERTS, - } + var termsMap = IfaceMap{ "field": "monitor_id", "size": intFrom + intSize, @@ -250,13 +261,13 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) "uniq_monitor_ids": IfaceMap{ "terms": termsMap, "aggregations": IfaceMap{ - "active": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": "ACTIVE" } } }, - "acknowledged": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": "ACKNOWLEDGED" } } }, - "errors": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": "ERROR" } } }, + "active": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_ACTIVE } } }, + "acknowledged": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_ACKNOWLEDGED } } }, + "errors": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_ERROR } } }, "ignored": IfaceMap{ "filter": IfaceMap{ "bool": IfaceMap{ - "filter": IfaceMap{ "term": IfaceMap{ "state": "COMPLETED" } }, + "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_COMPLETED } }, "must_not": IfaceMap{ "exists": IfaceMap{ "field": "acknowledged_time" } }, }, }, @@ -276,9 +287,8 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) }, } - - reqUrl = fmt.Sprintf("%s/%s/_alerting/monitors/_search", conf.Endpoint, API_PREFIX) - searchRes, err := doRequest(reqUrl, http.MethodPost, queryParams, aggsParams) + reqUrl = fmt.Sprintf("%s/%s/_search", config.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS)) + searchRes, err := doRequest(reqUrl, http.MethodPost, nil, aggsParams) if err != nil { writeError(w, err) return @@ -309,6 +319,7 @@ func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) monitor["latestAlert"] = latestAlert monitor["active"] = queryValue(bk, "active.doc_count", 0) monitor["errors"] = queryValue(bk, "errors.doc_count", 0) + monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0) monitor["currentTime"] = time.Now().UnixNano() / 1e6 usedMonitors = append(usedMonitors, monitor) delete(monitorMap, id.(string)) @@ -351,10 +362,22 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param return } - reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors", conf.Endpoint, API_PREFIX) + var monitor = &alerting.Monitor{} + err := decodeJSON(req.Body, &monitor) + if err != nil { + writeError(w, err) + return + } + + monitor.LastUpdateTime = time.Now().UnixNano()/1e6 + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc", config.Endpoint, orm.GetIndexName(alerting.Config{})) res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ "refresh": "wait_for", - }, req.Body) + }, IfaceMap{ + "cluster_id": id, + MONITOR_FIELD: monitor, + }) if err != nil { writeError(w, err) return @@ -367,10 +390,19 @@ func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param writeError(w, err) return } + monitorId := queryValue(resBody, "_id", "").(string) + GetScheduler().AddMonitor(monitorId, &ScheduleMonitor{ + Monitor: monitor, + ClusterID: id, + }) writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody, + "resp": IfaceMap{ + MONITOR_FIELD: monitor, + "_id": monitorId, + "_version": queryValue(resBody, "_version", 0), + }, }, http.StatusOK) } @@ -383,9 +415,56 @@ func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param } monitorId := ps.ByName("monitorID") + config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/%s", conf.Endpoint, API_PREFIX, monitorId) - res, err := doRequest(reqUrl, http.MethodDelete, nil, nil) + //change alert state to deleted and move alert to history + reqUrl := fmt.Sprintf("%s/_reindex", config.Endpoint) + query := IfaceMap{ + "bool": IfaceMap{ + "must": []IfaceMap{ + {"match": IfaceMap{ + "monitor_id": monitorId, + }}, + }, + }, + } + reqBody := IfaceMap{ + "source": IfaceMap{ + "index": getAlertIndexName(INDEX_ALERT), + "query": query, + }, + "dest": IfaceMap{ + "index": getAlertIndexName(INDEX_ALERT_HISTORY), + }, + "script": IfaceMap{ + "source": fmt.Sprintf("ctx._source['state'] = '%s';", ALERT_DELETED), + }, + } + _, err := doRequest(reqUrl, http.MethodPost,nil, reqBody) + if err != nil { + writeError(w, err) + return + } + //delete alert + reqUrl = fmt.Sprintf("%s/%s/_delete_by_query", config.Endpoint, getAlertIndexName(INDEX_ALERT)) + _, err = doRequest(reqUrl, http.MethodPost, nil, IfaceMap{ + "query" : query, + }) + if err != nil { + writeError(w, err) + return + } + + //logic delete monitor + reqUrl = fmt.Sprintf("%s/%s/_update/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), monitorId) + res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ + "refresh": "wait_for", + }, IfaceMap{ + "script" : IfaceMap{ + "source": "ctx._source.monitor.status = 'DELETED';", + "lang": "painless", + }, + }) if err != nil { writeError(w, err) return @@ -398,12 +477,12 @@ func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param writeError(w, err) return } - //TODO error handle: check whether resBody has contains field error resultIfce := queryValue(resBody, "result", "") var isOk = false - if result, ok := resultIfce.(string); ok && result == "deleted" { + if result, ok := resultIfce.(string); ok && result == "updated" { isOk = true + GetScheduler().RemoveMonitor(monitorId) } writeJSON(w, IfaceMap{ "ok": isOk, @@ -420,17 +499,36 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param } monitorId := ps.ByName("monitorID") - var ( - ifSeqNo = getQueryParam(req, "ifSeqNo") - ifPrimaryTerm = getQueryParam(req, "ifPrimaryTerm") - ) + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", config.Endpoint, orm.GetIndexName(alerting.Config{}), monitorId) - reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/%s", conf.Endpoint, API_PREFIX, monitorId) + var monitor = &alerting.Monitor{} + err := decodeJSON(req.Body, &monitor) + if err != nil { + writeError(w, err) + return + } + if len(monitor.Triggers) > 0 { + for i, trigger := range monitor.Triggers { + if trigger.ID == "" { + monitor.Triggers[i].ID = util.GetUUID() + } + if len(trigger.Actions) > 0 { + for j, action := range trigger.Actions { + if action.ID == ""{ + monitor.Triggers[i].Actions[j].ID = util.GetUUID() + } + } + } + } + } + monitor.LastUpdateTime = time.Now().UnixNano()/1e6 res, err := doRequest(reqUrl, http.MethodPut, map[string]string{ "refresh": "wait_for", - "if_seq_no": ifSeqNo, - "if_primary_term": ifPrimaryTerm, - }, req.Body) + }, IfaceMap{ + "cluster_id": id, + MONITOR_FIELD: monitor, + }) if err != nil { writeError(w, err) return @@ -444,6 +542,10 @@ func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Param return } + GetScheduler().UpdateMonitor(monitorId, &ScheduleMonitor{ + Monitor: monitor, + ClusterID: id, + }) writeJSON(w, IfaceMap{ "ok": true, "version": queryValue(resBody, "_version", ""), @@ -461,9 +563,40 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P } monitorId := ps.ByName("monitorID") + var ackAlertsReq = struct { + AlertIDs []string `json:"alerts"` + }{} + err := decodeJSON(req.Body, &ackAlertsReq) + if err != nil { + writeError(w, err) + return + } - reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/%s/_acknowledge/alerts", conf.Endpoint, API_PREFIX, monitorId) - res, err := doRequest(reqUrl, http.MethodPost,nil, req.Body) + config := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_update_by_query", getAlertIndexName(INDEX_ALERT), config.Endpoint) + reqBody := IfaceMap{ + "query": IfaceMap{ + "bool": IfaceMap{ + "must":[]IfaceMap{ + {"match": IfaceMap{ + "monitor_id": monitorId, + }}, + { + "terms": IfaceMap{ + "_id": ackAlertsReq.AlertIDs, + }, + }, + }, + }, + + }, + "script": IfaceMap{ + "source": fmt.Sprintf("ctx._source['state'] = '%s';ctx._source['acknowledged_time'] = %dL;", ALERT_ACKNOWLEDGED, time.Now().UnixNano()/1e6), + }, + } + res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ + "refresh":"", + }, reqBody) if err != nil { writeError(w, err) return @@ -478,13 +611,16 @@ func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.P } var isOk = false - if failed, ok := resBody["failed"].([]interface{}); ok && len(failed) == 0 { + if failed, ok := resBody["failures"].([]interface{}); ok && len(failed) == 0 { isOk = true } writeJSON(w, IfaceMap{ "ok": isOk, - "resp": resBody, + "resp": IfaceMap{ + "success": ackAlertsReq.AlertIDs, + "failed": []string{}, + }, }, http.StatusOK) } @@ -500,10 +636,23 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para dryrun = getQueryParam(req, "dryrun", "true") ) - reqUrl := fmt.Sprintf("%s/%s/_alerting/monitors/_execute", conf.Endpoint, API_PREFIX) - res, err := doRequest(reqUrl, http.MethodPost, map[string]string{ - "dryrun": dryrun, - }, req.Body) + var monitor = &alerting.Monitor{} + err := decodeJSON(req.Body, &monitor) + if err != nil { + writeError(w, err) + return + } + + if monitor.Name == "TEMP_MONITOR"{ + + } + if len(monitor.Inputs) == 0 { + writeError(w, errors.New("no input")) + return + } + periodStart := time.Now() + reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, strings.Join(monitor.Inputs[0].Search.Indices, ",")) + res, err := doRequest(reqUrl, http.MethodGet, nil, monitor.Inputs[0].Search.Query) if err != nil { writeError(w, err) return @@ -516,10 +665,49 @@ func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Para writeError(w, err) return } - //TODO error handle: check whether resBody has contains field error + var triggerResults = IfaceMap{} + if dryrun == "true" { + sm := ScheduleMonitor{ + Monitor: monitor, + } + for _, trigger := range monitor.Triggers { + triggerResult := IfaceMap{ + "error": nil, + "action_results": IfaceMap{}, + "name": trigger.Name, + } + monitorCtx, err := createMonitorContext(&trigger, resBody, &sm, IfaceMap{}) + if err != nil { + triggerResult["error"] = err + triggerResults[trigger.ID] = triggerResult + continue + } + isTrigger, err := resolveTriggerResult(&trigger, monitorCtx) + triggerResult["triggered"] = isTrigger + if err != nil { + triggerResult["error"] = err + } + if trigger.ID == "" { + trigger.ID = util.GetUUID() + } + triggerResults[trigger.ID] = triggerResult + } + } + + period := alertUtil.GetMonitorPeriod(&periodStart, &monitor.Schedule) writeJSON(w, IfaceMap{ "ok": true, - "resp": resBody, + "resp": IfaceMap{ + "error": nil, + "monitor_name": monitor.Name, + "input_results": IfaceMap{ + "error": nil, + "results": []IfaceMap{resBody}, + }, + "trigger_results": triggerResults, + "period_start": period.Start, + "period_end": period.End, + }, }, http.StatusOK) } diff --git a/service/alerting/overview.go b/service/alerting/overview.go new file mode 100644 index 00000000..4c8981dd --- /dev/null +++ b/service/alerting/overview.go @@ -0,0 +1,61 @@ +package alerting + +import ( + "fmt" + httprouter "infini.sh/framework/core/api/router" + "net/http" +) + +func GetAlertMetric(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + +} + +func getLastAlertDayCount() error{ + conf := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s", conf.Endpoint, getAlertIndexName(INDEX_ALL_ALERTS)) + reqBody := IfaceMap{ + "size": 0, + "query": IfaceMap{ + "bool": IfaceMap{ + "filter": []IfaceMap{ + {"range": IfaceMap{ + "start_time": IfaceMap{ + "gte": "now-3M", + }, + }}, + }, + }, + }, + "aggs": IfaceMap{ + "alert_day_count": IfaceMap{ + "date_histogram": IfaceMap{ + "field": "start_time", + "interval": "day", + }, + }, + }, + } + res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) + if err != nil { + return err + } + result := IfaceMap{} + defer res.Body.Close() + err = decodeJSON(res.Body, &result) + if err != nil { + return err + } + buckets := queryValue(result, "aggregations.alert_day_count.buckets", []interface{}{}) + var metricData []interface{} + if bks, ok := buckets.([]interface{}); ok { + for _, bk := range bks { + if bkm, ok := bk.(map[string]interface{}); ok { + metricData = append(metricData, []interface{}{ + queryValue(bkm, "key", ""), + queryValue(bkm, "doc_count", 0), + }) + } + } + } + return nil +} diff --git a/service/alerting/schedule.go b/service/alerting/schedule.go new file mode 100644 index 00000000..76284d40 --- /dev/null +++ b/service/alerting/schedule.go @@ -0,0 +1,591 @@ +package alerting + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/buger/jsonparser" + log "github.com/cihub/seelog" + "github.com/elastic/go-ucfg/yaml" + cronlib "github.com/robfig/cron" + "github.com/valyala/fasttemplate" + "infini.sh/framework/core/conditions" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/search-center/model/alerting" + "infini.sh/search-center/service/alerting/action" + "io" + "net/http" + "strings" + "sync" + "time" +) + +var alertScheduler *scheduler +var alertSchedulerOnce = &sync.Once{} + +func GetScheduler() *scheduler { + alertSchedulerOnce.Do(func(){ + alertScheduler = NewScheduler() + }) + return alertScheduler +} + +func NewScheduler() *scheduler{ + cr := cronlib.New(cronlib.WithParser(cronlib.NewParser( + cronlib.SecondOptional | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow | cronlib.Descriptor, + ))) + return &scheduler{ + cron: cr, + mu: &sync.Mutex{}, + } +} + +type scheduler struct{ + monitors sync.Map + cron *cronlib.Cron + IsStart bool + mu *sync.Mutex +} + +func (scd *scheduler) Start() error{ + scd.mu.Lock() + if scd.IsStart { + return nil + } + scd.mu.Unlock() + monitors, err := getEnabledMonitors() + if err != nil { + return err + } + for id, monitor := range monitors { + err = scd.AddMonitor(id, &monitor) + if err != nil { + return err + } + scd.monitors.Store(id, &monitor) + } + go scd.cron.Start() + return nil +} + +func (scd *scheduler) AddMonitor(key string, monitor *ScheduleMonitor) error{ + monitor.MonitorID = key + if _, ok := scd.monitors.Load(key); ok { + return errors.New("monitor already exists") + } + jobFunc := generateMonitorJob(monitor) + var cron *alerting.Cron + if monitor.Monitor.Schedule.Period != nil { + cron = convertPeriodToCronExpression( monitor.Monitor.Schedule.Period) + } + if monitor.Monitor.Schedule.Cron != nil { + cron = monitor.Monitor.Schedule.Cron + } + if cron != nil { + timezone := "" + if cron.Timezone != "" { + timezone = fmt.Sprintf("CRON_TZ=%s ", cron.Timezone) + } + //fmt.Println(timezone + cron.Expression) + entryID, err := scd.cron.AddFunc(timezone + cron.Expression, jobFunc) + if err != nil { + return err + } + monitor.EntryID = entryID + } + scd.monitors.Store(key, monitor) + return nil +} +func (scd *scheduler) Stop(){ + scd.monitors.Range(func (key, val interface{}) bool{ + monitor := val.(*ScheduleMonitor) + scd.cron.Remove(monitor.EntryID) + scd.monitors.Delete(key) + return true + }) + +} +func (scd *scheduler) RemoveMonitor(key string) bool{ + value, ok := scd.monitors.Load(key) + if ok && value != nil { + if monitor, ok := value.(*ScheduleMonitor); ok { + scd.cron.Remove(monitor.EntryID) + scd.monitors.Delete(key) + return ok + } + } + return ok +} + +func (scd *scheduler) UpdateMonitor(key string, monitor *ScheduleMonitor) error{ + scd.RemoveMonitor(key) + if monitor.Monitor.Enabled { + return scd.AddMonitor(key, monitor) + } + return nil +} + +func convertPeriodToCronExpression(period *alerting.Period) *alerting.Cron{ + var expression = "@every 1m" + switch period.Unit { + case "MINUTES": + expression = fmt.Sprintf("@every %dm", period.Interval) + case "HOURS": + expression = fmt.Sprintf("@every %dh", period.Interval) + case "DAYS": + expression = fmt.Sprintf("@every %dh", period.Interval * 24) + } + return &alerting.Cron{ + Expression: expression, + Timezone: "", + } +} + +type MonitorJob func() + +func generateMonitorJob(smt *ScheduleMonitor) MonitorJob{ + sm := *smt + return func() { + queryResult, err := getQueryResult(sm.ClusterID, &sm.Monitor.Inputs[0]) + if err != nil { + log.Error(err) + } + for _, trigger := range sm.Monitor.Triggers { + monitorCtx, err := createMonitorContext(&trigger, queryResult, &sm, IfaceMap{}) + if err != nil { + log.Error(err) + continue + } + isTrigger, err := resolveTriggerResult(&trigger, monitorCtx) + //fmt.Println("is triggered: ", isTrigger, err) + if err != nil { + log.Error(err) + continue + } + alertItem := alerting.Alert{ + TriggerId: trigger.ID, + TriggerName: trigger.Name, + MonitorId: sm.MonitorID, + MonitorName: sm.Monitor.Name, + StartTime: time.Now().UnixNano()/1e6, + Severity: trigger.Severity, + State: ALERT_COMPLETED, + ClusterID: sm.ClusterID, + } + if !isTrigger { + endTime := time.Now().UnixNano()/1e6 + alertItem.EndTime = &endTime + err = saveAlertInfo(&alertItem) + if err != nil { + log.Error(err) + } + continue + } + //check ack state + lastAlert, err := getLastAlert(sm.MonitorID, trigger.ID, sm.ClusterID) + if err != nil { + log.Error(err) + continue + } + if lastAlert != nil && lastAlert["state"].(string) == ALERT_ACKNOWLEDGED { + continue + } + + alertItem.State = ALERT_ACTIVE + for _, act := range trigger.Actions { + message, err := resolveMessage(act.MessageTemplate, monitorCtx) + if err != nil { + alertItem.ErrorMessage = err.Error() + continue + } + destination, err := resolveDestination(act.DestinationId) + if err != nil { + alertItem.ErrorMessage = err.Error() + continue + } + var tact action.Action + alertItem.LastNotificationTime = time.Now().UnixNano()/1e6 + switch destination.Type { + case action.ACTION_EMAIL: + sender, err := resolveEmailAccount(destination.Email.EmailAccountID) + if err != nil { + alertItem.ErrorMessage = err.Error() + continue + } + subject, err := resolveMessage(act.SubjectTemplate, monitorCtx) + if err != nil { + alertItem.ErrorMessage = err.Error() + continue + } + receiver, err := getEmailRecipient(destination.Email.Recipients) + if err != nil { + alertItem.ErrorMessage = err.Error() + continue + } + tact = &action.EmailAction{ + Message: string(message), + Subject: string(subject), + Sender: sender, + Receiver: receiver, + } + case action.ACTION_WEBHOOK: + tact = &action.WebhookAction{ + Data: &destination.CustomWebhook, + Message: string(message), + } + } + if tact != nil { + actResult, err := tact.Execute() + var errStr string + if err != nil { + errStr = err.Error() + alertItem.ErrorMessage += errStr + } + alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, alerting.ActionExecutionResult{ + ActionID: act.ID, + LastExecutionTime: alertItem.LastNotificationTime, + Error: errStr, + Result: string(actResult), + }) + + } + if alertItem.ErrorMessage != "" { + alertItem.State = ALERT_ERROR + } + err = saveAlertInfo(&alertItem) + if err != nil { + log.Error(err) + } + } + } + } +} + + +func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{}, error) { + conf := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, getAlertIndexName(INDEX_ALERT)) + reqBody := IfaceMap{ + "size": 1, + "query": IfaceMap{ + "bool": IfaceMap{ + "must": []IfaceMap{ + { + "match": IfaceMap{ + "monitor_id": monitorID, + }, + }, + { + "match": IfaceMap{ + "cluster_id": clusterID, + }, + }, + { + "match": IfaceMap{ + "trigger_id": triggerID, + }, + }, + }, + }, + + }, + "sort": []IfaceMap{ + {"start_time": IfaceMap{"order":"desc"}}, + }, + } + res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) + if err != nil { + return nil, err + } + var resBody = &elastic.SearchResponse{} + err = decodeJSON(res.Body, resBody) + if err != nil { + return nil, err + } + if len(resBody.Hits.Hits) > 0 { + return resBody.Hits.Hits[0].Source, nil + } + return nil, nil +} + +func saveAlertInfo(alertItem *alerting.Alert) error { + conf := getDefaultConfig() + indexName := getAlertIndexName(INDEX_ALERT) + reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, indexName) + reqBody := IfaceMap{ + "size": 1, + "query": IfaceMap{ + "bool": IfaceMap{ + "must": []IfaceMap{ + { + "match": IfaceMap{ + "monitor_id": alertItem.MonitorId, + }, + }, + //{ + // "match": IfaceMap{ + // "state": ALERT_ACTIVE, + // }, + //}, + { + "match": IfaceMap{ + "cluster_id": alertItem.ClusterID, + }, + }, + { + "match": IfaceMap{ + "trigger_id": alertItem.TriggerId, + }, + }, + }, + }, + + }, + } + res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) + if err != nil { + return err + } + var resBody = elastic.SearchResponse{} + err = decodeJSON(res.Body, &resBody) + if err != nil { + return err + } + res.Body.Close() + if len(resBody.Hits.Hits) == 0 { + if alertItem.State == ALERT_COMPLETED { + return nil + } + reqUrl = fmt.Sprintf("%s/%s/_doc", conf.Endpoint, indexName) + _,err = doRequest(reqUrl, http.MethodPost, nil, alertItem) + return err + } + currentState := queryValue(resBody.Hits.Hits[0].Source, "state", "").(string) + alertItem.Id = resBody.Hits.Hits[0].ID.(string) + if currentState != alertItem.State { + reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, getAlertIndexName(INDEX_ALERT_HISTORY), alertItem.Id) + source := resBody.Hits.Hits[0].Source + source["end_time"] = time.Now().UnixNano()/1e6 + if alertItem.State == ALERT_COMPLETED { + if currentState == ALERT_ACTIVE { + source["state"] = ALERT_COMPLETED + } + } + _,err = doRequest(reqUrl, http.MethodPut, nil, source) + reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, indexName, resBody.Hits.Hits[0].ID.(string)) + _,err = doRequest(reqUrl, http.MethodDelete, nil, alertItem) + return err + } + alertItem.StartTime = int64(queryValue(resBody.Hits.Hits[0].Source, "start_time", 0).(float64)) + + reqUrl = fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, indexName, alertItem.Id) + _,err = doRequest(reqUrl, http.MethodPut, nil, alertItem) + return err +} + +func getEmailRecipient(recipients []alerting.Recipient) ([]string, error){ + var emails []string + for _, recipient := range recipients { + if recipient.Type == "email" { + emails = append(emails, recipient.Email) + continue + } + if recipient.Type == "email_group" { + eg, err := resolveEmailGroup(recipient.EmailGroupID) + if err != nil { + return emails, err + } + for _, em := range eg.Emails { + if email, ok := em["email"].(string); ok { + emails = append(emails, email) + } + } + } + } + return emails, nil +} + +func resolveDestination(ID string)(*alerting.Destination, error){ + //todo may be cache destination ? + conf := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID) + res, err := doRequest(reqUrl, http.MethodGet, nil, nil) + if err != nil { + return nil,err + } + var resBody = IfaceMap{} + err = decodeJSON(res.Body, &resBody) + if err != nil { + return nil, err + } + res.Body.Close() + destination := &alerting.Destination{} + buf, _ := json.Marshal(queryValue(resBody, "_source."+DESTINATION_FIELD, IfaceMap{})) + _ = json.Unmarshal(buf, destination) + return destination, nil +} + +func resolveEmailAccount(ID string)(*alerting.EmailAccount, error){ + conf := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID) + res, err := doRequest(reqUrl, http.MethodGet, nil, nil) + if err != nil { + return nil,err + } + var resBody = IfaceMap{} + err = decodeJSON(res.Body, &resBody) + if err != nil { + return nil, err + } + res.Body.Close() + email := &alerting.EmailAccount{} + buf, _ := json.Marshal(queryValue(resBody, "_source."+EMAIL_ACCOUNT_FIELD, IfaceMap{})) + _ = json.Unmarshal(buf, email) + return email, nil +} + +func resolveEmailGroup(ID string)(*alerting.EmailGroup, error){ + conf := getDefaultConfig() + reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), ID) + res, err := doRequest(reqUrl, http.MethodGet, nil, nil) + if err != nil { + return nil,err + } + var resBody = IfaceMap{} + err = decodeJSON(res.Body, &resBody) + if err != nil { + return nil, err + } + res.Body.Close() + emailGroup := &alerting.EmailGroup{} + buf, _ := json.Marshal(queryValue(resBody, "_source."+EMAIL_GROUP_FIELD, IfaceMap{})) + _ = json.Unmarshal(buf, emailGroup) + return emailGroup, nil +} + +func getQueryResult(clusterID string, input *alerting.MonitorInput) (IfaceMap, error) { + conf := elastic.GetConfig(clusterID) + reqUrl := fmt.Sprintf("%s/%s/_search", conf.Endpoint, strings.Join(input.Search.Indices, ",")) + res, err := doRequest(reqUrl, http.MethodGet, nil, input.Search.Query) + if err != nil { + return nil, err + } + defer res.Body.Close() + resBody := IfaceMap{} + err = decodeJSON(res.Body, &resBody) + return resBody, err +} + +func resolveMessage(messageTemplate IfaceMap, monitorCtx []byte ) ([]byte, error){ + msg := messageTemplate["source"].(string) + tpl := fasttemplate.New(msg, "{{", "}}") + msgBuffer := bytes.NewBuffer(nil) + _, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ + keyParts := strings.Split(tag,".") + value, _, _, err := jsonparser.Get(monitorCtx, keyParts...) + if err != nil { + return 0, err + } + return writer.Write(value) + }) + return msgBuffer.Bytes(), err + //return json.Marshal(msg) +} + +func createMonitorContext(trigger *alerting.Trigger, result IfaceMap, smt *ScheduleMonitor, extra IfaceMap) ([]byte, error){ + ctx := IfaceMap{ + "_ctx": IfaceMap{ + "results": []interface{}{ + result, + }, + "trigger": trigger, + "monitor": smt.Monitor, + "cluster_id": smt.ClusterID, + "periodStart": "", + "periodEnd":"", + }, + } + return json.Marshal(ctx) +} + +func resolveTriggerResult(trigger *alerting.Trigger, monitorCtx []byte ) (bool, error){ + source := queryValue(trigger.Condition, "script.source", "") + sourceBytes := []byte(source.(string)) + config, err := yaml.NewConfig(sourceBytes) + if err != nil { + return false, err + } + var boolConfig = &conditions.Config{} + err = config.Unpack(boolConfig) + if err != nil { + return false, err + } + + cond, err := conditions.NewCondition(boolConfig) + if err != nil { + return false, err + } + + ev := &MonitorEvent{ + Fields: monitorCtx, + } + return cond.Check(ev), nil +} + +func getEnabledMonitors() (map[string]ScheduleMonitor, error){ + config := elastic.GetConfig("default") + reqUrl := fmt.Sprintf("%s/%s/_search", config.Endpoint, orm.GetIndexName(alerting.Config{})) + must := []IfaceMap{ + { + "exists": IfaceMap{ + "field": MONITOR_FIELD, + }, + }, + { + "match": IfaceMap{ + MONITOR_FIELD+".enabled": true, + }, + }, + } + reqBody := IfaceMap{ + "size": 100, + "query": IfaceMap{ + "bool": IfaceMap{ + "must": must, + }, + }, + } + resBody := elastic.SearchResponse{} + res, err := doRequest(reqUrl, http.MethodGet, nil, reqBody) + if err != nil { + return nil, err + } + err = decodeJSON(res.Body, &resBody) + if err != nil { + return nil, err + } + if len(resBody.Hits.Hits) == 0 { + return nil, nil + } + var monitors = map[string]ScheduleMonitor{} + for _, hit := range resBody.Hits.Hits { + monitor := &alerting.Monitor{} + buf, _ := json.Marshal(hit.Source[MONITOR_FIELD]) + _ = json.Unmarshal(buf, monitor) + monitors[hit.ID.(string)] = ScheduleMonitor{ + Monitor: monitor, + ClusterID: hit.Source["cluster_id"].(string), + } + } + + return monitors, nil +} + +type ScheduleMonitor struct { + Monitor *alerting.Monitor + ClusterID string + EntryID cronlib.EntryID + MonitorID string +} \ No newline at end of file diff --git a/service/alerting/schedule_test.go b/service/alerting/schedule_test.go new file mode 100644 index 00000000..2a9dd4c1 --- /dev/null +++ b/service/alerting/schedule_test.go @@ -0,0 +1,74 @@ +package alerting + +import ( + "encoding/json" + "fmt" + "infini.sh/framework/core/conditions" + "infini.sh/framework/core/util" + "src/github.com/elastic/go-ucfg/yaml" + "testing" +) + + +func TestParseYamlConfig(t *testing.T){ + yamlStr := `range: + ctx.results.hits.hits.[0]._source.number.test.gt: 3` + config, err := yaml.NewConfig([]byte(yamlStr)) + if err != nil { + t.Fatal(err) + } + var boolConfig = &conditions.Config{} + err = config.Unpack(boolConfig) + if err != nil { + t.Fatal(err) + } + fmt.Println(boolConfig.Range) + + cond, err := conditions.NewCondition(boolConfig) + if err != nil { + t.Fatal(err) + } + searchResStr := `{ + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "hits": { + "hits": [{"_source":{"number.test": 5, "normal": 7, "number": {"value": 5}}}], + "max_score": null, + "total": { + "relation": "eq", + "value": 5 + } + }, + "timed_out": false, + "took": 0 +}` + searchResults := util.MapStr{} + err = json.Unmarshal([]byte(searchResStr), &searchResults) + if err != nil { + t.Fatal(err) + } + fields := util.MapStr{ + "ctx": util.MapStr{ + "results": searchResults, + "test": util.MapStr{ + "number": 2, + }, + }, + } + //searchEvent := &event.Event{ + // Fields: fields, + //} + fieldsBytes, _ := json.Marshal(fields) + tevent := &MonitorEvent{ + Fields: fieldsBytes, + } + //fmt.Println(cond.Check(searchEvent)) + fmt.Println(cond.Check(tevent)) +} + + + diff --git a/service/alerting/util/period.go b/service/alerting/util/period.go new file mode 100644 index 00000000..cbc8a6ba --- /dev/null +++ b/service/alerting/util/period.go @@ -0,0 +1,78 @@ +package util + +import ( + "fmt" + cronlib "github.com/robfig/cron" + "infini.sh/search-center/model/alerting" + "time" +) + +type MonitorPeriod struct { + Start int64 + End int64 +} + +func GetMonitorPeriod(currentTime *time.Time, schedule *alerting.Schedule) *MonitorPeriod{ + if schedule.Period != nil { + return transformPeriod(currentTime, schedule.Period) + } + if schedule.Cron != nil { + return transformCron(currentTime, schedule.Cron) + } + return nil +} + + +func transformCron(currentTime *time.Time, cron *alerting.Cron) *MonitorPeriod { + timezone := "" + if cron.Timezone != "" { + timezone = fmt.Sprintf("CRON_TZ=%s ", cron.Timezone) + } + + parser := cronlib.NewParser( + cronlib.SecondOptional | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow, + ) + + sd, _ := parser.Parse(timezone + cron.Expression) + ssd := sd.(*cronlib.SpecSchedule) + var duration = time.Minute + if ssd.Hour == 1 { + duration = time.Hour + } + tempTime := *currentTime + nextTime := sd.Next(tempTime) + var preTime = tempTime + for { + tempTime = tempTime.Add(-duration) + if preTime = sd.Next(tempTime); !preTime.Equal(nextTime) { + break + } + } + mp := &MonitorPeriod{ + Start: preTime.UnixNano()/1e6, + End: currentTime.UnixNano()/1e6, + } + return mp +} + +func transformPeriod(currentTime *time.Time, period *alerting.Period) *MonitorPeriod { + if period == nil { + return nil + } + mp := &MonitorPeriod{ + End: currentTime.UnixNano()/1e6, + } + var duration time.Duration + switch period.Unit { + case "MINUTES": + duration = time.Minute + case "HOURS": + duration = time.Hour + case "DAYS": + duration = time.Hour * 24 + default: + return nil + } + mp.Start = currentTime.Add(-duration * time.Duration(period.Interval)).UnixNano()/1e6 + return mp +} \ No newline at end of file diff --git a/service/alerting/util/period_test.go b/service/alerting/util/period_test.go new file mode 100644 index 00000000..e3b1b433 --- /dev/null +++ b/service/alerting/util/period_test.go @@ -0,0 +1,22 @@ +package util + +import ( + "fmt" + "infini.sh/search-center/model/alerting" + "testing" + "time" +) + +func TestGetMonitorPeriod(t *testing.T) { + now := time.Now() + periods := GetMonitorPeriod(&now, &alerting.Schedule{ + Cron: &alerting.Cron{ + Expression: "0 0 1 */1 *", + }, + //Period: &alerting.Period{ + // Unit: "MINUTES", + // Interval: 10, + //}, + }) + fmt.Println(periods) +} \ No newline at end of file diff --git a/web/src/components/kibana/console/components/Console.tsx b/web/src/components/kibana/console/components/Console.tsx index 8b64c8cd..de431164 100644 --- a/web/src/components/kibana/console/components/Console.tsx +++ b/web/src/components/kibana/console/components/Console.tsx @@ -91,7 +91,7 @@ const Console = (params:props) => { return ( - + diff --git a/web/src/components/kibana/console/components/ConsoleInput.tsx b/web/src/components/kibana/console/components/ConsoleInput.tsx index 93d7b9d7..3fa955ee 100644 --- a/web/src/components/kibana/console/components/ConsoleInput.tsx +++ b/web/src/components/kibana/console/components/ConsoleInput.tsx @@ -141,7 +141,7 @@ const ConsoleInputUI = ({clusterID, initialText}:ConsoleInputProps) => { } }, []); useEffect(()=>{ - retrieveAutoCompleteInfo(settings, settings.getAutocomplete()); + retrieveAutoCompleteInfo(settings, settings.getAutocomplete(), clusterID); },[clusterID]) const handleSaveAsCommonCommand = async () => { diff --git a/web/src/components/kibana/console/contexts/services_context.tsx b/web/src/components/kibana/console/contexts/services_context.tsx index 9ab6afad..6e399e56 100644 --- a/web/src/components/kibana/console/contexts/services_context.tsx +++ b/web/src/components/kibana/console/contexts/services_context.tsx @@ -44,6 +44,7 @@ interface ContextServices { export interface ContextValue { services: ContextServices; + clusterID: string; } interface ContextProps { diff --git a/web/src/components/kibana/console/entities/es_request.ts b/web/src/components/kibana/console/entities/es_request.ts index c4dd12d8..754f1702 100644 --- a/web/src/components/kibana/console/entities/es_request.ts +++ b/web/src/components/kibana/console/entities/es_request.ts @@ -20,6 +20,7 @@ export type BaseResponseType = export interface EsRequestArgs { requests: Array<{ url: string; method: string; data: string[] }>; + clusterID: string; } export interface ESResponseObject { diff --git a/web/src/components/kibana/console/hooks/use_send_current_request_to_es/index.ts b/web/src/components/kibana/console/hooks/use_send_current_request_to_es/index.ts index 37265895..3aa7e6c7 100644 --- a/web/src/components/kibana/console/hooks/use_send_current_request_to_es/index.ts +++ b/web/src/components/kibana/console/hooks/use_send_current_request_to_es/index.ts @@ -47,7 +47,7 @@ function buildRawCommonCommandRequest(cmd:any){ } export const useSendCurrentRequestToES = () => { const dispatch = useRequestActionContext(); - const { services: { history } } = useServicesContext(); + const { services: { history }, clusterID } = useServicesContext(); return useCallback(async () => { try { @@ -80,7 +80,7 @@ export const useSendCurrentRequestToES = () => { dispatch({ type: 'sendRequest', payload: undefined }); // @ts-ignore - const results = await sendRequestToES({ requests }); + const results = await sendRequestToES({ requests, clusterID }); // let saveToHistoryError: undefined | Error; diff --git a/web/src/components/kibana/console/hooks/use_send_current_request_to_es/send_request_to_es.ts b/web/src/components/kibana/console/hooks/use_send_current_request_to_es/send_request_to_es.ts index 2a84ec08..9fa2cf30 100644 --- a/web/src/components/kibana/console/hooks/use_send_current_request_to_es/send_request_to_es.ts +++ b/web/src/components/kibana/console/hooks/use_send_current_request_to_es/send_request_to_es.ts @@ -69,7 +69,7 @@ export function sendRequestToES(args: EsRequestArgs): Promise } // append a new line for bulk requests. const startTime = Date.now(); - send(esMethod, esPath, esData).always( + send(esMethod, esPath, esData, {clusterID: args.clusterID}).always( (dataOrjqXHR, textStatus: string, jqXhrORerrorThrown) => { if (reqId !== CURRENT_REQ_ID) { return; diff --git a/web/src/components/kibana/console/modules/es/index.ts b/web/src/components/kibana/console/modules/es/index.ts index 9599bfbe..43d00643 100644 --- a/web/src/components/kibana/console/modules/es/index.ts +++ b/web/src/components/kibana/console/modules/es/index.ts @@ -37,6 +37,7 @@ import { stringify } from 'query-string'; interface SendOptions { asSystemRequest?: boolean; + clusterID?: string; } const esVersion: string[] = []; @@ -62,15 +63,15 @@ export function send( method: string, path: string, data: string | object, - { asSystemRequest }: SendOptions = {} + { asSystemRequest, clusterID }: SendOptions = {} ) { const wrappedDfd = $.Deferred(); - const clusterID = extractClusterIDFromURL(); - if(!clusterID){ - console.log('can not get clusterid from url'); - return; - } + // const clusterID = extractClusterIDFromURL(); + // if(!clusterID){ + // console.log('can not get clusterid from url'); + // return; + // } // @ts-ignore const options: JQuery.AjaxSettings = { url: `/elasticsearch/${clusterID}/_proxy?` + stringify({ path, method }), diff --git a/web/src/components/kibana/console/modules/mappings/mappings.js b/web/src/components/kibana/console/modules/mappings/mappings.js index 4faaac5e..9e2a42f9 100644 --- a/web/src/components/kibana/console/modules/mappings/mappings.js +++ b/web/src/components/kibana/console/modules/mappings/mappings.js @@ -267,7 +267,7 @@ export function clear() { templates = []; } -function retrieveSettings(settingsKey, settingsToRetrieve) { +function retrieveSettings(settingsKey, settingsToRetrieve, clusterID) { const settingKeyToPathMap = { fields: '_mapping', indices: '_aliases', @@ -280,7 +280,7 @@ function retrieveSettings(settingsKey, settingsToRetrieve) { if(settingsKey === 'commands'){ return es.queryCommonCommands(); } - return es.send('GET', settingKeyToPathMap[settingsKey], null, true); + return es.send('GET', settingKeyToPathMap[settingsKey], null, {clusterID, asSystemRequest: true}); } else { const settingsPromise = new $.Deferred(); if (settingsToRetrieve[settingsKey] === false) { @@ -319,13 +319,13 @@ function getObject(value){ * @param settings Settings A way to retrieve the current settings * @param settingsToRetrieve any */ -export function retrieveAutoCompleteInfo(settings, settingsToRetrieve) { +export function retrieveAutoCompleteInfo(settings, settingsToRetrieve, clusterID) { clearSubscriptions(); - const mappingPromise = retrieveSettings('fields', settingsToRetrieve); - const aliasesPromise = retrieveSettings('indices', settingsToRetrieve); - const templatesPromise = retrieveSettings('templates', settingsToRetrieve); - const commandsPromise = retrieveSettings('commands', settingsToRetrieve); + const mappingPromise = retrieveSettings('fields', settingsToRetrieve, clusterID); + const aliasesPromise = retrieveSettings('indices', settingsToRetrieve, clusterID); + const templatesPromise = retrieveSettings('templates', settingsToRetrieve, clusterID); + const commandsPromise = retrieveSettings('commands', settingsToRetrieve, clusterID); $.when(mappingPromise, aliasesPromise, templatesPromise, commandsPromise).done((mappings, aliases, templates, commands) => { diff --git a/web/src/layouts/BasicLayout.js b/web/src/layouts/BasicLayout.js index 08ea14aa..7253f611 100644 --- a/web/src/layouts/BasicLayout.js +++ b/web/src/layouts/BasicLayout.js @@ -103,12 +103,6 @@ class BasicLayout extends React.PureComponent { dispatch({ type: 'setting/getSetting', }); - // dispatch({ - // type: 'cluster/fetchClusterVersion', - // payload: { - // cluster: 'single-es' - // } - // }); this.renderRef = requestAnimationFrame(() => { this.setState({ rendering: false, diff --git a/web/src/locales/zh-CN/alert.js b/web/src/locales/zh-CN/alert.js index c886d655..41473547 100644 --- a/web/src/locales/zh-CN/alert.js +++ b/web/src/locales/zh-CN/alert.js @@ -22,7 +22,7 @@ export default { 'alert.dashboard.create-monitor-text': '暂无监控项。 创建监控项以添加触发器和操作。 一旦触发警报,状态将显示在此表中。', 'alert.dashboard.create-trigger-text': '暂无监控项。 创建触发器以开始警报。 一旦触发警报,状态将显示在此表中。', 'alert.dashboard.table.columns.start_time': '告警开始时间', - 'alert.dashboard.table.columns.end_time': '告警截止时间', + 'alert.dashboard.table.columns.end_time': '告警结束时间', 'alert.dashboard.table.columns.monitor_name': '监控项名称', 'alert.dashboard.table.columns.trigger_name': '触发器名称', 'alert.dashboard.table.columns.severity': '告警级别', diff --git a/web/src/models/global.js b/web/src/models/global.js index caa25951..8e0553a1 100644 --- a/web/src/models/global.js +++ b/web/src/models/global.js @@ -5,12 +5,15 @@ import {formatESSearchResult, extractClusterIDFromURL} from '@/lib/elasticsearch import {Modal} from 'antd'; import router from "umi/router"; +const MENU_COLLAPSED_KEY = "search-center:menu:collapsed"; +console.log(localStorage.getItem(MENU_COLLAPSED_KEY)) export default { namespace: 'global', state: { - collapsed: false, + collapsed: localStorage.getItem(MENU_COLLAPSED_KEY) === 'true', + isInitCollapsed:false, notices: [], clusterVisible: true, clusterList: [], @@ -160,9 +163,18 @@ export default { reducers: { changeLayoutCollapsed(state, { payload }) { + //layout sider init(false) bug + if(!state.isInitCollapsed && state.collapsed){ + return { + ...state, + isInitCollapsed: true, + }; + } + localStorage.setItem(MENU_COLLAPSED_KEY, payload); return { ...state, collapsed: payload, + isInitCollapsed: true, }; }, saveNotices(state, { payload }) { diff --git a/web/src/pages/Alerting/index.js b/web/src/pages/Alerting/index.js index aa1a2467..0c170ac0 100644 --- a/web/src/pages/Alerting/index.js +++ b/web/src/pages/Alerting/index.js @@ -40,7 +40,7 @@ const AlertingUI = (props)=>{ } useMemo(()=>{ httpClient.params.basePath.prepend = (url)=>{ - return '/elasticsearch/'+ props.selectedCluster.id + url; + return '/elasticsearch/'+ props.selectedCluster.id +"/" + url; } }, [props.selectedCluster]); const isDarkMode = false; diff --git a/web/src/pages/Alerting/pages/CreateMonitor/components/Schedule/Frequencies/Frequency.js b/web/src/pages/Alerting/pages/CreateMonitor/components/Schedule/Frequencies/Frequency.js index da9923e6..5c98cf81 100644 --- a/web/src/pages/Alerting/pages/CreateMonitor/components/Schedule/Frequencies/Frequency.js +++ b/web/src/pages/Alerting/pages/CreateMonitor/components/Schedule/Frequencies/Frequency.js @@ -20,9 +20,9 @@ import { isInvalid, hasError } from '../../../../../utils/validate'; const frequencies = [ { value: 'interval', text: 'By interval' }, - { value: 'daily', text: 'Daily' }, - { value: 'weekly', text: 'Weekly' }, - { value: 'monthly', text: 'Monthly' }, + // { value: 'daily', text: 'Daily' }, + // { value: 'weekly', text: 'Weekly' }, + // { value: 'monthly', text: 'Monthly' }, { value: 'cronExpression', text: 'Custom cron expression' }, ]; diff --git a/web/src/pages/Alerting/pages/CreateMonitor/containers/CreateMonitor/utils/monitorToFormik.js b/web/src/pages/Alerting/pages/CreateMonitor/containers/CreateMonitor/utils/monitorToFormik.js index 67fe7e0a..688928cb 100644 --- a/web/src/pages/Alerting/pages/CreateMonitor/containers/CreateMonitor/utils/monitorToFormik.js +++ b/web/src/pages/Alerting/pages/CreateMonitor/containers/CreateMonitor/utils/monitorToFormik.js @@ -21,7 +21,7 @@ import { SEARCH_TYPE, INPUTS_DETECTOR_ID } from '../../../../../utils/constants' export default function monitorToFormik(monitor) { const formikValues = _.cloneDeep(FORMIK_INITIAL_VALUES); if (!monitor) return formikValues; - const { + let { name, enabled, schedule: { cron: { expression: cronExpression = formikValues.cronExpression, timezone } = {} }, @@ -32,6 +32,11 @@ export default function monitorToFormik(monitor) { // In that case we don't want to guess on the UI what selections a user made, so we will default to just showing the extraction query const { searchType = 'query', fieldName } = search; const isAD = searchType === SEARCH_TYPE.AD; + if(monitor.schedule) { + schedule = monitor.schedule + schedule.cron && (schedule.frequency = "cronExpression") + schedule.period && (schedule.frequency = "interval") + } return { /* INITIALIZE WITH DEFAULTS */ diff --git a/web/src/pages/Alerting/pages/CreateMonitor/containers/DefineMonitor/DefineMonitor.js b/web/src/pages/Alerting/pages/CreateMonitor/containers/DefineMonitor/DefineMonitor.js index d09bb9ba..d1a22a2a 100644 --- a/web/src/pages/Alerting/pages/CreateMonitor/containers/DefineMonitor/DefineMonitor.js +++ b/web/src/pages/Alerting/pages/CreateMonitor/containers/DefineMonitor/DefineMonitor.js @@ -127,12 +127,13 @@ class DefineMonitor extends Component { async getPlugins() { const { httpClient } = this.props; try { - const pluginsResponse = await httpClient.get('/alerting/_plugins'); - if (pluginsResponse.ok) { - this.setState({ plugins: pluginsResponse.resp.map((plugin) => plugin.component) }); - } else { - console.error('There was a problem getting plugins list'); - } + return []; + // const pluginsResponse = await httpClient.get('/alerting/_plugins'); + // if (pluginsResponse.ok) { + // this.setState({ plugins: pluginsResponse.resp.map((plugin) => plugin.component) }); + // } else { + // console.error('There was a problem getting plugins list'); + // } } catch (e) { console.error('There was a problem getting plugins list', e); } diff --git a/web/src/pages/Alerting/pages/CreateTrigger/containers/ConfigureActions/ConfigureActions.js b/web/src/pages/Alerting/pages/CreateTrigger/containers/ConfigureActions/ConfigureActions.js index 9e22c4e6..74befca3 100644 --- a/web/src/pages/Alerting/pages/CreateTrigger/containers/ConfigureActions/ConfigureActions.js +++ b/web/src/pages/Alerting/pages/CreateTrigger/containers/ConfigureActions/ConfigureActions.js @@ -71,8 +71,8 @@ class ConfigureActions extends React.Component { label: `${destination.name} - (${getDestinationLabel(destination)})`, value: destination.id, type: destination.type, - })) - .filter(({ type }) => allowList.includes(type)); + })); + // .filter(({ type }) => allowList.includes(type)); this.setState({ destinations, loadingDestinations: false }); // If actions is not defined If user choose to delete actions, it will not override customer's preferences. if (destinations.length > 0 && !values.actions && !actionDeleted) { diff --git a/web/src/pages/Alerting/pages/CreateTrigger/containers/CreateTrigger/CreateTrigger.js b/web/src/pages/Alerting/pages/CreateTrigger/containers/CreateTrigger/CreateTrigger.js index 2239ccea..22f51cc8 100644 --- a/web/src/pages/Alerting/pages/CreateTrigger/containers/CreateTrigger/CreateTrigger.js +++ b/web/src/pages/Alerting/pages/CreateTrigger/containers/CreateTrigger/CreateTrigger.js @@ -69,7 +69,8 @@ export default class CreateTrigger extends Component { onCreate = (trigger, triggerMetadata, { setSubmitting, setErrors }) => { const { monitor, updateMonitor, onCloseTrigger } = this.props; - const { ui_metadata: uiMetadata, triggers } = monitor; + let { ui_metadata: uiMetadata, triggers } = monitor; + uiMetadata = uiMetadata || {triggers:[]}; const updatedTriggers = [trigger].concat(triggers); const updatedUiMetadata = { ...uiMetadata, diff --git a/web/src/pages/Alerting/pages/Dashboard/components/AlertList/AlertItem.tsx b/web/src/pages/Alerting/pages/Dashboard/components/AlertList/AlertItem.tsx new file mode 100644 index 00000000..66035ac3 --- /dev/null +++ b/web/src/pages/Alerting/pages/Dashboard/components/AlertList/AlertItem.tsx @@ -0,0 +1,60 @@ +import {List, Icon} from 'antd'; +import './alertitem.scss'; +import moment from 'moment'; + +type TimeValue = number | null; + +interface ActionExecutionResult { + action_id: string; + last_execution_time: TimeValue, + result?: string; +} + +export interface AlertRecord { + acknowledged_time: TimeValue, + action_execution_results?: ActionExecutionResult[]; + cluster_id: string; + end_time: TimeValue; + error_message: string; + id: string; + last_notification_time: TimeValue; + monitor_id: string; + monitor_name: string; + severity: string; + start_time: TimeValue; + state: string; + trigger_id: string; + trigger_name: string; +} + +interface AlertItemProps { + item: AlertRecord; + onClick: (item: AlertRecord)=>void +} + +export const AlertItem = ({ + item, + onClick +}: AlertItemProps)=>{ + return ( + {onClick(item)}} + key={item.id} + className="alert" + > +
+
+
{item.severity}
+
+
{item.monitor_name+":"+item.trigger_name}
+
+
{moment(item.start_time).fromNow()}
+
+ +
+
+
+
+ ) +} + diff --git a/web/src/pages/Alerting/pages/Dashboard/components/AlertList/AlertList.tsx b/web/src/pages/Alerting/pages/Dashboard/components/AlertList/AlertList.tsx new file mode 100644 index 00000000..b5c67150 --- /dev/null +++ b/web/src/pages/Alerting/pages/Dashboard/components/AlertList/AlertList.tsx @@ -0,0 +1,42 @@ +import {List} from 'antd'; +import {AlertItem, AlertRecord} from './AlertItem'; +import './alertlist.scss'; + +interface AlertListProps { + dataSource: AlertRecord[]; + pagination?: any; + title: string; + onItemClick: (item: AlertRecord)=>void +} + +export const AlertList = ({ + dataSource, + pagination, + title, + onItemClick, +}: AlertListProps)=>{ + return ( +
+
+ {title} + ({pagination?.total}) +
+ { + console.log(page); + }, + pageSize: 20, + ...pagination, + }} + dataSource={dataSource} + renderItem={item => ( + + )} + /> +
+ ) + +} diff --git a/web/src/pages/Alerting/pages/Dashboard/components/AlertList/alertitem.scss b/web/src/pages/Alerting/pages/Dashboard/components/AlertList/alertitem.scss new file mode 100644 index 00000000..97629ed4 --- /dev/null +++ b/web/src/pages/Alerting/pages/Dashboard/components/AlertList/alertitem.scss @@ -0,0 +1,54 @@ +.alert { + background-color: #fff; + border-radius: 5px; + border-bottom: none !important; + overflow:hidden; + padding: 0 !important; + margin-bottom: 5px; + &:hover{ + cursor: pointer; + background-color: rgb(147, 158, 160); + color: #fff; + } + .wrapper{ + display: flex; + min-height: 50px; + align-items: center; + .status{ + width: 40px; + align-self: stretch; + display: inline-flex; + align-items: center; + justify-content: center; + color: #fff; + font-size: 20px; + &.COMPLETED{ + background-color: sandybrown; + } + &.ACKNOWLEDGED{ + background-color: seagreen; + } + &.ACTIVE{ + background-color:tomato; + } + &.ERROR{ + background-color:red; + } + &.DELETED{ + background-color: gray; + } + } + .content{ + padding-left: 10px; + } + .right{ + margin-left: auto; + flex: 0 0 120px; + display: inline-flex; + .arrow{ + margin-left: auto; + padding-right: 10px; + } + } + } +} \ No newline at end of file diff --git a/web/src/pages/Alerting/pages/Dashboard/components/AlertList/alertlist.scss b/web/src/pages/Alerting/pages/Dashboard/components/AlertList/alertlist.scss new file mode 100644 index 00000000..e6e3dfe4 --- /dev/null +++ b/web/src/pages/Alerting/pages/Dashboard/components/AlertList/alertlist.scss @@ -0,0 +1,14 @@ +.alert-list{ + background: #f0f2f5; + padding: 10px 5px; + .title{ + color: #333; + font-weight:600; + padding-bottom: 6px; + .total{ + color: #999; + margin-left: 15px; + font-size: 12px; + } + } +} \ No newline at end of file diff --git a/web/src/pages/Alerting/pages/Dashboard/containers/AlertOverview.tsx b/web/src/pages/Alerting/pages/Dashboard/containers/AlertOverview.tsx new file mode 100644 index 00000000..d71fce18 --- /dev/null +++ b/web/src/pages/Alerting/pages/Dashboard/containers/AlertOverview.tsx @@ -0,0 +1,76 @@ +import {AlertList} from '../components/AlertList/AlertList'; +import _ from 'lodash'; +import {useState, useEffect} from 'react'; +import './alertoverview.scss'; + +export const AlertOverview = (props: any)=>{ + const {httpClient, history} = props; + const [data, setData] = useState({ + alerts: [], + totalAlerts: 0, + }); + + const getAlerts = _.debounce( + (from, size, search, sortField, sortDirection, severityLevel, alertState, monitorIds) => { + let params = { + from, + size, + search, + sortField, + sortDirection, + severityLevel, + alertState, + }; + if(monitorIds){ + params["monitorIds"]= monitorIds; + } + // const queryParamsString = queryString.stringify(params); + // history.replace({ ...this.props.location, search: queryParamsString }); + httpClient.get('/alerting/alerts', { query: params }).then((resp:any) => { + if (resp.ok) { + const { alerts, totalAlerts } = resp; + setData({ + alerts, + totalAlerts, + }); + } else { + console.log('error getting alerts:', resp); + } + }); + }, + 500, + { leading: true } + ); + + const pageSize = 10; + useEffect(()=>{ + getAlerts(0, pageSize, "", "start_time", "desc", "ALL", "ALL","") + },[]) + + const onPageChange = (pageIndex: number)=>{ + const from = (pageIndex - 1) * pageSize; + getAlerts(from, pageSize, "", "start_time", "desc", "ALL", "ALL","") + } + + const onItemClick = (item: any)=>{ + history.push(`/monitors/${item.monitor_id}`) + } + + return ( +
+
+ +
+
+
提示
+
+
+ ) +} \ No newline at end of file diff --git a/web/src/pages/Alerting/pages/Dashboard/containers/Dashboard.js b/web/src/pages/Alerting/pages/Dashboard/containers/Dashboard.js index 62c83a99..ca000e2c 100644 --- a/web/src/pages/Alerting/pages/Dashboard/containers/Dashboard.js +++ b/web/src/pages/Alerting/pages/Dashboard/containers/Dashboard.js @@ -16,7 +16,7 @@ import React, { Component } from 'react'; import _ from 'lodash'; import queryString from 'query-string'; -import { EuiBasicTable, EuiButton, EuiHorizontalRule, EuiIcon } from '@elastic/eui'; +import { EuiBasicTable, EuiButton, EuiHorizontalRule, EuiIcon,RIGHT_ALIGNMENT,EuiButtonIcon } from '@elastic/eui'; import ContentPanel from '../../../components/ContentPanel'; import DashboardEmptyPrompt from '../components/DashboardEmptyPrompt'; @@ -55,6 +55,21 @@ export default class Dashboard extends Component { sortField, } = this.getURLQueryParams(); + const tableColumns = [...columns, { + align: RIGHT_ALIGNMENT, + width: '40px', + isExpander: true, + render: (item) => { + const {itemIdToExpandedRowMap} = this.state; + ( + toggleDetails(item)} + aria-label={itemIdToExpandedRowMap[item.id] ? 'Collapse' : 'Expand'} + iconType={itemIdToExpandedRowMap[item.id] ? 'arrowUp' : 'arrowDown'} + /> + )}, + }] + this.state = { alerts: [], alertState, @@ -67,6 +82,8 @@ export default class Dashboard extends Component { sortDirection, sortField, totalAlerts: 0, + itemIdToExpandedRowMap:{}, + columns: tableColumns, }; } @@ -357,6 +374,8 @@ export default class Dashboard extends Component { `${item.id}-${item.version}`} - columns={columns} + columns={this.state.columns} pagination={pagination} sorting={sorting} isSelectable={true} diff --git a/web/src/pages/Alerting/pages/Dashboard/containers/alertoverview.scss b/web/src/pages/Alerting/pages/Dashboard/containers/alertoverview.scss new file mode 100644 index 00000000..a30a6d93 --- /dev/null +++ b/web/src/pages/Alerting/pages/Dashboard/containers/alertoverview.scss @@ -0,0 +1,9 @@ +.alert-overview{ + display: flex; + .right{ + flex: 0 0 30%; + } + .left{ + flex: 1 1 auto; + } +} \ No newline at end of file diff --git a/web/src/pages/Alerting/pages/Dashboard/utils/tableUtils.js b/web/src/pages/Alerting/pages/Dashboard/utils/tableUtils.js index 75108c9a..f6a64f68 100644 --- a/web/src/pages/Alerting/pages/Dashboard/utils/tableUtils.js +++ b/web/src/pages/Alerting/pages/Dashboard/utils/tableUtils.js @@ -95,5 +95,5 @@ export const columns = [ truncateText: false, render: renderTime, dataType: 'date', - }, + } ]; diff --git a/web/src/pages/Alerting/pages/Destinations/components/createDestinations/Email/Sender.js b/web/src/pages/Alerting/pages/Destinations/components/createDestinations/Email/Sender.js index ce334bd9..19178cab 100644 --- a/web/src/pages/Alerting/pages/Destinations/components/createDestinations/Email/Sender.js +++ b/web/src/pages/Alerting/pages/Destinations/components/createDestinations/Email/Sender.js @@ -19,6 +19,7 @@ import { FormikFieldText, FormikFieldNumber, FormikSelect, + FormikFieldPassword, } from '../../../../../components/FormControls'; import { isInvalid, hasError } from '../../../../../utils/validate'; import { validateEmail, validateHost, validatePort, validateSenderName } from './utils/validate'; @@ -101,6 +102,28 @@ const Sender = ({ sender, arrayHelpers, context, index, onDelete }) => { }} /> + + { + field.onChange(e); + onSenderChange(index, sender, arrayHelpers); + }, + isInvalid, + }} + /> + + + { const { allowList } = this.state; - return allowList.includes(DESTINATION_TYPE.EMAIL); + return true; + // return allowList.includes(DESTINATION_TYPE.EMAIL); }; isDeleteAllowed = async (type, id) => { diff --git a/web/src/pages/Alerting/pages/Home/Home.js b/web/src/pages/Alerting/pages/Home/Home.js index 498fd4ba..a8221edb 100644 --- a/web/src/pages/Alerting/pages/Home/Home.js +++ b/web/src/pages/Alerting/pages/Home/Home.js @@ -21,6 +21,7 @@ import Dashboard from '../Dashboard/containers/Dashboard'; import Monitors from '../Monitors/containers/Monitors'; import DestinationsList from '../Destinations/containers/DestinationsList'; import { formatMessage } from 'umi/locale'; +import {AlertOverview} from '../Dashboard/containers/AlertOverview'; const getSelectedTabId = (pathname) => { if (pathname.includes('monitors')) return 'monitors'; @@ -100,7 +101,8 @@ export default class Home extends Component { // exact path="/dashboard" render={(props) => ( - + // + )} /> { + return ( +
+ +
+ ); +} \ No newline at end of file diff --git a/web/src/pages/Alerting/utils/constants.js b/web/src/pages/Alerting/utils/constants.js index cd16eec2..cb15664b 100644 --- a/web/src/pages/Alerting/utils/constants.js +++ b/web/src/pages/Alerting/utils/constants.js @@ -66,10 +66,10 @@ export const AD_PREVIEW_DAYS = 7; export const MAX_QUERY_RESULT_SIZE = 200; -export const OPEN_DISTRO_PREFIX = 'opendistro'; +export const OPEN_DISTRO_PREFIX = 'infini-search-center'; export const PLUGIN_NAME = `alerting`; -export const INDEX_PREFIX = `${OPEN_DISTRO_PREFIX}-alerting`; +export const INDEX_PREFIX = `${OPEN_DISTRO_PREFIX}_alerting`; export const INDEX = { SCHEDULED_JOBS: `.${INDEX_PREFIX}-config`, ALERTS: `.${INDEX_PREFIX}-alerts`, diff --git a/web/src/pages/Cluster/Metrics.js b/web/src/pages/Cluster/Metrics.js index 1c925d89..9ba0b411 100644 --- a/web/src/pages/Cluster/Metrics.js +++ b/web/src/pages/Cluster/Metrics.js @@ -166,6 +166,7 @@ const MonitorDatePicker = ({timeRange, commonlyUsedRanges, onChange, isLoading}) return ( { this.fetchData(); }); @@ -374,6 +373,26 @@ class ClusterMonitor extends PureComponent { const commonlyUsedRanges = [ + { + from: 'now-15m', + to: 'now', + display: '最近15分钟' + }, + { + from: 'now-30m', + to: 'now', + display: '最近30分钟' + }, + { + from: 'now-1h', + to: 'now', + display: '最近一小时' + }, + { + from: 'now-24h', + to: 'now', + display: '最近一天', + }, { from: 'now/d', to: 'now/d', @@ -384,26 +403,6 @@ class ClusterMonitor extends PureComponent { to: 'now/w', display: '这个星期' }, - { - from: 'now-15m', - to: 'now', - display: '最近15分钟' - }, - { - from: 'now-30m', - to: 'now', - display: '最近三十分钟' - }, - { - from: 'now-1h', - to: 'now', - display: '最近一小时' - }, - { - from: 'now-24h', - to: 'now', - display: '最近一天', - }, { from: 'now-7d', to: 'now', @@ -503,7 +502,7 @@ class ClusterMonitor extends PureComponent {