update alert api
This commit is contained in:
parent
4c7fbe7cc1
commit
b9de7b67a4
1
main.go
1
main.go
|
@ -112,6 +112,7 @@ func main() {
|
|||
orm.RegisterSchemaWithIndexName(gateway.Instance{}, "gateway-instance")
|
||||
orm.RegisterSchemaWithIndexName(alerting.Rule{}, "alert-rule")
|
||||
orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history")
|
||||
orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message")
|
||||
api.RegisterSchema()
|
||||
|
||||
go func() {
|
||||
|
|
|
@ -18,8 +18,9 @@ type Alert struct {
|
|||
Expression string `json:"expression" elastic_mapping:"expression: { type: keyword, copy_to:search_text }"`
|
||||
Objects []string `json:"objects" elastic_mapping:"objects: { type:keyword,copy_to:search_text }"`
|
||||
Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"`
|
||||
Content string `json:"content" elastic_mapping:"context: { type: keyword, copy_to:search_text }"`
|
||||
AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"`
|
||||
Title string `json:"title" elastic_mapping:"title: { type: keyword }"`
|
||||
Message string `json:"message" elastic_mapping:"context: { type: keyword, copy_to:search_text }"`
|
||||
AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"`
|
||||
ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"`
|
||||
Users []string `json:"users,omitempty"`
|
||||
State string `json:"state"`
|
||||
|
@ -43,10 +44,28 @@ type ActionExecutionResult struct {
|
|||
const (
|
||||
AlertStateActive string = "active"
|
||||
AlertStateAcknowledge = "acknowledged"
|
||||
AlertStateNormal = "normal"
|
||||
AlertStateError = "error"
|
||||
AlertStateOK = "normal"
|
||||
AlertStateError = "error"
|
||||
)
|
||||
|
||||
const (
|
||||
MessageStateActive = "active"
|
||||
MessageStateIgnored = "ignored"
|
||||
MessageStateRecovered = "recovered"
|
||||
)
|
||||
|
||||
type AlertMessage struct {
|
||||
ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"`
|
||||
Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"`
|
||||
Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"`
|
||||
RuleID string `json:"rule_id" elastic_mapping:"rule_id: { type: keyword,copy_to:search_text }"`
|
||||
Title string `json:"title" elastic_mapping:"title: { type: keyword,copy_to:search_text }"`
|
||||
Message string `json:"message" elastic_mapping:"content: { type: keyword,copy_to:search_text }"`
|
||||
Status string `json:"status" elastic_mapping:"status: { type: keyword,copy_to:search_text }"`
|
||||
IgnoredTime time.Time `json:"ignored_time,omitempty" elastic_mapping:"ignored_time: { type: date }"`
|
||||
Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"`
|
||||
SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"`
|
||||
}
|
||||
|
||||
/*
|
||||
{
|
||||
|
@ -54,4 +73,7 @@ const (
|
|||
ResourceID
|
||||
ResourceName
|
||||
}
|
||||
*/
|
||||
*/
|
||||
|
||||
//message status (Active, Ignore, Recover)
|
||||
//rule status (Active, Error, OK)
|
|
@ -17,7 +17,6 @@ type ConditionItem struct {
|
|||
Operator string `json:"operator"`
|
||||
Values []string `json:"values"`
|
||||
Severity string `json:"severity"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
func (cond *ConditionItem) GenerateConditionExpression()(conditionExpression string, err error){
|
||||
valueLength := len(cond.Values)
|
||||
|
|
|
@ -15,6 +15,8 @@ type Metric struct {
|
|||
Items []MetricItem `json:"items"`
|
||||
Formula string `json:"formula,omitempty"`
|
||||
Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80
|
||||
Title string `json:"title"` //text template
|
||||
Message string `json:"message"` // text template
|
||||
}
|
||||
func (m *Metric) GenerateExpression() (string, error){
|
||||
if len(m.Items) == 1 {
|
||||
|
|
|
@ -52,12 +52,13 @@ func (rule *Rule) GetOrInitExpression() (string, error){
|
|||
}
|
||||
|
||||
type RuleChannel struct {
|
||||
Normal []Channel `json:"normal"`
|
||||
Enabled bool `json:"enabled"`
|
||||
Normal []Channel `json:"normal,omitempty"`
|
||||
Escalation []Channel `json:"escalation,omitempty"`
|
||||
ThrottlePeriod string `json:"throttle_period"` //沉默周期
|
||||
AcceptTimeRange TimeRange `json:"accept_time_range"`
|
||||
ThrottlePeriod string `json:"throttle_period,omitempty"` //沉默周期
|
||||
AcceptTimeRange TimeRange `json:"accept_time_range,omitempty"`
|
||||
EscalationThrottlePeriod string `json:"escalation_throttle_period,omitempty"`
|
||||
EscalationEnabled bool `json:"escalation_enabled"`
|
||||
EscalationEnabled bool `json:"escalation_enabled,omitempty"`
|
||||
}
|
||||
|
||||
type MessageTemplate struct{
|
||||
|
@ -71,6 +72,9 @@ type TimeRange struct {
|
|||
}
|
||||
|
||||
func (tr *TimeRange) Include( t time.Time) bool {
|
||||
if tr.Start == "" || tr.End == "" {
|
||||
return true
|
||||
}
|
||||
currentTimeStr := t.Format("15:04")
|
||||
return tr.Start <= currentTimeStr && currentTimeStr <= tr.End
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func TestCreateRule( t *testing.T) {
|
|||
//Conditions: Condition{
|
||||
// Operator: "any",
|
||||
// Items: []ConditionItem{
|
||||
// { MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"1"}, Severity: "error", Message: "集群健康状态为 Red"},
|
||||
// { MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"1"}, Severity: "error", AlertMessage: "集群健康状态为 Red"},
|
||||
// },
|
||||
//},
|
||||
|
||||
|
@ -80,7 +80,7 @@ func TestCreateRule( t *testing.T) {
|
|||
Normal: []Channel{
|
||||
{Name: "钉钉", Type: ChannelWebhook, Webhook: &CustomWebhook{
|
||||
HeaderParams: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
"Message-Type": "application/json",
|
||||
},
|
||||
Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
|
||||
Method: http.MethodPost,
|
||||
|
@ -90,7 +90,7 @@ func TestCreateRule( t *testing.T) {
|
|||
Escalation: []Channel{
|
||||
{Type: ChannelWebhook, Name: "微信", Webhook: &CustomWebhook{
|
||||
HeaderParams: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
"Message-Type": "application/json",
|
||||
},
|
||||
Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
|
||||
Method: http.MethodPost,
|
||||
|
|
|
@ -92,9 +92,16 @@ func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps http
|
|||
state = h.GetParameterOrDefault(req, "state", "")
|
||||
severity = h.GetParameterOrDefault(req, "severity", "")
|
||||
sort = h.GetParameterOrDefault(req, "sort", "")
|
||||
ruleID = h.GetParameterOrDefault(req, "rule_id", "")
|
||||
min = h.GetParameterOrDefault(req, "min", "")
|
||||
max = h.GetParameterOrDefault(req, "max", "")
|
||||
mustBuilder = &strings.Builder{}
|
||||
sortBuilder = strings.Builder{}
|
||||
)
|
||||
mustBuilder.WriteString(fmt.Sprintf(`{"range":{"created":{"gte":"%s", "lte": "%s"}}}`, min, max))
|
||||
if ruleID != "" {
|
||||
mustBuilder.WriteString(fmt.Sprintf(`,{"term":{"rule_id":{"value":"%s"}}}`, ruleID))
|
||||
}
|
||||
|
||||
if sort != "" {
|
||||
sortParts := strings.Split(sort, ",")
|
||||
|
@ -103,24 +110,17 @@ func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps http
|
|||
}
|
||||
}
|
||||
sortBuilder.WriteString(`{"created":{ "order": "desc"}}`)
|
||||
hasFilter := false
|
||||
|
||||
if keyword != "" {
|
||||
mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword))
|
||||
hasFilter = true
|
||||
}
|
||||
if state != "" {
|
||||
if hasFilter {
|
||||
mustBuilder.WriteString(",")
|
||||
}
|
||||
mustBuilder.WriteString(",")
|
||||
mustBuilder.WriteString(fmt.Sprintf(`{"term":{"state":{"value":"%s"}}}`, state))
|
||||
hasFilter = true
|
||||
}
|
||||
if severity != "" {
|
||||
if hasFilter {
|
||||
mustBuilder.WriteString(",")
|
||||
}
|
||||
mustBuilder.WriteString(",")
|
||||
mustBuilder.WriteString(fmt.Sprintf(`{"term":{"severity":{"value":"%s"}}}`, severity))
|
||||
hasFilter = true
|
||||
}
|
||||
size, _ := strconv.Atoi(strSize)
|
||||
if size <= 0 {
|
||||
|
|
|
@ -35,9 +35,12 @@ func (alert *AlertAPI) Init() {
|
|||
|
||||
api.HandleAPIMethod(api.GET, "/alerting/alert/_search", alert.searchAlert)
|
||||
api.HandleAPIMethod(api.GET, "/alerting/alert/:alert_id", alert.getAlert)
|
||||
api.HandleAPIMethod(api.POST, "/alerting/alert/_acknowledge", alert.acknowledgeAlert)
|
||||
api.HandleAPIMethod(api.GET, "/alerting/template/parameters", alert.getTemplateParams)
|
||||
|
||||
api.HandleAPIMethod(api.POST, "/alerting/message/_search", alert.searchAlertMessage)
|
||||
api.HandleAPIMethod(api.POST, "/alerting/message/_ignore", alert.ignoreAlertMessage)
|
||||
api.HandleAPIMethod(api.GET, "/alerting/message/_stats", alert.getAlertMessageStats)
|
||||
|
||||
|
||||
//just for test
|
||||
//api.HandleAPIMethod(api.GET, "/alerting/rule/test", alert.testRule)
|
||||
|
|
|
@ -0,0 +1,222 @@
|
|||
/* Copyright © INFINI Ltd. All rights reserved.
|
||||
* web: https://infinilabs.com
|
||||
* mail: hello#infini.ltd */
|
||||
|
||||
package alerting
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/model/alerting"
|
||||
httprouter "infini.sh/framework/core/api/router"
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/util"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (h *AlertAPI) ignoreAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
body := struct {
|
||||
MessageIDs []string `json:"ids"`
|
||||
}{}
|
||||
err := h.DecodeJSON(req, &body)
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if len(body.MessageIDs) == 0 {
|
||||
h.WriteError(w, "alert ids should not be empty", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
queryDsl := util.MapStr{
|
||||
"query": util.MapStr{
|
||||
"terms": util.MapStr{
|
||||
"_id": body.MessageIDs,
|
||||
},
|
||||
},
|
||||
"script": util.MapStr{
|
||||
"source": fmt.Sprintf("ctx._source['status'] = '%s';ctx._source['ignored_time']='%s'", alerting.MessageStateIgnored, time.Now().Format(time.RFC3339Nano)),
|
||||
},
|
||||
}
|
||||
err = orm.UpdateBy(alerting.AlertMessage{}, util.MustToJSONBytes(queryDsl))
|
||||
if err != nil {
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
h.WriteJSON(w, util.MapStr{
|
||||
"ids": body.MessageIDs,
|
||||
"result": "updated",
|
||||
}, 200)
|
||||
}
|
||||
|
||||
func (h *AlertAPI) getAlertMessageStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
esClient := elastic.GetClient(h.Config.Elasticsearch)
|
||||
queryDsl := util.MapStr{
|
||||
"size": 0,
|
||||
"query": util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must_not": []util.MapStr{
|
||||
{
|
||||
"terms": util.MapStr{
|
||||
"status": []string{
|
||||
alerting.MessageStateRecovered,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"aggs": util.MapStr{
|
||||
"terms_by_severity": util.MapStr{
|
||||
"terms": util.MapStr{
|
||||
"field": "severity",
|
||||
"size": 5,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.AlertMessage{}), util.MustToJSONBytes(queryDsl) )
|
||||
if err != nil {
|
||||
h.WriteJSON(w, util.MapStr{
|
||||
"error": err.Error(),
|
||||
}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
statusCounts := map[string]interface{}{}
|
||||
if termsAgg, ok := searchRes.Aggregations["terms_by_severity"]; ok {
|
||||
for _, bk := range termsAgg.Buckets {
|
||||
if status, ok := bk["key"].(string); ok {
|
||||
statusCounts[status] = bk["doc_count"]
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, status := range []string{"warning", "error", "critical"} {
|
||||
if _, ok := statusCounts[status]; !ok {
|
||||
statusCounts[status] = 0
|
||||
}
|
||||
}
|
||||
h.WriteJSON(w, util.MapStr{
|
||||
"alert": util.MapStr{
|
||||
"current": statusCounts,
|
||||
},
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
|
||||
func (h *AlertAPI) searchAlertMessage(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
resBody:=util.MapStr{}
|
||||
reqBody := struct{
|
||||
Keyword string `json:"keyword"`
|
||||
Size int `json:"size"`
|
||||
From int `json:"from"`
|
||||
Aggregations []elastic.SearchAggParam `json:"aggs"`
|
||||
Highlight elastic.SearchHighlightParam `json:"highlight"`
|
||||
Filter elastic.SearchFilterParam `json:"filter"`
|
||||
Sort []string `json:"sort"`
|
||||
SearchField string `json:"search_field"`
|
||||
}{}
|
||||
err := h.DecodeJSON(req, &reqBody)
|
||||
if err != nil {
|
||||
resBody["error"] = err.Error()
|
||||
h.WriteJSON(w,resBody, http.StatusInternalServerError )
|
||||
return
|
||||
}
|
||||
if reqBody.Size <= 0 {
|
||||
reqBody.Size = 20
|
||||
}
|
||||
aggs := elastic.BuildSearchTermAggregations(reqBody.Aggregations)
|
||||
filter := elastic.BuildSearchTermFilter(reqBody.Filter)
|
||||
var should []util.MapStr
|
||||
if reqBody.SearchField != ""{
|
||||
should = []util.MapStr{
|
||||
{
|
||||
"prefix": util.MapStr{
|
||||
reqBody.SearchField: util.MapStr{
|
||||
"value": reqBody.Keyword,
|
||||
"boost": 20,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"match": util.MapStr{
|
||||
reqBody.SearchField: util.MapStr{
|
||||
"query": reqBody.Keyword,
|
||||
"fuzziness": "AUTO",
|
||||
"max_expansions": 10,
|
||||
"prefix_length": 2,
|
||||
"fuzzy_transpositions": true,
|
||||
"boost": 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}else{
|
||||
if reqBody.Keyword != ""{
|
||||
should = []util.MapStr{
|
||||
{
|
||||
"match": util.MapStr{
|
||||
"search_text": util.MapStr{
|
||||
"query": reqBody.Keyword,
|
||||
"fuzziness": "AUTO",
|
||||
"max_expansions": 10,
|
||||
"prefix_length": 2,
|
||||
"fuzzy_transpositions": true,
|
||||
"boost": 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"query_string": util.MapStr{
|
||||
"fields": []string{"*"},
|
||||
"query": reqBody.Keyword,
|
||||
"fuzziness": "AUTO",
|
||||
"fuzzy_prefix_length": 2,
|
||||
"fuzzy_max_expansions": 10,
|
||||
"fuzzy_transpositions": true,
|
||||
"allow_leading_wildcard": false,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
boolQuery := util.MapStr{
|
||||
"filter": filter,
|
||||
}
|
||||
if len(should) > 0 {
|
||||
boolQuery["should"] = should
|
||||
boolQuery["minimum_should_match"] = 1
|
||||
}
|
||||
query := util.MapStr{
|
||||
"aggs": aggs,
|
||||
"size": reqBody.Size,
|
||||
"from": reqBody.From,
|
||||
"highlight": elastic.BuildSearchHighlight(&reqBody.Highlight),
|
||||
"query": util.MapStr{
|
||||
"bool": boolQuery,
|
||||
},
|
||||
}
|
||||
if len(reqBody.Sort) > 1 {
|
||||
query["sort"] = []util.MapStr{
|
||||
{
|
||||
reqBody.Sort[0]: util.MapStr{
|
||||
"order": reqBody.Sort[1],
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
dsl := util.MustToJSONBytes(query)
|
||||
response, err := elastic.GetClient(h.Config.Elasticsearch).SearchWithRawQueryDSL(orm.GetIndexName(alerting.AlertMessage{}), dsl)
|
||||
if err != nil {
|
||||
resBody["error"] = err.Error()
|
||||
h.WriteJSON(w,resBody, http.StatusInternalServerError )
|
||||
return
|
||||
}
|
||||
h.WriteJSONHeader(w)
|
||||
w.Write(util.MustToJSONBytes(response))
|
||||
|
||||
}
|
|
@ -479,6 +479,39 @@ func (alertAPI *AlertAPI) getTemplateParams(w http.ResponseWriter, req *http.Req
|
|||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func (alertAPI *AlertAPI) getPreviewMetricData(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
rule := &alerting.Rule{}
|
||||
err := alertAPI.DecodeJSON(req, rule)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"error": err.Error(),
|
||||
}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var (
|
||||
minStr = alertAPI.Get(req, "min", "")
|
||||
maxStr = alertAPI.Get(req, "max", "")
|
||||
)
|
||||
bucketSize, min, max, err := api.GetMetricRangeAndBucketSize(minStr, maxStr, 60, 15)
|
||||
filterParam := &alerting.FilterParam{
|
||||
Start: min,
|
||||
End: max,
|
||||
BucketSize: fmt.Sprintf("%ds", bucketSize),
|
||||
}
|
||||
metricItem, err := getRuleMetricData(rule, filterParam)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"error": err.Error(),
|
||||
}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"metric": metricItem,
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
rule := &alerting.Rule{
|
||||
ID: ps.ByName("rule_id"),
|
||||
|
@ -501,8 +534,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
|
|||
End: max,
|
||||
BucketSize: fmt.Sprintf("%ds", bucketSize),
|
||||
}
|
||||
eng := alerting2.GetEngine(rule.Resource.Type)
|
||||
metricData, err := eng.GetTargetMetricData(rule, true, filterParam)
|
||||
metricItem, err := getRuleMetricData(rule, filterParam)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
|
@ -510,18 +542,29 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
|
|||
}, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
//var filteredMetricData []alerting.MetricData
|
||||
title := rule.Metrics.Formula
|
||||
if title == "" && len( rule.Conditions.Items) > 0{
|
||||
title,_ = rule.Conditions.Items[0].GenerateConditionExpression()
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"metric": metricItem,
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam) (*common.MetricItem, error) {
|
||||
eng := alerting2.GetEngine(rule.Resource.Type)
|
||||
metricData, err := eng.GetTargetMetricData(rule, true, filterParam)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//var filteredMetricData []alerting.MetricData
|
||||
//title := rule.Metrics.Formula
|
||||
//if title == "" && len( rule.Conditions.Items) > 0{
|
||||
// title,_ = rule.Conditions.Items[0].GenerateConditionExpression()
|
||||
//}
|
||||
var metricItem = common.MetricItem{
|
||||
Group: rule.ID,
|
||||
Key: rule.ID,
|
||||
Key: rule.ID,
|
||||
Axis: []*common.MetricAxis{
|
||||
{ID: util.GetUUID(), Group: rule.ID, Title: title, FormatType: "num", Position: "left",ShowGridLines: true,
|
||||
{ID: util.GetUUID(), Group: rule.ID, Title: "", FormatType: "num", Position: "left", ShowGridLines: true,
|
||||
TickFormat: "0,0.[00]",
|
||||
Ticks: 5},
|
||||
Ticks: 5},
|
||||
},
|
||||
}
|
||||
var sampleData []alerting.TimeMetricData
|
||||
|
@ -534,18 +577,18 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
|
|||
sampleData = md.Data["result"]
|
||||
}
|
||||
metricItem.Lines = append(metricItem.Lines, &common.MetricLine{
|
||||
Data: md.Data["result"],
|
||||
Data: md.Data["result"],
|
||||
BucketSize: filterParam.BucketSize,
|
||||
Metric: common.MetricSummary{
|
||||
Label: strings.Join(md.GroupValues, "-"),
|
||||
Group: rule.ID,
|
||||
Label: strings.Join(md.GroupValues, "-"),
|
||||
Group: rule.ID,
|
||||
TickFormat: "0,0.[00]",
|
||||
FormatType: "num",
|
||||
},
|
||||
})
|
||||
}
|
||||
//add guidelines
|
||||
for _, cond := range rule.Conditions.Items{
|
||||
for _, cond := range rule.Conditions.Items {
|
||||
if len(cond.Values) > 0 {
|
||||
val, err := strconv.ParseFloat(cond.Values[0], 64)
|
||||
if err != nil {
|
||||
|
@ -553,9 +596,9 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
|
|||
continue
|
||||
}
|
||||
if sampleData != nil {
|
||||
newData := make([]alerting.TimeMetricData,0, len(sampleData))
|
||||
newData := make([]alerting.TimeMetricData, 0, len(sampleData))
|
||||
for _, td := range sampleData {
|
||||
if len(td) < 2{
|
||||
if len(td) < 2 {
|
||||
continue
|
||||
}
|
||||
newData = append(newData, alerting.TimeMetricData{
|
||||
|
@ -563,11 +606,11 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
|
|||
})
|
||||
}
|
||||
metricItem.Lines = append(metricItem.Lines, &common.MetricLine{
|
||||
Data: newData,
|
||||
Data: newData,
|
||||
BucketSize: filterParam.BucketSize,
|
||||
Metric: common.MetricSummary{
|
||||
Label: "",
|
||||
Group: rule.ID,
|
||||
Label: "",
|
||||
Group: rule.ID,
|
||||
TickFormat: "0,0.[00]",
|
||||
FormatType: "num",
|
||||
},
|
||||
|
@ -575,9 +618,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
|
|||
}
|
||||
}
|
||||
}
|
||||
alertAPI.WriteJSON(w, util.MapStr{
|
||||
"metric": metricItem,
|
||||
}, http.StatusOK)
|
||||
return &metricItem, nil
|
||||
}
|
||||
|
||||
|
||||
|
@ -609,7 +650,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
|
|||
// Conditions: alerting.Condition{
|
||||
// Operator: "any",
|
||||
// Items: []alerting.ConditionItem{
|
||||
// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"},
|
||||
// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", AlertMessage: "cpu使用率大于90%"},
|
||||
// },
|
||||
// },
|
||||
//
|
||||
|
@ -617,7 +658,7 @@ func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request
|
|||
// Normal: []alerting.Channel{
|
||||
// {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
|
||||
// HeaderParams: map[string]string{
|
||||
// "Content-Type": "application/json",
|
||||
// "Message-Type": "application/json",
|
||||
// },
|
||||
// Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
|
||||
// Method: http.MethodPost,
|
||||
|
|
|
@ -8,6 +8,7 @@ const (
|
|||
KVLastNotificationTime = "alert_last_notification_time"
|
||||
KVLastTermStartTime = "alert_last_term_start_time"
|
||||
KVLastEscalationTime = "alert_last_escalation_time"
|
||||
KVLastMessageState = "alert_last_message_state"
|
||||
)
|
||||
|
||||
|
||||
|
@ -17,7 +18,8 @@ const (
|
|||
ParamResourceName = "resource_name" // 资源名称 如集群名称 es-v714
|
||||
ParamEventID = "event_id" // 检查事件 ID
|
||||
ParamResults = "results" //
|
||||
ParamMessage = "message" //检查消息 自定义
|
||||
ParamMessage = "message" //检查消息 自定义(模版渲染)
|
||||
ParamTitle = "title"
|
||||
ParamPresetValue = "preset_value" //检查预设值 float64
|
||||
ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]}
|
||||
Severity = "severity" //告警等级
|
||||
|
|
|
@ -575,44 +575,93 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
Expression: rule.Metrics.Expression,
|
||||
Objects: rule.Resource.Objects,
|
||||
Conditions: rule.Conditions,
|
||||
State: alerting.AlertStateNormal,
|
||||
State: alerting.AlertStateOK,
|
||||
}
|
||||
checkResults, err := engine.CheckCondition(rule)
|
||||
alertItem.ConditionResult = checkResults
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lastAlertItem := alerting.Alert{}
|
||||
err = getLastAlert(rule.ID, &lastAlertItem)
|
||||
alertMessage, err := getLastAlertMessage(rule.ID, 2 * time.Minute)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("get alert message error: %w", err)
|
||||
}
|
||||
conditionResults := checkResults.ResultItems
|
||||
var paramsCtx map[string]interface{}
|
||||
if len(conditionResults) == 0 {
|
||||
alertItem.Severity = "info"
|
||||
alertItem.Content = ""
|
||||
alertItem.State = alerting.AlertStateNormal
|
||||
alertItem.State = alerting.AlertStateOK
|
||||
if alertMessage != nil && alertMessage.Status != alerting.MessageStateRecovered {
|
||||
alertMessage.Status = alerting.MessageStateRecovered
|
||||
alertMessage.Updated = time.Now()
|
||||
err = saveAlertMessage(alertMessage)
|
||||
if err != nil {
|
||||
return fmt.Errorf("save alert message error: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}else{
|
||||
if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal {
|
||||
rule.LastTermStartTime = time.Now()
|
||||
strTime := rule.LastTermStartTime.UTC().Format(time.RFC3339)
|
||||
kv.AddValue(alerting2.KVLastTermStartTime, []byte(rule.ID), []byte(strTime))
|
||||
}
|
||||
log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID )
|
||||
paramsCtx = newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Unix())
|
||||
var (
|
||||
severity = conditionResults[0].ConditionItem.Severity
|
||||
content string
|
||||
tplBytes []byte
|
||||
message string
|
||||
title string
|
||||
)
|
||||
tplBytes, err = resolveMessage(rule.Metrics.Message, paramsCtx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("resolve content template error: %w", err)
|
||||
}
|
||||
message = string(tplBytes)
|
||||
paramsCtx[alerting2.ParamMessage] = message
|
||||
tplBytes, err = resolveMessage(rule.Metrics.Title, paramsCtx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("resolve title template error: %w", err)
|
||||
}
|
||||
title = string(tplBytes)
|
||||
paramsCtx[alerting2.ParamTitle] = title
|
||||
for _, conditionResult := range conditionResults {
|
||||
if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] {
|
||||
severity = conditionResult.ConditionItem.Severity
|
||||
content = conditionResult.ConditionItem.Message
|
||||
}
|
||||
}
|
||||
|
||||
alertItem.Severity = severity
|
||||
alertItem.Content = content
|
||||
alertItem.Message = message
|
||||
alertItem.Title = title
|
||||
alertItem.State = alerting.AlertStateActive
|
||||
if alertMessage == nil || alertMessage.Status == alerting.MessageStateRecovered {
|
||||
msg := &alerting.AlertMessage{
|
||||
RuleID: rule.ID,
|
||||
Created: time.Now(),
|
||||
Updated: time.Now(),
|
||||
ID: util.GetUUID(),
|
||||
Status: alerting.MessageStateActive,
|
||||
Severity: severity,
|
||||
Title: title,
|
||||
Message: message,
|
||||
}
|
||||
err = saveAlertMessage(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("save alert message error: %w", err)
|
||||
}
|
||||
}else{
|
||||
alertMessage.Title = title
|
||||
alertMessage.Message = message
|
||||
err = saveAlertMessage(alertMessage)
|
||||
if err != nil {
|
||||
return fmt.Errorf("save alert message error: %w", err)
|
||||
}
|
||||
}
|
||||
log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID )
|
||||
}
|
||||
// if alert message status equals ignored , then skip sending message to channel
|
||||
if alertMessage != nil && alertMessage.Status == alerting.MessageStateIgnored {
|
||||
return nil
|
||||
}
|
||||
// if channel is not enabled return
|
||||
if !rule.Channels.Enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
if rule.Channels.AcceptTimeRange.Include(time.Now()) {
|
||||
|
@ -633,9 +682,11 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
period := time.Now().Sub(rule.LastNotificationTime.Local())
|
||||
|
||||
//log.Error(lastAlertItem.ID, period, periodDuration)
|
||||
paramsCtx := newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.UnixNano()/1e6)
|
||||
if paramsCtx == nil {
|
||||
paramsCtx = newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Unix())
|
||||
}
|
||||
|
||||
if lastAlertItem.ID == "" || period > periodDuration {
|
||||
if alertMessage == nil || period > periodDuration {
|
||||
actionResults, errCount := performChannels(rule.Channels.Normal, paramsCtx)
|
||||
alertItem.ActionExecutionResults = actionResults
|
||||
//change and save last notification time in local kv store when action error count equals zero
|
||||
|
@ -646,25 +697,16 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
|
|||
alertItem.IsNotified = true
|
||||
}
|
||||
}
|
||||
isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime)
|
||||
if err != nil {
|
||||
alertItem.Error = err.Error()
|
||||
return err
|
||||
}
|
||||
if rule.Channels.EscalationEnabled && lastAlertItem.ID !="" && !isAck {
|
||||
|
||||
if rule.Channels.EscalationEnabled {
|
||||
throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//change and save last term start time in local kv store when action error count equals zero
|
||||
if rule.LastTermStartTime.IsZero(){
|
||||
tm, err := readTimeFromKV(alerting2.KVLastTermStartTime, []byte(rule.ID))
|
||||
if err != nil {
|
||||
return fmt.Errorf("get last term start time from kv error: %w", err)
|
||||
}
|
||||
if !tm.IsZero(){
|
||||
rule.LastTermStartTime = tm
|
||||
}
|
||||
|
||||
rule.LastTermStartTime = time.Now()
|
||||
if alertMessage != nil {
|
||||
rule.LastTermStartTime = alertMessage.Created
|
||||
}
|
||||
if time.Now().Sub(rule.LastTermStartTime.Local()) > throttlePeriod {
|
||||
if rule.LastEscalationTime.IsZero(){
|
||||
|
@ -698,7 +740,6 @@ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult
|
|||
var conditionParams []util.MapStr
|
||||
for _, resultItem := range checkResults.ResultItems {
|
||||
conditionParams = append(conditionParams, util.MapStr{
|
||||
alerting2.ParamMessage: resultItem.ConditionItem.Message,
|
||||
alerting2.ParamPresetValue: resultItem.ConditionItem.Values,
|
||||
alerting2.Severity: resultItem.ConditionItem.Severity,
|
||||
alerting2.ParamGroupValues: resultItem.GroupValues,
|
||||
|
@ -724,7 +765,7 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul
|
|||
return nil, fmt.Errorf("check condition error:%w", err)
|
||||
}
|
||||
var actionResults []alerting.ActionExecutionResult
|
||||
paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().UnixNano()/1e6)
|
||||
paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Unix())
|
||||
if len(rule.Channels.Normal) > 0 {
|
||||
actionResults, _ = performChannels(rule.Channels.Normal, paramsCtx)
|
||||
}else if len(rule.Channels.Escalation) > 0{
|
||||
|
@ -879,7 +920,7 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti
|
|||
}
|
||||
}
|
||||
|
||||
func getLastAlert(ruleID string, alertItem *alerting.Alert) error {
|
||||
func getLastAlertMessageFromES(ruleID string, message *alerting.AlertMessage) error {
|
||||
queryDsl := util.MapStr{
|
||||
"size": 1,
|
||||
"sort": []util.MapStr{
|
||||
|
@ -900,15 +941,53 @@ func getLastAlert(ruleID string, alertItem *alerting.Alert) error {
|
|||
q := orm.Query{
|
||||
RawQuery: util.MustToJSONBytes(queryDsl),
|
||||
}
|
||||
err, searchResult := orm.Search(alertItem, &q )
|
||||
err, searchResult := orm.Search(alerting.AlertMessage{}, &q )
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(searchResult.Result) == 0 {
|
||||
return nil
|
||||
}
|
||||
messageBytes := util.MustToJSONBytes(searchResult.Result[0])
|
||||
return util.FromJSONBytes(messageBytes, message)
|
||||
}
|
||||
|
||||
func getLastAlertMessage(ruleID string, duration time.Duration) (*alerting.AlertMessage, error ){
|
||||
messageBytes, err := kv.GetValue(alerting2.KVLastMessageState, []byte(ruleID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if messageBytes == nil {
|
||||
return nil, nil
|
||||
}
|
||||
message := &alerting.AlertMessage{}
|
||||
err = util.FromJSONBytes(messageBytes, message)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if time.Now().Sub(message.Updated) > duration {
|
||||
err = getLastAlertMessageFromES(ruleID, message)
|
||||
return message, err
|
||||
}
|
||||
return message, nil
|
||||
}
|
||||
|
||||
func saveAlertMessageToES(message *alerting.AlertMessage) error {
|
||||
return orm.Save(message)
|
||||
}
|
||||
|
||||
func saveAlertMessage(message *alerting.AlertMessage) error {
|
||||
err := saveAlertMessageToES(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(searchResult.Result) == 0 {
|
||||
return nil
|
||||
|
||||
messageBytes, err := util.ToJSONBytes(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
alertBytes := util.MustToJSONBytes(searchResult.Result[0])
|
||||
return util.FromJSONBytes(alertBytes, alertItem)
|
||||
err = kv.AddValue(alerting2.KVLastMessageState, []byte(message.RuleID), messageBytes)
|
||||
return err
|
||||
}
|
||||
|
||||
func hasAcknowledgedRule(ruleID string, startTime time.Time) (bool, error){
|
||||
|
|
|
@ -75,7 +75,7 @@ func TestEngine( t *testing.T) {
|
|||
Normal: []alerting.Channel{
|
||||
{Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
|
||||
HeaderParams: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
"Message-Type": "application/json",
|
||||
},
|
||||
Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
|
||||
Method: http.MethodPost,
|
||||
|
@ -85,7 +85,7 @@ func TestEngine( t *testing.T) {
|
|||
Escalation: []alerting.Channel{
|
||||
{Type: alerting.ChannelWebhook, Name: "微信", Webhook: &alerting.CustomWebhook{
|
||||
HeaderParams: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
"Message-Type": "application/json",
|
||||
},
|
||||
Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
|
||||
Method: http.MethodPost,
|
||||
|
@ -153,7 +153,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
|
|||
// Conditions: alerting.Condition{
|
||||
// Operator: "any",
|
||||
// Items: []alerting.ConditionItem{
|
||||
// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"},
|
||||
// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", AlertMessage: "cpu使用率大于90%"},
|
||||
// },
|
||||
// },
|
||||
//
|
||||
|
@ -161,7 +161,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
|
|||
// Normal: []alerting.Channel{
|
||||
// {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
|
||||
// HeaderParams: map[string]string{
|
||||
// "Content-Type": "application/json",
|
||||
// "Message-Type": "application/json",
|
||||
// },
|
||||
// Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
|
||||
// Method: http.MethodPost,
|
||||
|
@ -222,7 +222,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
|
|||
Normal: []alerting.Channel{
|
||||
{Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
|
||||
HeaderParams: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
"Message-Type": "application/json",
|
||||
},
|
||||
Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
|
||||
Method: http.MethodPost,
|
||||
|
|
|
@ -9,19 +9,22 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
func date(fmt string, date interface{}) string {
|
||||
return dateInZone(fmt, date, "Local")
|
||||
func datetimeInZone(zone string, date interface{}) string{
|
||||
return _dateInZone("2006-01-02 15:04:05", date, zone)
|
||||
}
|
||||
func datetime(date interface{}) string{
|
||||
return _dateInZone("2006-01-02 15:04:05", date, "Local")
|
||||
}
|
||||
|
||||
func htmlDate(date interface{}) string {
|
||||
return dateInZone("2006-01-02", date, "Local")
|
||||
func date(date interface{}) string {
|
||||
return _dateInZone("2006-01-02", date, "Local")
|
||||
}
|
||||
|
||||
func htmlDateInZone(date interface{}, zone string) string {
|
||||
return dateInZone("2006-01-02", date, zone)
|
||||
func dateInZone(zone string, date interface{}) string {
|
||||
return _dateInZone("2006-01-02", date, zone)
|
||||
}
|
||||
|
||||
func dateInZone(fmt string, date interface{}, zone string) string {
|
||||
func _dateInZone(fmt string, date interface{}, zone string) string {
|
||||
var t time.Time
|
||||
switch date := date.(type) {
|
||||
default:
|
||||
|
@ -34,6 +37,7 @@ func dateInZone(fmt string, date interface{}, zone string) string {
|
|||
t = time.Unix(date, 0)
|
||||
case int:
|
||||
t = time.Unix(int64(date), 0)
|
||||
|
||||
case int32:
|
||||
t = time.Unix(int64(date), 0)
|
||||
case string:
|
||||
|
|
|
@ -18,11 +18,13 @@ func GenericFuncMap() template.FuncMap {
|
|||
}
|
||||
|
||||
var genericMap = map[string]interface{}{
|
||||
"hello": func() string { return "Hello!" },
|
||||
"hello": func() string { return "Hello!" },
|
||||
"format_bytes": formatBytes,
|
||||
"to_fixed": toFixed,
|
||||
"date": date,
|
||||
"to_fixed": toFixed,
|
||||
"date": date,
|
||||
"date_in_zone": dateInZone,
|
||||
"to_upper": strings.ToUpper,
|
||||
"to_lower": strings.ToLower,
|
||||
"datetime": datetime,
|
||||
"datetime_in_zone": datetimeInZone,
|
||||
"to_upper": strings.ToUpper,
|
||||
"to_lower": strings.ToLower,
|
||||
}
|
||||
|
|
|
@ -18,8 +18,9 @@ func GetTemplateParameters() []ParameterMeta {
|
|||
{ParamResourceID, "string", "resource uuid", "c9f663tath2e5a0vksjg", nil},
|
||||
{ParamResourceName, "string", "resource name", "es-v716", nil},
|
||||
{ParamEventID, "string", "identifier for check details", "c9f663tath2e5a0vksjx", nil},
|
||||
{ParamTitle, "string", "", "xxx cpu used 95%", nil},
|
||||
{ParamMessage, "string", "", "disk used 90%", nil},
|
||||
{ParamResults, "array", "", "", []ParameterMeta{
|
||||
{ParamMessage, "string", "", "disk used 90%", nil},
|
||||
{ParamPresetValue, "array", "", "[\"90\"]", nil},
|
||||
{Severity, "string", "", "error", nil},
|
||||
{ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil},
|
||||
|
|
Loading…
Reference in New Issue