diff --git a/main.go b/main.go index cb158c3d..51d3dd55 100644 --- a/main.go +++ b/main.go @@ -112,6 +112,7 @@ func main() { orm.RegisterSchemaWithIndexName(gateway.Instance{}, "gateway-instance") orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule") orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history") + orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message") api.RegisterSchema() go func() { diff --git a/model/alerting/alert.go b/model/alerting/alert.go index 6d80afb8..293ffcd1 100644 --- a/model/alerting/alert.go +++ b/model/alerting/alert.go @@ -18,8 +18,9 @@ type Alert struct { Expression string `json:"expression" elastic_mapping:"expression: { type: keyword, copy_to:search_text }"` Objects []string `json:"objects" elastic_mapping:"objects: { type:keyword,copy_to:search_text }"` Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"` - Content string `json:"content" elastic_mapping:"context: { type: keyword, copy_to:search_text }"` - AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` + Title string `json:"title" elastic_mapping:"title: { type: keyword }"` + Message string `json:"message" elastic_mapping:"context: { type: keyword, copy_to:search_text }"` + AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` Users []string `json:"users,omitempty"` State string `json:"state"` @@ -43,10 +44,28 @@ type ActionExecutionResult struct { const ( AlertStateActive string = "active" AlertStateAcknowledge = "acknowledged" - AlertStateNormal = "normal" - AlertStateError = "error" + AlertStateOK = "normal" + AlertStateError = "error" ) +const ( + MessageStateActive = "active" + MessageStateIgnored = "ignored" + MessageStateRecovered = "recovered" +) + +type AlertMessage struct { + ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` + Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` + Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` + RuleID string `json:"rule_id" elastic_mapping:"rule_id: { type: keyword,copy_to:search_text }"` + Title string `json:"title" elastic_mapping:"title: { type: keyword,copy_to:search_text }"` + Message string `json:"message" elastic_mapping:"content: { type: keyword,copy_to:search_text }"` + Status string `json:"status" elastic_mapping:"status: { type: keyword,copy_to:search_text }"` + IgnoredTime time.Time `json:"ignored_time,omitempty" elastic_mapping:"ignored_time: { type: date }"` + Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"` + SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` +} /* { @@ -54,4 +73,7 @@ const ( ResourceID ResourceName } -*/ \ No newline at end of file +*/ + +//message status (Active, Ignore, Recover) +//rule status (Active, Error, OK) \ No newline at end of file diff --git a/model/alerting/condition.go b/model/alerting/condition.go index 494f790a..097f3c6f 100644 --- a/model/alerting/condition.go +++ b/model/alerting/condition.go @@ -17,7 +17,6 @@ type ConditionItem struct { Operator string `json:"operator"` Values []string `json:"values"` Severity string `json:"severity"` - Message string `json:"message"` } func (cond *ConditionItem) GenerateConditionExpression()(conditionExpression string, err error){ valueLength := len(cond.Values) diff --git a/model/alerting/metric.go b/model/alerting/metric.go index ca40e945..5f8fd0cb 100644 --- a/model/alerting/metric.go +++ b/model/alerting/metric.go @@ -15,6 +15,8 @@ type Metric struct { Items []MetricItem `json:"items"` Formula string `json:"formula,omitempty"` Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80 + Title string `json:"title"` //text template + Message string `json:"message"` // text template } func (m *Metric) GenerateExpression() (string, error){ if len(m.Items) == 1 { diff --git a/model/alerting/rule.go b/model/alerting/rule.go index dafeb501..c6e4c223 100644 --- a/model/alerting/rule.go +++ b/model/alerting/rule.go @@ -52,12 +52,13 @@ func (rule *Rule) GetOrInitExpression() (string, error){ } type RuleChannel struct { - Normal []Channel `json:"normal"` + Enabled bool `json:"enabled"` + Normal []Channel `json:"normal,omitempty"` Escalation []Channel `json:"escalation,omitempty"` - ThrottlePeriod string `json:"throttle_period"` //沉默周期 - AcceptTimeRange TimeRange `json:"accept_time_range"` + ThrottlePeriod string `json:"throttle_period,omitempty"` //沉默周期 + AcceptTimeRange TimeRange `json:"accept_time_range,omitempty"` EscalationThrottlePeriod string `json:"escalation_throttle_period,omitempty"` - EscalationEnabled bool `json:"escalation_enabled"` + EscalationEnabled bool `json:"escalation_enabled,omitempty"` } type MessageTemplate struct{ @@ -71,6 +72,9 @@ type TimeRange struct { } func (tr *TimeRange) Include( t time.Time) bool { + if tr.Start == "" || tr.End == "" { + return true + } currentTimeStr := t.Format("15:04") return tr.Start <= currentTimeStr && currentTimeStr <= tr.End } diff --git a/model/alerting/rule_test.go b/model/alerting/rule_test.go index 21583231..93e1e257 100644 --- a/model/alerting/rule_test.go +++ b/model/alerting/rule_test.go @@ -55,7 +55,7 @@ func TestCreateRule( t *testing.T) { //Conditions: Condition{ // Operator: "any", // Items: []ConditionItem{ - // { MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"1"}, Severity: "error", Message: "集群健康状态为 Red"}, + // { MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"1"}, Severity: "error", AlertMessage: "集群健康状态为 Red"}, // }, //}, @@ -80,7 +80,7 @@ func TestCreateRule( t *testing.T) { Normal: []Channel{ {Name: "钉钉", Type: ChannelWebhook, Webhook: &CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, @@ -90,7 +90,7 @@ func TestCreateRule( t *testing.T) { Escalation: []Channel{ {Type: ChannelWebhook, Name: "微信", Webhook: &CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, diff --git a/plugin/api/alerting/alert.go b/plugin/api/alerting/alert.go index 0fdf70c7..a3138561 100644 --- a/plugin/api/alerting/alert.go +++ b/plugin/api/alerting/alert.go @@ -92,9 +92,16 @@ func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps http state = h.GetParameterOrDefault(req, "state", "") severity = h.GetParameterOrDefault(req, "severity", "") sort = h.GetParameterOrDefault(req, "sort", "") + ruleID = h.GetParameterOrDefault(req, "rule_id", "") + min = h.GetParameterOrDefault(req, "min", "") + max = h.GetParameterOrDefault(req, "max", "") mustBuilder = &strings.Builder{} sortBuilder = strings.Builder{} ) + mustBuilder.WriteString(fmt.Sprintf(`{"range":{"created":{"gte":"%s", "lte": "%s"}}}`, min, max)) + if ruleID != "" { + mustBuilder.WriteString(fmt.Sprintf(`,{"term":{"rule_id":{"value":"%s"}}}`, ruleID)) + } if sort != "" { sortParts := strings.Split(sort, ",") @@ -103,24 +110,17 @@ func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps http } } sortBuilder.WriteString(`{"created":{ "order": "desc"}}`) - hasFilter := false + if keyword != "" { mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) - hasFilter = true } if state != "" { - if hasFilter { - mustBuilder.WriteString(",") - } + mustBuilder.WriteString(",") mustBuilder.WriteString(fmt.Sprintf(`{"term":{"state":{"value":"%s"}}}`, state)) - hasFilter = true } if severity != "" { - if hasFilter { - mustBuilder.WriteString(",") - } + mustBuilder.WriteString(",") mustBuilder.WriteString(fmt.Sprintf(`{"term":{"severity":{"value":"%s"}}}`, severity)) - hasFilter = true } size, _ := strconv.Atoi(strSize) if size <= 0 { diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 0a29d4b1..c54a0a8a 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -35,9 +35,12 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.searchAlert) api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.getAlert) - api.HandleAPIMethod(api.POST, "/alerting/alert/_acknowledge", alert.acknowledgeAlert) api.HandleAPIMethod(api.GET, "/alerting/template/parameters", alert.getTemplateParams) + api.HandleAPIMethod(api.POST, "/alerting/message/_search", alert.searchAlertMessage) + api.HandleAPIMethod(api.POST, "/alerting/message/_ignore", alert.ignoreAlertMessage) + api.HandleAPIMethod(api.GET, "/alerting/message/_stats", alert.getAlertMessageStats) + //just for test //api.HandleAPIMethod(api.GET, "/alerting/rule/test", alert.testRule) diff --git a/plugin/api/alerting/message.go b/plugin/api/alerting/message.go new file mode 100644 index 00000000..5644f816 --- /dev/null +++ b/plugin/api/alerting/message.go @@ -0,0 +1,222 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package alerting + +import ( + "fmt" + log "github.com/cihub/seelog" + "infini.sh/console/model/alerting" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + "time" +) + +func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + body := struct { + MessageIDs []string `json:"ids"` + }{} + err := h.DecodeJSON(req, &body) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + if len(body.MessageIDs) == 0 { + h.WriteError(w, "alert ids should not be empty", http.StatusInternalServerError) + return + } + queryDsl := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "_id": body.MessageIDs, + }, + }, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s';ctx._source['ignored_time']='%s'", alerting.MessageStateIgnored, time.Now().Format(time.RFC3339Nano)), + }, + } + err = orm.UpdateBy(alerting.AlertMessage{}, util.MustToJSONBytes(queryDsl)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "ids": body.MessageIDs, + "result": "updated", + }, 200) +} + +func (h *AlertAPI) getAlertMessageStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + esClient := elastic.GetClient(h.Config.Elasticsearch) + queryDsl := util.MapStr{ + "size": 0, + "query": util.MapStr{ + "bool": util.MapStr{ + "must_not": []util.MapStr{ + { + "terms": util.MapStr{ + "status": []string{ + alerting.MessageStateRecovered, + }, + }, + }, + }, + }, + }, + "aggs": util.MapStr{ + "terms_by_severity": util.MapStr{ + "terms": util.MapStr{ + "field": "severity", + "size": 5, + }, + }, + }, + } + + searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.AlertMessage{}), util.MustToJSONBytes(queryDsl) ) + if err != nil { + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + statusCounts := map[string]interface{}{} + if termsAgg, ok := searchRes.Aggregations["terms_by_severity"]; ok { + for _, bk := range termsAgg.Buckets { + if status, ok := bk["key"].(string); ok { + statusCounts[status] = bk["doc_count"] + } + } + } + for _, status := range []string{"warning", "error", "critical"} { + if _, ok := statusCounts[status]; !ok { + statusCounts[status] = 0 + } + } + h.WriteJSON(w, util.MapStr{ + "alert": util.MapStr{ + "current": statusCounts, + }, + }, http.StatusOK) +} + + +func (h *AlertAPI) searchAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + resBody:=util.MapStr{} + reqBody := struct{ + Keyword string `json:"keyword"` + Size int `json:"size"` + From int `json:"from"` + Aggregations []elastic.SearchAggParam `json:"aggs"` + Highlight elastic.SearchHighlightParam `json:"highlight"` + Filter elastic.SearchFilterParam `json:"filter"` + Sort []string `json:"sort"` + SearchField string `json:"search_field"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + resBody["error"] = err.Error() + h.WriteJSON(w,resBody, http.StatusInternalServerError ) + return + } + if reqBody.Size <= 0 { + reqBody.Size = 20 + } + aggs := elastic.BuildSearchTermAggregations(reqBody.Aggregations) + filter := elastic.BuildSearchTermFilter(reqBody.Filter) + var should []util.MapStr + if reqBody.SearchField != ""{ + should = []util.MapStr{ + { + "prefix": util.MapStr{ + reqBody.SearchField: util.MapStr{ + "value": reqBody.Keyword, + "boost": 20, + }, + }, + }, + { + "match": util.MapStr{ + reqBody.SearchField: util.MapStr{ + "query": reqBody.Keyword, + "fuzziness": "AUTO", + "max_expansions": 10, + "prefix_length": 2, + "fuzzy_transpositions": true, + "boost": 2, + }, + }, + }, + } + }else{ + if reqBody.Keyword != ""{ + should = []util.MapStr{ + { + "match": util.MapStr{ + "search_text": util.MapStr{ + "query": reqBody.Keyword, + "fuzziness": "AUTO", + "max_expansions": 10, + "prefix_length": 2, + "fuzzy_transpositions": true, + "boost": 2, + }, + }, + }, + { + "query_string": util.MapStr{ + "fields": []string{"*"}, + "query": reqBody.Keyword, + "fuzziness": "AUTO", + "fuzzy_prefix_length": 2, + "fuzzy_max_expansions": 10, + "fuzzy_transpositions": true, + "allow_leading_wildcard": false, + }, + }, + } + } + } + boolQuery := util.MapStr{ + "filter": filter, + } + if len(should) > 0 { + boolQuery["should"] = should + boolQuery["minimum_should_match"] = 1 + } + query := util.MapStr{ + "aggs": aggs, + "size": reqBody.Size, + "from": reqBody.From, + "highlight": elastic.BuildSearchHighlight(&reqBody.Highlight), + "query": util.MapStr{ + "bool": boolQuery, + }, + } + if len(reqBody.Sort) > 1 { + query["sort"] = []util.MapStr{ + { + reqBody.Sort[0]: util.MapStr{ + "order": reqBody.Sort[1], + }, + }, + } + } + dsl := util.MustToJSONBytes(query) + response, err := elastic.GetClient(h.Config.Elasticsearch).SearchWithRawQueryDSL(orm.GetIndexName(alerting.AlertMessage{}), dsl) + if err != nil { + resBody["error"] = err.Error() + h.WriteJSON(w,resBody, http.StatusInternalServerError ) + return + } + h.WriteJSONHeader(w) + w.Write(util.MustToJSONBytes(response)) + +} \ No newline at end of file diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index 1abd055d..0d422f35 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -479,6 +479,39 @@ func (alertAPI *AlertAPI) getTemplateParams(w http.ResponseWriter, req *http.Req }, http.StatusOK) } +func (alertAPI *AlertAPI) getPreviewMetricData(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + rule := &alerting.Rule{} + err := alertAPI.DecodeJSON(req, rule) + if err != nil { + log.Error(err) + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + var ( + minStr = alertAPI.Get(req, "min", "") + maxStr = alertAPI.Get(req, "max", "") + ) + bucketSize, min, max, err := api.GetMetricRangeAndBucketSize(minStr, maxStr, 60, 15) + filterParam := &alerting.FilterParam{ + Start: min, + End: max, + BucketSize: fmt.Sprintf("%ds", bucketSize), + } + metricItem, err := getRuleMetricData(rule, filterParam) + if err != nil { + log.Error(err) + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + alertAPI.WriteJSON(w, util.MapStr{ + "metric": metricItem, + }, http.StatusOK) +} + func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { rule := &alerting.Rule{ ID: ps.ByName("rule_id"), @@ -501,8 +534,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request End: max, BucketSize: fmt.Sprintf("%ds", bucketSize), } - eng := alerting2.GetEngine(rule.Resource.Type) - metricData, err := eng.GetTargetMetricData(rule, true, filterParam) + metricItem, err := getRuleMetricData(rule, filterParam) if err != nil { log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ @@ -510,18 +542,29 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request }, http.StatusInternalServerError) return } - //var filteredMetricData []alerting.MetricData - title := rule.Metrics.Formula - if title == "" && len( rule.Conditions.Items) > 0{ - title,_ = rule.Conditions.Items[0].GenerateConditionExpression() + alertAPI.WriteJSON(w, util.MapStr{ + "metric": metricItem, + }, http.StatusOK) +} + +func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) (*common.MetricItem, error) { + eng := alerting2.GetEngine(rule.Resource.Type) + metricData, err := eng.GetTargetMetricData(rule, true, filterParam) + if err != nil { + return nil, err } + //var filteredMetricData []alerting.MetricData + //title := rule.Metrics.Formula + //if title == "" && len( rule.Conditions.Items) > 0{ + // title,_ = rule.Conditions.Items[0].GenerateConditionExpression() + //} var metricItem = common.MetricItem{ Group: rule.ID, - Key: rule.ID, + Key: rule.ID, Axis: []*common.MetricAxis{ - {ID: util.GetUUID(), Group: rule.ID, Title: title, FormatType: "num", Position: "left",ShowGridLines: true, + {ID: util.GetUUID(), Group: rule.ID, Title: "", FormatType: "num", Position: "left", ShowGridLines: true, TickFormat: "0,0.[00]", - Ticks: 5}, + Ticks: 5}, }, } var sampleData []alerting.TimeMetricData @@ -534,18 +577,18 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request sampleData = md.Data["result"] } metricItem.Lines = append(metricItem.Lines, &common.MetricLine{ - Data: md.Data["result"], + Data: md.Data["result"], BucketSize: filterParam.BucketSize, Metric: common.MetricSummary{ - Label: strings.Join(md.GroupValues, "-"), - Group: rule.ID, + Label: strings.Join(md.GroupValues, "-"), + Group: rule.ID, TickFormat: "0,0.[00]", FormatType: "num", }, }) } //add guidelines - for _, cond := range rule.Conditions.Items{ + for _, cond := range rule.Conditions.Items { if len(cond.Values) > 0 { val, err := strconv.ParseFloat(cond.Values[0], 64) if err != nil { @@ -553,9 +596,9 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request continue } if sampleData != nil { - newData := make([]alerting.TimeMetricData,0, len(sampleData)) + newData := make([]alerting.TimeMetricData, 0, len(sampleData)) for _, td := range sampleData { - if len(td) < 2{ + if len(td) < 2 { continue } newData = append(newData, alerting.TimeMetricData{ @@ -563,11 +606,11 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request }) } metricItem.Lines = append(metricItem.Lines, &common.MetricLine{ - Data: newData, + Data: newData, BucketSize: filterParam.BucketSize, Metric: common.MetricSummary{ - Label: "", - Group: rule.ID, + Label: "", + Group: rule.ID, TickFormat: "0,0.[00]", FormatType: "num", }, @@ -575,9 +618,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request } } } - alertAPI.WriteJSON(w, util.MapStr{ - "metric": metricItem, - }, http.StatusOK) + return &metricItem, nil } @@ -609,7 +650,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request // Conditions: alerting.Condition{ // Operator: "any", // Items: []alerting.ConditionItem{ -// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"}, +// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", AlertMessage: "cpu使用率大于90%"}, // }, // }, // @@ -617,7 +658,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request // Normal: []alerting.Channel{ // {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ // HeaderParams: map[string]string{ -// "Content-Type": "application/json", +// "Message-Type": "application/json", // }, // Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, // Method: http.MethodPost, diff --git a/service/alerting/constants.go b/service/alerting/constants.go index dba24266..80860944 100644 --- a/service/alerting/constants.go +++ b/service/alerting/constants.go @@ -8,6 +8,7 @@ const ( KVLastNotificationTime = "alert_last_notification_time" KVLastTermStartTime = "alert_last_term_start_time" KVLastEscalationTime = "alert_last_escalation_time" + KVLastMessageState = "alert_last_message_state" ) @@ -17,7 +18,8 @@ const ( ParamResourceName = "resource_name" // 资源名称 如集群名称 es-v714 ParamEventID = "event_id" // 检查事件 ID ParamResults = "results" // - ParamMessage = "message" //检查消息 自定义 + ParamMessage = "message" //检查消息 自定义(模版渲染) + ParamTitle = "title" ParamPresetValue = "preset_value" //检查预设值 float64 ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} Severity = "severity" //告警等级 diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index e514ab4f..ae78db78 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -575,44 +575,93 @@ func (engine *Engine) Do(rule *alerting.Rule) error { Expression: rule.Metrics.Expression, Objects: rule.Resource.Objects, Conditions: rule.Conditions, - State: alerting.AlertStateNormal, + State: alerting.AlertStateOK, } checkResults, err := engine.CheckCondition(rule) alertItem.ConditionResult = checkResults if err != nil { return err } - lastAlertItem := alerting.Alert{} - err = getLastAlert(rule.ID, &lastAlertItem) + alertMessage, err := getLastAlertMessage(rule.ID, 2 * time.Minute) if err != nil { - return err + return fmt.Errorf("get alert message error: %w", err) } conditionResults := checkResults.ResultItems + var paramsCtx map[string]interface{} if len(conditionResults) == 0 { alertItem.Severity = "info" - alertItem.Content = "" - alertItem.State = alerting.AlertStateNormal + alertItem.State = alerting.AlertStateOK + if alertMessage != nil && alertMessage.Status != alerting.MessageStateRecovered { + alertMessage.Status = alerting.MessageStateRecovered + alertMessage.Updated = time.Now() + err = saveAlertMessage(alertMessage) + if err != nil { + return fmt.Errorf("save alert message error: %w", err) + } + } return nil }else{ - if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal { - rule.LastTermStartTime = time.Now() - strTime := rule.LastTermStartTime.UTC().Format(time.RFC3339) - kv.AddValue(alerting2.KVLastTermStartTime, []byte(rule.ID), []byte(strTime)) - } - log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) + paramsCtx = newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Unix()) var ( severity = conditionResults[0].ConditionItem.Severity - content string + tplBytes []byte + message string + title string ) + tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx) + if err != nil { + return fmt.Errorf("resolve content template error: %w", err) + } + message = string(tplBytes) + paramsCtx[alerting2.ParamMessage] = message + tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx) + if err != nil { + return fmt.Errorf("resolve title template error: %w", err) + } + title = string(tplBytes) + paramsCtx[alerting2.ParamTitle] = title for _, conditionResult := range conditionResults { if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] { severity = conditionResult.ConditionItem.Severity - content = conditionResult.ConditionItem.Message } } + alertItem.Severity = severity - alertItem.Content = content + alertItem.Message = message + alertItem.Title = title alertItem.State = alerting.AlertStateActive + if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered { + msg := &alerting.AlertMessage{ + RuleID: rule.ID, + Created: time.Now(), + Updated: time.Now(), + ID: util.GetUUID(), + Status: alerting.MessageStateActive, + Severity: severity, + Title: title, + Message: message, + } + err = saveAlertMessage(msg) + if err != nil { + return fmt.Errorf("save alert message error: %w", err) + } + }else{ + alertMessage.Title = title + alertMessage.Message = message + err = saveAlertMessage(alertMessage) + if err != nil { + return fmt.Errorf("save alert message error: %w", err) + } + } + log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) + } + // if alert message status equals ignored , then skip sending message to channel + if alertMessage != nil && alertMessage.Status == alerting.MessageStateIgnored { + return nil + } + // if channel is not enabled return + if !rule.Channels.Enabled { + return nil } if rule.Channels.AcceptTimeRange.Include(time.Now()) { @@ -633,9 +682,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error { period := time.Now().Sub(rule.LastNotificationTime.Local()) //log.Error(lastAlertItem.ID, period, periodDuration) - paramsCtx := newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.UnixNano()/1e6) + if paramsCtx == nil { + paramsCtx = newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Unix()) + } - if lastAlertItem.ID == "" || period > periodDuration { + if alertMessage == nil || period > periodDuration { actionResults, errCount := performChannels(rule.Channels.Normal, paramsCtx) alertItem.ActionExecutionResults = actionResults //change and save last notification time in local kv store when action error count equals zero @@ -646,25 +697,16 @@ func (engine *Engine) Do(rule *alerting.Rule) error { alertItem.IsNotified = true } } - isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime) - if err != nil { - alertItem.Error = err.Error() - return err - } - if rule.Channels.EscalationEnabled && lastAlertItem.ID !="" && !isAck { + + if rule.Channels.EscalationEnabled { throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod) if err != nil { return err } - //change and save last term start time in local kv store when action error count equals zero - if rule.LastTermStartTime.IsZero(){ - tm, err := readTimeFromKV(alerting2.KVLastTermStartTime, []byte(rule.ID)) - if err != nil { - return fmt.Errorf("get last term start time from kv error: %w", err) - } - if !tm.IsZero(){ - rule.LastTermStartTime = tm - } + + rule.LastTermStartTime = time.Now() + if alertMessage != nil { + rule.LastTermStartTime = alertMessage.Created } if time.Now().Sub(rule.LastTermStartTime.Local()) > throttlePeriod { if rule.LastEscalationTime.IsZero(){ @@ -698,7 +740,6 @@ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult var conditionParams []util.MapStr for _, resultItem := range checkResults.ResultItems { conditionParams = append(conditionParams, util.MapStr{ - alerting2.ParamMessage: resultItem.ConditionItem.Message, alerting2.ParamPresetValue: resultItem.ConditionItem.Values, alerting2.Severity: resultItem.ConditionItem.Severity, alerting2.ParamGroupValues: resultItem.GroupValues, @@ -724,7 +765,7 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul return nil, fmt.Errorf("check condition error:%w", err) } var actionResults []alerting.ActionExecutionResult - paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().UnixNano()/1e6) + paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Unix()) if len(rule.Channels.Normal) > 0 { actionResults, _ = performChannels(rule.Channels.Normal, paramsCtx) }else if len(rule.Channels.Escalation) > 0{ @@ -879,7 +920,7 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti } } -func getLastAlert(ruleID string, alertItem *alerting.Alert) error { +func getLastAlertMessageFromES(ruleID string, message *alerting.AlertMessage) error { queryDsl := util.MapStr{ "size": 1, "sort": []util.MapStr{ @@ -900,15 +941,53 @@ func getLastAlert(ruleID string, alertItem *alerting.Alert) error { q := orm.Query{ RawQuery: util.MustToJSONBytes(queryDsl), } - err, searchResult := orm.Search(alertItem, &q ) + err, searchResult := orm.Search(alerting.AlertMessage{}, &q ) + if err != nil { + return err + } + if len(searchResult.Result) == 0 { + return nil + } + messageBytes := util.MustToJSONBytes(searchResult.Result[0]) + return util.FromJSONBytes(messageBytes, message) +} + +func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.AlertMessage, error ){ + messageBytes, err := kv.GetValue(alerting2.KVLastMessageState, []byte(ruleID)) + if err != nil { + return nil, err + } + if messageBytes == nil { + return nil, nil + } + message := &alerting.AlertMessage{} + err = util.FromJSONBytes(messageBytes, message) + if err != nil { + return nil, err + } + if time.Now().Sub(message.Updated) > duration { + err = getLastAlertMessageFromES(ruleID, message) + return message, err + } + return message, nil +} + +func saveAlertMessageToES(message *alerting.AlertMessage) error { + return orm.Save(message) +} + +func saveAlertMessage(message *alerting.AlertMessage) error { + err := saveAlertMessageToES(message) if err != nil { return err } - if len(searchResult.Result) == 0 { - return nil + + messageBytes, err := util.ToJSONBytes(message) + if err != nil { + return err } - alertBytes := util.MustToJSONBytes(searchResult.Result[0]) - return util.FromJSONBytes(alertBytes, alertItem) + err = kv.AddValue(alerting2.KVLastMessageState, []byte(message.RuleID), messageBytes) + return err } func hasAcknowledgedRule(ruleID string, startTime time.Time) (bool, error){ diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 492a4324..82d68215 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -75,7 +75,7 @@ func TestEngine( t *testing.T) { Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, @@ -85,7 +85,7 @@ func TestEngine( t *testing.T) { Escalation: []alerting.Channel{ {Type: alerting.ChannelWebhook, Name: "微信", Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, @@ -153,7 +153,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { // Conditions: alerting.Condition{ // Operator: "any", // Items: []alerting.ConditionItem{ - // {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"}, + // {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", AlertMessage: "cpu使用率大于90%"}, // }, // }, // @@ -161,7 +161,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { // Normal: []alerting.Channel{ // {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ // HeaderParams: map[string]string{ - // "Content-Type": "application/json", + // "Message-Type": "application/json", // }, // Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, // Method: http.MethodPost, @@ -222,7 +222,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { Normal: []alerting.Channel{ {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{ HeaderParams: map[string]string{ - "Content-Type": "application/json", + "Message-Type": "application/json", }, Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`, Method: http.MethodPost, diff --git a/service/alerting/funcs/date.go b/service/alerting/funcs/date.go index 468931ba..8ace3f05 100644 --- a/service/alerting/funcs/date.go +++ b/service/alerting/funcs/date.go @@ -9,19 +9,22 @@ import ( "time" ) -func date(fmt string, date interface{}) string { - return dateInZone(fmt, date, "Local") +func datetimeInZone(zone string, date interface{}) string{ + return _dateInZone("2006-01-02 15:04:05", date, zone) +} +func datetime(date interface{}) string{ + return _dateInZone("2006-01-02 15:04:05", date, "Local") } -func htmlDate(date interface{}) string { - return dateInZone("2006-01-02", date, "Local") +func date(date interface{}) string { + return _dateInZone("2006-01-02", date, "Local") } -func htmlDateInZone(date interface{}, zone string) string { - return dateInZone("2006-01-02", date, zone) +func dateInZone(zone string, date interface{}) string { + return _dateInZone("2006-01-02", date, zone) } -func dateInZone(fmt string, date interface{}, zone string) string { +func _dateInZone(fmt string, date interface{}, zone string) string { var t time.Time switch date := date.(type) { default: @@ -34,6 +37,7 @@ func dateInZone(fmt string, date interface{}, zone string) string { t = time.Unix(date, 0) case int: t = time.Unix(int64(date), 0) + case int32: t = time.Unix(int64(date), 0) case string: diff --git a/service/alerting/funcs/function.go b/service/alerting/funcs/function.go index 7c0a15ad..f6fcaba6 100644 --- a/service/alerting/funcs/function.go +++ b/service/alerting/funcs/function.go @@ -18,11 +18,13 @@ func GenericFuncMap() template.FuncMap { } var genericMap = map[string]interface{}{ - "hello": func() string { return "Hello!" }, + "hello": func() string { return "Hello!" }, "format_bytes": formatBytes, - "to_fixed": toFixed, - "date": date, + "to_fixed": toFixed, + "date": date, "date_in_zone": dateInZone, - "to_upper": strings.ToUpper, - "to_lower": strings.ToLower, + "datetime": datetime, + "datetime_in_zone": datetimeInZone, + "to_upper": strings.ToUpper, + "to_lower": strings.ToLower, } diff --git a/service/alerting/parameter.go b/service/alerting/parameter.go index 2fe8128b..48c5d226 100644 --- a/service/alerting/parameter.go +++ b/service/alerting/parameter.go @@ -18,8 +18,9 @@ func GetTemplateParameters() []ParameterMeta { {ParamResourceID, "string", "resource uuid", "c9f663tath2e5a0vksjg", nil}, {ParamResourceName, "string", "resource name", "es-v716", nil}, {ParamEventID, "string", "identifier for check details", "c9f663tath2e5a0vksjx", nil}, + {ParamTitle, "string", "", "xxx cpu used 95%", nil}, + {ParamMessage, "string", "", "disk used 90%", nil}, {ParamResults, "array", "", "", []ParameterMeta{ - {ParamMessage, "string", "", "disk used 90%", nil}, {ParamPresetValue, "array", "", "[\"90\"]", nil}, {Severity, "string", "", "error", nil}, {ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil},