diff --git a/main.go b/main.go index de9f289e..459ce3d5 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,7 @@ import ( "infini.sh/console/model" "infini.sh/console/model/alerting" "infini.sh/console/model/gateway" - alertSrv "infini.sh/console/service/alerting" + _ "infini.sh/console/plugin" "infini.sh/framework" "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" @@ -117,19 +117,19 @@ func main() { module.Start() + orm.RegisterSchemaWithIndexName(model.Dict{}, "dict") orm.RegisterSchemaWithIndexName(model.Reindex{}, "reindex") orm.RegisterSchemaWithIndexName(elastic.View{}, "view") - orm.RegisterSchemaWithIndexName(alerting.Config{}, "alerting-config") orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alerting-alerts") - orm.RegisterSchemaWithIndexName(alerting.AlertingHistory{}, "alerting-history") orm.RegisterSchemaWithIndexName(elastic.CommonCommand{}, "commands") orm.RegisterSchemaWithIndexName(elastic.TraceTemplate{}, "trace-template") orm.RegisterSchemaWithIndexName(gateway.Instance{} , "gateway-instance") + orm.RegisterSchemaWithIndexName(alerting.Rule{} , "alert-rule") + orm.RegisterSchemaWithIndexName(alerting.Alert{} , "alert-history") api.RegisterSchema() - alertSrv.GetScheduler().Start() }, nil) { app.Run() diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 32651236..515550e1 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -1,35 +1,40 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + package alerting -type Alert struct { - ClusterID string `json:"cluster_id" elastic_mapping:"cluster_id:{type:keyword}"` - ClusterName string `json:"cluster_name" elastic_mapping:"cluster_name:{type:text}"` - AcknowledgedTime *int64 `json:"acknowledged_time" elastic_mapping:"acknowledged_time:{type:date}"` - ActionExecutionResults []ActionExecutionResult `json:"action_execution_results" elastic_mapping:"action_execution_results:{type:object}"` - AlertHistories []AlertHistory `json:"alert_history" elastic_mapping:"alert_history:{type:object}"` - EndTime *int64 `json:"end_time" elastic_mapping:"end_time:{type:date}"` - ErrorMessage string `json:"error_message" elastic_mapping:"error_message:{type:text}"` - Id string `json:"id" elastic_mapping:"id:{type:keyword}"` - LastNotificationTime int64 `json:"last_notification_time" elastic_mapping:"last_notification_time:{type:date}"` - MonitorId string `json:"monitor_id" elastic_mapping:"monitor_id:{type:keyword}"` - MonitorName string `json:"monitor_name" elastic_mapping:"monitor_name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}"` - Severity string `json:"severity" elastic_mapping:"severity:{type:keyword}"` - StartTime int64 `json:"start_time" elastic_mapping:"start_time:{type:date}"` - State string `json:"state" elastic_mapping:"state:{type:keyword}"` - TriggerId string `json:"trigger_id" elastic_mapping:"trigger_id:{type:keyword}"` - TriggerName string `json:"trigger_name" elastic_mapping:"trigger_name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}"` -} +import ( + "time" +) -type AlertingHistory Alert +type Alert struct { + ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` + Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` + Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` + RuleID string `json:"rule_id"` + ClusterID string `json:"cluster_id"` + Expression string `json:"expression"` + Objects []string `json:"objects"` + Severity string `json:"severity"` + Content string `json:"content"` + AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` + ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` + Users []string `json:"users,omitempty"` + State string `json:"state"` + Error string `json:"error,omitempty"` +} type ActionExecutionResult struct { - ActionID string `json:"action_id" elastic_mapping:"action_id:{type:keyword}"` - LastExecutionTime int64 `json:"last_execution_time" elastic_mapping:"last_execution_time:{type:date}"` - ThrottledCount int `json:"throttled_count" elastic_mapping:"throttled_count:{type:integer}"` - Error string `json:"error,omitempty"` - Result string `json:"result"` + //ActionId string `json:"action_id"` + LastExecutionTime int `json:"last_execution_time"` + Error string `json:"error"` + Result string `json:"result"` } -type AlertHistory struct { - Message string `json:"message" elastic_mapping:"message:{type:text}"` - Timestamp int64 `json:"timestamp" elastic_mapping:"timestamp:{type:date}"` -} \ No newline at end of file +const ( + AlertStateActive string = "active" + AlertStateAcknowledge = "acknowledge" + AlertStateNormal = "normal" + AlertStateError = "error" +) \ No newline at end of file diff --git a/model/alerting/condition.go b/model/alerting/condition.go new file mode 100644 index 00000000..6e0d3cea --- /dev/null +++ b/model/alerting/condition.go @@ -0,0 +1,34 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +type Condition struct { + Operator string `json:"operator"` + Items []ConditionItem `json:"items"` +} + +type ConditionItem struct { + //MetricName string `json:"metric"` + MinimumPeriodMatch int `json:"minimum_period_match"` + Operator string `json:"operator"` + Values []string `json:"values"` + Severity string `json:"severity"` + Message string `json:"message"` +} + +type ConditionResult struct { + GroupValues []string `json:"group_values"` + ConditionItem *ConditionItem `json:"condition_item"` +} + +type Severity string + +var SeverityWeights = map[string]int{ + "verbose": 1, + "info": 2, + "warning": 3, + "error": 4, + "critical": 5, +} \ No newline at end of file diff --git a/model/alerting/config.go b/model/alerting/config.go deleted file mode 100644 index 0f2ccc95..00000000 --- a/model/alerting/config.go +++ /dev/null @@ -1,19 +0,0 @@ -package alerting - -//type Config struct { -// ID string `json:"id" index:"id"` -// Type string `json:"type" elastic_mapping:"type:{type:keyword}"` -// Destination Destination `json:"destination,omitempty" elastic_mapping:"destination:{dynamic:false,properties:{custom_webhook:{properties:{header_params:{type:object,enabled:false},host:{type:text},password:{type:text},path:{type:keyword},port:{type:integer},query_params:{type:object,enabled:false},scheme:{type:keyword},url:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},username:{type:text}}},email:{properties:{email_account_id:{type:keyword},recipients:{type:nested,properties:{email:{type:text},email_group_id:{type:keyword},type:{type:keyword}}}}},last_update_time:{type:date,format:strict_date_time||epoch_millis},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schema_version:{type:integer},slack:{properties:{url:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}}},type:{type:keyword},user:{properties:{backend_roles:{type:text,fields:{keyword:{type:keyword}}},custom_attribute_names:{type:text,fields:{keyword:{type:keyword}}},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},roles:{type:text,fields:{keyword:{type:keyword}}}}}}}"` -// EmailAccount EmailAccount `json:"email_account,omitempty" elastic_mapping:"email_account:{properties:{email:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},from:{type:text},host:{type:text},method:{type:text},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},port:{type:integer},schema_version:{type:long}}}"` -// EmailGroup EmailGroup `json:"email_group,omitempty" elastic_mapping:"email_group:{properties:{emails:{type:nested,properties:{email:{type:text}}},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schema_version:{type:long}}}"` -// Monitor Monitor `json:"monitor,omitempty" elastic_mapping:"monitor:{dynamic:false,properties:{enabled:{type:boolean},enabled_time:{type:date,format:strict_date_time||epoch_millis},inputs:{type:nested,properties:{search:{properties:{indices:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},query:{type:object,enabled:false}}}}},last_update_time:{type:date,format:strict_date_time||epoch_millis},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schedule:{properties:{cron:{properties:{expression:{type:text},timezone:{type:keyword}}},period:{properties:{interval:{type:integer},unit:{type:keyword}}}}},schema_version:{type:integer},triggers:{type:nested,properties:{actions:{type:nested,properties:{destination_id:{type:keyword},message_template:{type:object,enabled:false},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},subject_template:{type:object,enabled:false},throttle:{properties:{unit:{type:keyword},value:{type:integer}}},throttle_enabled:{type:boolean}}},condition:{type:object,enabled:false},min_time_between_executions:{type:integer},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}}},type:{type:keyword}}}"` -//} - -type Config struct { - ID string `json:"id" index:"id"` - Type string `json:"type" elastic_mapping:"type:{type:keyword}"` - Destination Destination `json:"destination,omitempty" elastic_mapping:"destination:{type:object}"` - EmailAccount EmailAccount `json:"email_account,omitempty" elastic_mapping:"email_account:{type:object}"` - EmailGroup EmailGroup `json:"email_group,omitempty" elastic_mapping:"email_group:{type:object}"` - Monitor Monitor `json:"monitor,omitempty" elastic_mapping:"monitor:{type:object}"` -} diff --git a/model/alerting/context.go b/model/alerting/context.go new file mode 100644 index 00000000..4b342b54 --- /dev/null +++ b/model/alerting/context.go @@ -0,0 +1,9 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +type Context struct { + Fields []string `json:"fields"` +} diff --git a/model/alerting/destination.go b/model/alerting/destination.go index 9902fae5..1fec2bbd 100644 --- a/model/alerting/destination.go +++ b/model/alerting/destination.go @@ -1,40 +1,23 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + package alerting -type Destination struct { - Type string `json:"type" elastic_mapping:"type:{type:keyword}"` - Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` - SchemaVersion int `json:"schema_version" elastic_mapping:"schema_version:{type:integer}"` - LastUpdateTime int64 `json:"last_update_time" elastic_mapping:"last_update_time:{type:date,format:strict_date_time||epoch_millis}"` - CustomWebhook CustomWebhook `json:"custom_webhook,omitempty" elastic_mapping:"custom_webhook:{type:object}"` - Email EmailDestination `json:"email,omitempty" elastic_mapping:"email:{type:object}"` - //Slack Slack `json:"slack,omitempty" elastic_mapping:"slack"` -} +import "infini.sh/framework/core/orm" -type EmailDestination struct { - EmailAccountID string `json:"email_account_id" elastic_mapping:"email_account_id:{type:keyword}"` - Recipients []Recipient `json:"recipients" elastic_mapping:"recipients:{type:nested}"` -} - -type Recipient struct { - EmailGroupID string `json:"email_group_id,omitempty" elastic_mapping:"email_group_id:{type:keyword}"` - Email string `json:"email,omitempty" elastic_mapping:"email:{type:text}"` - Type string `json:"type" elastic_mapping:"type:{type:keyword}"` +type Channel struct { + orm.ORMObjectBase + Name string `json:"name"` + Type string `json:"type"` // email or webhook + Priority int `json:"priority,omitempty"` + Webhook *CustomWebhook `json:"webhook,omitempty"` + //Name string `json:"name" elastic_mapping:"name:{type:keyword,copy_to:search_text}"` + //SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` } -type CustomWebhook struct { - HeaderParams map[string]string `json:"header_params" elastic_mapping:"header_params:{type:object,enabled:false}"` - Host string `json:"host" elastic_mapping:"host:{type:text}"` - Method string `json:"method" elastic_mapping:"method:{type:keyword}"` - Password string `json:"password" elastic_mapping:"password:{type:text}"` - Path string `json:"path" elastic_mapping:"path:{type:keyword}"` - Port int `json:"port" elastic_mapping:"port:{type:integer}"` - QueryParams map[string]string `json:"query_params" elastic_mapping:"query_params:{type:object,enabled:false}"` - Scheme string `json:"scheme" elastic_mapping:"scheme:{type:keyword}"` - URL string `json:"url" elastic_mapping:"url:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}"` - Username string `json:"username" elastic_mapping:"username:{type:text}"` -} - -type Slack struct { - URl string `json:"url" elastic_mapping:"url:{type:keyword}"` -} \ No newline at end of file +const ( + ChannelEmail = "email" + ChannelWebhook = "webhook" +) \ No newline at end of file diff --git a/model/alerting/email_account.go b/model/alerting/email_account.go deleted file mode 100644 index e3da6af7..00000000 --- a/model/alerting/email_account.go +++ /dev/null @@ -1,12 +0,0 @@ -package alerting - -type EmailAccount struct { - Email string `json:"email" elastic_mapping:"email:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` - //From string `json:"from" elastic_mapping:"from:{type:text}"` - Host string `json:"host" elastic_mapping:"host:{type:text}"` - Method string `json:"method" elastic_mapping:"method:{type:text}"` - Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` - Port int `json:"port" elastic_mapping:"port:{type:integer}"` - SchemaVersion int64 `json:"schema_version" elastic_mapping:"schema_version:{type:long}"` - Password string `json:"password,omitempty" elastic_mapping:"password:{type:keyword}"` -} diff --git a/model/alerting/email_group.go b/model/alerting/email_group.go deleted file mode 100644 index 9c0952ae..00000000 --- a/model/alerting/email_group.go +++ /dev/null @@ -1,7 +0,0 @@ -package alerting - -type EmailGroup struct { - Emails []map[string]interface{} `json:"emails" elastic_mapping:"emails:{type: nested, properties: {email: {type: text}}}"` - Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` - SchemaVersion int64 `json:"schema_version" elastic_mapping:"schema_version:{type:long}"` -} \ No newline at end of file diff --git a/model/alerting/filter.go b/model/alerting/filter.go new file mode 100644 index 00000000..6ae07519 --- /dev/null +++ b/model/alerting/filter.go @@ -0,0 +1,12 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +type Filter struct { + And []FilterQuery `json:"and,omitempty"` + Or []FilterQuery `json:"or,omitempty"` + Not []FilterQuery `json:"not,omitempty"` + //MinimumShouldMatch int `json:"minimum_should_match"` +} diff --git a/model/alerting/filter_query.go b/model/alerting/filter_query.go new file mode 100644 index 00000000..ece64139 --- /dev/null +++ b/model/alerting/filter_query.go @@ -0,0 +1,14 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +type FilterQuery struct { + Field string `json:"field,omitempty"` + Operator string `json:"operator,omitempty"` + Values []string `json:"values,omitempty"` + And []FilterQuery `json:"and,omitempty"` + Or []FilterQuery `json:"or,omitempty"` + Not []FilterQuery `json:"not,omitempty"` +} diff --git a/model/alerting/metric.go b/model/alerting/metric.go new file mode 100644 index 00000000..dd6ce249 --- /dev/null +++ b/model/alerting/metric.go @@ -0,0 +1,56 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "fmt" + "regexp" +) + +type Metric struct { + PeriodInterval string `json:"period_interval"` + MaxPeriods int `json:"max_periods"` + Items []MetricItem `json:"items"` + Formula string `json:"formula,omitempty"` + Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 +} +func (m *Metric) RefreshExpression() error{ + if len(m.Items) == 1 { + m.Expression = fmt.Sprintf("%s(%s)", m.Items[0].Statistic, m.Items[0].Field) + return nil + } + if m.Formula == "" { + return fmt.Errorf("formula should not be empty since there are %d metrics", len(m.Items)) + } + var ( + expressionBytes = []byte(m.Formula) + metricExpression string + ) + for _, item := range m.Items { + metricExpression = fmt.Sprintf("%s(%s)", item.Statistic, item.Field) + reg, err := regexp.Compile(item.Name+`([^\w]|$)`) + if err != nil { + return err + } + expressionBytes = reg.ReplaceAll(expressionBytes, []byte(metricExpression+"$1")) + } + + m.Expression = string(expressionBytes) + return nil +} + +type MetricItem struct { + Name string `json:"name"` + Field string `json:"field"` + Statistic string `json:"statistic"` + Group []string `json:"group"` //bucket group +} + +type MetricData struct { + GroupValues []string `json:"group_values"` + Data map[string][]TimeMetricData `json:"data"` +} + +type TimeMetricData []interface{} \ No newline at end of file diff --git a/model/alerting/monitor.go b/model/alerting/monitor.go deleted file mode 100644 index f9fbada8..00000000 --- a/model/alerting/monitor.go +++ /dev/null @@ -1,63 +0,0 @@ -package alerting - -type Monitor struct { - Enabled bool `json:"enabled" elastic_mapping:"enabled: {type:boolean}"` - EnabledTime int64 `json:"enabled_time" elastic_mapping:"enabled_time:{type:date,format:strict_date_time||epoch_millis}"` - Inputs []MonitorInput `json:"inputs" elastic_mapping:"inputs:{type:nested}"` - LastUpdateTime int64 `json:"last_update_time" elastic_mapping:"last_update_time:{type:date,format:strict_date_time||epoch_millis}"` - Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` - Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` - SchemaVersion int `json:"schema_version" elastic_mapping:"schema_version:{type:integer}"` - Triggers []Trigger `json:"triggers" elastic_mapping:"triggers:{type:nested}"` - Type string `json:"type" elastic_mapping:"type:{type:keyword}` -} - -type MonitorInput struct { - Search MonitorSearch `json:"search" elastic_mapping:"search:{type:object}"` -} - -type MonitorSearch struct { - Indices []string `json:"indices" elastic_mapping:"indices:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` - Query map[string]interface{} `json:"query" elastic_mapping:"query:{type:object,enabled:false}"` -} - -type Cron struct { - Expression string `json:"expression" elastic_mapping:"expression:{type:text}"` - Timezone string `json:"timezone" elastic_mapping:"timezone:{type:keyword}"` -} - -type Period struct { - Interval int `json:"interval" elastic_mapping:"interval:{type:integer}"` - Unit string `json:"unit" elastic_mapping:"unit:{type:keyword}"` -} - - -type Schedule struct { - Cron *Cron `json:"cron,omitempty" elastic_mapping:"cron:{type:object}"` - Period *Period `json:"period,omitempty" elastic_mapping:"period:{type:object}"` -} - - -type Throttle struct { - Unit string `json:"unit" elastic_mapping:"unit:{type:keyword}"` - Value int `json:"value" elastic_mapping:"value:{type:integer}"` -} - -type Action struct { - ID string `json:"id"` - DestinationId string `json:"destination_id" elastic_mapping:"destination_id:{type:keyword}"` - MessageTemplate map[string]interface{} `json:"message_template" elastic_mapping:"message_template:{type:object}"` - Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` - SubjectTemplate map[string]interface{} `json:"subject_template" elastic_mapping:"subject_template:{type:object}"` - ThrottleEnabled bool `json:"throttle_enabled" elastic_mapping:"throttle_enabled:{type:boolean}"` - Throttle *Throttle `json:"throttle,omitempty" elastic_mapping:"throttle:{type:object}"` -} - -type Trigger struct { - ID string `json:"id"` - Severity string `json:"severity" elastic_mapping:"severity:{type:keyword}"` - Name string `json:"name" elastic_mapping:"name:{type:text,fields: {keyword: {type: keyword, ignore_above: 256}}}"` - Condition map[string]interface{} `json:"condition" elastic_mapping:"condition:{type:object, enabled:false}"` - Actions []Action `json:"actions" elastic_mapping:"actions:{type:nested}"` - MinTimeBetweenExecutions int `json:"min_time_between_executions" elastic_mapping:"min_time_between_executions:{type:integer}"` -} \ No newline at end of file diff --git a/model/alerting/resource.go b/model/alerting/resource.go new file mode 100644 index 00000000..bc20e204 --- /dev/null +++ b/model/alerting/resource.go @@ -0,0 +1,15 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +type Resource struct { + ID string `json:"id"` + Type string `json:"type"` + Objects []string `json:"objects" elastic_mapping:"objects:{type:keyword,copy_to:search_text}"` + Filter Filter `json:"filter,omitempty" elastic_mapping:"-"` + RawFilter map[string]interface{} `json:"raw_filter,omitempty"` + TimeField string `json:"time_field,omitempty"` + Context Context `json:"context"` +} diff --git a/model/alerting/rule.go b/model/alerting/rule.go new file mode 100644 index 00000000..8be30e94 --- /dev/null +++ b/model/alerting/rule.go @@ -0,0 +1,46 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "infini.sh/framework/core/orm" + "time" +) + +type Rule struct { + orm.ORMObjectBase + //Name string `json:"name" elastic_mapping:"name:{type:keyword,copy_to:search_text}"` + Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:keyword}"` + Resource Resource `json:"resource" elastic_mapping:"resource:{type:object}"` + Metrics Metric `json:"metrics" elastic_mapping:"metrics:{type:object}"` + Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` + Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"` + Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` + SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` +} + +type RuleChannel struct { + Normal []Channel `json:"normal"` + Escalation []Channel `json:"escalation"` + ThrottlePeriod string `json:"throttle_period"` //沉默周期 + AcceptTimeRange TimeRange `json:"accept_time_range"` + EscalationThrottlePeriod string `json:"escalation_throttle_period"` + EscalationEnabled bool `json:"escalation_enabled"` +} + +type MessageTemplate struct{ + Type string `json:"type"` + Source string `json:"source"` +} + +type TimeRange struct { + Start string `json:"start"` + End string `json:"end"` +} + +func (tr *TimeRange) Include( t time.Time) bool { + currentTimeStr := t.Format("15:04") + return tr.Start <= currentTimeStr && currentTimeStr <= tr.End +} diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go new file mode 100644 index 00000000..e6192b68 --- /dev/null +++ b/model/alerting/rule_test.go @@ -0,0 +1,129 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "fmt" + "infini.sh/framework/core/util" + "net/http" + "testing" + "time" +) + +func TestCreateRule( t *testing.T) { + rule := Rule{ + //ORMObjectBase: orm.ORMObjectBase{ + // ID: util.GetUUID(), + // Created: time.Now(), + // Updated: time.Now(), + //}, + Enabled: true, + Resource: Resource{ + ID: "c8i18llath2blrusdjng", + Type: "elasticsearch", + Objects: []string{".infini_metrics*"}, + TimeField: "timestamp", + Filter: Filter{ + And: []FilterQuery{ + //{Field: "timestamp", Operator: "gte", Values: []string{"now-15m"}}, + //{Field: "payload.elasticsearch.cluster_health.status", Operator: "equals", Values: []string{"red"}}, + }, + }, + RawFilter: map[string]interface{}{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": "now-15m", + }, + }, + }, + }, + }, + }, + }, + //Metrics: Metric{ + // PeriodInterval: "1m", + // MaxPeriods: 15, + // Items: []MetricItem{ + // {Name: "red_health", Field: "*", Statistic: "count", Group: []string{"metadata.labels.cluster_id"}}, + // }, + //}, + //Conditions: Condition{ + // Operator: "any", + // Items: []ConditionItem{ + // { MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"1"}, Severity: "error", Message: "集群健康状态为 Red"}, + // }, + //}, + + Metrics: Metric{ + PeriodInterval: "1m", + MaxPeriods: 15, + Items: []MetricItem{ + {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + }, + Formula: "a/b*100", + //Expression: "min(fs.free_in_bytes)/max(fs.total_in_bytes)*100", + }, + Conditions: Condition{ + Operator: "any", + Items: []ConditionItem{ + {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"76"}, Severity: "error", Message: "磁盘可用率小于10%"}, + }, + }, + + Channels: RuleChannel{ + Normal: []Channel{ + {Name: "钉钉", Type: ChannelWebhook, Webhook: &CustomWebhook{ + HeaderParams: map[string]string{ + "Content-Type": "application/json", + }, + Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, + Method: http.MethodPost, + URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX", + }}, + }, + Escalation: []Channel{ + {Type: ChannelWebhook, Name: "微信", Webhook: &CustomWebhook{ + HeaderParams: map[string]string{ + "Content-Type": "application/json", + }, + Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, + Method: http.MethodPost, + URL: "https://oapi.weixin.com/robot/send?access_token=6a5c7c9454ff74537a6de493153b1da68860942d4b0aeb33797cb68b5111b077", + }}, + }, + ThrottlePeriod: "1h", + AcceptTimeRange: TimeRange{ + Start: "8:00", + End: "21:00", + }, + EscalationEnabled: false, + EscalationThrottlePeriod: "30m", + }, + } + //err := rule.Metrics.RefreshExpression() + //if err != nil { + // t.Fatal(err) + //} + + fmt.Println(util.MustToJSON(rule)) + //fmt.Println(rule.Metrics.Expression) +} + + + + +func TestTimeRange_Include( t *testing.T) { + tr := TimeRange{ + Start: "08:00", + End: "18:31", + } + fmt.Println(tr.Include(time.Now())) + ti,_ := time.Parse(time.RFC3339, "2022-04-11T10:31:38.911000504Z") + fmt.Println(time.Now().Sub(ti)) +} diff --git a/model/alerting/schedule.go b/model/alerting/schedule.go new file mode 100644 index 00000000..3b488835 --- /dev/null +++ b/model/alerting/schedule.go @@ -0,0 +1,18 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +type Schedule struct { + Cron *Cron `json:"cron,omitempty" elastic_mapping:"cron:{type:object}"` + Interval string `json:"interval,omitempty" elastic_mapping:"interval:{type:keyword}"` +} + +type Cron struct { + Expression string `json:"expression" elastic_mapping:"expression:{type:text}"` + Timezone string `json:"timezone" elastic_mapping:"timezone:{type:keyword}"` +} + + + diff --git a/model/alerting/temp b/model/alerting/temp deleted file mode 100644 index 617439b9..00000000 --- a/model/alerting/temp +++ /dev/null @@ -1 +0,0 @@ -monitor:{dynamic:false,properties:{enabled:{type:boolean},enabled_time:{type:date,format:strict_date_time||epoch_millis},inputs:{type:nested,properties:{search:{properties:{indices:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},query:{type:object,enabled:false}}}}},last_update_time:{type:date,format:strict_date_time||epoch_millis},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},schedule:{properties:{cron:{properties:{expression:{type:text},timezone:{type:keyword}}},period:{properties:{interval:{type:integer},unit:{type:keyword}}}}},schema_version:{type:integer},triggers:{type:nested,properties:{actions:{type:nested,properties:{destination_id:{type:keyword},message_template:{type:object,enabled:false},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}},subject_template:{type:object,enabled:false},throttle:{properties:{unit:{type:keyword},value:{type:integer}}},throttle_enabled:{type:boolean}}},condition:{type:object,enabled:false},min_time_between_executions:{type:integer},name:{type:text,fields:{keyword:{type:keyword,ignore_above:256}}}}},type:{type:keyword}}} \ No newline at end of file diff --git a/model/alerting/webhook.go b/model/alerting/webhook.go new file mode 100644 index 00000000..70117faf --- /dev/null +++ b/model/alerting/webhook.go @@ -0,0 +1,12 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +type CustomWebhook struct { + HeaderParams map[string]string `json:"header_params,omitempty" elastic_mapping:"header_params:{type:object,enabled:false}"` + Method string `json:"method" elastic_mapping:"method:{type:keyword}"` + URL string `json:"url,omitempty"` + Body string `json:"body" elastic_mapping:"body:{type:text}"` +} diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go new file mode 100644 index 00000000..20770787 --- /dev/null +++ b/plugin/api/alerting/api.go @@ -0,0 +1,19 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import "infini.sh/framework/core/api" + + +type AlertAPI struct { + api.Handler +} + +func init() { + alert:=AlertAPI{} + api.HandleAPIMethod(api.GET, "/elasticsearch/:id/alerting/rule/:rule_id", alert.getRule) + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/alerting/rule", alert.createRule) +} + diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go new file mode 100644 index 00000000..61aa83fb --- /dev/null +++ b/plugin/api/alerting/rule.go @@ -0,0 +1,156 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + log "github.com/cihub/seelog" + "infini.sh/console/model/alerting" + alerting2 "infini.sh/console/service/alerting" + "infini.sh/console/service/alerting/elasticsearch" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" + "net/http" + "time" +) + +func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + rules := []alerting.Rule{} + err := alertAPI.DecodeJSON(req, &rules) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + var ids []string + for _, rule := range rules { + err = rule.Metrics.RefreshExpression() + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + rule.ID = util.GetUUID() + ids = append(ids, rule.ID) + rule.Created = time.Now() + rule.Updated = time.Now() + rule.Metrics.MaxPeriods = 15 + if rule.Schedule.Interval == ""{ + rule.Schedule.Interval = "1m" + } + + err = orm.Save(rule) + if err != nil { + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + eng := alerting2.GetEngine(rule.Resource.Type) + if rule.Enabled { + ruleTask := task.ScheduleTask{ + ID: rule.ID, + Interval: rule.Schedule.Interval, + Description: rule.Metrics.Expression, + Task: eng.GenerateTask(&rule), + } + task.RegisterScheduleTask(ruleTask) + task.StartTask(ruleTask.ID) + } + + } + + alertAPI.WriteJSON(w, util.MapStr{ + "result": "created", + "ids": ids, + }, http.StatusOK) +} +func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + rule := alerting.Rule{ + ORMObjectBase: orm.ORMObjectBase{ + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), + }, + Enabled: true, + Resource: alerting.Resource{ + ID: "c8i18llath2blrusdjng", + Type: "elasticsearch", + Objects: []string{".infini_metrics*"}, + TimeField: "timestamp", + RawFilter: map[string]interface{}{ + "bool": util.MapStr{ + "must": []util.MapStr{ + //{ + // "term": util.MapStr{ + // "metadata.labels.cluster_id": util.MapStr{ + // "value": "xxx", + // }, + // }, + //}, + }, + }, + }, + }, + + Metrics: alerting.Metric{ + PeriodInterval: "1m", + MaxPeriods: 15, + Items: []alerting.MetricItem{ + {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max",Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + }, + Formula: "a/b*100", + //Expression: "min(fs.free_in_bytes)/max(fs.total_in_bytes)*100", + }, + Conditions: alerting.Condition{ + Operator: "any", + Items: []alerting.ConditionItem{ + {MinimumPeriodMatch: 10, Operator: "lte", Values: []string{"76"}, Severity: "warning", Message: "磁盘可用率小于20%"}, + {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"75"}, Severity: "error", Message: "磁盘可用率小于10%"}, + }, + }, + + Channels: alerting.RuleChannel{ + Normal: []alerting.Channel{ + {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ + HeaderParams: map[string]string{ + "Content-Type": "application/json", + }, + Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, + Method: http.MethodPost, + URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX", + }}, + }, + Escalation: []alerting.Channel{ + {Type: alerting.ChannelWebhook, Name: "微信", Webhook: &alerting.CustomWebhook{ + HeaderParams: map[string]string{ + "Content-Type": "application/json", + }, + Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, + Method: http.MethodPost, + URL: "https://oapi.weixin.com/robot/send?access_token=XXXXXX", + }}, + }, + ThrottlePeriod: "1h", + AcceptTimeRange: alerting.TimeRange{ + Start: "8:00", + End: "21:00", + }, + EscalationEnabled: true, + EscalationThrottlePeriod: "30m", + }, + } + eng := &elasticsearch.Engine{} + result, err := eng.ExecuteQuery(&rule) + if err != nil { + log.Error(err) + } + alertAPI.WriteJSON(w, result, http.StatusOK) + +} diff --git a/plugin/api/init.go b/plugin/api/init.go index 1e57db0b..9e6d3a70 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -3,7 +3,6 @@ package api import ( "infini.sh/console/config" "infini.sh/console/plugin/api/index_management" - "infini.sh/console/service/alerting" "infini.sh/framework/core/api" "path" ) @@ -13,7 +12,6 @@ func Init(cfg *config.AppConfig) { handler := index_management.APIHandler{ Config: cfg, } - alerting.InitAppConfig(cfg) var pathPrefix = "/_search-center/" var esPrefix = "/elasticsearch/:id/" api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/overview"), handler.ElasticsearchOverviewAction) @@ -46,39 +44,6 @@ func Init(cfg *config.AppConfig) { api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/command"), handler.HandleQueryCommonCommandAction) api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.HandleDeleteCommonCommandAction) - //new api - api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "alerting/overview"), alerting.GetAlertOverview) - api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "alerting/overview/alerts"), alerting.GetAlerts) - api.HandleAPIMethod(api.POST, path.Join(pathPrefix,"alerting/destinations/email_accounts"), alerting.CreateEmailAccount) - api.HandleAPIMethod(api.PUT, path.Join(pathPrefix, "alerting/email_accounts/:emailAccountId"), alerting.UpdateEmailAccount) - api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix,"alerting/email_accounts/:emailAccountId"), alerting.DeleteEmailAccount) - api.HandleAPIMethod(api.GET, path.Join(pathPrefix,"alerting/destinations/email_accounts"), alerting.GetEmailAccounts) - api.HandleAPIMethod(api.GET, path.Join(pathPrefix,"alerting/email_accounts/:emailAccountId"), alerting.GetEmailAccount) - api.HandleAPIMethod(api.POST, path.Join(pathPrefix,"alerting/destinations/email_groups"), alerting.CreateEmailGroup) - api.HandleAPIMethod(api.GET, path.Join(pathPrefix,"alerting/destinations/email_groups"), alerting.GetEmailGroups) - api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix,"alerting/email_groups/:emailGroupId"), alerting.DeleteEmailGroup) - api.HandleAPIMethod(api.PUT, path.Join(pathPrefix,"alerting/email_groups/:emailGroupId"), alerting.UpdateEmailGroup) - api.HandleAPIMethod(api.GET, path.Join(pathPrefix,"alerting/email_groups/:emailGroupId"), alerting.GetEmailGroup) - api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "alerting/destinations"), alerting.GetDestinations) - api.HandleAPIMethod(api.POST, path.Join(pathPrefix,"alerting/destinations"), alerting.CreateDestination) - api.HandleAPIMethod(api.PUT, path.Join(pathPrefix,"alerting/destinations/:destinationId"), alerting.UpdateDestination) - api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "alerting/destinations/:destinationId"), alerting.DeleteDestination) - api.HandleAPIMethod(api.GET, "/elasticsearch/:id/alerting/monitors/:monitorID", alerting.GetMonitor) - api.HandleAPIMethod(api.PUT, "/elasticsearch/:id/alerting/monitors/:monitorID", alerting.UpdateMonitor) - api.HandleAPIMethod(api.GET, "/elasticsearch/:id/alerting/monitors", alerting.GetMonitors) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/alerting/monitors", alerting.CreateMonitor) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/alerting/monitors/_execute", alerting.ExecuteMonitor) - api.HandleAPIMethod(api.DELETE, "/elasticsearch/:id/alerting/monitors/:monitorID", alerting.DeleteMonitor) - - api.HandleAPIMethod(api.GET, path.Join(pathPrefix,"alerting/_settings"), alerting.GetSettings) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/alerting/_indices", alerting.GetIndices) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/alerting/_aliases", alerting.GetAliases) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/alerting/_mappings", alerting.GetMappings) - api.HandleAPIMethod(api.POST, path.Join(pathPrefix, "alerting/_search"), alerting.Search) - api.HandleAPIMethod(api.GET, "/elasticsearch/:id/alerting/alerts", alerting.GetAlerts) - api.HandleAPIMethod(api.POST, "/elasticsearch/:id/alerting/_monitors/:monitorID/_acknowledge/alerts", alerting.AcknowledgeAlerts) - - //task.RegisterScheduleTask(task.ScheduleTask{ // Description: "sync reindex task result", // Task: func() { diff --git a/service/alerting/action/action.go b/service/alerting/action/action.go deleted file mode 100644 index 9cd12255..00000000 --- a/service/alerting/action/action.go +++ /dev/null @@ -1,7 +0,0 @@ -package action - -const ( - ACTION_EMAIL = "email" - ACTION_WEBHOOK = "custom_webhook" - ACTION_SLACK = "slack" -) \ No newline at end of file diff --git a/service/alerting/action/email.go b/service/alerting/action/email.go deleted file mode 100644 index d3273176..00000000 --- a/service/alerting/action/email.go +++ /dev/null @@ -1,89 +0,0 @@ -package action - -import ( - "crypto/tls" - "fmt" - "infini.sh/console/model/alerting" - "net" - "net/smtp" - "strings" -) - -type EmailAction struct { - Sender *alerting.EmailAccount - Message string - Subject string - Receiver []string -} - -func (act *EmailAction) Execute() ([]byte, error){ - from := act.Sender.Email - act.Sender.Host = strings.TrimSpace(act.Sender.Host) - addr := fmt.Sprintf("%s:%d", act.Sender.Host, act.Sender.Port) - msg := fmt.Sprintf("To: %s\r\nSubject: %s\r\n%s\r\n", strings.Join(act.Receiver, ","), act.Subject, act.Message) - - auth := smtp.PlainAuth("", from, act.Sender.Password, act.Sender.Host) - if act.Sender.Method == "ssl" { - err := SendEmailOverTLS(addr, auth, from, act.Receiver, []byte(msg)) - return nil, err - } - err := smtp.SendMail(addr, auth, from, act.Receiver, []byte(msg)) - - return nil, err -} - -func SendEmailOverTLS(addr string, auth smtp.Auth, from string, to []string, msg []byte) error{ - host, _, _ := net.SplitHostPort(addr) - tlsConfig := &tls.Config { - InsecureSkipVerify: true, - ServerName: host, - } - - conn, err := tls.Dial("tcp", addr, tlsConfig) - //dialer := &net.Dialer{ - // Timeout: time.Second * 15, - //} - //conn, err := tls.DialWithDialer(dialer, "tcp", addr, tlsConfig) - if err != nil { - return err - } - - c, err := smtp.NewClient(conn, host) - if err != nil { - return err - } - - if err = c.Auth(auth); err != nil { - return err - } - - // To && From - if err = c.Mail(from); err != nil { - return err - } - - for _, dst := range to { - if err = c.Rcpt(dst); err != nil { - return err - } - } - // Data - w, err := c.Data() - if err != nil { - return err - } - - _, err = w.Write(msg) - if err != nil { - return err - } - - err = w.Close() - if err != nil { - return err - } - - c.Quit() - return nil -} - diff --git a/service/alerting/action/email_test.go b/service/alerting/action/email_test.go deleted file mode 100644 index 6ca8428c..00000000 --- a/service/alerting/action/email_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package action - -import ( - "infini.sh/console/model/alerting" - "testing" -) - -func TestEmailAction(t *testing.T){ - ea := EmailAction{ - Sender: &alerting.EmailAccount{ - Email: "liugq@infini.ltd", - Host: "smtp.ym.163.com", - Port: 994, - Method: "ssl", - Password: "hello@infini$", //"", - }, - Message: "hello world", - Subject: "test email", - Receiver: []string{"786027438@qq.com"}, - } - _, err := ea.Execute() - if err != nil { - t.Fatal(err) - } -} \ No newline at end of file diff --git a/service/alerting/action/webhook.go b/service/alerting/action/webhook.go index 4e7b038e..965eb085 100644 --- a/service/alerting/action/webhook.go +++ b/service/alerting/action/webhook.go @@ -1,8 +1,11 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + package action import ( "crypto/tls" - "fmt" "infini.sh/console/model/alerting" "io/ioutil" "net/http" @@ -25,25 +28,7 @@ var actionClient = http.Client{ } func (act *WebhookAction) Execute()([]byte, error){ - var reqURL string - if act.Data.URL != "" { - reqURL = act.Data.URL - } - if act.Data.Host != "" { - reqURL = fmt.Sprintf("%s://%s:%d/%s", act.Data.Scheme, act.Data.Host, act.Data.Port, act.Data.Path ) - urlBuilder := strings.Builder{} - urlBuilder.WriteString(reqURL) - if len(act.Data.QueryParams) > 0 { - urlBuilder.WriteString("?") - } - for k, v := range act.Data.QueryParams { - urlBuilder.WriteString(k) - urlBuilder.WriteString("=") - urlBuilder.WriteString(v) - urlBuilder.WriteString("&") - } - reqURL = urlBuilder.String() - } + var reqURL = act.Data.URL reqBody := strings.NewReader(act.Message) req, err := http.NewRequest(http.MethodPost, reqURL, reqBody) if err != nil { diff --git a/service/alerting/action/webhook_test.go b/service/alerting/action/webhook_test.go deleted file mode 100644 index 31b969ba..00000000 --- a/service/alerting/action/webhook_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package action - -import ( - "infini.sh/console/model/alerting" - "net/http" - "testing" -) - -func TestWebhookAction(t *testing.T){ - ea := WebhookAction{ - Message: `{"msgtype": "text","text": {"content":"通知:我就是我, 是不一样的烟火,Trigger: {{ctx.trigger.name}}"},"at":{"atMobiles":["18692254900"],"isAtAll": false}}`, - Data: &alerting.CustomWebhook{ - HeaderParams: map[string]string{ - "Content-Type": "application/json", - }, - URL: "https://oapi.dingtalk.com/robot/send?access_token=6a5c7c9454ff74537a6de493153b1da68860942d4b0aeb33797cb68b5111b077", - Method: http.MethodPost, - }, - } - _, err := ea.Execute() - if err != nil { - t.Fatal(err) - } - -} \ No newline at end of file diff --git a/service/alerting/alert.go b/service/alerting/alert.go deleted file mode 100644 index d9f6277c..00000000 --- a/service/alerting/alert.go +++ /dev/null @@ -1,255 +0,0 @@ -package alerting - -import ( - "bytes" - "crypto/tls" - "encoding/json" - "errors" - "fmt" - "infini.sh/console/model/alerting" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "io" - "net/http" - "net/url" - "strings" -) - -func getQueryParam(req *http.Request, key string, or ...string) string { - query := req.URL.Query() - val := query.Get(key) - if val == "" && len(or)>0 { - return or[0] - } - return val -} - -func getAlertIndexName(typ string) string { - switch typ{ - case INDEX_ALL_ALERTS: - return fmt.Sprintf("%s,%s", orm.GetIndexName(alerting.AlertingHistory{}), orm.GetIndexName(alerting.Alert{})) - case INDEX_ALERT_HISTORY: - return orm.GetIndexName(alerting.AlertingHistory{}) - } - return orm.GetIndexName(alerting.Alert{}) -} - -func GetAlerts (w http.ResponseWriter, req *http.Request, ps httprouter.Params){ - id := ps.ByName("id") - - var ( - from = getQueryParam(req, "from", "0") - size = getQueryParam(req, "size", "20") - search = getQueryParam(req, "search", "") - sortDirection = getQueryParam(req, "sortDirection", "desc") - sortField = getQueryParam(req, "sortField", "start_time") - severityLevel = getQueryParam(req, "severityLevel", "ALL") - alertState = getQueryParam(req, "alertState", "ALL") - monitorIds = req.URL.Query()["monitorIds"] - params = map[string]string{ - } - alertType = getQueryParam(req, "type", INDEX_ALL_ALERTS) - ) - - switch sortField { - case "monitor_name", "trigger_name": - params["sortString"] = fmt.Sprintf(`%s.keyword`, sortField) - params["sortOrder"] = sortDirection - case "start_time": - params["sortString"] = sortField - params["sortOrder"] = sortDirection - case "end_time": - params["sortString"] = sortField - params["sortOrder"] = sortDirection - params["missing"] = "_first" - if sortDirection == "asc" { - params["missing"] = "_last" - } - case "acknowledged_time": - params["sortString"] = sortField - params["sortOrder"] = sortDirection - params["missing"] = "_last" - } - sort := IfaceMap{ - params["sortString"]: params["sortOrder"], - } - must := []IfaceMap{ - } - if id != "" { - must = append(must, IfaceMap{ - "match": IfaceMap{ - "cluster_id": id, - }, - }) - } - if severityLevel != "ALL" { - must = append(must, IfaceMap{ - "match": IfaceMap{ - "severity": severityLevel, - }, - }) - } - if alertState != "ALL" { - must = append(must, IfaceMap{ - "match": IfaceMap{ - "state": alertState, - }, - }) - } - if len(monitorIds) > 0{ - must = append(must, IfaceMap{ - "match": IfaceMap{ - "monitor_id": monitorIds[0], - }, - }) - } - - if clearSearch := strings.TrimSpace(search); clearSearch != ""{ - searches := strings.Split(clearSearch, " ") - clearSearch = strings.Join(searches, "* *") - params["searchString"] = fmt.Sprintf("*%s*", clearSearch) - must = append(must, IfaceMap{ - "query_string": IfaceMap{ - //"default_field": "destination.name", - "default_operator": "AND", - "query": fmt.Sprintf(`*%s*`, clearSearch), - }, - }) - } - - reqBody := IfaceMap{ - "size":size, - "from": from, - "query": IfaceMap{ - "bool":IfaceMap{ - "must": must, - }, - }, - "sort": sort, - } - indexName := getAlertIndexName(alertType) - - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - res, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(reqBody)) - if err != nil { - writeError(w, err) - return - } - alerts := []interface{}{} - rawAlerts := res.Hits.Hits - for _, alert := range rawAlerts { - alert.Source["id"] = alert.ID - alerts = append(alerts, alert.Source) - } - writeJSON(w, IfaceMap{ - "ok": true, - "alerts": alerts, - "totalAlerts": res.GetTotal(), - }, http.StatusOK) - -} -func writeError(w http.ResponseWriter, err error) { - writeJSON(w, map[string]interface{}{ - "ok": false, - "resp": err.Error(), - }, http.StatusOK) -} - -type IfaceMap map[string]interface{} - -func decodeJSON(reader io.Reader, obj interface{}) error{ - dec := json.NewDecoder(reader) - err := dec.Decode(obj) - if err != nil { - return err - } - - if m, ok := obj.(*IfaceMap); ok { - if errStr := queryValue(*m,"error", nil); errStr != nil { - if str, ok := errStr.(string); ok { - errors.New(str) - } - buf, _ := json.Marshal(errStr) - return errors.New(string(buf)) - } - } - return nil -} - -func writeJSON(w http.ResponseWriter, data interface{}, statusCode int){ - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(statusCode) - buf, _ := json.Marshal(data) - w.Write(buf) -} - -var alertClient = http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }, -} - -func doRequest(requestUrl string, method string, params map[string]string, body interface{}) (*http.Response, error){ - var req *http.Request - if params != nil && len(params) > 0 { - var queryValues = url.Values{} - for k, v := range params { - queryValues.Set(k, v) - } - requestUrl += "?"+ queryValues.Encode() - } - var reader io.Reader - if body != nil { - switch body.(type) { - case string: - reader = bytes.NewBufferString(body.(string)) - case io.Reader: - reader = body.(io.Reader) - default: - rw := &bytes.Buffer{} - enc := json.NewEncoder(rw) - err := enc.Encode(body) - if err != nil { - return nil, err - } - reader = rw - } - } - req, _ = http.NewRequest(method, requestUrl, reader) - req.Header.Set("content-type", "application/json") - req.Header.Set("User-Agent", "infini-client") - return alertClient.Do(req) -} - -func queryValue(obj map[string]interface{}, key string, defaultValue interface{}) interface{} { - if key == "" { - return obj - } - idx := strings.Index(key, ".") - if idx == -1 { - if v, ok := obj[key]; ok { - return v - } - return defaultValue - } - ckey := key[0:idx] - - if v, ok := obj[ckey]; ok { - if vmap, ok := v.(map[string]interface{}); ok { - return queryValue(vmap, key[idx+1:], defaultValue) - } - } - return defaultValue -} - -func assignTo(dst IfaceMap, src IfaceMap){ - if dst == nil || src == nil { - return - } - for k, v := range src { - dst[k] = v - } -} diff --git a/service/alerting/constants.go b/service/alerting/constants.go deleted file mode 100644 index ec9e4360..00000000 --- a/service/alerting/constants.go +++ /dev/null @@ -1,17 +0,0 @@ -package alerting - -const EMAIL_ACCOUNT_FIELD = "email_account" -const EMAIL_GROUP_FIELD = "email_group" -const DESTINATION_FIELD = "destination" -const MONITOR_FIELD = "monitor" -const INDEX_ALERT = "ALERT" -const INDEX_ALERT_HISTORY = "ALERT_HISTORY" -const INDEX_ALL_ALERTS = "ALL_ALERTS" - -const ( - ALERT_ACTIVE = "ACTIVE" - ALERT_COMPLETED = "COMPLETED" - ALERT_DELETED = "DELETED" - ALERT_ACKNOWLEDGED = "ACKNOWLEDGED" - ALERT_ERROR = "ERROR" -) diff --git a/service/alerting/destination.go b/service/alerting/destination.go deleted file mode 100644 index ca891f91..00000000 --- a/service/alerting/destination.go +++ /dev/null @@ -1,634 +0,0 @@ -package alerting - -import ( - "errors" - "fmt" - "infini.sh/console/config" - "infini.sh/console/model/alerting" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "net/http" - "strings" - "time" -) - -func GetDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - conf := getDefaultConfig() - dstID := ps.ByName("destID") - reqUrl := fmt.Sprintf("%s/%s/_doc/%s", conf.Endpoint, orm.GetIndexName(alerting.Config{}), dstID) - res, err := doRequest(reqUrl, http.MethodGet, nil, nil) - if err != nil { - writeError(w, err) - return - } - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - writeError(w, err) - return - } - res.Body.Close() - - writeJSON(w, IfaceMap{ - "ok": true, - "destination": queryValue(resBody, "_source.destination", nil), - }, http.StatusOK) - -} - -func GetDestinations(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - from = getQueryParam(req, "from", "0") - size = getQueryParam(req, "size", "20") - search = getQueryParam(req, "search", "") - sortDirection = getQueryParam(req, "sortDirection", "desc") - sortField = getQueryParam(req, "sortField", "") - typ = getQueryParam(req, "type", "") - ) - - must := []IfaceMap{ - { - "exists": IfaceMap{ - "field": DESTINATION_FIELD, - }, - }, - //{ - // "match": IfaceMap{ - // "cluster_id": id, - // }, - //}, - } - - if clearSearch := strings.TrimSpace(search); clearSearch != "" { - clearSearch = strings.ReplaceAll(clearSearch, " ", "* *") - must = append(must, IfaceMap{ - "query_string": IfaceMap{ - //"default_field": "destination.name", - "default_operator": "AND", - "query": fmt.Sprintf(`*%s*`, clearSearch), - }, - }) - } - - var sort interface{} = IfaceMap{} - switch (sortField) { - case "name": - sort = IfaceMap{ "destination.name.keyword": sortDirection } - case "type": - sort = IfaceMap{ "destination.type": sortDirection } - default: - } - if typ != "" && typ != "ALL" { - must = append(must, IfaceMap{ - "match": IfaceMap{ - "destination.type": typ, - }, - }) - } - - reqBody := IfaceMap{ - "from": from, - "size": size, - "sort": sort, - "query": IfaceMap{ - "bool": IfaceMap{ - "must": must, - }, - }, - } - - esConfig := getDefaultConfig() - esClient := elastic.GetClient(esConfig.ID) - res, err := esClient.SearchWithRawQueryDSL( orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(reqBody)) - if err != nil { - writeError(w, err) - return - } - - totalDestinations := res.GetTotal() - rawHits := res.Hits.Hits - dests := []IfaceMap{} - - for _, hit := range rawHits { - newItem := IfaceMap{} - newItem["id"] = hit.ID - source := queryValue(hit.Source,DESTINATION_FIELD, nil) - if ms, ok := source.(map[string]interface{}); ok { - assignTo(newItem, ms) - } - dests = append(dests, newItem) - } - - writeJSON(w, IfaceMap{ - "ok": true, - "destinations": dests, - "totalDestinations":totalDestinations, - }, http.StatusOK) -} - -func CreateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - esConfig := getDefaultConfig() - destId := util.GetUUID() - var destination = &alerting.Destination{} - err := decodeJSON(req.Body, &destination) - if err != nil { - writeError(w, err) - return - } - destination.LastUpdateTime = time.Now().UnixNano()/1e6 - var toSaveDest = IfaceMap{ - "type": destination.Type, - "name": destination.Name, - "last_update_time": destination.LastUpdateTime, - "schema_version": destination.SchemaVersion, - "id": destId, - } - switch destination.Type { - case "email" : - toSaveDest[destination.Type] = destination.Email - case "custom_webhook": - toSaveDest[destination.Type] = destination.CustomWebhook - default: - writeError(w, errors.New("type unsupported")) - return - } - esClient := elastic.GetClient(esConfig.ID) - indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", destId, IfaceMap{ - DESTINATION_FIELD: toSaveDest, - }, "wait_for") - if err != nil { - writeError(w, err) - return - } - - - writeJSON(w, IfaceMap{ - "ok": true, - "resp": IfaceMap{ - DESTINATION_FIELD: toSaveDest, - "_id": indexRes.ID, - "_version": indexRes.Version, - }, - }, http.StatusOK) - -} - -func UpdateDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - destinationId := ps.ByName("destinationId") - - config := getDefaultConfig() - var destination = &alerting.Destination{} - err := decodeJSON(req.Body, &destination) - if err != nil { - writeError(w, err) - return - } - destination.LastUpdateTime = time.Now().UnixNano()/1e6 - var toSaveDest = IfaceMap{ - "type": destination.Type, - "name": destination.Name, - "last_update_time": destination.LastUpdateTime, - "schema_version": destination.SchemaVersion, - "id": destinationId, - } - switch destination.Type { - case "email" : - toSaveDest[destination.Type] = destination.Email - case "custom_webhook": - toSaveDest[destination.Type] = destination.CustomWebhook - default: - writeError(w, errors.New("type unsupported")) - return - } - esClient := elastic.GetClient(config.ID) - indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", destinationId, IfaceMap{ - DESTINATION_FIELD: toSaveDest, - }, "wait_for") - if err != nil { - writeError(w, err) - return - } - - writeJSON(w, IfaceMap{ - "ok": true, - "version": indexRes.Version, - "id": indexRes.ID, - }, http.StatusOK) - -} - -func DeleteDestination(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - destinationId := ps.ByName("destinationId") - - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - deleteRes, err := esClient.Delete(orm.GetIndexName(alerting.Config{}), "", destinationId) - if err != nil { - writeError(w, err) - return - } - - var isOk = false - if deleteRes.Result == "deleted" { - isOk = true - } - writeJSON(w, IfaceMap{ - "ok": isOk, - }, http.StatusOK) - -} - -func getDefaultConfig() *elastic.ElasticsearchConfig { - elasticsearch := "default" - if appConfig != nil { - elasticsearch = appConfig.Elasticsearch - } - return elastic.GetConfig(elasticsearch) -} -var appConfig *config.AppConfig -func InitAppConfig(config *config.AppConfig){ - appConfig = config -} - -//var ( -// ks keystore.WritableKeystore -// ksOnce = &sync.Once{} -//) -//func getKeystore() keystore.WritableKeystore { -// ksOnce.Do(func() { -// tempKS, err := keystore.Factory(nil, "data/search-center/keystore") -// if err != nil { -// panic(err) -// } -// ks, err = keystore.AsWritableKeystore(tempKS) -// if err != nil { -// panic(err) -// } -// }) -// return ks -//} - -func CreateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - config := getDefaultConfig() - - var emailAccount = &alerting.EmailAccount{} - err := decodeJSON(req.Body, &emailAccount) - if err != nil { - writeError(w, err) - return - } - //var password = emailAccount.Password - //emailAccount.Password = "" - esClient := elastic.GetClient(config.ID) - indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", util.GetUUID(), IfaceMap{ - EMAIL_ACCOUNT_FIELD: emailAccount, - },"wait_for") - if err != nil { - writeError(w, err) - return - } - - //kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", emailAccount.Name) - //secStr, _ := getKeystore().Retrieve(kk) - //kst := getKeystore() - //if secStr == nil { - // kst.Store(kk, []byte(password)) - // kst.Save() - //}else{ - // oldPwd, _ := secStr.Get() - // if bytes.Compare(oldPwd, []byte(password)) != 0 { - // kst.Store(kk, []byte(password)) - // } - // kst.Save() - //} - - writeJSON(w, IfaceMap{ - "ok": true, - "resp": IfaceMap{ - EMAIL_ACCOUNT_FIELD: emailAccount, - "_id": indexRes.ID, - "_version": indexRes.Version, - }, - }, http.StatusOK) -} - -func UpdateEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - emailAccountId := ps.ByName("emailAccountId") - config := getDefaultConfig() - - var emailAccount = &alerting.EmailAccount{} - err := decodeJSON(req.Body, &emailAccount) - if err != nil { - writeError(w, err) - return - } - esClient := elastic.GetClient(config.ID) - indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}),"", emailAccountId, IfaceMap{ - EMAIL_ACCOUNT_FIELD: emailAccount, - }, "wait_for") - if err != nil { - writeError(w, err) - return - } - - writeJSON(w, IfaceMap{ - "ok": true, - "id": indexRes.ID, - }, http.StatusOK) - -} - -func DeleteEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - emailAccountId := ps.ByName("emailAccountId") - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - deleteRes, err := esClient.Delete(orm.GetIndexName(alerting.Config{}), "", emailAccountId, "wait_for") - if err != nil { - writeError(w, err) - return - } - var isOk = false - if deleteRes.Result == "deleted" { - isOk = true - } - writeJSON(w, IfaceMap{ - "ok": isOk, - }, http.StatusOK) -} - -func GetEmailAccounts(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - from = getQueryParam(req, "from", "0") - size = getQueryParam(req, "size", "20") - search = getQueryParam(req, "search", "") - sortDirection = getQueryParam(req, "sortDirection", "desc") - sortField = getQueryParam(req, "sortField", "name") - ) - - must := []IfaceMap{ - { - "exists": IfaceMap{ - "field": EMAIL_ACCOUNT_FIELD, - }, - }, - } - - if clearSearch := strings.TrimSpace(search); clearSearch != "" { - clearSearch = strings.ReplaceAll(clearSearch, " ", "* *") - must = append(must, IfaceMap{ - "query_string": IfaceMap{ - "default_field": "email_account.name", - "default_operator": "AND", - "query": fmt.Sprintf(`*%s*`, clearSearch), - }, - }) - } - - sortQueryMap := IfaceMap{ "name": IfaceMap{ "email_account.name.keyword": sortDirection } } - var sort interface{} - if sortQuery, ok := sortQueryMap[sortField]; ok { - sort = sortQuery - } - reqBody := IfaceMap{ - "from": from, - "size": size, - "sort": sort, - "query": IfaceMap{ - "bool": IfaceMap{ - "must": must, - }, - }, - } - - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - searchRes, err := esClient.SearchWithRawQueryDSL( orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(reqBody)) - if err != nil { - writeError(w, err) - return - } - - totalEmailAccounts := searchRes.GetTotal() - rawHits := searchRes.Hits.Hits - emailAccounts := []IfaceMap{} - - for _, hit := range rawHits { - newItem := IfaceMap{} - newItem["id"] = hit.ID - source := queryValue(hit.Source, EMAIL_ACCOUNT_FIELD, nil) - if ms, ok := source.(map[string]interface{}); ok { - assignTo(newItem, ms) - } - emailAccounts = append(emailAccounts, newItem) - } - writeJSON(w, IfaceMap{ - "ok": true, - "emailAccounts": emailAccounts, - "totalEmailAccounts": totalEmailAccounts, - }, http.StatusOK) -} - -func GetEmailAccount(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - emailAccountId := ps.ByName("emailAccountId") - - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - getRes, err := esClient.Get(orm.GetIndexName(alerting.Config{}), "", emailAccountId) - if err != nil { - writeError(w, err) - return - } - - //name := queryValue(resBody,"_source.email_account.name", "") - //kk := fmt.Sprintf("search-center_alerting.destination.email.%s.password", name) - //secStr, _ := getKeystore().Retrieve(kk) - //if secStr != nil { - // pwd, _ := secStr.Get() - // fmt.Println(string(pwd)) - //} - - writeJSON(w, IfaceMap{ - "ok": true, - "resp": queryValue(getRes.Source, "email_account", nil), - }, http.StatusOK) -} - - -// --- email group - -func CreateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - config := getDefaultConfig() - var emailGroup = &alerting.EmailGroup{} - err := decodeJSON(req.Body, &emailGroup) - if err != nil { - writeError(w, err) - return - } - esClient := elastic.GetClient(config.ID) - indexRes, err := esClient.Index(orm.GetIndexName(alerting.Config{}), "", util.GetUUID(), IfaceMap{ - EMAIL_GROUP_FIELD: emailGroup, - },"wait_for") - if err != nil { - writeError(w, err) - return - } - - writeJSON(w, IfaceMap{ - "ok": true, - "resp": IfaceMap{ - EMAIL_GROUP_FIELD: IfaceMap{ - "emails": emailGroup.Emails, - "name": emailGroup.Name, - "schema_version": emailGroup.SchemaVersion, - }, - "_id": indexRes.ID, - "_version": indexRes.Version, - }, - }, http.StatusOK) -} - -func UpdateEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - emailGroupId := ps.ByName("emailGroupId") - config := getDefaultConfig() - var emailGroup = &alerting.EmailGroup{} - err := decodeJSON(req.Body, &emailGroup) - if err != nil { - writeError(w, err) - return - } - esClient := elastic.GetClient(config.ID) - indexRes, err := esClient.Index( orm.GetIndexName(alerting.Config{}), "", emailGroupId, IfaceMap{ - EMAIL_GROUP_FIELD: emailGroup, - }, "wait_for") - if err != nil { - writeError(w, err) - return - } - - writeJSON(w, IfaceMap{ - "ok": true, - "id": indexRes.ID, - }, http.StatusOK) - -} - -func DeleteEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - emailGroupId := ps.ByName("emailGroupId") - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - res, err := esClient.Delete(orm.GetIndexName(alerting.Config{}), "", emailGroupId) - if err != nil { - writeError(w, err) - return - } - - var isOk = false - if res.Result == "deleted" { - isOk = true - } - writeJSON(w, IfaceMap{ - "ok": isOk, - }, http.StatusOK) - -} - -func GetEmailGroups(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - from = getQueryParam(req, "from", "0") - size = getQueryParam(req, "size", "20") - search = getQueryParam(req, "search", "") - sortDirection = getQueryParam(req, "sortDirection", "desc") - sortField = getQueryParam(req, "sortField", "name") - ) - - must := []IfaceMap{ - { - "exists": IfaceMap{ - "field": EMAIL_GROUP_FIELD, - }, - }, - } - - if clearSearch := strings.TrimSpace(search); clearSearch != "" { - clearSearch = strings.ReplaceAll(clearSearch, " ", "* *") - must = append(must, IfaceMap{ - "query_string": IfaceMap{ - "default_field": "email_group.name", - "default_operator": "AND", - "query": fmt.Sprintf(`*%s*`, clearSearch), - }, - }) - } - - sortQueryMap := IfaceMap{ "name": IfaceMap{ "email_group.name.keyword": sortDirection } } - var sort interface{} - if sortQuery, ok := sortQueryMap[sortField]; ok { - sort = sortQuery - } - reqBody := IfaceMap{ - "from": from, - "size": size, - "sort": sort, - "query": IfaceMap{ - "bool": IfaceMap{ - "must": must, - }, - }, - } - - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - res, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(reqBody)) - if err != nil { - writeError(w, err) - return - } - - totalEmailGroups := res.GetTotal() - rawHits := res.Hits.Hits - emailGroups := []IfaceMap{} - - for _, hit := range rawHits { - newItem := IfaceMap{} - newItem["id"] = hit.ID - source := queryValue(hit.Source, EMAIL_GROUP_FIELD, nil) - if ms, ok := source.(map[string]interface{}); ok { - assignTo(newItem, ms) - } - emailGroups = append(emailGroups, newItem) - } - writeJSON(w, IfaceMap{ - "ok": true, - "emailGroups": emailGroups, - "totalEmailGroups": totalEmailGroups, - }, http.StatusOK) -} - -func GetEmailGroup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - emailGroupId := ps.ByName("emailGroupId") - - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - getRes, err := esClient.Get(orm.GetIndexName(alerting.Config{}), "", emailGroupId) - if err != nil { - writeError(w, err) - return - } - - emailGroup := queryValue(getRes.Source, EMAIL_GROUP_FIELD, nil) - if emailGroup == nil { - writeJSON(w, IfaceMap{ - "ok": false, - }, http.StatusOK) - return - } - - writeJSON(w, IfaceMap{ - "ok": true, - "resp": emailGroup, - }, http.StatusOK) - -} - diff --git a/service/alerting/elasticsearch.go b/service/alerting/elasticsearch.go deleted file mode 100644 index 667d1955..00000000 --- a/service/alerting/elasticsearch.go +++ /dev/null @@ -1,168 +0,0 @@ -package alerting - -import ( - "errors" - "fmt" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/util" - "net/http" - "runtime/debug" - "strings" -) - -type SearchBody struct { - Query IfaceMap `json:"query"` - Index string `json:"index"` - Size int `json:"size"` -} - -func Search(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ - var body = SearchBody{} - err := decodeJSON(req.Body, &body) - if err != nil { - writeError(w, err) - return - } - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - body.Query["size"] = body.Size - searchRes, err := esClient.SearchWithRawQueryDSL(body.Index, util.MustToJSONBytes(body.Query)) - if err != nil { - writeError(w, err) - return - } - - writeJSON(w, IfaceMap{ - "ok": true, - "resp": searchRes, - }, http.StatusOK) - -} - -func GetIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - esClient := elastic.GetClient(id) - if esClient == nil { - writeError(w, errors.New("cluster not found")) - return - } - - var body = struct{ - Index string `json:"index"` - }{} - err := decodeJSON(req.Body, &body) - if err != nil { - writeError(w, err) - return - } - - indexInfos, err := esClient.GetIndices( body.Index) - if err != nil { - writeError(w, err) - return - } - indices := make([]elastic.IndexInfo, 0, len(*indexInfos)) - for _, info := range *indexInfos { - indices = append(indices, info) - } - writeJSON(w, IfaceMap{ - "ok": true, - "resp": indices, - }, http.StatusOK) -} - -func GetAliases(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - defer func() { - if err := recover(); err != nil { - debug.PrintStack() - } - }() - id := ps.ByName("id") - esClient := elastic.GetClient(id) - if esClient == nil { - writeError(w, errors.New("cluster not found")) - return - } - - var body = struct{ - Alias string `json:"alias"` - }{} - err := decodeJSON(req.Body, &body) - if err != nil { - writeError(w, err) - return - } - //reqUrl := fmt.Sprintf("%s/_cat/aliases/%s", meta.GetActiveEndpoint(), body.Alias) - //params := map[string]string{ - // "format": "json", - // "h": "alias,index", - //} - res, err := esClient.GetAliases() - if err != nil { - writeError(w, err) - return - } - aliases := make([]elastic.AliasInfo, 0, len(*res)) - for _, alias := range *res { - aliases =append(aliases, alias) - } - writeJSON(w, IfaceMap{ - "ok": true, - "resp": aliases, - }, http.StatusOK) -} - -func GetMappings(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - esClient := elastic.GetClient(id) - if esClient == nil { - writeError(w, errors.New("cluster not found")) - return - } - - var body = struct{ - Index []string `json:"index"` - }{} - err := decodeJSON(req.Body, &body) - if err != nil { - writeError(w, err) - return - } - _, _, mappings, err := esClient.GetMapping(false, strings.Join(body.Index, ",")) - if err != nil { - writeError(w, err) - return - } - - writeJSON(w, IfaceMap{ - "ok": true, - "resp": mappings, - }, http.StatusOK) -} - -func GetSettings(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - - // /_cluster/settings?include_defaults=true - config := getDefaultConfig() - reqUrl := fmt.Sprintf("%s/_cluster/settings", config.Endpoint) - res, err := doRequest(reqUrl, http.MethodGet, map[string]string{ - "include_defaults": "true", - }, nil) - if err != nil { - writeError(w, err) - return - } - defer res.Body.Close() - var resBody = IfaceMap{} - err = decodeJSON(res.Body, &resBody) - if err != nil { - writeError(w, err) - return - } - - writeJSON(w, IfaceMap{ - "ok": true, - "resp": resBody, - }, http.StatusOK) -} \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go new file mode 100644 index 00000000..99a37d6d --- /dev/null +++ b/service/alerting/elasticsearch/engine.go @@ -0,0 +1,532 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package elasticsearch + +import ( + "bytes" + "context" + "fmt" + "github.com/Knetic/govaluate" + "github.com/buger/jsonparser" + log "github.com/cihub/seelog" + "github.com/valyala/fasttemplate" + "infini.sh/console/model/alerting" + "infini.sh/console/service/alerting/action" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "io" + "sort" + "strings" + "time" +) + +type Engine struct { + +} + +func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { + filter, err := engine.GenerateRawFilter(rule) + if err != nil { + return nil, err + } + //todo generate agg + if len(rule.Metrics.Items) == 0 { + return nil, fmt.Errorf("metric items should not be empty") + } + basicAggs := util.MapStr{} + for _, metricItem := range rule.Metrics.Items { + basicAggs[metricItem.Name] = util.MapStr{ + metricItem.Statistic: util.MapStr{ + "field": metricItem.Field, + }, + } + } + timeAggs := util.MapStr{ + "date_histogram": util.MapStr{ + "field": rule.Resource.TimeField, + "interval": rule.Metrics.PeriodInterval, + }, + "aggs": basicAggs, + } + var rootAggs util.MapStr + groups := rule.Metrics.Items[0].Group + if grpLength := len(groups); grpLength > 0 { + var lastGroupAgg util.MapStr + + for i := grpLength-1; i>=0; i-- { + groupAgg := util.MapStr{ + "terms": util.MapStr{ + "field": groups[i], + "size": 500, + }, + } + groupID := util.GetUUID() + if lastGroupAgg != nil { + groupAgg["aggs"] = util.MapStr{ + groupID: lastGroupAgg, + } + }else{ + groupAgg["aggs"] = util.MapStr{ + "time_buckets": timeAggs, + } + } + + lastGroupAgg = groupAgg + + } + rootAggs = util.MapStr{ + util.GetUUID(): lastGroupAgg, + } + }else{ + rootAggs = util.MapStr{ + "time_buckets": timeAggs, + } + } + + return util.MapStr{ + "size": 0, + "query": filter, + "aggs": rootAggs, + }, nil +} + +func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) { + query := map[string]interface{}{} + if rule.Resource.RawFilter != nil { + query = rule.Resource.RawFilter + } + intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval) + if err != nil { + return nil, err + } + var ( + units string + value int + ) + if intervalDuration / time.Hour >= 1 { + units = "h" + value = int(intervalDuration / time.Hour) + }else if intervalDuration / time.Minute >= 1{ + units = "m" + value = int(intervalDuration / time.Minute) + }else if intervalDuration / time.Second >= 1 { + units = "s" + value = int(intervalDuration / time.Second) + }else{ + return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval) + } + timeQuery := util.MapStr{ + "range": util.MapStr{ + rule.Resource.TimeField: util.MapStr{ + "gte": fmt.Sprintf("now-%d%s", value * 15, units), + }, + }, + } + + if boolQ, ok := query["bool"].(map[string]interface{}); ok { + if mustQ, ok := boolQ["must"]; ok { + + if mustArr, ok := mustQ.([]interface{}); ok { + boolQ["must"] = append(mustArr, timeQuery) + + }else{ + return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ) + } + }else{ + boolQ["must"] = []interface{}{ + timeQuery, + } + } + }else{ + query["bool"] = util.MapStr{ + "must": []interface{}{ + timeQuery, + }, + } + } + return query, nil +} + +func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, error){ + esClient := elastic.GetClient(rule.Resource.ID) + indexName := strings.Join(rule.Resource.Objects, ",") + queryDsl, err := engine.GenerateQuery(rule) + if err != nil { + return nil, err + } + queryDslBytes, err := util.ToJSONBytes(queryDsl) + if err != nil { + return nil, err + } + searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes) + if err != nil { + return nil, err + } + searchResult := map[string]interface{}{} + err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult) + if err != nil { + return nil, err + } + metricData := []alerting.MetricData{} + collectMetricData(searchResult["aggregations"], "", &metricData) + return metricData, nil +} +func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error){ + metricData, err := engine.ExecuteQuery(rule) + if err != nil { + return nil, err + } + var conditionResults []alerting.ConditionResult + for _, md := range metricData { + var targetData alerting.MetricData + if len(rule.Metrics.Items) == 1 { + targetData = md + }else{ + targetData = alerting.MetricData{ + GroupValues: md.GroupValues, + Data: map[string][]alerting.TimeMetricData{}, + } + expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) + if err != nil { + return nil, err + } + dataLength := 0 + for _, v := range md.Data { + dataLength = len(v) + break + } + DataLoop: + for i := 0; i < dataLength; i++ { + parameters := map[string]interface{}{ + } + var timestamp interface{} + for k, v := range md.Data { + //drop nil value bucket + if v[i][1] == nil { + continue DataLoop + } + parameters[k] = v[i][1] + timestamp = v[i][0] + } + result, err := expression.Evaluate(parameters) + if err != nil { + return nil, err + } + targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, result}) + } + } + sort.Slice(rule.Conditions.Items, func(i, j int) bool { + return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity] + }) + LoopCondition: + for _, cond := range rule.Conditions.Items { + conditionExpression := "" + valueLength := len(cond.Values) + if valueLength == 0 { + return nil, fmt.Errorf("condition values: %v should not be empty", cond.Values) + } + switch cond.Operator { + case "equals": + conditionExpression = fmt.Sprintf("result == %v", cond.Values[0]) + case "gte": + conditionExpression = fmt.Sprintf("result >= %v", cond.Values[0]) + case "lte": + conditionExpression = fmt.Sprintf("result <= %v", cond.Values[0]) + case "gt": + conditionExpression = fmt.Sprintf("result > %v", cond.Values[0]) + case "lt": + conditionExpression = fmt.Sprintf("result < %v", cond.Values[0]) + case "range": + if valueLength != 2 { + return nil, fmt.Errorf("length of %s condition values should be 2", cond.Operator) + } + conditionExpression = fmt.Sprintf("result >= %v && result <= %v", cond.Values[0], cond.Values[1]) + default: + return nil, fmt.Errorf("unsupport condition operator: %s", cond.Operator) + } + expression, err := govaluate.NewEvaluableExpression(conditionExpression) + if err != nil { + return nil, err + } + dataLength := 0 + dataKey := "" + for k, v := range targetData.Data { + dataLength = len(v) + dataKey = k + } + triggerCount := 0 + for i := 0; i < dataLength; i++ { + conditionResult, err := expression.Evaluate(map[string]interface{}{ + "result": targetData.Data[dataKey][i][1], + }) + if err != nil { + return nil, err + } + if conditionResult == true { + triggerCount += 1 + }else { + triggerCount = 0 + } + if triggerCount >= cond.MinimumPeriodMatch { + log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues) + conditionResults = append(conditionResults, alerting.ConditionResult{ + GroupValues: targetData.GroupValues, + ConditionItem: &cond, + }) + break LoopCondition + } + } + + } + } + return conditionResults, nil +} +func (engine *Engine) Do(rule *alerting.Rule) error { + log.Tracef("start check condition of rule %s", rule.ID) + conditionResults, err := engine.CheckCondition(rule) + if err != nil { + return err + } + lastAlertItem := alerting.Alert{} + err = getLastAlert(rule.ID, rule.Resource.ID, &lastAlertItem) + if err != nil { + return err + } + var alertItem *alerting.Alert + defer func() { + if alertItem != nil { + for _, actionResult := range alertItem.ActionExecutionResults { + if actionResult.Error != "" { + alertItem.Error = actionResult.Error + } + } + if alertItem.Error != ""{ + alertItem.State = alerting.AlertStateError + } + err = orm.Save(alertItem) + if err != nil { + log.Error(err) + } + } + }() + if len(conditionResults) == 0 { + if lastAlertItem.State != alerting.AlertStateNormal && lastAlertItem.ID != "" { + alertItem = &alerting.Alert{ + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), + RuleID: rule.ID, + ClusterID: rule.Resource.ID, + Expression: rule.Metrics.Expression, + Objects: rule.Resource.Objects, + Severity: "info", + Content: "", + State: alerting.AlertStateNormal, + } + } + return nil + }else{ + log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) + var ( + severity = conditionResults[0].ConditionItem.Severity + content string + ) + for _, conditionResult := range conditionResults { + if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] { + severity = conditionResult.ConditionItem.Severity + } + content += conditionResult.ConditionItem.Message + ";" + } + alertItem = &alerting.Alert{ + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), + RuleID: rule.ID, + ClusterID: rule.Resource.ID, + Expression: rule.Metrics.Expression, + Objects: rule.Resource.Objects, + Severity: severity, + Content: content, + State: alerting.AlertStateActive, + } + } + + if rule.Channels.AcceptTimeRange.Include(time.Now()) { + periodDuration, err := time.ParseDuration(rule.Channels.ThrottlePeriod) + if err != nil { + alertItem.Error = err.Error() + return err + } + + if time.Now().Sub(lastAlertItem.Created) > periodDuration { + actionResults := performChannels(rule.Channels.Normal, conditionResults) + alertItem.ActionExecutionResults = actionResults + + } + if rule.Channels.EscalationEnabled && lastAlertItem.State != alerting.AlertStateNormal { + periodDuration, err = time.ParseDuration(rule.Channels.EscalationThrottlePeriod) + if err != nil { + return err + } + if time.Now().Sub(lastAlertItem.Created) > periodDuration { + actionResults := performChannels(rule.Channels.Escalation, conditionResults) + alertItem.ActionExecutionResults = actionResults + } + } + } + return nil +} + +func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResult) []alerting.ActionExecutionResult { + var message string + for _, conditionResult := range conditionResults { + message += fmt.Sprintf("severity: %s\t message:%s\t groups:%v\t timestamp: %v;", conditionResult.ConditionItem.Severity, conditionResult.ConditionItem.Message, conditionResult.GroupValues, time.Now()) + } + ctx := util.MapStr{ + "ctx": util.MapStr{ + "message": message, + }, + } + var actionResults []alerting.ActionExecutionResult + for _, channel := range channels { + resBytes, err := performChannel(&channel, util.MustToJSONBytes(ctx)) + var errStr string + if err != nil { + errStr = err.Error() + } + actionResults = append(actionResults, alerting.ActionExecutionResult{ + Result: string(resBytes), + Error: errStr, + LastExecutionTime: int(time.Now().UnixNano()/1e6), + }) + } + return actionResults +} + +func resolveMessage(messageTemplate string, ctx []byte) ([]byte, error){ + msg := messageTemplate + tpl := fasttemplate.New(msg, "{{", "}}") + msgBuffer := bytes.NewBuffer(nil) + _, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ + keyParts := strings.Split(tag,".") + value, _, _, err := jsonparser.Get(ctx, keyParts...) + if err != nil { + return 0, err + } + return writer.Write(value) + }) + return msgBuffer.Bytes(), err +} + +func performChannel(channel *alerting.Channel, ctx []byte) ([]byte, error) { + var act action.Action + switch channel.Type { + + case alerting.ChannelWebhook: + message, err := resolveMessage(channel.Webhook.Body, ctx) + if err != nil { + return nil, err + } + act = &action.WebhookAction{ + Data: channel.Webhook, + Message: string(message), + } + default: + return nil, fmt.Errorf("unsupported action type: %s", channel.Type) + } + return act.Execute() +} +func (engine *Engine) GenerateTask(rule *alerting.Rule) func(ctx context.Context) { + return func(ctx context.Context) { + err := engine.Do(rule) + if err != nil { + log.Error(err) + } + } +} + + +func collectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData){ + if aggM, ok := agg.(map[string]interface{}); ok { + if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok { + if bks, ok := timeBks["buckets"].([]interface{}); ok { + md := alerting.MetricData{ + Data: map[string][]alerting.TimeMetricData{}, + GroupValues: strings.Split(groupValues, "*"), + } + for _, bk := range bks { + if bkM, ok := bk.(map[string]interface{}); ok{ + + for k, v := range bkM { + if k == "key" || k == "key_as_string" || k== "doc_count"{ + continue + } + if vm, ok := v.(map[string]interface{}); ok { + md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], vm["value"]}) + } + + } + } + + } + *metricData = append(*metricData,md) + } + + }else{ + for k, v := range aggM { + if k == "key" || k== "doc_count"{ + continue + } + if vm, ok := v.(map[string]interface{}); ok { + if bks, ok := vm["buckets"].([]interface{}); ok { + for _, bk := range bks { + if bkVal, ok := bk.(map[string]interface{}); ok { + currentGroup := bkVal["key"].(string) + newGroupValues := currentGroup + if groupValues != "" { + newGroupValues = fmt.Sprintf("%s*%s", groupValues, currentGroup) + } + collectMetricData(bk, newGroupValues, metricData) + } + + } + } + } + break + } + } + } +} + +func getLastAlert(ruleID, clusterID string, alertItem *alerting.Alert) error { + esClient := elastic.GetClient(clusterID) + queryDsl := util.MapStr{ + "size": 1, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "query": util.MapStr{ + "term": util.MapStr{ + "rule_id": util.MapStr{ + "value": ruleID, + }, + }, + }, + } + searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alertItem), util.MustToJSONBytes(queryDsl) ) + if err != nil { + return err + } + if len(searchRes.Hits.Hits) == 0 { + return nil + } + alertBytes := util.MustToJSONBytes(searchRes.Hits.Hits[0].Source) + return util.FromJSONBytes(alertBytes, alertItem) +} \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go new file mode 100644 index 00000000..d8409974 --- /dev/null +++ b/service/alerting/elasticsearch/engine_test.go @@ -0,0 +1,119 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package elasticsearch + +import ( + "fmt" + "infini.sh/console/model/alerting" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + "sort" + "testing" + "time" +) + +func TestEngine( t *testing.T) { + rule := alerting.Rule{ + ORMObjectBase: orm.ORMObjectBase{ + ID: util.GetUUID(), + Created: time.Now(), + Updated: time.Now(), + }, + Enabled: true, + Resource: alerting.Resource{ + ID: "c8i18llath2blrusdjng", + Type: "elasticsearch", + Objects: []string{".infini_metrics*"}, + TimeField: "timestamp", + Filter: alerting.Filter{ + And: []alerting.FilterQuery{ + {Field: "timestamp", Operator: "gte", Values: []string{"now-15m"}}, + //{Field: "payload.elasticsearch.cluster_health.status", Operator: "equals", Values: []string{"red"}}, + }, + }, + RawFilter: map[string]interface{}{ + "bool": util.MapStr{ + "must": []util.MapStr{ + //{ + // "range": util.MapStr{ + // "timestamp": util.MapStr{ + // "gte": "now-15m", + // }, + // }, + //}, + { + "term": util.MapStr{ + "metadata.labels.cluster_id": util.MapStr{ + "value": "xxx", + }, + }, + }, + }, + }, + }, + }, + + Metrics: alerting.Metric{ + PeriodInterval: "1m", + MaxPeriods: 15, + Items: []alerting.MetricItem{ + {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, + }, + Formula: "a/b*100", + //Expression: "min(fs.free_in_bytes)/max(fs.total_in_bytes)*100", + }, + Conditions: alerting.Condition{ + Operator: "any", + Items: []alerting.ConditionItem{ + {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"10"}, Severity: "error", Message: "磁盘可用率小于10%"}, + {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"20"}, Severity: "warning", Message: "磁盘可用率小于20%"}, + }, + }, + + Channels: alerting.RuleChannel{ + Normal: []alerting.Channel{ + {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ + HeaderParams: map[string]string{ + "Content-Type": "application/json", + }, + Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, + Method: http.MethodPost, + URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX", + }}, + }, + Escalation: []alerting.Channel{ + {Type: alerting.ChannelWebhook, Name: "微信", Webhook: &alerting.CustomWebhook{ + HeaderParams: map[string]string{ + "Content-Type": "application/json", + }, + Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, + Method: http.MethodPost, + URL: "https://oapi.weixin.com/robot/send?access_token=XXXXXX", + }}, + }, + ThrottlePeriod: "1h", + AcceptTimeRange: alerting.TimeRange{ + Start: "8:00", + End: "21:00", + }, + EscalationEnabled: true, + EscalationThrottlePeriod: "30m", + }, + } + //eng := &Engine{} + //filter, err := eng.GenerateQuery(&rule) + ////result, err := eng.ExecuteQuery(&rule) + //if err != nil { + // t.Fatal(err) + //} + sort.Slice(rule.Conditions.Items, func(i, j int) bool { + return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity] + }) + fmt.Println(rule.Conditions.Items) + + //fmt.Println(util.MustToJSON(filter)) +} diff --git a/service/alerting/elasticsearch/init.go b/service/alerting/elasticsearch/init.go new file mode 100644 index 00000000..65ac0b21 --- /dev/null +++ b/service/alerting/elasticsearch/init.go @@ -0,0 +1,12 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package elasticsearch + +import "infini.sh/console/service/alerting" + +func init(){ + eng := Engine{} + alerting.RegistEngine("elasticsearch", &eng) +} diff --git a/service/alerting/engine.go b/service/alerting/engine.go new file mode 100644 index 00000000..2b7c6a5b --- /dev/null +++ b/service/alerting/engine.go @@ -0,0 +1,39 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "context" + "fmt" + "infini.sh/console/model/alerting" + "sync" +) + +type Engine interface { + GenerateQuery(rule *alerting.Rule) (interface{}, error) + ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, error) + CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error) + GenerateTask(rule *alerting.Rule) func(ctx context.Context) +} + +var ( + alertEngines = map[string] Engine{} + alertEnginesMutex = sync.RWMutex{} +) +func RegistEngine(typ string, engine Engine) { + alertEnginesMutex.Lock() + defer alertEnginesMutex.Unlock() + alertEngines[typ] = engine +} + +func GetEngine(typ string) Engine { + alertEnginesMutex.RLock() + eng, ok := alertEngines[typ] + alertEnginesMutex.RUnlock() + if !ok { + panic(fmt.Sprintf("alert engine of type: %s not found", typ)) + } + return eng +} \ No newline at end of file diff --git a/service/alerting/event.go b/service/alerting/event.go deleted file mode 100644 index e0386f50..00000000 --- a/service/alerting/event.go +++ /dev/null @@ -1,33 +0,0 @@ -package alerting - -import ( - "github.com/buger/jsonparser" - "strconv" - "strings" -) - -type MonitorEvent struct { - Fields []byte -} - -func (ev *MonitorEvent) GetValue(key string) (interface{}, error) { - keyParts := strings.Split(key, ".")//todo whether ctx field contains dot - val, valType, _, err := jsonparser.Get(ev.Fields, keyParts...) - if err != nil { - return val, err - } - switch valType { - case jsonparser.String: - return string(val), nil - case jsonparser.Number: - - return strconv.Atoi(string(val)) - } - return nil, nil -} - -//func SplitFieldsKey(key, sep string) []string { -// sr := strings.NewReader(key) -// if ch, size, sr.ReadRune() -//} - diff --git a/service/alerting/monitor.go b/service/alerting/monitor.go deleted file mode 100644 index 1af864d2..00000000 --- a/service/alerting/monitor.go +++ /dev/null @@ -1,707 +0,0 @@ -package alerting - -import ( - "bytes" - "errors" - "fmt" - "infini.sh/console/model/alerting" - alertUtil "infini.sh/console/service/alerting/util" - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "io" - "net/http" - "src/github.com/buger/jsonparser" - "src/github.com/valyala/fasttemplate" - "strconv" - "strings" - "time" -) - -func GetMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - conf := elastic.GetConfig(id) - if conf == nil { - writeError(w, errors.New("cluster not found")) - return - } - - mid := ps.ByName("monitorID") - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - res, err := esClient.Get(orm.GetIndexName(alerting.Config{}), "", mid) - if err != nil { - writeError(w, err) - return - } - - if !res.Found { - writeError(w, errors.New("monitor not found")) - return - } - - queryDSL := ` { - "size": 0, - "query": { - "bool": { - "must": { - "term": { - "monitor_id": "%s" - } - } - } - }, - "aggs": { - "active_count": { - "terms": { - "field": "state" - } - }, - "24_hour_count": { - "date_range": { - "field": "start_time", - "ranges": [{ "from": "now-24h/h" }] - } - } - } - }` - queryDSL = fmt.Sprintf(queryDSL, mid) - searchRes, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(INDEX_ALL_ALERTS), []byte(queryDSL)) - if err != nil { - writeError(w, err) - return - } - var dayCountBuckets interface{} - if agg, ok := searchRes.Aggregations["24_hour_count"]; ok { - dayCountBuckets = agg.Buckets - } - dayCount := 0 - if dcb, ok := dayCountBuckets.([]elastic.BucketBase); ok { - dayCount = int(dcb[0]["doc_count"].(float64)) - } - - var activeBuckets interface{} - if agg, ok := searchRes.Aggregations["active_count"]; ok { - activeBuckets = agg.Buckets - } - activeCount := 0 - if ab, ok := activeBuckets.([]elastic.BucketBase); ok { - for _, bk := range ab { - if bk["key"].(string) == "ACTIVE" { - activeCount = int(bk["doc_count"].(float64)) - break - } - } - } - monitor := queryValue(res.Source, "monitor", nil) - writeJSON(w, IfaceMap{ - "ok": true, - "resp": monitor, - "activeCount": activeCount, - "dayCount": dayCount, - "version": res.Version, - }, http.StatusOK) -} - -func GetMonitors(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - conf := elastic.GetConfig(id) - if conf == nil { - writeError(w, errors.New("cluster not found")) - return - } - var ( - from = getQueryParam(req, "from") - size = getQueryParam(req, "size") - search = getQueryParam(req, "search") - sortDirection = getQueryParam(req, "sortDirection") - sortField = getQueryParam(req, "sortField") - state = getQueryParam(req, "state") - must = []IfaceMap{ - { - "match": IfaceMap{ - "cluster_id": id, - }, - }, - { - "exists": IfaceMap{ - "field": MONITOR_FIELD, - }, - }, - } - ) - if clearSearch := strings.TrimSpace(search); clearSearch != "" { - clearSearch = strings.ReplaceAll(clearSearch, " ", "* *") - must = append(must, IfaceMap{ - "query_string": IfaceMap{ - //"default_field": "monitor.name", - "default_operator": "AND", - "query": fmt.Sprintf("*%s*", clearSearch), - }, - }) - } - var filter = []IfaceMap{ - { "term": IfaceMap{ "monitor.type": "monitor" }}, - } - if state != "all" { - filter = append(filter, IfaceMap{ - "term": IfaceMap{ "monitor.enabled": state == "enabled" }, - }) - } - var monitorSorts = IfaceMap{ "name": "monitor.name.keyword" } - var ( - intSize int - intFrom int - ) - intSize, _ = strconv.Atoi(size) - if intSize < 0 { - intSize = 1000 - } - intFrom, _ = strconv.Atoi(from) - if intFrom < 0 { - intFrom = 0 - } - - sortPageData := IfaceMap{ - "size": intSize, - "from": intFrom, - } - if msort, ok := monitorSorts[sortField]; ok { - sortPageData["sort"] = []IfaceMap{ - { msort.(string): sortDirection }, - } - } - var params = IfaceMap{ - "query": IfaceMap{ - "bool": IfaceMap{ - "filter": filter, - "must": must, - "must_not": []IfaceMap{ - { - "exists": IfaceMap{ - "field": "monitor.status", - }, - }, - }, - }, - }, - } - assignTo(params, sortPageData) - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - resBody, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(alerting.Config{}), util.MustToJSONBytes(params)) - if err != nil { - writeError(w, err) - return - } - - totalMonitors := resBody.GetTotal() - var monitors []IfaceMap - var hits = resBody.Hits.Hits - monitorIDs := []interface{}{} - monitorMap := map[string]int{} - for i, hit := range hits { - monitorIDs = append(monitorIDs, hit.ID) - monitor := hit.Source["monitor"].(map[string]interface{}) - monitorMap[hit.ID] = i - monitors = append(monitors, IfaceMap{ - "id": hit.ID, - //"version": hit., - "name": monitor["name"], - "enabled": monitor["enabled"], - "monitor": monitor, - }) - } - - - - aggsOrderData := IfaceMap{} - aggsSorts := IfaceMap{ - "active": "active", - "acknowledged": "acknowledged", - "errors": "errors", - "ignored": "ignored", - "lastNotificationTime": "last_notification_time", - } - - if sortF, ok := aggsSorts[sortField]; ok { - aggsOrderData["order"] = IfaceMap{ sortF.(string): sortDirection } - } - - var termsMap = IfaceMap{ - "field": "monitor_id", - "size": intFrom + intSize, - } - assignTo(termsMap, aggsOrderData) - var aggsParams = IfaceMap{ - "size": 0, - "query": IfaceMap{ "terms": IfaceMap{ "monitor_id": monitorIDs } }, - "aggregations": IfaceMap{ - "uniq_monitor_ids": IfaceMap{ - "terms": termsMap, - "aggregations": IfaceMap{ - "active": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_ACTIVE } } }, - "acknowledged": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_ACKNOWLEDGED } } }, - "errors": IfaceMap{ "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_ERROR } } }, - "ignored": IfaceMap{ - "filter": IfaceMap{ - "bool": IfaceMap{ - "filter": IfaceMap{ "term": IfaceMap{ "state": ALERT_COMPLETED } }, - "must_not": IfaceMap{ "exists": IfaceMap{ "field": "acknowledged_time" } }, - }, - }, - }, - "last_notification_time": IfaceMap{ "max": IfaceMap{ "field": "last_notification_time" } }, - "latest_alert": IfaceMap{ - "top_hits": IfaceMap{ - "size": 1, - "sort": []IfaceMap{ { "start_time": IfaceMap{ "order": "desc" }} }, - "_source": IfaceMap{ - "includes": []string{"last_notification_time", "trigger_name"}, - }, - }, - }, - }, - }, - }, - } - - searchRes, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(INDEX_ALL_ALERTS), util.MustToJSONBytes(aggsParams)) - if err != nil { - writeError(w, err) - return - } - - var buckets interface{} - if agg, ok := searchRes.Aggregations["uniq_monitor_ids"]; ok { - buckets = agg.Buckets - } - if bks, ok := buckets.([]elastic.BucketBase); ok { - for _, bk := range bks { - id := queryValue(bk, "key", "") - monitor := monitors[monitorMap[id.(string)]] - monitor["lastNotificationTime"] = queryValue(bk, "last_notification_time.value", "") - monitor["ignored"] = queryValue(bk, "ignored.doc_count", 0) - alertHits := queryValue(bk, "latest_alert.hits.hits", nil) - var latestAlert interface{} - if hits, ok := alertHits.([]interface{}); ok && len(hits) > 0 { - if hitMap, ok := hits[0].(map[string]interface{}); ok { - latestAlert = queryValue(hitMap, "_source.trigger_name", nil) - } - } - monitor["latestAlert"] = latestAlert - monitor["active"] = queryValue(bk, "active.doc_count", 0) - monitor["errors"] = queryValue(bk, "errors.doc_count", 0) - monitor["acknowledged"] = queryValue(bk, "acknowledged.doc_count", 0) - monitor["currentTime"] = time.Now().UnixNano() / 1e6 - delete(monitorMap, id.(string)) - } - } - - for _, idx := range monitorMap { - assignTo(monitors[idx], IfaceMap{ - "lastNotificationTime": nil, - "ignored": 0, - "active": 0, - "acknowledged": 0, - "errors": 0, - "latestAlert": "--", - "currentTime": time.Now().UnixNano()/1e6, - }) - } - - results := monitors - - //if _, ok := monitorSorts[sortField]; !ok { - // results = results[intFrom: intFrom + intSize] - //} - writeJSON(w, IfaceMap{ - "ok": true, - "monitors": results, - "totalMonitors": totalMonitors, - }, http.StatusOK) -} - - -func CreateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - conf := elastic.GetConfig(id) - if conf == nil { - writeError(w, errors.New("cluster not found")) - return - } - - var monitor = &alerting.Monitor{} - err := decodeJSON(req.Body, &monitor) - if err != nil { - writeError(w, err) - return - } - - monitor.LastUpdateTime = time.Now().UnixNano()/1e6 - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - indexName := orm.GetIndexName(alerting.Config{}) - indexRes, err := esClient.Index(indexName,"",util.GetUUID(),IfaceMap{ - "cluster_id": id, - MONITOR_FIELD: monitor, - },"wait_for") - if err != nil { - writeError(w, err) - return - } - - - monitorId := indexRes.ID - GetScheduler().AddMonitor(monitorId, &ScheduleMonitor{ - Monitor: monitor, - ClusterID: id, - }) - - writeJSON(w, IfaceMap{ - "ok": true, - "resp": IfaceMap{ - MONITOR_FIELD: monitor, - "_id": monitorId, - "_version": indexRes.Version, - }, - }, http.StatusOK) -} - -func DeleteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - conf := elastic.GetConfig(id) - if conf == nil { - writeError(w, errors.New("cluster not found")) - return - } - - monitorId := ps.ByName("monitorID") - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - //change alert state to deleted and move alert to history - query := IfaceMap{ - "bool": IfaceMap{ - "must": []IfaceMap{ - {"match": IfaceMap{ - "monitor_id": monitorId, - }}, - }, - }, - } - reqBody := IfaceMap{ - "source": IfaceMap{ - "index": getAlertIndexName(INDEX_ALERT), - "query": query, - }, - "dest": IfaceMap{ - "index": getAlertIndexName(INDEX_ALERT_HISTORY), - }, - "script": IfaceMap{ - "source": fmt.Sprintf("ctx._source['state'] = '%s';", ALERT_DELETED), - }, - } - _, err := esClient.Reindex(util.MustToJSONBytes(reqBody)) - if err != nil { - writeError(w, err) - return - } - //delete alert - _, err = esClient.DeleteByQuery(getAlertIndexName(INDEX_ALERT), util.MustToJSONBytes(IfaceMap{ - "query" : query, - }) ) - - if err != nil { - writeError(w, err) - return - } - - //logic delete monitor - var indexName = orm.GetIndexName(alerting.Config{}) - getRes, err := esClient.Get(indexName, "", monitorId) - if err != nil { - writeError(w, err) - return - } - source := util.MapStr(getRes.Source) - source.Put("monitor.status", "DELETED") - indexRes, err := esClient.Index(indexName, "", monitorId, source, "wait_for") - if err != nil { - writeError(w, err) - return - } - - var isOk = false - if indexRes.Result == "updated" { - isOk = true - GetScheduler().RemoveMonitor(monitorId) - } - writeJSON(w, IfaceMap{ - "ok": isOk, - }, http.StatusOK) - -} - -func UpdateMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - conf := elastic.GetConfig(id) - if conf == nil { - writeError(w, errors.New("cluster not found")) - return - } - - monitorId := ps.ByName("monitorID") - - var monitor = &alerting.Monitor{} - err := decodeJSON(req.Body, &monitor) - if err != nil { - writeError(w, err) - return - } - if len(monitor.Triggers) > 0 { - for i, trigger := range monitor.Triggers { - if trigger.ID == "" { - monitor.Triggers[i].ID = util.GetUUID() - } - if len(trigger.Actions) > 0 { - for j, action := range trigger.Actions { - if action.ID == ""{ - monitor.Triggers[i].Actions[j].ID = util.GetUUID() - } - } - } - } - } - monitor.LastUpdateTime = time.Now().UnixNano()/1e6 - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - indexName := orm.GetIndexName(alerting.Config{}) - getRes, err := esClient.Get(indexName, "", monitorId) - if err != nil { - writeError(w, err) - return - } - if !getRes.Found { - w.WriteHeader(http.StatusNotFound) - return - } - - indexRes, err := esClient.Index(indexName, "", monitorId, IfaceMap{ - "cluster_id": getRes.Source["cluster_id"], - MONITOR_FIELD: monitor, - }, "wait_for") - if err != nil { - writeError(w, err) - return - } - - GetScheduler().UpdateMonitor(monitorId, &ScheduleMonitor{ - Monitor: monitor, - ClusterID: id, - }) - writeJSON(w, IfaceMap{ - "ok": true, - "version": indexRes.Version, - "id": indexRes.ID, - }, http.StatusOK) - -} - -func AcknowledgeAlerts(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - conf := elastic.GetConfig(id) - if conf == nil { - writeError(w, errors.New("cluster not found")) - return - } - - monitorId := ps.ByName("monitorID") - var ackAlertsReq = struct { - AlertIDs []string `json:"alerts"` - }{} - err := decodeJSON(req.Body, &ackAlertsReq) - if err != nil { - writeError(w, err) - return - } - - config := getDefaultConfig() - reqBody := IfaceMap{ - "query": IfaceMap{ - "bool": IfaceMap{ - "must":[]IfaceMap{ - {"match": IfaceMap{ - "monitor_id": monitorId, - }}, - { - "terms": IfaceMap{ - "_id": ackAlertsReq.AlertIDs, - }, - }, - }, - }, - - }, - "script": IfaceMap{ - "source": fmt.Sprintf("ctx._source['state'] = '%s';ctx._source['acknowledged_time'] = %dL;", ALERT_ACKNOWLEDGED, time.Now().UnixNano()/1e6), - }, - } - esClient := elastic.GetClient(config.ID) - res, err := esClient.UpdateByQuery( getAlertIndexName(INDEX_ALERT), util.MustToJSONBytes(reqBody)) - if err != nil { - writeError(w, err) - return - } - - var isOk = false - if len(res.Failures) == 0 { - isOk = true - } - - writeJSON(w, IfaceMap{ - "ok": isOk, - "resp": IfaceMap{ - "success": ackAlertsReq.AlertIDs, - "failed": res.Failures, - }, - }, http.StatusOK) - -} - -func ExecuteMonitor(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - esClient := elastic.GetClient(id) - if esClient == nil { - writeError(w, errors.New("cluster not found")) - return - } - var ( - dryrun = getQueryParam(req, "dryrun", "true") - ) - - var monitor = &alerting.Monitor{} - err := decodeJSON(req.Body, &monitor) - if err != nil { - writeError(w, err) - return - } - - if monitor.Name == "TEMP_MONITOR"{ - - } - if len(monitor.Inputs) == 0 { - writeError(w, errors.New("no input")) - return - } - periodStart := time.Now() - period := alertUtil.GetMonitorPeriod(periodStart, &monitor.Schedule) - //strQuery := string(util.MustToJSONBytes(monitor.Inputs[0].Search.Query)) - //resolveQuery(strQuery, IfaceMap{ - // - // "periodStart": period.Start, - // "periodEnd": period.End, - //}) - - queryDsl := util.MustToJSONBytes(monitor.Inputs[0].Search.Query) - searchRes, err := esClient.SearchWithRawQueryDSL(strings.Join(monitor.Inputs[0].Search.Indices, ","), queryDsl) - if err != nil { - writeError(w, err) - return - } - - var resBody = IfaceMap{} - util.MustFromJSONBytes(searchRes.RawResult.Body, &resBody) - - var triggerResults = IfaceMap{} - sm := ScheduleMonitor{ - Monitor: monitor, - } - - var monitorCtx []byte - if dryrun == "true" { - for _, trigger := range monitor.Triggers { - triggerResult := IfaceMap{ - "error": nil, - "action_results": IfaceMap{}, - "name": trigger.Name, - } - monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{ - "periodStart": period.Start, - "periodEnd": period.End, - }) - if err != nil { - triggerResult["error"] = err.Error() - triggerResults[trigger.ID] = triggerResult - continue - } - isTrigger, rerr := resolveTriggerResult(&trigger, monitorCtx) - triggerResult["triggered"] = isTrigger - if rerr != nil { - triggerResult["error"] = rerr.Error() - } - if trigger.ID == "" { - trigger.ID = util.GetUUID() - } - triggerResults[trigger.ID] = triggerResult - } - }else{ - LOOP_TRIGGER: - for _, trigger := range monitor.Triggers { - monitorCtx, err = createMonitorContext(&trigger, resBody, &sm, IfaceMap{ - "periodStart": period.Start, - "periodEnd": period.End, - }) - if err != nil { - break - } - for _, action := range trigger.Actions { - _, err = doAction(action, monitorCtx) - if err != nil { - break LOOP_TRIGGER - } - } - } - } - - - var ( - ok = true - errStr string - ) - if err != nil { - ok = false - errStr = err.Error() - } - writeJSON(w, IfaceMap{ - "ok": ok, - "resp": IfaceMap{ - "error": errStr, - "monitor_name": monitor.Name, - "input_results": IfaceMap{ - "error": nil, - "results": []IfaceMap{resBody}, - }, - "trigger_results": triggerResults, - "period_start": period.Start, - "period_end": period.End, - }, - }, http.StatusOK) -} - -func resolveQuery(query string, ctx IfaceMap ) ([]byte, error){ - ctxBytes := util.MustToJSONBytes(ctx) - msg := query - tpl := fasttemplate.New(msg, "{{", "}}") - msgBuffer := bytes.NewBuffer(nil) - _, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ - keyParts := strings.Split(tag,".") - value, _, _, err := jsonparser.Get(ctxBytes, keyParts...) - if err != nil { - return 0, err - } - return writer.Write(value) - }) - return msgBuffer.Bytes(), err - //return json.Marshal(msg) -} \ No newline at end of file diff --git a/service/alerting/overview.go b/service/alerting/overview.go deleted file mode 100644 index adab0c7f..00000000 --- a/service/alerting/overview.go +++ /dev/null @@ -1,188 +0,0 @@ -package alerting - -import ( - httprouter "infini.sh/framework/core/api/router" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "net/http" -) - -func GetAlertOverview(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - alertDayMetricData, err := getLastAlertDayCount() - if err != nil { - writeError(w, err) - return - } - topTenData, err := getTopTenAlertCluster() - if err != nil { - writeError(w, err) - return - } - stateCount, err := getAlertByState() - if err != nil { - writeError(w, err) - return - } - writeJSON(w, IfaceMap{ - "metrics": IfaceMap{ - "last_tree_month": IfaceMap{ - "data": alertDayMetricData, - "day": 90, - }, - "top_ten_cluster": IfaceMap{ - "data": topTenData, - }, - }, - "state_count": stateCount, - "ok": true, - }, http.StatusOK) -} - -func getAlertByState() (IfaceMap, error){ - reqBody := IfaceMap{ - "size": 0, - "aggs": IfaceMap{ - "alert_count_by_state": IfaceMap{ - "terms": IfaceMap{ - "field": "state", - "size": 10, - }, - }, - }, - } - buckets, err := queryMetricBuckets(reqBody, "alert_count_by_state", INDEX_ALERT) - if err != nil { - return nil, err - } - var metricData = IfaceMap{} - if bks, ok := buckets.([]elastic.BucketBase); ok { - for _, bk := range bks { - metricData[queryValue(bk, "key", "").(string)]= queryValue(bk, "doc_count", 0) - } - } - return metricData, nil -} - -func queryMetricBuckets(reqBody IfaceMap, metricKey, indexName string)(interface{}, error){ - conf := getDefaultConfig() - esClient := elastic.GetClient(conf.ID) - res, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(indexName), util.MustToJSONBytes(reqBody)) - if err != nil { - return nil, err - } - var buckets interface{} - if agg, ok := res.Aggregations[metricKey]; ok { - buckets = agg.Buckets - } - return buckets, nil -} - -func getTopTenAlertCluster()(interface{}, error){ - reqBody := IfaceMap{ - "size": 0, - "aggs": IfaceMap{ - "alert_top_ten": IfaceMap{ - "terms": IfaceMap{ - "field": "cluster_id", - "size": 10, - }, - "aggs": IfaceMap{ - "group_by_state": IfaceMap{ - "terms": IfaceMap{ - "field": "state", - "size": 5, - }, - }, - }, - }, - }, - } - buckets, err := queryMetricBuckets(reqBody, "alert_top_ten", INDEX_ALL_ALERTS) - if err != nil { - return nil, err - } - var metricData []IfaceMap - var clusterIDs []interface{} - if bks, ok := buckets.([]elastic.BucketBase); ok { - for _, bk := range bks { - stateBuckets := queryValue(bk, "group_by_state.buckets", nil ) - key := queryValue(bk, "key", "" ) - clusterIDs = append(clusterIDs, key) - if stateBKS, ok := stateBuckets.([]interface{}); ok{ - for _, stateBK := range stateBKS { - if stateBKMap, ok := stateBK.(map[string]interface{}); ok { - metricData = append(metricData, IfaceMap{ - "x": key, - "y": queryValue(stateBKMap, "doc_count", 0), - "g": queryValue(stateBKMap, "key", ""), - }) - } - } - } - } - } - reqBody = IfaceMap{ - "_source": "name", - "query": IfaceMap{ - "terms": IfaceMap{ - "_id": clusterIDs, - }, - }, - } - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - resBody, err := esClient.SearchWithRawQueryDSL( orm.GetIndexName(elastic.ElasticsearchConfig{}), util.MustToJSONBytes(reqBody)) - if err != nil { - return nil, err - } - clusterMap := IfaceMap{} - for _, hit := range resBody.Hits.Hits { - clusterMap[hit.ID] = hit.Source["name"] - } - for _, d := range metricData { - if name, ok := clusterMap[d["x"].(string)]; ok { - d["x"] = name - } - } - return metricData, nil -} - -func getLastAlertDayCount() (interface{}, error){ - reqBody := IfaceMap{ - "size": 0, - "query": IfaceMap{ - "bool": IfaceMap{ - "filter": []IfaceMap{ - {"range": IfaceMap{ - "start_time": IfaceMap{ - "gte": "now-3M", - }, - }}, - }, - }, - }, - "aggs": IfaceMap{ - "alert_day_count": IfaceMap{ - "date_histogram": IfaceMap{ - "field": "start_time", - "interval": "day", - }, - }, - }, - } - buckets, err := queryMetricBuckets(reqBody, "alert_day_count", INDEX_ALL_ALERTS) - if err != nil { - return nil, err - } - var metricData []interface{} - if bks, ok := buckets.([]elastic.BucketBase); ok { - for _, bk := range bks { - metricData = append(metricData, []interface{}{ - queryValue(bk, "key", ""), - queryValue(bk, "doc_count", 0), - }) - } - } - return metricData, nil -} diff --git a/service/alerting/schedule.go b/service/alerting/schedule.go deleted file mode 100644 index fb862f7c..00000000 --- a/service/alerting/schedule.go +++ /dev/null @@ -1,569 +0,0 @@ -package alerting - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "github.com/buger/jsonparser" - log "github.com/cihub/seelog" - "github.com/elastic/go-ucfg/yaml" - cronlib "github.com/robfig/cron" - "github.com/valyala/fasttemplate" - "infini.sh/console/model/alerting" - "infini.sh/console/service/alerting/action" - util1 "infini.sh/console/service/alerting/util" - "infini.sh/framework/core/conditions" - "infini.sh/framework/core/elastic" - "infini.sh/framework/core/orm" - "infini.sh/framework/core/util" - "io" - "strings" - "sync" - "time" -) - -var alertScheduler *scheduler -var alertSchedulerOnce = &sync.Once{} - -func GetScheduler() *scheduler { - alertSchedulerOnce.Do(func(){ - alertScheduler = NewScheduler() - }) - return alertScheduler -} - -func NewScheduler() *scheduler{ - cr := cronlib.New(cronlib.WithParser(cronlib.NewParser( - cronlib.SecondOptional | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow | cronlib.Descriptor, - ))) - return &scheduler{ - cron: cr, - mu: &sync.Mutex{}, - } -} - -type scheduler struct{ - monitors sync.Map - cron *cronlib.Cron - IsStart bool - mu *sync.Mutex -} - -func (scd *scheduler) Start() error{ - scd.mu.Lock() - if scd.IsStart { - return nil - } - scd.mu.Unlock() - monitors, err := getEnabledMonitors() - if err != nil { - return err - } - for id, monitor := range monitors { - err = scd.AddMonitor(id, &monitor) - if err != nil { - return err - } - scd.monitors.Store(id, &monitor) - } - go scd.cron.Start() - return nil -} - -func (scd *scheduler) AddMonitor(key string, monitor *ScheduleMonitor) error{ - monitor.MonitorID = key - if _, ok := scd.monitors.Load(key); ok { - return errors.New("monitor already exists") - } - jobFunc := generateMonitorJob(monitor) - var cron *alerting.Cron - if monitor.Monitor.Schedule.Period != nil { - cron = convertPeriodToCronExpression( monitor.Monitor.Schedule.Period) - } - if monitor.Monitor.Schedule.Cron != nil { - cron = monitor.Monitor.Schedule.Cron - } - if cron != nil { - timezone := "" - if cron.Timezone != "" { - timezone = fmt.Sprintf("CRON_TZ=%s ", cron.Timezone) - } - //fmt.Println(timezone + cron.Expression) - entryID, err := scd.cron.AddFunc(timezone + cron.Expression, jobFunc) - if err != nil { - return err - } - monitor.EntryID = entryID - } - scd.monitors.Store(key, monitor) - return nil -} -func (scd *scheduler) Stop(){ - scd.monitors.Range(func (key, val interface{}) bool{ - monitor := val.(*ScheduleMonitor) - scd.cron.Remove(monitor.EntryID) - scd.monitors.Delete(key) - return true - }) - -} -func (scd *scheduler) RemoveMonitor(key string) bool{ - value, ok := scd.monitors.Load(key) - if ok && value != nil { - if monitor, ok := value.(*ScheduleMonitor); ok { - scd.cron.Remove(monitor.EntryID) - scd.monitors.Delete(key) - return ok - } - } - return ok -} - -func (scd *scheduler) UpdateMonitor(key string, monitor *ScheduleMonitor) error{ - scd.RemoveMonitor(key) - if monitor.Monitor.Enabled { - return scd.AddMonitor(key, monitor) - } - return nil -} - -func convertPeriodToCronExpression(period *alerting.Period) *alerting.Cron{ - var expression = "@every 1m" - switch period.Unit { - case "MINUTES": - expression = fmt.Sprintf("@every %dm", period.Interval) - case "HOURS": - expression = fmt.Sprintf("@every %dh", period.Interval) - case "DAYS": - expression = fmt.Sprintf("@every %dh", period.Interval * 24) - } - return &alerting.Cron{ - Expression: expression, - Timezone: "", - } -} - -type MonitorJob func() - -func generateMonitorJob(smt *ScheduleMonitor) MonitorJob{ - sm := *smt - return func() { - startTime := time.Now() - queryResult, err := getQueryResult(sm.ClusterID, &sm.Monitor.Inputs[0]) - if err != nil { - log.Error(err) - } - periods := util1.GetMonitorPeriod(startTime, &sm.Monitor.Schedule) - for _, trigger := range sm.Monitor.Triggers { - monitorCtx, err := createMonitorContext(&trigger, queryResult, &sm, IfaceMap{ - "periodStart": periods.Start, - "periodEnd": periods.End, - }) - if err != nil { - log.Error(err) - continue - } - isTrigger, err := resolveTriggerResult(&trigger, monitorCtx) - //fmt.Println("is triggered: ", isTrigger, err) - if err != nil { - log.Error(err) - continue - } - alertItem := alerting.Alert{ - TriggerId: trigger.ID, - TriggerName: trigger.Name, - MonitorId: sm.MonitorID, - MonitorName: sm.Monitor.Name, - StartTime: time.Now().UnixNano()/1e6, - Severity: trigger.Severity, - State: ALERT_COMPLETED, - ClusterID: sm.ClusterID, - ClusterName: elastic.GetMetadata(sm.ClusterID).Config.Name, - } - if !isTrigger { - endTime := time.Now().UnixNano()/1e6 - alertItem.EndTime = &endTime - err = saveAlertInfo(&alertItem) - if err != nil { - log.Error(err) - } - continue - } - //check ack state - lastAlert, err := getLastAlert(sm.MonitorID, trigger.ID, sm.ClusterID) - if err != nil { - log.Error(err) - continue - } - if lastAlert != nil && lastAlert["state"].(string) == ALERT_ACKNOWLEDGED { - continue - } - - alertItem.State = ALERT_ACTIVE - for _, act := range trigger.Actions { - actResult, err := doAction(act, monitorCtx) - var errMsg string - if err != nil { - errMsg = err.Error() - alertItem.ErrorMessage += errMsg - } - alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, alerting.ActionExecutionResult{ - ActionID: act.ID, - LastExecutionTime: alertItem.LastNotificationTime, - Error: errMsg, - Result: string(actResult), - }) - alertItem.LastNotificationTime = time.Now().UnixNano()/1e6 - - if alertItem.ErrorMessage != "" { - alertItem.State = ALERT_ERROR - } - err = saveAlertInfo(&alertItem) - if err != nil { - log.Error(err) - } - } - } - } -} - -func doAction(act alerting.Action, monitorCtx []byte) ([]byte, error) { - message, err := resolveMessage(act.MessageTemplate, monitorCtx) - if err != nil { - //alertItem.ErrorMessage = err.Error() - return nil, err - } - destination, err := resolveDestination(act.DestinationId) - if err != nil { - return nil, err - } - var tact action.Action - - switch destination.Type { - case action.ACTION_EMAIL: - sender, err := resolveEmailAccount(destination.Email.EmailAccountID) - if err != nil { - return nil, err - } - subject, err := resolveMessage(act.SubjectTemplate, monitorCtx) - if err != nil { - return nil, err - } - receiver, err := getEmailRecipient(destination.Email.Recipients) - if err != nil { - return nil, err - } - tact = &action.EmailAction{ - Message: string(message), - Subject: string(subject), - Sender: sender, - Receiver: receiver, - } - case action.ACTION_WEBHOOK: - tact = &action.WebhookAction{ - Data: &destination.CustomWebhook, - Message: string(message), - } - default: - return nil, fmt.Errorf("unsupported action type: %s", destination.Type) - } - return tact.Execute() -} - - -func getLastAlert(monitorID, triggerID, clusterID string) (map[string]interface{}, error) { - conf := getDefaultConfig() - esClient := elastic.GetClient(conf.ID) - reqBody := IfaceMap{ - "size": 1, - "query": IfaceMap{ - "bool": IfaceMap{ - "must": []IfaceMap{ - { - "match": IfaceMap{ - "monitor_id": monitorID, - }, - }, - { - "match": IfaceMap{ - "cluster_id": clusterID, - }, - }, - { - "match": IfaceMap{ - "trigger_id": triggerID, - }, - }, - }, - }, - - }, - "sort": []IfaceMap{ - {"start_time": IfaceMap{"order":"desc"}}, - }, - } - - resBody, err := esClient.SearchWithRawQueryDSL(getAlertIndexName(INDEX_ALERT), util.MustToJSONBytes(reqBody)) - if err != nil { - return nil, err - } - if len(resBody.Hits.Hits) > 0 { - return resBody.Hits.Hits[0].Source, nil - } - return nil, nil -} - -func saveAlertInfo(alertItem *alerting.Alert) error { - conf := getDefaultConfig() - esClient := elastic.GetClient(conf.ID) - indexName := getAlertIndexName(INDEX_ALERT) - reqBody := IfaceMap{ - "size": 1, - "query": IfaceMap{ - "bool": IfaceMap{ - "must": []IfaceMap{ - { - "match": IfaceMap{ - "monitor_id": alertItem.MonitorId, - }, - }, - //{ - // "match": IfaceMap{ - // "state": ALERT_ACTIVE, - // }, - //}, - { - "match": IfaceMap{ - "cluster_id": alertItem.ClusterID, - }, - }, - { - "match": IfaceMap{ - "trigger_id": alertItem.TriggerId, - }, - }, - }, - }, - - }, - } - resBody, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(reqBody)) - if err != nil { - return err - } - if len(resBody.Hits.Hits) == 0 { - if alertItem.State == ALERT_COMPLETED { - return nil - } - _, err = esClient.Index(indexName,"", util.GetUUID(), alertItem, "") - return err - } - currentState := queryValue(resBody.Hits.Hits[0].Source, "state", "").(string) - alertItem.Id = resBody.Hits.Hits[0].ID - if currentState != alertItem.State { - source := resBody.Hits.Hits[0].Source - source["end_time"] = time.Now().UnixNano()/1e6 - if alertItem.State == ALERT_COMPLETED { - if currentState == ALERT_ACTIVE { - source["state"] = ALERT_COMPLETED - } - } - esClient.Index( getAlertIndexName(INDEX_ALERT_HISTORY), "", alertItem.Id, source, "") - _,err = esClient.Delete(indexName, "", resBody.Hits.Hits[0].ID) - return err - } - alertItem.StartTime = int64(queryValue(resBody.Hits.Hits[0].Source, "start_time", 0).(float64)) - - _, err = esClient.Index(indexName, "", alertItem.Id, alertItem, "" ) - return err -} - -func getEmailRecipient(recipients []alerting.Recipient) ([]string, error){ - var emails []string - for _, recipient := range recipients { - if recipient.Type == "email" { - emails = append(emails, recipient.Email) - continue - } - if recipient.Type == "email_group" { - eg, err := resolveEmailGroup(recipient.EmailGroupID) - if err != nil { - return emails, err - } - for _, em := range eg.Emails { - if email, ok := em["email"].(string); ok { - emails = append(emails, email) - } - } - } - } - return emails, nil -} - -func resolveDestination(ID string)(*alerting.Destination, error){ - conf := getDefaultConfig() - esClient := elastic.GetClient(conf.ID) - res, err := esClient.Get( orm.GetIndexName(alerting.Config{}), "", ID) - if err != nil { - return nil,err - } - destination := &alerting.Destination{} - buf, _ := json.Marshal(queryValue(res.Source, DESTINATION_FIELD, IfaceMap{})) - _ = json.Unmarshal(buf, destination) - return destination, nil -} - -func resolveEmailAccount(ID string)(*alerting.EmailAccount, error){ - conf := getDefaultConfig() - esClient := elastic.GetClient(conf.ID) - res, err := esClient.Get( orm.GetIndexName(alerting.Config{}), "", ID) - if err != nil { - return nil,err - } - email := &alerting.EmailAccount{} - buf, _ := json.Marshal(queryValue(res.Source, EMAIL_ACCOUNT_FIELD, IfaceMap{})) - _ = json.Unmarshal(buf, email) - return email, nil -} - -func resolveEmailGroup(ID string)(*alerting.EmailGroup, error){ - conf := getDefaultConfig() - esClient := elastic.GetClient(conf.ID) - res, err := esClient.Get( orm.GetIndexName(alerting.Config{}), "", ID) - if err != nil { - return nil,err - } - - emailGroup := &alerting.EmailGroup{} - buf, _ := json.Marshal(queryValue(res.Source, EMAIL_GROUP_FIELD, IfaceMap{})) - err = json.Unmarshal(buf, emailGroup) - return emailGroup, err -} - -func getQueryResult(clusterID string, input *alerting.MonitorInput) (IfaceMap, error) { - esClient := elastic.GetClient(clusterID) - queryDsl := util.MustToJSONBytes(input.Search.Query) - searchRes, err := esClient.SearchWithRawQueryDSL(strings.Join(input.Search.Indices, ","), queryDsl) - if err != nil { - return nil, err - } - var resBody = IfaceMap{} - util.MustFromJSONBytes(searchRes.RawResult.Body, &resBody) - return resBody, nil -} - -func resolveMessage(messageTemplate IfaceMap, monitorCtx []byte ) ([]byte, error){ - msg := messageTemplate["source"].(string) - tpl := fasttemplate.New(msg, "{{", "}}") - msgBuffer := bytes.NewBuffer(nil) - _, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ - keyParts := strings.Split(tag,".") - value, _, _, err := jsonparser.Get(monitorCtx, keyParts...) - if err != nil { - return 0, err - } - return writer.Write(value) - }) - return msgBuffer.Bytes(), err - //return json.Marshal(msg) -} - -func createMonitorContext(trigger *alerting.Trigger, result IfaceMap, smt *ScheduleMonitor, extra IfaceMap) ([]byte, error){ - params := IfaceMap{ - "results": []interface{}{ - result, - }, - "trigger": trigger, - "monitor": smt.Monitor, - "cluster_id": smt.ClusterID, - } - assignTo(params, extra) - ctx := IfaceMap{ - "_ctx": params, - } - return json.Marshal(ctx) -} - -func resolveTriggerResult(trigger *alerting.Trigger, monitorCtx []byte ) (bool, error){ - source := queryValue(trigger.Condition, "script.source", "") - sourceBytes := []byte(source.(string)) - config, err := yaml.NewConfig(sourceBytes) - if err != nil { - return false, err - } - var boolConfig = &conditions.Config{} - err = config.Unpack(boolConfig) - if err != nil { - return false, err - } - - cond, err := conditions.NewCondition(boolConfig) - if err != nil { - return false, err - } - - ev := &MonitorEvent{ - Fields: monitorCtx, - } - return cond.Check(ev), nil -} - -func getEnabledMonitors() (map[string]ScheduleMonitor, error){ - config := getDefaultConfig() - esClient := elastic.GetClient(config.ID) - must := []IfaceMap{ - { - "exists": IfaceMap{ - "field": MONITOR_FIELD, - }, - }, - { - "match": IfaceMap{ - MONITOR_FIELD+".enabled": true, - }, - }, - } - reqBody := IfaceMap{ - "size": 100, - "query": IfaceMap{ - "bool": IfaceMap{ - "must": must, - "must_not": []IfaceMap{ - { - "match": IfaceMap{ - MONITOR_FIELD+".status": "DELETED", - }, - }, - }, - }, - }, - } - queryDsl := util.MustToJSONBytes(reqBody) - resBody, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(alerting.Config{}),queryDsl) - if err != nil { - return nil, err - } - if len(resBody.Hits.Hits) == 0 { - return nil, nil - } - var monitors = map[string]ScheduleMonitor{} - for _, hit := range resBody.Hits.Hits { - monitor := &alerting.Monitor{} - buf, _ := json.Marshal(hit.Source[MONITOR_FIELD]) - _ = json.Unmarshal(buf, monitor) - monitors[hit.ID] = ScheduleMonitor{ - Monitor: monitor, - ClusterID: hit.Source["cluster_id"].(string), - } - } - - return monitors, nil -} - -type ScheduleMonitor struct { - Monitor *alerting.Monitor - ClusterID string - EntryID cronlib.EntryID - MonitorID string -} \ No newline at end of file diff --git a/service/alerting/schedule_test.go b/service/alerting/schedule_test.go deleted file mode 100644 index 6e3e7751..00000000 --- a/service/alerting/schedule_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package alerting - -import ( - "encoding/json" - "fmt" - "infini.sh/framework/core/conditions" - "infini.sh/framework/core/util" - "github.com/elastic/go-ucfg/yaml" - "testing" -) - - -func TestParseYamlConfig(t *testing.T){ - yamlStr := `range: - ctx.results.hits.hits.[0]._source.number.test.gt: 3` - config, err := yaml.NewConfig([]byte(yamlStr)) - if err != nil { - t.Fatal(err) - } - var boolConfig = &conditions.Config{} - err = config.Unpack(boolConfig) - if err != nil { - t.Fatal(err) - } - fmt.Println(boolConfig.Range) - - cond, err := conditions.NewCondition(boolConfig) - if err != nil { - t.Fatal(err) - } - searchResStr := `{ - "_shards": { - "failed": 0, - "skipped": 0, - "successful": 1, - "total": 1 - }, - "hits": { - "hits": [{"_source":{"number.test": 5, "normal": 7, "number": {"value": 5}}}], - "max_score": null, - "total": { - "relation": "eq", - "value": 5 - } - }, - "timed_out": false, - "took": 0 -}` - searchResults := util.MapStr{} - err = json.Unmarshal([]byte(searchResStr), &searchResults) - if err != nil { - t.Fatal(err) - } - fields := util.MapStr{ - "ctx": util.MapStr{ - "results": searchResults, - "test": util.MapStr{ - "number": 2, - }, - }, - } - //searchEvent := &event.Event{ - // Fields: fields, - //} - fieldsBytes, _ := json.Marshal(fields) - tevent := &MonitorEvent{ - Fields: fieldsBytes, - } - //fmt.Println(cond.Check(searchEvent)) - fmt.Println(cond.Check(tevent)) -} - - - diff --git a/service/alerting/util/period.go b/service/alerting/util/period.go deleted file mode 100644 index e3ce0c5c..00000000 --- a/service/alerting/util/period.go +++ /dev/null @@ -1,78 +0,0 @@ -package util - -import ( - "fmt" - cronlib "github.com/robfig/cron" - "infini.sh/console/model/alerting" - "time" -) - -type MonitorPeriod struct { - Start time.Time - End time.Time -} - -func GetMonitorPeriod(currentTime time.Time, schedule *alerting.Schedule) *MonitorPeriod{ - if schedule.Period != nil { - return transformPeriod(currentTime, schedule.Period) - } - if schedule.Cron != nil { - return transformCron(currentTime, schedule.Cron) - } - return nil -} - - -func transformCron(currentTime time.Time, cron *alerting.Cron) *MonitorPeriod { - timezone := "" - if cron.Timezone != "" { - timezone = fmt.Sprintf("CRON_TZ=%s ", cron.Timezone) - } - - parser := cronlib.NewParser( - cronlib.SecondOptional | cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow, - ) - - sd, _ := parser.Parse(timezone + cron.Expression) - ssd := sd.(*cronlib.SpecSchedule) - var duration = time.Minute - if ssd.Hour == 1 { - duration = time.Hour - } - tempTime := currentTime - nextTime := sd.Next(tempTime) - var preTime = tempTime - for { - tempTime = tempTime.Add(-duration) - if preTime = sd.Next(tempTime); !preTime.Equal(nextTime) { - break - } - } - mp := &MonitorPeriod{ - Start: preTime, - End: currentTime, - } - return mp -} - -func transformPeriod(currentTime time.Time, period *alerting.Period) *MonitorPeriod { - if period == nil { - return nil - } - mp := &MonitorPeriod{ - End: currentTime, - } - var duration time.Duration - switch period.Unit { - case "MINUTES": - duration = time.Minute - case "HOURS": - duration = time.Hour - case "DAYS": - duration = time.Hour * 24 - default: - return nil - } - mp.Start = currentTime.Add(-duration * time.Duration(period.Interval)) - return mp -} \ No newline at end of file diff --git a/service/alerting/util/period_test.go b/service/alerting/util/period_test.go deleted file mode 100644 index d72f06e4..00000000 --- a/service/alerting/util/period_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package util - -import ( - "fmt" - "infini.sh/console/model/alerting" - "testing" - "time" -) - -func TestGetMonitorPeriod(t *testing.T) { - now := time.Now() - periods := GetMonitorPeriod(&now, &alerting.Schedule{ - Cron: &alerting.Cron{ - Expression: "0 0 1 */1 *", - }, - //Period: &alerting.Period{ - // Unit: "MINUTES", - // Interval: 10, - //}, - }) - fmt.Println(periods) -} \ No newline at end of file