diff --git a/main.go b/main.go index cb158c3d..51d3dd55 100644 --- a/main.go +++ b/main.go @@ -112,6 +112,7 @@ func main() { orm.RegisterSchemaWithIndexName(gateway.Instance{}, "gateway-instance") orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule") orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history") + orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message") api.RegisterSchema() go func() { diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 6d80afb8..2307b888 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -13,13 +13,15 @@ type Alert struct { Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` RuleID string `json:"rule_id" elastic_mapping:"rule_id: { type: keyword }"` + RuleName string `json:"rule_name" elastic_mapping:"rule_name: { type: keyword }"` ResourceID string `json:"resource_id" elastic_mapping:"resource_id: { type: keyword }"` ResourceName string `json:"resource_name" elastic_mapping:"resource_name: { type: keyword }"` Expression string `json:"expression" elastic_mapping:"expression: { type: keyword, copy_to:search_text }"` Objects []string `json:"objects" elastic_mapping:"objects: { type:keyword,copy_to:search_text }"` Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"` - Content string `json:"content" elastic_mapping:"context: { type: keyword, copy_to:search_text }"` - AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` + Title string `json:"title" elastic_mapping:"title: { type: keyword }"` + Message string `json:"message" elastic_mapping:"context: { type: keyword, copy_to:search_text }"` + AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` Users []string `json:"users,omitempty"` State string `json:"state"` @@ -41,12 +43,30 @@ type ActionExecutionResult struct { } const ( - AlertStateActive string = "active" - AlertStateAcknowledge = "acknowledged" - AlertStateNormal = "normal" - AlertStateError = "error" + AlertStateAlerting string = "alerting" + AlertStateOK = "ok" + AlertStateError = "error" + AlertStateNodata = "nodata" ) +const ( + MessageStateAlerting = "alerting" + MessageStateIgnored = "ignored" + MessageStateRecovered = "recovered" +) + +type AlertMessage struct { + ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` + Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` + Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` + RuleID string `json:"rule_id" elastic_mapping:"rule_id: { type: keyword,copy_to:search_text }"` + Title string `json:"title" elastic_mapping:"title: { type: keyword,copy_to:search_text }"` + Message string `json:"message" elastic_mapping:"content: { type: keyword,copy_to:search_text }"` + Status string `json:"status" elastic_mapping:"status: { type: keyword,copy_to:search_text }"` + IgnoredTime time.Time `json:"ignored_time,omitempty" elastic_mapping:"ignored_time: { type: date }"` + Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"` + SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` +} /* { @@ -54,4 +74,7 @@ const ( ResourceID ResourceName } -*/ \ No newline at end of file +*/ + +//message status (Active, Ignore, Recover) +//rule status (Active, Error, OK) \ No newline at end of file diff --git a/model/alerting/condition.go b/model/alerting/condition.go index 494f790a..cec3d075 100644 --- a/model/alerting/condition.go +++ b/model/alerting/condition.go @@ -10,6 +10,15 @@ type Condition struct { Operator string `json:"operator"` Items []ConditionItem `json:"items"` } +func (cond *Condition) GetMinimumPeriodMatch() int{ + var minPeriodMatch = 0 + for _, citem := range cond.Items { + if citem.MinimumPeriodMatch > minPeriodMatch { + minPeriodMatch = citem.MinimumPeriodMatch + } + } + return minPeriodMatch +} type ConditionItem struct { //MetricName string `json:"metric"` @@ -17,8 +26,9 @@ type ConditionItem struct { Operator string `json:"operator"` Values []string `json:"values"` Severity string `json:"severity"` - Message string `json:"message"` + Expression string `json:"expression,omitempty"` } + func (cond *ConditionItem) GenerateConditionExpression()(conditionExpression string, err error){ valueLength := len(cond.Values) if valueLength == 0 { diff --git a/model/alerting/metric.go b/model/alerting/metric.go index ca40e945..5336d628 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -11,10 +11,11 @@ import ( type Metric struct { PeriodInterval string `json:"period_interval"` - MaxPeriods int `json:"max_periods"` Items []MetricItem `json:"items"` Formula string `json:"formula,omitempty"` Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 + Title string `json:"title"` //text template + Message string `json:"message"` // text template } func (m *Metric) GenerateExpression() (string, error){ if len(m.Items) == 1 { @@ -51,6 +52,7 @@ type QueryResult struct { Query string `json:"query"` Raw string `json:"raw"` MetricData []MetricData `json:"metric_data"` + Nodata bool `json:"nodata"` } type MetricData struct { diff --git a/model/alerting/rule.go b/model/alerting/rule.go index dafeb501..e676c61d 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -13,7 +13,7 @@ type Rule struct { ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` - //Name string `json:"name" elastic_mapping:"name:{type:keyword,copy_to:search_text}"` + Name string `json:"name" elastic_mapping:"name:{type:keyword,copy_to:search_text}"` Enabled bool `json:"enabled" elastic_mapping:"enabled:{type:keyword}"` Resource Resource `json:"resource" elastic_mapping:"resource:{type:object}"` Metrics Metric `json:"metrics" elastic_mapping:"metrics:{type:object}"` @@ -52,12 +52,13 @@ func (rule *Rule) GetOrInitExpression() (string, error){ } type RuleChannel struct { - Normal []Channel `json:"normal"` + Enabled bool `json:"enabled"` + Normal []Channel `json:"normal,omitempty"` Escalation []Channel `json:"escalation,omitempty"` - ThrottlePeriod string `json:"throttle_period"` //沉默周期 - AcceptTimeRange TimeRange `json:"accept_time_range"` + ThrottlePeriod string `json:"throttle_period,omitempty"` //沉默周期 + AcceptTimeRange TimeRange `json:"accept_time_range,omitempty"` EscalationThrottlePeriod string `json:"escalation_throttle_period,omitempty"` - EscalationEnabled bool `json:"escalation_enabled"` + EscalationEnabled bool `json:"escalation_enabled,omitempty"` } type MessageTemplate struct{ @@ -71,6 +72,9 @@ type TimeRange struct { } func (tr *TimeRange) Include( t time.Time) bool { + if tr.Start == "" || tr.End == "" { + return true + } currentTimeStr := t.Format("15:04") return tr.Start <= currentTimeStr && currentTimeStr <= tr.End } diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go index 21583231..1f1b4e43 100644 --- a/model/alerting/rule_test.go +++ b/model/alerting/rule_test.go @@ -55,13 +55,12 @@ func TestCreateRule( t *testing.T) { //Conditions: Condition{ // Operator: "any", // Items: []ConditionItem{ - // { MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"1"}, Severity: "error", Message: "集群健康状态为 Red"}, + // { MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"1"}, Severity: "error", AlertMessage: "集群健康状态为 Red"}, // }, //}, Metrics: Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []MetricItem{ {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, @@ -80,7 +79,7 @@ func TestCreateRule( t *testing.T) { Normal: []Channel{ {Name: "钉钉", Type: ChannelWebhook, Webhook: &CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, @@ -90,7 +89,7 @@ func TestCreateRule( t *testing.T) { Escalation: []Channel{ {Type: ChannelWebhook, Name: "微信", Webhook: &CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, diff --git a/plugin/api/alerting/alert.go b/plugin/api/alerting/alert.go index 0fdf70c7..6e3117fc 100644 --- a/plugin/api/alerting/alert.go +++ b/plugin/api/alerting/alert.go @@ -44,44 +44,6 @@ func (h *AlertAPI) getAlert(w http.ResponseWriter, req *http.Request, ps httprou }, 200) } -func (h *AlertAPI) acknowledgeAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - body := struct { - AlertIDs []string `json:"ids"` - }{} - err := h.DecodeJSON(req, &body) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - if len(body.AlertIDs) == 0 { - h.WriteError(w, "alert ids should not be empty", http.StatusInternalServerError) - return - } - queryDsl := util.MapStr{ - "query": util.MapStr{ - "terms": util.MapStr{ - "_id": body.AlertIDs, - }, - }, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['state'] = '%s'", alerting.AlertStateAcknowledge), - }, - } - err = orm.UpdateBy(alerting.Alert{}, util.MustToJSONBytes(queryDsl)) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - h.WriteJSON(w, util.MapStr{ - "ids": body.AlertIDs, - "result": "updated", - }, 200) -} - - func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( @@ -92,9 +54,16 @@ func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps http state = h.GetParameterOrDefault(req, "state", "") severity = h.GetParameterOrDefault(req, "severity", "") sort = h.GetParameterOrDefault(req, "sort", "") + ruleID = h.GetParameterOrDefault(req, "rule_id", "") + min = h.GetParameterOrDefault(req, "min", "") + max = h.GetParameterOrDefault(req, "max", "") mustBuilder = &strings.Builder{} sortBuilder = strings.Builder{} ) + mustBuilder.WriteString(fmt.Sprintf(`{"range":{"created":{"gte":"%s", "lte": "%s"}}}`, min, max)) + if ruleID != "" { + mustBuilder.WriteString(fmt.Sprintf(`,{"term":{"rule_id":{"value":"%s"}}}`, ruleID)) + } if sort != "" { sortParts := strings.Split(sort, ",") @@ -103,24 +72,17 @@ func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps http } } sortBuilder.WriteString(`{"created":{ "order": "desc"}}`) - hasFilter := false + if keyword != "" { mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) - hasFilter = true } if state != "" { - if hasFilter { - mustBuilder.WriteString(",") - } + mustBuilder.WriteString(",") mustBuilder.WriteString(fmt.Sprintf(`{"term":{"state":{"value":"%s"}}}`, state)) - hasFilter = true } if severity != "" { - if hasFilter { - mustBuilder.WriteString(",") - } + mustBuilder.WriteString(",") mustBuilder.WriteString(fmt.Sprintf(`{"term":{"severity":{"value":"%s"}}}`, severity)) - hasFilter = true } size, _ := strconv.Atoi(strSize) if size <= 0 { diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 0a29d4b1..e9fc7ba3 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -7,6 +7,7 @@ package alerting import ( "infini.sh/console/config" "infini.sh/framework/core/api" + "infini.sh/framework/core/api/rbac/enum" ) @@ -16,28 +17,33 @@ type AlertAPI struct { } func (alert *AlertAPI) Init() { - api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule) - api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule) + api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.RequirePermission(alert.getRule,enum.PermissionAlertRuleRead)) + api.HandleAPIMethod(api.POST, "/alerting/rule", alert.RequirePermission(alert.createRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.sendTestMessage) - api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule) - api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule) - api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule) + api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.RequirePermission(alert.deleteRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.RequirePermission(alert.updateRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.RequirePermission(alert.searchRule, enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.GET, "/alerting/stats", alert.getAlertStats) api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.fetchAlertInfos) - api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.enableRule) - api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/metric", alert.getMetricData) + api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.RequirePermission(alert.enableRule, enum.PermissionAlertRuleWrite)) + api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/metric", alert.RequirePermission(alert.getMetricData, enum.PermissionAlertRuleRead)) + api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/info", alert.RequirePermission(alert.getRuleDetail, enum.PermissionAlertRuleRead, enum.PermissionAlertMessageRead)) - api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.getChannel) - api.HandleAPIMethod(api.POST, "/alerting/channel", alert.createChannel) - api.HandleAPIMethod(api.DELETE, "/alerting/channel/:channel_id", alert.deleteChannel) - api.HandleAPIMethod(api.PUT, "/alerting/channel/:channel_id", alert.updateChannel) - api.HandleAPIMethod(api.GET, "/alerting/channel/_search", alert.searchChannel) + api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.RequirePermission(alert.getChannel, enum.PermissionAlertChannelRead)) + api.HandleAPIMethod(api.POST, "/alerting/channel", alert.RequirePermission(alert.createChannel, enum.PermissionAlertChannelWrite)) + api.HandleAPIMethod(api.DELETE, "/alerting/channel", alert.RequirePermission(alert.deleteChannel, enum.PermissionAlertChannelWrite)) + api.HandleAPIMethod(api.PUT, "/alerting/channel/:channel_id", alert.RequirePermission(alert.updateChannel, enum.PermissionAlertChannelWrite)) + api.HandleAPIMethod(api.GET, "/alerting/channel/_search", alert.RequirePermission(alert.searchChannel, enum.PermissionAlertChannelRead)) - api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.searchAlert) - api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.getAlert) - api.HandleAPIMethod(api.POST, "/alerting/alert/_acknowledge", alert.acknowledgeAlert) + api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.RequirePermission(alert.searchAlert, enum.PermissionAlertHistoryRead)) + api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.RequirePermission(alert.getAlert, enum.PermissionAlertHistoryRead)) api.HandleAPIMethod(api.GET, "/alerting/template/parameters", alert.getTemplateParams) + api.HandleAPIMethod(api.GET, "/alerting/message/_search", alert.RequirePermission(alert.searchAlertMessage, enum.PermissionElasticsearchMetricRead)) + api.HandleAPIMethod(api.POST, "/alerting/message/_ignore", alert.RequirePermission(alert.ignoreAlertMessage, enum.PermissionAlertMessageWrite)) + api.HandleAPIMethod(api.GET, "/alerting/message/_stats", alert.RequirePermission(alert.getAlertMessageStats, enum.PermissionAlertMessageRead)) + api.HandleAPIMethod(api.GET, "/alerting/message/:message_id", alert.RequirePermission(alert.getAlertMessage, enum.PermissionAlertMessageRead)) + //just for test //api.HandleAPIMethod(api.GET, "/alerting/rule/test", alert.testRule) diff --git a/plugin/api/alerting/channel.go b/plugin/api/alerting/channel.go index c372954d..f38a5fa0 100644 --- a/plugin/api/alerting/channel.go +++ b/plugin/api/alerting/channel.go @@ -107,21 +107,32 @@ func (h *AlertAPI) updateChannel(w http.ResponseWriter, req *http.Request, ps ht } func (h *AlertAPI) deleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("channel_id") - - obj := alerting.Channel{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "result": "not_found", - }, http.StatusNotFound) + reqBody := struct { + ChannelIDs []string `json:"ids"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) return } + if len(reqBody.ChannelIDs) == 0 { + if err != nil { + h.WriteError(w, "channel ids required", http.StatusInternalServerError) + log.Error(err) + return + } + } - err = orm.Delete(&obj) + queryDsl := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": reqBody.ChannelIDs, + }, + }, + } + + err = orm.DeleteBy(alerting.Channel{}, util.MustToJSONBytes(queryDsl)) if err != nil { h.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) @@ -129,7 +140,7 @@ func (h *AlertAPI) deleteChannel(w http.ResponseWriter, req *http.Request, ps ht } h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, + "ids": reqBody.ChannelIDs , "result": "deleted", }, 200) } diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go new file mode 100644 index 00000000..25ea5c5d --- /dev/null +++ b/plugin/api/alerting/message.go @@ -0,0 +1,273 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "fmt" + log "github.com/cihub/seelog" + "infini.sh/console/model/alerting" + alerting2 "infini.sh/console/service/alerting" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/kv" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + "strconv" + "strings" + "time" +) + +func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + body := struct { + Messages []alerting.AlertMessage `json:"messages"` + }{} + err := h.DecodeJSON(req, &body) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(body.Messages) == 0 { + h.WriteError(w, "messages should not be empty", http.StatusInternalServerError) + return + } + messageIDs := make([]string, 0, len(body.Messages)) + for _, msg := range body.Messages { + messageIDs = append(messageIDs, msg.ID) + } + queryDsl := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "_id": messageIDs, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": alerting.MessageStateAlerting, + }, + }, + }, + }, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s';ctx._source['ignored_time']='%s'", alerting.MessageStateIgnored, time.Now().Format(time.RFC3339Nano)), + }, + } + err = orm.UpdateBy(alerting.AlertMessage{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + //delete kv cache + for _, msg := range body.Messages { + _ = kv.DeleteKey(alerting2.KVLastMessageState, []byte(msg.RuleID)) + } + + + h.WriteJSON(w, util.MapStr{ + "ids": messageIDs, + "result": "updated", + }, 200) +} + +func (h *AlertAPI) getAlertMessageStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + esClient := elastic.GetClient(h.Config.Elasticsearch) + queryDsl := util.MapStr{ + "size": 0, + "query": util.MapStr{ + "bool": util.MapStr{ + "must_not": []util.MapStr{ + { + "terms": util.MapStr{ + "status": []string{ + alerting.MessageStateRecovered, + }, + }, + }, + }, + }, + }, + "aggs": util.MapStr{ + "terms_by_severity": util.MapStr{ + "terms": util.MapStr{ + "field": "severity", + "size": 5, + }, + }, + }, + } + + searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.AlertMessage{}), util.MustToJSONBytes(queryDsl) ) + if err != nil { + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + statusCounts := map[string]interface{}{} + if termsAgg, ok := searchRes.Aggregations["terms_by_severity"]; ok { + for _, bk := range termsAgg.Buckets { + if status, ok := bk["key"].(string); ok { + statusCounts[status] = bk["doc_count"] + } + } + } + for _, status := range []string{"warning", "error", "critical"} { + if _, ok := statusCounts[status]; !ok { + statusCounts[status] = 0 + } + } + h.WriteJSON(w, util.MapStr{ + "alert": util.MapStr{ + "current": statusCounts, + }, + }, http.StatusOK) +} + + +func (h *AlertAPI) searchAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + queryDSL = `{"sort":[%s],"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + status = h.GetParameterOrDefault(req, "status", "") + severity = h.GetParameterOrDefault(req, "severity", "") + sort = h.GetParameterOrDefault(req, "sort", "") + ruleID = h.GetParameterOrDefault(req, "rule_id", "") + min = h.GetParameterOrDefault(req, "min", "now-1d") + max = h.GetParameterOrDefault(req, "max", "now") + mustBuilder = &strings.Builder{} + sortBuilder = strings.Builder{} + ) + mustBuilder.WriteString(fmt.Sprintf(`{"range":{"created":{"gte":"%s", "lte": "%s"}}}`, min, max)) + if ruleID != "" { + mustBuilder.WriteString(fmt.Sprintf(`,{"term":{"rule_id":{"value":"%s"}}}`, ruleID)) + } + + if sort != "" { + sortParts := strings.Split(sort, ",") + if len(sortParts) == 2 && sortParts[1] != "created" { + sortBuilder.WriteString(fmt.Sprintf(`{"%s":{ "order": "%s"}},`, sortParts[0], sortParts[1])) + } + } + sortBuilder.WriteString(`{"created":{ "order": "desc"}}`) + + if status != "" { + mustBuilder.WriteString(",") + mustBuilder.WriteString(fmt.Sprintf(`{"term":{"status":{"value":"%s"}}}`, status)) + } + if severity != "" { + mustBuilder.WriteString(",") + mustBuilder.WriteString(fmt.Sprintf(`{"term":{"severity":{"value":"%s"}}}`, severity)) + } + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{} + queryDSL = fmt.Sprintf(queryDSL, sortBuilder.String(), mustBuilder.String(), size, from) + q.RawQuery = []byte(queryDSL) + + err, res := orm.Search(&alerting.AlertMessage{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + esRes := elastic.SearchResponse{} + err = util.FromJSONBytes(res.Raw, &esRes) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for _, hit := range esRes.Hits.Hits { + created, _ := parseTime(hit.Source["created"], time.RFC3339) + updated, _ := parseTime(hit.Source["updated"], time.RFC3339) + if !created.IsZero() && !updated.IsZero() { + endTime := time.Now() + if hit.Source["status"] == alerting.MessageStateRecovered { + endTime = updated + } + hit.Source["duration"] = endTime.Sub(created).Milliseconds() + } + + } + h.WriteJSON(w, esRes, http.StatusOK) +} + +func parseTime( t interface{}, layout string) (time.Time, error){ + switch t.(type) { + case string: + return time.Parse(layout, t.(string)) + default: + return time.Time{}, fmt.Errorf("unsupport time type") + } +} + +func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + message := &alerting.AlertMessage{ + ID: ps.ByName("message_id"), + } + exists, err := orm.Get(message) + if !exists || err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "_id": message.ID, + "found": false, + }, http.StatusNotFound) + return + } + rule := &alerting.Rule{ + ID: message.RuleID, + } + exists, err = orm.Get(rule) + if !exists || err != nil { + log.Error(err) + h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError) + return + } + metricExpression, _ := rule.Metrics.GenerateExpression() + for i, cond := range rule.Conditions.Items { + expression, _ := cond.GenerateConditionExpression() + rule.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression) + } + var duration time.Duration + if message.Status == alerting.MessageStateRecovered { + duration = message.Updated.Sub(message.Created) + }else{ + duration = time.Now().Sub(message.Created) + } + detailObj := util.MapStr{ + "message_id": message.ID, + "rule_id": message.RuleID, + "rule_name": rule.Name, + "title": message.Title, + "message": message.Message, + "severity": message.Severity, + "created": message.Created, + "updated": message.Updated, + "resource_name": rule.Resource.Name, + "resource_objects": rule.Resource.Objects, + "conditions": rule.Conditions, + "duration": duration.Milliseconds(), + "ignored_time": message.IgnoredTime, + "status": message.Status, + } + h.WriteJSON(w, detailObj, http.StatusOK) +} \ No newline at end of file diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 1abd055d..4b5adad8 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -7,19 +7,21 @@ package alerting import ( "fmt" log "github.com/cihub/seelog" + "github.com/r3labs/diff/v2" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" _ "infini.sh/console/service/alerting/elasticsearch" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/event" "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" + "infini.sh/framework/core/queue" "infini.sh/framework/core/task" "infini.sh/framework/core/util" "infini.sh/framework/modules/elastic/api" "infini.sh/framework/modules/elastic/common" "net/http" - "strconv" "strings" "time" ) @@ -56,7 +58,6 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p ids = append(ids, rule.ID) rule.Created = time.Now() rule.Updated = time.Now() - rule.Metrics.MaxPeriods = 15 if rule.Schedule.Interval == ""{ rule.Schedule.Interval = "1m" } @@ -69,6 +70,11 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p }, http.StatusInternalServerError) return } + saveAlertActivity("alerting_rule_change", "create", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },nil, &rule) eng := alerting2.GetEngine(rule.Resource.Type) if rule.Enabled { ruleTask := task.ScheduleTask{ @@ -90,7 +96,6 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p } func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("rule_id") - obj := alerting.Rule{} obj.ID = id @@ -103,12 +108,6 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h }, http.StatusNotFound) return } - if err != nil { - log.Error(err) - alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } alertAPI.WriteJSON(w, util.MapStr{ "found": true, @@ -118,12 +117,135 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h } +func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("rule_id") + obj := alerting.Rule{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + log.Error(err) + alertAPI.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + metricExpression, _ := obj.Metrics.GenerateExpression() + for i, cond := range obj.Conditions.Items { + expression, _ := cond.GenerateConditionExpression() + obj.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression) + } + alertNumbers, err := alertAPI.getRuleAlertMessageNumbers([]string{obj.ID}) + if err != nil { + log.Error(err) + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + queryDSL := util.MapStr{ + "_source": "state", + "size": 1, + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "query": util.MapStr{ + "term": util.MapStr{ + "rule_id": util.MapStr{ + "value": obj.ID, + }, + }, + }, + } + q := &orm.Query{ + WildcardIndex: true, + RawQuery: util.MustToJSONBytes(queryDSL), + } + err, result := orm.Search(alerting.Alert{}, q) + if err != nil { + log.Error(err) + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + var state interface{} = "N/A" + if len(result.Result) > 0 { + if resultM, ok := result.Result[0].(map[string]interface{}); ok { + state = resultM["state"] + } + } + + detailObj := util.MapStr{ + "rule_name": obj.Name, + "resource_name": obj.Resource.Name, + "resource_objects": obj.Resource.Objects, + "period_interval": obj.Metrics.PeriodInterval, //统计周期 + "updated": obj.Updated, + "conditions": obj.Conditions, + "message_count": alertNumbers[obj.ID], //所有关联告警消息数(包括已恢复的) + "state": state, + "enabled": obj.Enabled, + } + + alertAPI.WriteJSON(w, detailObj, 200) + +} + +func saveActivity(activityInfo *event.Activity){ + queueConfig := queue.GetOrInitConfig("platform##activities") + if queueConfig.Labels == nil { + queueConfig.Labels = map[string]interface{}{ + "type": "platform", + "name": "activity", + "category": "elasticsearch", + "activity": true, + } + } + err := queue.Push(queueConfig, util.MustToJSONBytes(event.Event{ + Timestamp: time.Now(), + Metadata: event.EventMetadata{ + Category: "elasticsearch", + Name: "activity", + }, + Fields: util.MapStr{ + "activity": activityInfo, + }})) + if err != nil { + log.Error(err) + } +} + +func saveAlertActivity(name, typ string, labels map[string]interface{}, changelog diff.Changelog, oldState interface{}){ + activityInfo := &event.Activity{ + ID: util.GetUUID(), + Timestamp: time.Now(), + Metadata: event.ActivityMetadata{ + Category: "elasticsearch", + Group: "platform", + Name: name, + Type: typ, + Labels: labels, + }, + Changelog: changelog, + Fields: util.MapStr{ + "rule": oldState, + }, + } + saveActivity(activityInfo) +} + func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("rule_id") - obj := &alerting.Rule{} + oldRule := &alerting.Rule{} - obj.ID = id - exists, err := orm.Get(obj) + oldRule.ID = id + exists, err := orm.Get(oldRule) if !exists || err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -133,35 +255,46 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p return } - id = obj.ID - create := obj.Created - obj = &alerting.Rule{} - err = alertAPI.DecodeJSON(req, obj) + id = oldRule.ID + create := oldRule.Created + rule := &alerting.Rule{ + } + err = alertAPI.DecodeJSON(req, rule) if err != nil { alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return } + rule.Metrics.Expression, err = rule.Metrics.GenerateExpression() + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + changeLog, err := util.DiffTwoObject(oldRule, rule) + if err != nil { + log.Error(err) + } //protect - obj.ID = id - obj.Created = create - obj.Updated = time.Now() - obj.Metrics.Expression, err = obj.Metrics.GenerateExpression() - if err != nil { - alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - err = orm.Update(obj) - if err != nil { - alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } + rule.ID = id + rule.Created = create + rule.Updated = time.Now() - if obj.Enabled { - exists, err = checkResourceExists(obj) + err = orm.Update(rule) + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + saveAlertActivity("alerting_rule_change", "update", util.MapStr{ + "cluster_id": rule.Resource.ID, + "rule_id": rule.ID, + "cluster_name": rule.Resource.Name, + },changeLog, oldRule) + + if rule.Enabled { + exists, err = checkResourceExists(rule) if err != nil || !exists { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -171,22 +304,22 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p } //update task task.StopTask(id) - eng := alerting2.GetEngine(obj.Resource.Type) + eng := alerting2.GetEngine(rule.Resource.Type) ruleTask := task.ScheduleTask{ - ID: obj.ID, - Interval: obj.Schedule.Interval, - Description: obj.Metrics.Expression, - Task: eng.GenerateTask(obj), + ID: rule.ID, + Interval: rule.Schedule.Interval, + Description: rule.Metrics.Expression, + Task: eng.GenerateTask(rule), } task.RegisterScheduleTask(ruleTask) task.StartTask(ruleTask.ID) }else{ task.DeleteTask(id) } - clearKV(obj.ID) + clearKV(rule.ID) alertAPI.WriteJSON(w, util.MapStr{ - "_id": obj.ID, + "_id": rule.ID, "result": "updated", }, 200) } @@ -218,9 +351,30 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p log.Error(err) return } + saveAlertActivity("alerting_rule_change", "delete", util.MapStr{ + "cluster_id": obj.Resource.ID, + "rule_id": obj.ID, + "cluster_name": obj.Resource.Name, + },nil, &obj) task.DeleteTask(obj.ID) clearKV(obj.ID) + delDsl := util.MapStr{ + "query": util.MapStr{ + "term": util.MapStr{ + "rule_id": id, + }, + }, + } + err = orm.DeleteBy(alerting.AlertMessage{}, util.MustToJSONBytes(delDsl)) + if err != nil { + log.Error(err) + } + err = orm.DeleteBy(alerting.Alert{}, util.MustToJSONBytes(delDsl)) + if err != nil { + log.Error(err) + } + alertAPI.WriteJSON(w, util.MapStr{ "_id": obj.ID, "result": "deleted", @@ -268,11 +422,29 @@ func (alertAPI *AlertAPI) searchRule(w http.ResponseWriter, req *http.Request, p log.Error(err) return } + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(searchResult.Raw, &searchRes) + if err != nil { + alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + for _, hit := range searchRes.Hits.Hits { + hitRule := alerting.Rule{} + hitBytes, _ := util.ToJSONBytes(hit.Source) + util.FromJSONBytes(hitBytes, &hitRule) + metricExpression, _ := hitRule.Metrics.GenerateExpression() + for i, cond := range hitRule.Conditions.Items { + expression, _ := cond.GenerateConditionExpression() + hitRule.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression) + } + hit.Source["conditions"] = hitRule.Conditions + } - w.Write(searchResult.Raw) + alertAPI.WriteJSON(w, searchRes, http.StatusOK) } -func (alertAPI *AlertAPI) getRuleAlertNumbers(ruleIDs []string) ( map[string]interface{},error) { +func (alertAPI *AlertAPI) getRuleAlertMessageNumbers(ruleIDs []string) ( map[string]interface{},error) { esClient := elastic.GetClient(alertAPI.Config.Elasticsearch) queryDsl := util.MapStr{ "size": 0, @@ -284,11 +456,11 @@ func (alertAPI *AlertAPI) getRuleAlertNumbers(ruleIDs []string) ( map[string]int "rule_id": ruleIDs, }, }, - { - "terms": util.MapStr{ - "state": []string{alerting.AlertStateError, alerting.AlertStateActive}, - }, - }, + //{ + // "terms": util.MapStr{ + // "status": []string{alerting.MessageStateAlerting, alerting.MessageStateIgnored}, + // }, + //}, }, }, }, @@ -301,7 +473,7 @@ func (alertAPI *AlertAPI) getRuleAlertNumbers(ruleIDs []string) ( map[string]int }, } - searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) + searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.AlertMessage{}), util.MustToJSONBytes(queryDsl) ) if err != nil { return nil, err } @@ -356,21 +528,12 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) return } - alertNumbers, err := alertAPI.getRuleAlertNumbers(ruleIDs) - if err != nil { - log.Error(err) - alertAPI.WriteJSON(w, util.MapStr{ - "error": err.Error(), - }, http.StatusInternalServerError) - return - } latestAlertInfos := map[string]util.MapStr{} for _, hit := range searchRes.Hits.Hits { if ruleID, ok := hit.Source["rule_id"].(string); ok { latestAlertInfos[ruleID] = util.MapStr{ "status": hit.Source["state"], - "alert_count": alertNumbers[ruleID], } } @@ -479,6 +642,39 @@ func (alertAPI *AlertAPI) getTemplateParams(w http.ResponseWriter, req *http.Req }, http.StatusOK) } +func (alertAPI *AlertAPI) getPreviewMetricData(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + rule := &alerting.Rule{} + err := alertAPI.DecodeJSON(req, rule) + if err != nil { + log.Error(err) + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + var ( + minStr = alertAPI.Get(req, "min", "") + maxStr = alertAPI.Get(req, "max", "") + ) + bucketSize, min, max, err := api.GetMetricRangeAndBucketSize(minStr, maxStr, 60, 15) + filterParam := &alerting.FilterParam{ + Start: min, + End: max, + BucketSize: fmt.Sprintf("%ds", bucketSize), + } + metricItem, _, err := getRuleMetricData(rule, filterParam) + if err != nil { + log.Error(err) + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + alertAPI.WriteJSON(w, util.MapStr{ + "metric": metricItem, + }, http.StatusOK) +} + func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { rule := &alerting.Rule{ ID: ps.ByName("rule_id"), @@ -501,8 +697,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request End: max, BucketSize: fmt.Sprintf("%ds", bucketSize), } - eng := alerting2.GetEngine(rule.Resource.Type) - metricData, err := eng.GetTargetMetricData(rule, true, filterParam) + metricItem, queryResult, err := getRuleMetricData(rule, filterParam) if err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -510,18 +705,33 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request }, http.StatusInternalServerError) return } - //var filteredMetricData []alerting.MetricData - title := rule.Metrics.Formula - if title == "" && len( rule.Conditions.Items) > 0{ - title,_ = rule.Conditions.Items[0].GenerateConditionExpression() + resBody := util.MapStr{ + "metric": metricItem, } + if alertAPI.GetParameter(req, "debug") == "1" { + resBody["query"] = queryResult.Query + } + alertAPI.WriteJSON(w,resBody, http.StatusOK) +} + +func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) (*common.MetricItem, *alerting.QueryResult, error) { + eng := alerting2.GetEngine(rule.Resource.Type) + metricData, queryResult, err := eng.GetTargetMetricData(rule, true, filterParam) + if err != nil { + return nil,queryResult, err + } + //var filteredMetricData []alerting.MetricData + //title := rule.Metrics.Formula + //if title == "" && len( rule.Conditions.Items) > 0{ + // title,_ = rule.Conditions.Items[0].GenerateConditionExpression() + //} var metricItem = common.MetricItem{ Group: rule.ID, - Key: rule.ID, + Key: rule.ID, Axis: []*common.MetricAxis{ - {ID: util.GetUUID(), Group: rule.ID, Title: title, FormatType: "num", Position: "left",ShowGridLines: true, + {ID: util.GetUUID(), Group: rule.ID, Title: "", FormatType: "num", Position: "left", ShowGridLines: true, TickFormat: "0,0.[00]", - Ticks: 5}, + Ticks: 5}, }, } var sampleData []alerting.TimeMetricData @@ -529,55 +739,59 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request if len(md.Data) == 0 { continue } - //filteredMetricData = append(filteredMetricData, md) + targetData := md.Data["result"] + if len(rule.Metrics.Items) == 1 { + for k, _ := range md.Data { + targetData = md.Data[k] + break + } + } if sampleData == nil { - sampleData = md.Data["result"] + sampleData = targetData } metricItem.Lines = append(metricItem.Lines, &common.MetricLine{ - Data: md.Data["result"], + Data: targetData, BucketSize: filterParam.BucketSize, Metric: common.MetricSummary{ - Label: strings.Join(md.GroupValues, "-"), - Group: rule.ID, + Label: strings.Join(md.GroupValues, "-"), + Group: rule.ID, TickFormat: "0,0.[00]", FormatType: "num", }, }) } //add guidelines - for _, cond := range rule.Conditions.Items{ - if len(cond.Values) > 0 { - val, err := strconv.ParseFloat(cond.Values[0], 64) - if err != nil { - log.Errorf("parse condition value error: %v", err) - continue - } - if sampleData != nil { - newData := make([]alerting.TimeMetricData,0, len(sampleData)) - for _, td := range sampleData { - if len(td) < 2{ - continue - } - newData = append(newData, alerting.TimeMetricData{ - td[0], val, - }) - } - metricItem.Lines = append(metricItem.Lines, &common.MetricLine{ - Data: newData, - BucketSize: filterParam.BucketSize, - Metric: common.MetricSummary{ - Label: "", - Group: rule.ID, - TickFormat: "0,0.[00]", - FormatType: "num", - }, - }) - } - } - } - alertAPI.WriteJSON(w, util.MapStr{ - "metric": metricItem, - }, http.StatusOK) + //for _, cond := range rule.Conditions.Items { + // if len(cond.Values) > 0 { + // val, err := strconv.ParseFloat(cond.Values[0], 64) + // if err != nil { + // log.Errorf("parse condition value error: %v", err) + // continue + // } + // if sampleData != nil { + // newData := make([]alerting.TimeMetricData, 0, len(sampleData)) + // for _, td := range sampleData { + // if len(td) < 2 { + // continue + // } + // newData = append(newData, alerting.TimeMetricData{ + // td[0], val, + // }) + // } + // metricItem.Lines = append(metricItem.Lines, &common.MetricLine{ + // Data: newData, + // BucketSize: filterParam.BucketSize, + // Metric: common.MetricSummary{ + // Label: "", + // Group: rule.ID, + // TickFormat: "0,0.[00]", + // FormatType: "num", + // }, + // }) + // } + // } + //} + return &metricItem,queryResult, nil } @@ -609,7 +823,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request // Conditions: alerting.Condition{ // Operator: "any", // Items: []alerting.ConditionItem{ -// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"}, +// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", AlertMessage: "cpu使用率大于90%"}, // }, // }, // @@ -617,7 +831,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request // Normal: []alerting.Channel{ // {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ // HeaderParams: map[string]string{ -// "Content-Type": "application/json", +// "Message-Type": "application/json", // }, // Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, // Method: http.MethodPost, diff --git a/plugin/api/gateway/api.go b/plugin/api/gateway/api.go index 7624624b..ab1ac7f4 100644 --- a/plugin/api/gateway/api.go +++ b/plugin/api/gateway/api.go @@ -6,6 +6,7 @@ package gateway import ( "infini.sh/framework/core/api" + "infini.sh/framework/core/api/rbac/enum" ) type GatewayAPI struct { @@ -15,12 +16,12 @@ type GatewayAPI struct { func init() { gateway:=GatewayAPI{} api.HandleAPIMethod(api.POST, "/gateway/instance/try_connect", gateway.tryConnect) - api.HandleAPIMethod(api.GET, "/gateway/instance/:instance_id", gateway.getInstance) - api.HandleAPIMethod(api.POST, "/gateway/instance", gateway.createInstance) - api.HandleAPIMethod(api.PUT, "/gateway/instance/:instance_id", gateway.updateInstance) - api.HandleAPIMethod(api.DELETE, "/gateway/instance/:instance_id", gateway.deleteInstance) - api.HandleAPIMethod(api.GET, "/gateway/instance/_search", gateway.searchInstance) - api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.getInstanceStatus) + api.HandleAPIMethod(api.GET, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.getInstance, enum.PermissionGatewayInstanceRead)) + api.HandleAPIMethod(api.POST, "/gateway/instance", gateway.RequirePermission(gateway.createInstance, enum.PermissionGatewayInstanceWrite)) + api.HandleAPIMethod(api.PUT, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.updateInstance, enum.PermissionGatewayInstanceWrite)) + api.HandleAPIMethod(api.DELETE, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.deleteInstance, enum.PermissionGatewayInstanceWrite)) + api.HandleAPIMethod(api.GET, "/gateway/instance/_search", gateway.RequirePermission(gateway.searchInstance, enum.PermissionGatewayInstanceRead)) + api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.RequirePermission(gateway.getInstanceStatus, enum.PermissionGatewayInstanceRead)) - api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.proxy) + api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.RequirePermission(gateway.proxy, enum.PermissionGatewayInstanceRead)) } diff --git a/plugin/api/index_management/elasticsearch.go b/plugin/api/index_management/elasticsearch.go index 4356c13d..345898f1 100644 --- a/plugin/api/index_management/elasticsearch.go +++ b/plugin/api/index_management/elasticsearch.go @@ -25,8 +25,24 @@ func (handler APIHandler) ElasticsearchOverviewAction(w http.ResponseWriter, req // return true //}) esClient := elastic.GetClient(handler.Config.Elasticsearch) + queryDsl := util.MapStr{ + "size": 100, + } + clusterFilter, hasAllPrivilege := handler.GetClusterFilter(req, "_id") + if !hasAllPrivilege && clusterFilter == nil{ + handler.WriteJSON(w, util.MapStr{ + "nodes_count": 0, + "cluster_count":0, + "total_used_store_in_bytes": 0, + "hosts_count": 0, + }, http.StatusOK) + return + } + if !hasAllPrivilege { + queryDsl["query"] = clusterFilter + } - searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(elastic.ElasticsearchConfig{}), nil) + searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetIndexName(elastic.ElasticsearchConfig{}), util.MustToJSONBytes(queryDsl)) if err != nil { log.Error(err) handler.WriteJSON(w, util.MapStr{ @@ -64,11 +80,11 @@ func (handler APIHandler) ElasticsearchOverviewAction(w http.ResponseWriter, req } } - hostCount, err := handler.getMetricCount(orm.GetIndexName(elastic.NodeConfig{}), "metadata.host") + hostCount, err := handler.getMetricCount(orm.GetIndexName(elastic.NodeConfig{}), "metadata.host", clusterIDs) if err != nil{ log.Error(err) } - nodeCount, err := handler.getMetricCount(orm.GetIndexName(elastic.NodeConfig{}), "id") + nodeCount, err := handler.getMetricCount(orm.GetIndexName(elastic.NodeConfig{}), "id", clusterIDs) if err != nil{ log.Error(err) } @@ -130,20 +146,26 @@ func (handler APIHandler) getLatestClusterMonitorData(clusterIDs []interface{}) } -func (handler APIHandler) getMetricCount(indexName, field string) (interface{}, error){ +func (handler APIHandler) getMetricCount(indexName, field string, clusterIDs []interface{}) (interface{}, error){ client := elastic.GetClient(handler.Config.Elasticsearch) - queryDSL := `{ + queryDSL := util.MapStr{ "size": 0, - "aggs": { - "field_count": { - "cardinality": { - "field": "%s" - } - } - } -}` - queryDSL = fmt.Sprintf(queryDSL, field) - searchRes, err := client.SearchWithRawQueryDSL(indexName, []byte(queryDSL)) + "aggs": util.MapStr{ + "field_count": util.MapStr{ + "cardinality": util.MapStr{ + "field": field, + }, + }, + }, +} + if len(clusterIDs) > 0 { + queryDSL["query"] = util.MapStr{ + "terms": util.MapStr{ + "metadata.cluster_id": clusterIDs, + }, + } + } + searchRes, err := client.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(queryDSL)) if err != nil { log.Error(err) return 0, err diff --git a/plugin/api/index_management/indices.go b/plugin/api/index_management/indices.go index 6ae775d6..227668e3 100644 --- a/plugin/api/index_management/indices.go +++ b/plugin/api/index_management/indices.go @@ -1,11 +1,12 @@ package index_management import ( + log "github.com/cihub/seelog" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/util" "net/http" - log "github.com/cihub/seelog" + "strings" ) func (handler APIHandler) HandleGetMappingsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -39,7 +40,17 @@ func (handler APIHandler) HandleGetMappingsAction(w http.ResponseWriter, req *ht func (handler APIHandler) HandleGetIndicesAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { targetClusterID := ps.ByName("id") client := elastic.GetClient(targetClusterID) - catIndices, err := client.GetIndices("") + //filter indices + allowedIndices, hasAllPrivilege := handler.GetAllowedIndices(req, targetClusterID) + if !hasAllPrivilege && len(allowedIndices) == 0 { + handler.WriteJSON(w, []interface{}{} , http.StatusOK) + return + } + strIndices := "" + if !hasAllPrivilege { + strIndices = strings.Join(allowedIndices, ",") + } + catIndices, err := client.GetIndices(strIndices) resBody := util.MapStr{} if err != nil { log.Error(err) diff --git a/plugin/api/init.go b/plugin/api/init.go index 178e64ab..5670da83 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -5,6 +5,7 @@ import ( "infini.sh/console/plugin/api/alerting" "infini.sh/console/plugin/api/index_management" "infini.sh/framework/core/api" + "infini.sh/framework/core/api/rbac/enum" "path" ) @@ -15,7 +16,7 @@ func Init(cfg *config.AppConfig) { } var pathPrefix = "/_search-center/" var esPrefix = "/elasticsearch/:id/" - api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/overview"), handler.ElasticsearchOverviewAction) + api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/overview"), handler.RequirePermission(handler.ElasticsearchOverviewAction, enum.PermissionElasticsearchMetricRead)) //api.HandleAPIMethod(api.POST, "/api/get_indices",index_management.API1) api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "dict/_search"), handler.GetDictListAction) @@ -34,17 +35,17 @@ func Init(cfg *config.AppConfig) { api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "rebuild/_search"), handler.HandleGetRebuildListAction) api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "rebuild/:id"), handler.HandleDeleteRebuildAction) - api.HandleAPIMethod(api.GET, path.Join(esPrefix, "_cat/indices"), handler.HandleGetIndicesAction) + api.HandleAPIMethod(api.GET, path.Join(esPrefix, "_cat/indices"), handler.RequireLogin(handler.HandleGetIndicesAction)) api.HandleAPIMethod(api.GET, path.Join(esPrefix, "index/:index/_mappings"), handler.HandleGetMappingsAction) api.HandleAPIMethod(api.GET, path.Join(esPrefix, "index/:index/_settings"), handler.HandleGetSettingsAction) api.HandleAPIMethod(api.PUT, path.Join(esPrefix, "index/:index/_settings"),handler.HandleUpdateSettingsAction) api.HandleAPIMethod(api.DELETE, path.Join(esPrefix, "index/:index"), handler.HandleDeleteIndexAction) api.HandleAPIMethod(api.POST, path.Join(esPrefix, "index/:index"), handler.HandleCreateIndexAction) - api.HandleAPIMethod(api.POST, path.Join(pathPrefix, "elasticsearch/command"), handler.HandleAddCommonCommandAction) - api.HandleAPIMethod(api.PUT, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.HandleSaveCommonCommandAction) - api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/command"), handler.HandleQueryCommonCommandAction) - api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.HandleDeleteCommonCommandAction) + api.HandleAPIMethod(api.POST, path.Join(pathPrefix, "elasticsearch/command"), handler.RequirePermission(handler.HandleAddCommonCommandAction, enum.PermissionCommandWrite)) + api.HandleAPIMethod(api.PUT, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.RequirePermission(handler.HandleSaveCommonCommandAction, enum.PermissionCommandWrite)) + api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/command"), handler.RequirePermission(handler.HandleQueryCommonCommandAction, enum.PermissionCommandRead)) + api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.RequirePermission(handler.HandleDeleteCommonCommandAction,enum.PermissionCommandWrite)) //task.RegisterScheduleTask(task.ScheduleTask{ // Description: "sync reindex task result", diff --git a/service/alerting/constants.go b/service/alerting/constants.go index dba24266..c546ca8c 100644 --- a/service/alerting/constants.go +++ b/service/alerting/constants.go @@ -8,6 +8,7 @@ const ( KVLastNotificationTime = "alert_last_notification_time" KVLastTermStartTime = "alert_last_term_start_time" KVLastEscalationTime = "alert_last_escalation_time" + KVLastMessageState = "alert_last_message_state" ) @@ -17,11 +18,12 @@ const ( ParamResourceName = "resource_name" // 资源名称 如集群名称 es-v714 ParamEventID = "event_id" // 检查事件 ID ParamResults = "results" // - ParamMessage = "message" //检查消息 自定义 - ParamPresetValue = "preset_value" //检查预设值 float64 - ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} - Severity = "severity" //告警等级 - ParamTimestamp = "timestamp" //事件产生时间戳 + ParamMessage = "message" //检查消息 自定义(模版渲染) + ParamTitle = "title" + ParamThreshold = "threshold" //检查预设值 []string + ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} + Severity = "severity" //告警等级 + ParamTimestamp = "timestamp" //事件产生时间戳 ParamGroupValues = "group_values" ParamIssueTimestamp = "issue_timestamp" ParamRelationValues = "relation_values" diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index e514ab4f..0d44db3d 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -41,11 +41,15 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F if err != nil { return nil, err } - //todo generate agg + timeFilter, err := engine.generateTimeFilter(rule, filterParam) + if err != nil { + return nil, err + } if len(rule.Metrics.Items) == 0 { return nil, fmt.Errorf("metric items should not be empty") } basicAggs := util.MapStr{} + //todo bucket sort (es 6.1) bucket script (es 2.0) for _, metricItem := range rule.Metrics.Items { metricAggs := engine.generateAgg(&metricItem) if err = util.MergeFields(basicAggs, metricAggs, true); err != nil { @@ -62,11 +66,22 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F periodInterval = filterParam.BucketSize } timeAggs := util.MapStr{ - "date_histogram": util.MapStr{ - "field": rule.Resource.TimeField, - intervalField: periodInterval, + "time_buckets": util.MapStr{ + "date_histogram": util.MapStr{ + "field": rule.Resource.TimeField, + intervalField: periodInterval, + }, + "aggs": basicAggs, }, - "aggs": basicAggs, + } + + if len(filter) > 0 { + timeAggs = util.MapStr{ + "filter_agg": util.MapStr{ + "filter": filter, + "aggs": timeAggs, + }, + } } var rootAggs util.MapStr groups := rule.Metrics.Items[0].Group @@ -91,26 +106,20 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F groupID: lastGroupAgg, } }else{ - groupAgg["aggs"] = util.MapStr{ - "time_buckets": timeAggs, - } + groupAgg["aggs"] = timeAggs } - lastGroupAgg = groupAgg - } rootAggs = util.MapStr{ util.GetUUID(): lastGroupAgg, } }else{ - rootAggs = util.MapStr{ - "time_buckets": timeAggs, - } + rootAggs = timeAggs } return util.MapStr{ "size": 0, - "query": filter, + "query": timeFilter, "aggs": rootAggs, }, nil } @@ -133,7 +142,7 @@ func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) map[string]in case "rate": aggType = "max" isPipeline = true - case "medium": + case "medium": // from es version 6.6 aggType = "median_absolute_deviation" case "p99", "p95","p90","p80","p50": aggType = "percentiles" @@ -263,19 +272,7 @@ func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[str return resultQuery, nil } -func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) { - query := map[string]interface{}{} - var err error - if rule.Resource.RawFilter != nil { - query = util.DeepCopy(rule.Resource.RawFilter).(map[string]interface{}) - }else{ - if !rule.Resource.Filter.IsEmpty(){ - query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter) - if err != nil { - return nil, err - } - } - } +func (engine *Engine) generateTimeFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error){ var ( timeStart interface{} timeEnd interface{} @@ -304,12 +301,16 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerti }else{ return nil, fmt.Errorf("period interval: %s is too small", rule.Metrics.PeriodInterval) } - duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * 15, units)) + bucketCount := rule.Conditions.GetMinimumPeriodMatch() + 1 + if bucketCount <= 0 { + bucketCount = 1 + } + duration, err := time.ParseDuration(fmt.Sprintf("%d%s", value * bucketCount, units)) if err != nil { return nil, err } - timeStart = time.Now().Add(-duration).Format(time.RFC3339Nano) - timeEnd = time.Now().Format(time.RFC3339Nano) + timeStart = time.Now().Add(-duration).UnixMilli() //.Format(time.RFC3339Nano) + timeEnd = time.Now().UnixMilli() } timeQuery := util.MapStr{ @@ -320,36 +321,56 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerti }, }, } + return timeQuery, nil +} - if boolQ, ok := query["bool"].(map[string]interface{}); ok { - if mustQ, ok := boolQ["must"]; ok { - - if mustArr, ok := mustQ.([]interface{}); ok { - boolQ["must"] = append(mustArr, timeQuery) - - }else{ - return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ) - } - }else{ - boolQ["must"] = []interface{}{ - timeQuery, - } - } +func (engine *Engine) GenerateRawFilter(rule *alerting.Rule, filterParam *alerting.FilterParam) (map[string]interface{}, error) { + query := map[string]interface{}{} + var err error + if rule.Resource.RawFilter != nil { + query = util.DeepCopy(rule.Resource.RawFilter).(map[string]interface{}) }else{ - must := []interface{}{ - timeQuery, - } - if len(query) > 0 { - if _, ok = query["match_all"]; !ok { - must = append(must, query) + if !rule.Resource.Filter.IsEmpty(){ + query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter) + if err != nil { + return nil, err } } - query = util.MapStr{ - "bool": util.MapStr{ - "must": must, - }, - } } + //timeQuery, err := engine.generateTimeFilter(rule, filterParam) + //if err != nil { + // return nil, err + //} + // + //if boolQ, ok := query["bool"].(map[string]interface{}); ok { + // if mustQ, ok := boolQ["must"]; ok { + // + // if mustArr, ok := mustQ.([]interface{}); ok { + // boolQ["must"] = append(mustArr, timeQuery) + // + // }else{ + // return nil, fmt.Errorf("must query: %v is not valid in filter", mustQ) + // } + // }else{ + // boolQ["must"] = []interface{}{ + // timeQuery, + // } + // } + //}else{ + // must := []interface{}{ + // timeQuery, + // } + // if len(query) > 0 { + // if _, ok = query["match_all"]; !ok { + // must = append(must, query) + // } + // } + // query = util.MapStr{ + // "bool": util.MapStr{ + // "must": must, + // }, + // } + //} return query, nil } @@ -369,26 +390,55 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi queryResult.Query = string(queryDslBytes) searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes) if err != nil { - return nil, err + return queryResult, err + } + if searchRes.GetTotal() == 0 { + queryResult.Nodata = true } if searchRes.StatusCode != 200 { - return nil, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body)) + return queryResult, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body)) } queryResult.Raw = string(searchRes.RawResult.Body) searchResult := map[string]interface{}{} err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult) if err != nil { - return nil, err + return queryResult, err } metricData := []alerting.MetricData{} collectMetricData(searchResult["aggregations"], "", &metricData) + //将 rate 求导数据 除以 bucket size (单位 /s) + //statisticM := map[string] string{} + //for _, mi := range rule.Metrics.Items { + // statisticM[mi.Name] = mi.Statistic + //} + //var periodInterval = rule.Metrics.PeriodInterval + //if filterParam != nil && filterParam.BucketSize != "" { + // periodInterval = filterParam.BucketSize + //} + //interval, err := time.ParseDuration(periodInterval) + //if err != nil { + // log.Error(err) + //} + //for i, _ := range metricData { + // for k, d := range metricData[i].Data { + // if statisticM[k] == "rate" { + // for _, td := range d { + // if len(td) > 1 { + // if v, ok := td[1].(float64); ok { + // td[1] = v / interval.Seconds() + // } + // } + // } + // } + // } + //} queryResult.MetricData = metricData return queryResult, nil } -func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error){ +func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error){ queryResult, err := engine.ExecuteQuery(rule, filterParam) if err != nil { - return nil, err + return nil, queryResult, err } var targetMetricData []alerting.MetricData for _, md := range queryResult.MetricData { @@ -402,7 +452,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) if err != nil { - return nil, err + return nil, queryResult, err } dataLength := 0 for _, v := range md.Data { @@ -429,7 +479,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } result, err := expression.Evaluate(parameters) if err != nil { - return nil, err + return nil, queryResult, err } if r, ok := result.(float64); ok { if math.IsNaN(r) || math.IsInf(r, 0 ){ @@ -445,25 +495,20 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, } targetMetricData = append(targetMetricData, targetData) } - return targetMetricData, nil + return targetMetricData, queryResult, nil } //CheckCondition check whether rule conditions triggered or not -//if triggered returns an array of ConditionResult +//if triggered returns an ConditionResult //sort conditions by severity desc before check , and then if condition is true, then continue check another group func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ - queryResult, err := engine.ExecuteQuery(rule, nil) + var resultItems []alerting.ConditionResultItem + targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, false, nil) conditionResult := &alerting.ConditionResult{ QueryResult: queryResult, } if err != nil { return conditionResult, err } - - var resultItems []alerting.ConditionResultItem - targetMetricData, err := engine.GetTargetMetricData(rule, false, nil) - if err != nil { - return nil, err - } for idx, targetData := range targetMetricData { if idx == 0 { sort.Slice(rule.Conditions.Items, func(i, j int) bool { @@ -488,6 +533,10 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } triggerCount := 0 for i := 0; i < dataLength; i++ { + //clear nil value + if targetData.Data[dataKey][i][1] == nil { + continue + } if r, ok := targetData.Data[dataKey][i][1].(float64); ok { if math.IsNaN(r){ continue @@ -497,7 +546,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes "result": targetData.Data[dataKey][i][1], }) if err != nil { - return nil, err + return conditionResult, fmt.Errorf("evaluate rule [%s] error: %w", rule.ID, err) } if evaluateResult == true { triggerCount += 1 @@ -540,6 +589,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { Created: time.Now(), Updated: time.Now(), RuleID: rule.ID, + RuleName: rule.Name, ResourceID: rule.Resource.ID, ResourceName: rule.Resource.Name, Expression: rule.Metrics.Expression, @@ -549,14 +599,18 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if alertItem != nil { - for _, actionResult := range alertItem.ActionExecutionResults { - if actionResult.Error != "" { - alertItem.Error = actionResult.Error + if err != nil{ + alertItem.State = alerting.AlertStateError + alertItem.Error = err.Error() + }else { + for _, actionResult := range alertItem.ActionExecutionResults { + if actionResult.Error != "" { + alertItem.Error = actionResult.Error + alertItem.State = alerting.AlertStateError + } } } - if alertItem.Error != ""{ - alertItem.State = alerting.AlertStateError - } + err = orm.Save(alertItem) if err != nil { log.Error(err) @@ -565,54 +619,106 @@ func (engine *Engine) Do(rule *alerting.Rule) error { }() log.Tracef("start check condition of rule %s", rule.ID) + //todo do only once when rule not change + metricExpression, _ := rule.Metrics.GenerateExpression() + for i, cond := range rule.Conditions.Items { + expression, _ := cond.GenerateConditionExpression() + rule.Conditions.Items[i].Expression = strings.ReplaceAll(expression, "result", metricExpression) + } alertItem = &alerting.Alert{ ID: util.GetUUID(), Created: time.Now(), Updated: time.Now(), RuleID: rule.ID, + RuleName: rule.Name, ResourceID: rule.Resource.ID, ResourceName: rule.Resource.Name, Expression: rule.Metrics.Expression, Objects: rule.Resource.Objects, Conditions: rule.Conditions, - State: alerting.AlertStateNormal, + State: alerting.AlertStateOK, } checkResults, err := engine.CheckCondition(rule) alertItem.ConditionResult = checkResults if err != nil { return err } - lastAlertItem := alerting.Alert{} - err = getLastAlert(rule.ID, &lastAlertItem) + alertMessage, err := getLastAlertMessage(rule.ID, 2 * time.Minute) + if err != nil { + return fmt.Errorf("get alert message error: %w", err) + } + conditionResults := checkResults.ResultItems + var paramsCtx map[string]interface{} + if len(conditionResults) == 0 { + alertItem.Severity = "info" + if checkResults.QueryResult.Nodata { + alertItem.State = alerting.AlertStateNodata + } + + if alertMessage != nil && alertMessage.Status != alerting.MessageStateRecovered && !checkResults.QueryResult.Nodata { + alertMessage.Status = alerting.MessageStateRecovered + alertMessage.Updated = time.Now() + err = saveAlertMessage(alertMessage) + if err != nil { + return fmt.Errorf("save alert message error: %w", err) + } + } + return nil + } + alertItem.State = alerting.AlertStateAlerting + + var ( + severity = conditionResults[0].ConditionItem.Severity + ) + for _, conditionResult := range conditionResults { + if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] { + severity = conditionResult.ConditionItem.Severity + } + } + paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{ + alerting2.ParamEventID: alertItem.ID, + alerting2.ParamTimestamp: alertItem.Created.Unix(), + }) + + alertItem.Severity = severity + err = attachTitleMessageToCtx(rule, paramsCtx) if err != nil { return err } - conditionResults := checkResults.ResultItems - if len(conditionResults) == 0 { - alertItem.Severity = "info" - alertItem.Content = "" - alertItem.State = alerting.AlertStateNormal - return nil + alertItem.Message = paramsCtx[alerting2.ParamMessage].(string) + alertItem.Title = paramsCtx[alerting2.ParamTitle].(string) + if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { + msg := &alerting.AlertMessage{ + RuleID: rule.ID, + Created: time.Now(), + Updated: time.Now(), + ID: util.GetUUID(), + Status: alerting.MessageStateAlerting, + Severity: severity, + Title: alertItem.Title, + Message: alertItem.Message, + } + err = saveAlertMessage(msg) + if err != nil { + return fmt.Errorf("save alert message error: %w", err) + } }else{ - if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal { - rule.LastTermStartTime = time.Now() - strTime := rule.LastTermStartTime.UTC().Format(time.RFC3339) - kv.AddValue(alerting2.KVLastTermStartTime, []byte(rule.ID), []byte(strTime)) + alertMessage.Title = alertItem.Title + alertMessage.Message = alertItem.Message + err = saveAlertMessage(alertMessage) + if err != nil { + return fmt.Errorf("save alert message error: %w", err) } - log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) - var ( - severity = conditionResults[0].ConditionItem.Severity - content string - ) - for _, conditionResult := range conditionResults { - if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] { - severity = conditionResult.ConditionItem.Severity - content = conditionResult.ConditionItem.Message - } - } - alertItem.Severity = severity - alertItem.Content = content - alertItem.State = alerting.AlertStateActive + } + log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) + + // if alert message status equals ignored , then skip sending message to channel + if alertMessage != nil && alertMessage.Status == alerting.MessageStateIgnored { + return nil + } + // if channel is not enabled return + if !rule.Channels.Enabled { + return nil } if rule.Channels.AcceptTimeRange.Include(time.Now()) { @@ -633,9 +739,15 @@ func (engine *Engine) Do(rule *alerting.Rule) error { period := time.Now().Sub(rule.LastNotificationTime.Local()) //log.Error(lastAlertItem.ID, period, periodDuration) - paramsCtx := newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.UnixNano()/1e6) + if paramsCtx == nil { + paramsCtx = newParameterCtx(rule, checkResults, util.MapStr{ + alerting2.ParamEventID: alertItem.ID, + alerting2.ParamTimestamp: alertItem.Created.Unix(), + "severity": severity, + }) + } - if lastAlertItem.ID == "" || period > periodDuration { + if alertMessage == nil || period > periodDuration { actionResults, errCount := performChannels(rule.Channels.Normal, paramsCtx) alertItem.ActionExecutionResults = actionResults //change and save last notification time in local kv store when action error count equals zero @@ -646,25 +758,16 @@ func (engine *Engine) Do(rule *alerting.Rule) error { alertItem.IsNotified = true } } - isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime) - if err != nil { - alertItem.Error = err.Error() - return err - } - if rule.Channels.EscalationEnabled && lastAlertItem.ID !="" && !isAck { + + if rule.Channels.EscalationEnabled { throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod) if err != nil { return err } - //change and save last term start time in local kv store when action error count equals zero - if rule.LastTermStartTime.IsZero(){ - tm, err := readTimeFromKV(alerting2.KVLastTermStartTime, []byte(rule.ID)) - if err != nil { - return fmt.Errorf("get last term start time from kv error: %w", err) - } - if !tm.IsZero(){ - rule.LastTermStartTime = tm - } + + rule.LastTermStartTime = time.Now() + if alertMessage != nil { + rule.LastTermStartTime = alertMessage.Created } if time.Now().Sub(rule.LastTermStartTime.Local()) > throttlePeriod { if rule.LastEscalationTime.IsZero(){ @@ -694,12 +797,57 @@ func (engine *Engine) Do(rule *alerting.Rule) error { return nil } -func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, eventID string, eventTimestamp interface{} ) map[string]interface{}{ - var conditionParams []util.MapStr - for _, resultItem := range checkResults.ResultItems { +func attachTitleMessageToCtx(rule *alerting.Rule, paramsCtx map[string]interface{}) error{ + var ( + tplBytes []byte + err error + ) + tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) + if err != nil { + return fmt.Errorf("resolve message template error: %w", err) + } + paramsCtx[alerting2.ParamMessage] = string(tplBytes) + tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx) + if err != nil { + return fmt.Errorf("resolve title template error: %w", err) + } + paramsCtx[alerting2.ParamTitle] = string(tplBytes) + return nil +} + +func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult, extraParams map[string]interface{} ) map[string]interface{}{ + var ( + conditionParams []util.MapStr + firstGroupValue string + firstThreshold string + severity string + ) + if len(checkResults.ResultItems) > 0 { + severity = checkResults.ResultItems[0].ConditionItem.Severity + sort.Slice(checkResults.ResultItems, func(i, j int) bool { + if alerting.SeverityWeights[checkResults.ResultItems[i].ConditionItem.Severity] > alerting.SeverityWeights[checkResults.ResultItems[j].ConditionItem.Severity] { + return true + } + return false + }) + sort.Slice(checkResults.ResultItems, func(i, j int) bool { + if vi, ok := checkResults.ResultItems[i].ResultValue.(float64); ok { + if vj, ok := checkResults.ResultItems[j].ResultValue.(float64); ok { + return vi > vj + } + } + return false + }) + } + + for i, resultItem := range checkResults.ResultItems { + + if i == 0 { + firstGroupValue = strings.Join(resultItem.GroupValues, ",") + firstThreshold = strings.Join(resultItem.ConditionItem.Values, ",") + } conditionParams = append(conditionParams, util.MapStr{ - alerting2.ParamMessage: resultItem.ConditionItem.Message, - alerting2.ParamPresetValue: resultItem.ConditionItem.Values, + alerting2.ParamThreshold: resultItem.ConditionItem.Values, alerting2.Severity: resultItem.ConditionItem.Severity, alerting2.ParamGroupValues: resultItem.GroupValues, alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp, @@ -711,9 +859,15 @@ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult alerting2.ParamRuleID: rule.ID, alerting2.ParamResourceID: rule.Resource.ID, alerting2.ParamResourceName: rule.Resource.Name, - alerting2.ParamEventID: eventID, - alerting2.ParamTimestamp: eventTimestamp, alerting2.ParamResults: conditionParams, + "first_group_value": firstGroupValue, + "first_threshold": firstThreshold, + "rule_name": rule.Name, + "severity": severity, + } + err := util.MergeFields(paramsCtx, extraParams, true) + if err != nil { + log.Errorf("merge template params error: %v", err) } return paramsCtx } @@ -724,7 +878,15 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul return nil, fmt.Errorf("check condition error:%w", err) } var actionResults []alerting.ActionExecutionResult - paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().UnixNano()/1e6) + + paramsCtx := newParameterCtx(rule, checkResults,util.MapStr{ + alerting2.ParamEventID: util.GetUUID(), + alerting2.ParamTimestamp: time.Now().Unix(), + } ) + err = attachTitleMessageToCtx(rule, paramsCtx) + if err != nil { + return nil, err + } if len(rule.Channels.Normal) > 0 { actionResults, _ = performChannels(rule.Channels.Normal, paramsCtx) }else if len(rule.Channels.Escalation) > 0{ @@ -822,6 +984,40 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti if k == "key" || k == "key_as_string" || k== "doc_count"{ continue } + //has filter + //if k == "filter_agg" { + // if filterM, ok := v.(map[string]interface{}); ok { + // for fk, fv := range filterM { + // if fk == "doc_count" { + // continue + // } + // if vm, ok := fv.(map[string]interface{}); ok { + // if metricVal, ok := vm["value"]; ok { + // md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], metricVal}) + // }else{ + // //percentiles agg type + // switch vm["values"].(type) { + // case []interface{}: + // for _, val := range vm["values"].([]interface{}) { + // if valM, ok := val.(map[string]interface{}); ok { + // md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], valM["value"]}) + // } + // break + // } + // case map[string]interface{}: + // for _, val := range vm["values"].(map[string]interface{}) { + // md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], val}) + // break + // } + // } + // + // } + // + // } + // } + // } + // continue + //} if vm, ok := v.(map[string]interface{}); ok { if metricVal, ok := vm["value"]; ok { md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal}) @@ -867,7 +1063,12 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti if groupValues != "" { newGroupValues = fmt.Sprintf("%s*%s", groupValues, currentGroup) } - collectMetricData(bk, newGroupValues, metricData) + if filterAgg, ok := bkVal["filter_agg"].(map[string]interface{}); ok { + collectMetricData(filterAgg, newGroupValues, metricData) + }else{ + collectMetricData(bk, newGroupValues, metricData) + } + } } @@ -879,7 +1080,7 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti } } -func getLastAlert(ruleID string, alertItem *alerting.Alert) error { +func getLastAlertMessageFromES(ruleID string) (*alerting.AlertMessage, error) { queryDsl := util.MapStr{ "size": 1, "sort": []util.MapStr{ @@ -900,61 +1101,60 @@ func getLastAlert(ruleID string, alertItem *alerting.Alert) error { q := orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), } - err, searchResult := orm.Search(alertItem, &q ) + err, searchResult := orm.Search(alerting.AlertMessage{}, &q ) + if err != nil { + return nil, err + } + if len(searchResult.Result) == 0 { + return nil, nil + } + messageBytes := util.MustToJSONBytes(searchResult.Result[0]) + message := &alerting.AlertMessage{} + err = util.FromJSONBytes(messageBytes, message) + return message, err +} + +func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.AlertMessage, error ){ + messageBytes, err := kv.GetValue(alerting2.KVLastMessageState, []byte(ruleID)) + if err != nil { + return nil, err + } + message := &alerting.AlertMessage{} + if messageBytes != nil { + + err = util.FromJSONBytes(messageBytes, message) + if err != nil { + return nil, err + } + if time.Now().Sub(message.Updated) <= duration { + return message, nil + } + } + message, err = getLastAlertMessageFromES(ruleID) + return message, err +} + +func saveAlertMessageToES(message *alerting.AlertMessage) error { + message.Updated = time.Now() + return orm.Save(message) +} + +func saveAlertMessage(message *alerting.AlertMessage) error { + //todo diff message if not change , then skip save to es ? + err := saveAlertMessageToES(message) if err != nil { return err } - if len(searchResult.Result) == 0 { - return nil - } - alertBytes := util.MustToJSONBytes(searchResult.Result[0]) - return util.FromJSONBytes(alertBytes, alertItem) -} -func hasAcknowledgedRule(ruleID string, startTime time.Time) (bool, error){ - queryDsl := util.MapStr{ - "size": 1, - "query": util.MapStr{ - "bool": util.MapStr{ - "must":[]util.MapStr{ - { - "term": util.MapStr{ - "rule_id": util.MapStr{ - "value": ruleID, - }, - }, - }, - { - "term": util.MapStr{ - "state": alerting.AlertStateAcknowledge, - }, - }, - { - "range": util.MapStr{ - "created": util.MapStr{ - "gte": startTime, - }, - }, - }, - }, - - }, - }, - } - q := orm.Query{ - WildcardIndex: true, - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, searchResult := orm.Search(alerting.Alert{}, &q ) + messageBytes, err := util.ToJSONBytes(message) if err != nil { - return false, err + return err } - if len(searchResult.Result) == 0 { - return false, nil - } - return true, nil + err = kv.AddValue(alerting2.KVLastMessageState, []byte(message.RuleID), messageBytes) + return err } + func readTimeFromKV(bucketKey string, key []byte)(time.Time, error){ timeBytes, err := kv.GetValue(bucketKey, key) zeroTime := time.Time{} diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 492a4324..6a0eccb2 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -55,7 +55,6 @@ func TestEngine( t *testing.T) { Metrics: alerting.Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []alerting.MetricItem{ {Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, {Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}}, @@ -75,7 +74,7 @@ func TestEngine( t *testing.T) { Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, @@ -85,7 +84,7 @@ func TestEngine( t *testing.T) { Escalation: []alerting.Channel{ {Type: alerting.ChannelWebhook, Name: "微信", Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, @@ -153,7 +152,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { // Conditions: alerting.Condition{ // Operator: "any", // Items: []alerting.ConditionItem{ - // {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"}, + // {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", AlertMessage: "cpu使用率大于90%"}, // }, // }, // @@ -161,7 +160,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { // Normal: []alerting.Channel{ // {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ // HeaderParams: map[string]string{ - // "Content-Type": "application/json", + // "Message-Type": "application/json", // }, // Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, // Method: http.MethodPost, @@ -204,7 +203,6 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { Metrics: alerting.Metric{ PeriodInterval: "1m", - MaxPeriods: 15, Items: []alerting.MetricItem{ {Name: "a", Field: "payload.elasticsearch.index_stats.total.search.query_total", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}}, {Name: "b", Field: "payload.elasticsearch.index_stats.total.search.query_time_in_millis", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}}, @@ -222,7 +220,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, diff --git a/service/alerting/engine.go b/service/alerting/engine.go index 5f3c548b..a6a4debd 100644 --- a/service/alerting/engine.go +++ b/service/alerting/engine.go @@ -17,7 +17,7 @@ type Engine interface { CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error) GenerateTask(rule *alerting.Rule) func(ctx context.Context) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) - GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, error) + GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, filterParam *alerting.FilterParam)([]alerting.MetricData, *alerting.QueryResult, error) } var ( diff --git a/service/alerting/funcs/date.go b/service/alerting/funcs/date.go index 468931ba..8ace3f05 100644 --- a/service/alerting/funcs/date.go +++ b/service/alerting/funcs/date.go @@ -9,19 +9,22 @@ import ( "time" ) -func date(fmt string, date interface{}) string { - return dateInZone(fmt, date, "Local") +func datetimeInZone(zone string, date interface{}) string{ + return _dateInZone("2006-01-02 15:04:05", date, zone) +} +func datetime(date interface{}) string{ + return _dateInZone("2006-01-02 15:04:05", date, "Local") } -func htmlDate(date interface{}) string { - return dateInZone("2006-01-02", date, "Local") +func date(date interface{}) string { + return _dateInZone("2006-01-02", date, "Local") } -func htmlDateInZone(date interface{}, zone string) string { - return dateInZone("2006-01-02", date, zone) +func dateInZone(zone string, date interface{}) string { + return _dateInZone("2006-01-02", date, zone) } -func dateInZone(fmt string, date interface{}, zone string) string { +func _dateInZone(fmt string, date interface{}, zone string) string { var t time.Time switch date := date.(type) { default: @@ -34,6 +37,7 @@ func dateInZone(fmt string, date interface{}, zone string) string { t = time.Unix(date, 0) case int: t = time.Unix(int64(date), 0) + case int32: t = time.Unix(int64(date), 0) case string: diff --git a/service/alerting/funcs/function.go b/service/alerting/funcs/function.go index 7c0a15ad..f6fcaba6 100644 --- a/service/alerting/funcs/function.go +++ b/service/alerting/funcs/function.go @@ -18,11 +18,13 @@ func GenericFuncMap() template.FuncMap { } var genericMap = map[string]interface{}{ - "hello": func() string { return "Hello!" }, + "hello": func() string { return "Hello!" }, "format_bytes": formatBytes, - "to_fixed": toFixed, - "date": date, + "to_fixed": toFixed, + "date": date, "date_in_zone": dateInZone, - "to_upper": strings.ToUpper, - "to_lower": strings.ToLower, + "datetime": datetime, + "datetime_in_zone": datetimeInZone, + "to_upper": strings.ToUpper, + "to_lower": strings.ToLower, } diff --git a/service/alerting/parameter.go b/service/alerting/parameter.go index 2fe8128b..25ba5fcc 100644 --- a/service/alerting/parameter.go +++ b/service/alerting/parameter.go @@ -18,9 +18,10 @@ func GetTemplateParameters() []ParameterMeta { {ParamResourceID, "string", "resource uuid", "c9f663tath2e5a0vksjg", nil}, {ParamResourceName, "string", "resource name", "es-v716", nil}, {ParamEventID, "string", "identifier for check details", "c9f663tath2e5a0vksjx", nil}, + {ParamTitle, "string", "", "xxx cpu used 95%", nil}, + {ParamMessage, "string", "", "disk used 90%", nil}, {ParamResults, "array", "", "", []ParameterMeta{ - {ParamMessage, "string", "", "disk used 90%", nil}, - {ParamPresetValue, "array", "", "[\"90\"]", nil}, + {ParamThreshold, "array", "", "[\"90\"]", nil}, {Severity, "string", "", "error", nil}, {ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil}, {ParamIssueTimestamp, "date", "", "2022-05-11T11:50:55+08:00", nil},