From 2770a8b9ab5e867fbe89d4a7bede940b93084067 Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 11 May 2022 17:41:33 +0800 Subject: [PATCH] recalculate time after rule updated --- plugin/api/alerting/api.go | 1 + plugin/api/alerting/rule.go | 64 ++++++++++++++++++-- service/alerting/constants.go | 6 +- service/alerting/elasticsearch/engine.go | 74 ++++++++++++------------ service/alerting/engine.go | 1 + service/alerting/parameter.go | 8 +-- 6 files changed, 105 insertions(+), 49 deletions(-) diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index daf3c0a9..c8fb0cc6 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -25,6 +25,7 @@ func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/stats", alert.getAlertStats) api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.fetchAlertInfos) api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.enableRule) + api.HandleAPIMethod(api.POST, "/alerting/rule/metric", alert.getMetricData) api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.getChannel) api.HandleAPIMethod(api.POST, "/alerting/channel", alert.createChannel) diff --git a/plugin/api/alerting/rule.go b/plugin/api/alerting/rule.go index edf46a52..5f58a5fb 100644 --- a/plugin/api/alerting/rule.go +++ b/plugin/api/alerting/rule.go @@ -6,16 +6,17 @@ package alerting import ( "fmt" + log "github.com/cihub/seelog" "infini.sh/console/model/alerting" alerting2 "infini.sh/console/service/alerting" _ "infini.sh/console/service/alerting/elasticsearch" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/kv" "infini.sh/framework/core/orm" "infini.sh/framework/core/task" "infini.sh/framework/core/util" "net/http" - log "src/github.com/cihub/seelog" "time" ) @@ -23,6 +24,7 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p rules := []alerting.Rule{} err := alertAPI.DecodeJSON(req, &rules) if err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "error": err.Error(), }, http.StatusInternalServerError) @@ -32,6 +34,7 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p for _, rule := range rules { exists, err := checkResourceExists(&rule) if err != nil || !exists { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "error": err.Error(), }, http.StatusInternalServerError) @@ -39,6 +42,7 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p } rule.Metrics.Expression, err = rule.Metrics.GenerateExpression() if err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "error": err.Error(), }, http.StatusInternalServerError) @@ -55,6 +59,7 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p err = orm.Save(rule) if err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "error": err.Error(), }, http.StatusInternalServerError) @@ -87,6 +92,7 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h exists, err := orm.Get(&obj) if !exists || err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "_id": id, "found": false, @@ -94,6 +100,7 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h return } if err != nil { + log.Error(err) alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) log.Error(err) return @@ -114,6 +121,7 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p obj.ID = id exists, err := orm.Get(obj) if !exists || err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "_id": id, "result": "not_found", @@ -151,6 +159,7 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p if obj.Enabled { exists, err = checkResourceExists(obj) if err != nil || !exists { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "error": err.Error(), }, http.StatusInternalServerError) @@ -170,6 +179,7 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p }else{ task.DeleteTask(id) } + clearKV(obj.ID) alertAPI.WriteJSON(w, util.MapStr{ "_id": obj.ID, @@ -177,6 +187,11 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p }, 200) } +func clearKV(ruleID string){ + _ = kv.DeleteKey(alerting2.KVLastNotificationTime, []byte(ruleID)) + _ = kv.DeleteKey(alerting2.KVLastEscalationTime, []byte(ruleID)) +} + func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("rule_id") @@ -185,6 +200,7 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p exists, err := orm.Get(&obj) if !exists || err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "_id": id, "result": "not_found", @@ -199,6 +215,7 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p return } task.DeleteTask(obj.ID) + clearKV(obj.ID) alertAPI.WriteJSON(w, util.MapStr{ "_id": obj.ID, @@ -325,6 +342,7 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) ) if err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "error": err.Error(), }, http.StatusInternalServerError) @@ -334,8 +352,9 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK) return } - aletNumbers, err := alertAPI.getRuleAlertNumbers(ruleIDs) + alertNumbers, err := alertAPI.getRuleAlertNumbers(ruleIDs) if err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "error": err.Error(), }, http.StatusInternalServerError) @@ -346,8 +365,8 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque for _, hit := range searchRes.Hits.Hits { if ruleID, ok := hit.Source["rule_id"].(string); ok { latestAlertInfos[ruleID] = util.MapStr{ - "status": hit.Source["state"], - "alert_count": aletNumbers[ruleID], + "status": hit.Source["state"], + "alert_count": alertNumbers[ruleID], } } @@ -359,6 +378,7 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p reqObj := alerting.Rule{} err := alertAPI.DecodeJSON(req, &reqObj) if err != nil { + log.Error(err) alertAPI.WriteError(w, fmt.Sprintf("request format error:%v", err), http.StatusInternalServerError) return } @@ -368,6 +388,7 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p exists, err := orm.Get(&obj) if !exists || err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "_id": id, "result": "not_found", @@ -390,6 +411,7 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p obj.Enabled = reqObj.Enabled err = orm.Save(obj) if err != nil { + log.Error(err) alertAPI.WriteError(w, fmt.Sprintf("save rule error:%v", err), http.StatusInternalServerError) return } @@ -403,6 +425,7 @@ func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Reque rule := alerting.Rule{} err := alertAPI.DecodeJSON(req, &rule) if err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "error": err.Error(), }, http.StatusInternalServerError) @@ -411,6 +434,7 @@ func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Reque eng := alerting2.GetEngine(rule.Resource.Type) actionResults, err := eng.Test(&rule) if err != nil { + log.Error(err) alertAPI.WriteJSON(w, util.MapStr{ "error": err.Error(), }, http.StatusInternalServerError) @@ -448,6 +472,38 @@ func (alertAPI *AlertAPI) getTemplateParams(w http.ResponseWriter, req *http.Req }, http.StatusOK) } +func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + rule := alerting.Rule{} + err := alertAPI.DecodeJSON(req, &rule) + if err != nil { + log.Error(err) + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + eng := alerting2.GetEngine(rule.Resource.Type) + metricData, err := eng.GetTargetMetricData(&rule, true) + if err != nil { + log.Error(err) + alertAPI.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + var filteredMetricData []alerting.MetricData + for _, md := range metricData { + if len(md.Data) == 0 { + continue + } + filteredMetricData = append(filteredMetricData, md) + } + alertAPI.WriteJSON(w, util.MapStr{ + "metric": filteredMetricData, + }, http.StatusOK) +} + + //func (alertAPI *AlertAPI) testRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { // rule := alerting.Rule{ // ID: util.GetUUID(), diff --git a/service/alerting/constants.go b/service/alerting/constants.go index ad36c95b..dba24266 100644 --- a/service/alerting/constants.go +++ b/service/alerting/constants.go @@ -16,15 +16,15 @@ const ( ParamResourceID = "resource_id" // 资源 UUID ParamResourceName = "resource_name" // 资源名称 如集群名称 es-v714 ParamEventID = "event_id" // 检查事件 ID - ParamResults = "results" // + ParamResults = "results" // ParamMessage = "message" //检查消息 自定义 ParamPresetValue = "preset_value" //检查预设值 float64 ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]} - ParamStatus = "status" //状态 + Severity = "severity" //告警等级 ParamTimestamp = "timestamp" //事件产生时间戳 ParamGroupValues = "group_values" ParamIssueTimestamp = "issue_timestamp" ParamRelationValues = "relation_values" //rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values], //check_status ,timestamp, -) +) \ No newline at end of file diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 886a1a3f..09b77f6c 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -363,7 +363,7 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, e queryResult.MetricData = metricData return queryResult, nil } -func (engine *Engine) GetTargetMetricData(rule *alerting.Rule)([]alerting.MetricData, error){ +func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool)([]alerting.MetricData, error){ queryResult, err := engine.ExecuteQuery(rule) if err != nil { return nil, err @@ -411,7 +411,9 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule)([]alerting.Metric } if r, ok := result.(float64); ok { if math.IsNaN(r) || math.IsInf(r, 0 ){ - targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()}) + if !isFilterNaN { + targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()}) + } continue } } @@ -436,7 +438,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes } var resultItems []alerting.ConditionResultItem - targetMetricData, err := engine.GetTargetMetricData(rule) + targetMetricData, err := engine.GetTargetMetricData(rule, false) if err != nil { return nil, err } @@ -611,13 +613,15 @@ func (engine *Engine) Do(rule *alerting.Rule) error { paramsCtx := newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Format(time.RFC3339)) if lastAlertItem.ID == "" || period > periodDuration { - actionResults := performChannels(rule.Channels.Normal, paramsCtx) + actionResults, errCount := performChannels(rule.Channels.Normal, paramsCtx) alertItem.ActionExecutionResults = actionResults - //todo init last notification time when create task (by last alert item is notified) - rule.LastNotificationTime = time.Now() - strTime := time.Now().UTC().Format(time.RFC3339) - kv.AddValue(alerting2.KVLastNotificationTime, []byte(rule.ID), []byte(strTime)) - alertItem.IsNotified = true + //change and save last notification time in local kv store when action error count equals zero + if errCount == 0 { + rule.LastNotificationTime = time.Now() + strTime := time.Now().UTC().Format(time.RFC3339) + kv.AddValue(alerting2.KVLastNotificationTime, []byte(rule.ID), []byte(strTime)) + alertItem.IsNotified = true + } } isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime) if err != nil { @@ -629,7 +633,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error { if err != nil { return err } - //todo init last term start time when create task (by last alert item of state normal) + //change and save last term start time in local kv store when action error count equals zero if rule.LastTermStartTime.IsZero(){ tm, err := readTimeFromKV(alerting2.KVLastTermStartTime, []byte(rule.ID)) if err != nil { @@ -650,13 +654,15 @@ func (engine *Engine) Do(rule *alerting.Rule) error { } } if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration { - actionResults := performChannels(rule.Channels.Escalation, paramsCtx) - alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...) + actionResults, errCount := performChannels(rule.Channels.Escalation, paramsCtx) + alertItem.ActionExecutionResults = actionResults //todo init last escalation time when create task (by last alert item is escalated) - rule.LastEscalationTime = time.Now() - alertItem.IsEscalated = true - strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339) - kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime)) + if errCount == 0 { + rule.LastEscalationTime = time.Now() + alertItem.IsEscalated = true + strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339) + kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime)) + } } } @@ -669,12 +675,12 @@ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult var conditionParams []util.MapStr for _, resultItem := range checkResults.ResultItems { conditionParams = append(conditionParams, util.MapStr{ - alerting2.ParamMessage: resultItem.ConditionItem.Message, - alerting2.ParamPresetValue: resultItem.ConditionItem.Values, - alerting2.ParamStatus: resultItem.ConditionItem.Severity, - alerting2.ParamGroupValues: resultItem.GroupValues, + alerting2.ParamMessage: resultItem.ConditionItem.Message, + alerting2.ParamPresetValue: resultItem.ConditionItem.Values, + alerting2.Severity: resultItem.ConditionItem.Severity, + alerting2.ParamGroupValues: resultItem.GroupValues, alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp, - alerting2.ParamResultValue: resultItem.ResultValue, + alerting2.ParamResultValue: resultItem.ResultValue, alerting2.ParamRelationValues: resultItem.RelationValues, }) } @@ -697,22 +703,23 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul var actionResults []alerting.ActionExecutionResult paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Format(time.RFC3339)) if len(rule.Channels.Normal) > 0 { - actionResults = performChannels(rule.Channels.Normal, paramsCtx) + actionResults, _ = performChannels(rule.Channels.Normal, paramsCtx) }else if len(rule.Channels.Escalation) > 0{ - actionResults = performChannels(rule.Channels.Escalation, paramsCtx) + actionResults, _ = performChannels(rule.Channels.Escalation, paramsCtx) }else{ return nil, fmt.Errorf("no useable channel") } return actionResults, nil } -func performChannels(channels []alerting.Channel, ctx map[string]interface{}) []alerting.ActionExecutionResult { - +func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([]alerting.ActionExecutionResult, int) { + var errCount int var actionResults []alerting.ActionExecutionResult for _, channel := range channels { resBytes, err := performChannel(&channel, ctx) var errStr string if err != nil { + errCount++ errStr = err.Error() } actionResults = append(actionResults, alerting.ActionExecutionResult{ @@ -721,23 +728,14 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) [] LastExecutionTime: int(time.Now().UnixNano()/1e6), }) } - return actionResults + return actionResults, errCount } func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){ msg := messageTemplate - //tpl := fasttemplate.New(msg, "{{", "}}") - //msgBuffer := bytes.NewBuffer(nil) - //_, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){ - // keyParts := strings.Split(tag,".") - // value, _, _, err := jsonparser.Get(ctx, keyParts...) - // if err != nil { - // return 0, err - // } - // return writer.Write(value) - //}) - //return msgBuffer.Bytes(), err - tmpl, err := template.New("alert-message").Parse(msg) + tmpl, err := template.New("alert-message").Funcs(template.FuncMap{ + "format_bytes": func(precision int, bytes float64) string { return util.FormatBytes(bytes, precision)}, + }).Parse(msg) if err !=nil { return nil, fmt.Errorf("parse message temlate error: %w", err) } diff --git a/service/alerting/engine.go b/service/alerting/engine.go index d9dece81..46dc11c3 100644 --- a/service/alerting/engine.go +++ b/service/alerting/engine.go @@ -17,6 +17,7 @@ type Engine interface { CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error) GenerateTask(rule *alerting.Rule) func(ctx context.Context) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error) + GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool)([]alerting.MetricData, error) } var ( diff --git a/service/alerting/parameter.go b/service/alerting/parameter.go index f29c258c..2fe8128b 100644 --- a/service/alerting/parameter.go +++ b/service/alerting/parameter.go @@ -20,13 +20,13 @@ func GetTemplateParameters() []ParameterMeta { {ParamEventID, "string", "identifier for check details", "c9f663tath2e5a0vksjx", nil}, {ParamResults, "array", "", "", []ParameterMeta{ {ParamMessage, "string", "", "disk used 90%", nil}, - {ParamPresetValue, "float", "", "", nil}, - {ParamStatus, "string", "", "error", nil}, + {ParamPresetValue, "array", "", "[\"90\"]", nil}, + {Severity, "string", "", "error", nil}, {ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil}, - {ParamIssueTimestamp, "date", "", "1652184211252", nil}, + {ParamIssueTimestamp, "date", "", "2022-05-11T11:50:55+08:00", nil}, {ParamResultValue, "float", "", "91.2", nil}, {ParamRelationValues, "map", "", "{a:100, b:91.2}", nil}, }}, - {ParamTimestamp, "date", "", "", nil}, + {ParamTimestamp, "date", "", "2022-05-11T11:50:55+08:00", nil}, } }