recalculate time after rule updated

This commit is contained in:
liugq 2022-05-11 17:41:33 +08:00
parent 9a4456be59
commit 2770a8b9ab
6 changed files with 105 additions and 49 deletions

View File

@ -25,6 +25,7 @@ func (alert *AlertAPI) Init() {
api.HandleAPIMethod(api.GET, "/alerting/stats", alert.getAlertStats)
api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.fetchAlertInfos)
api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.enableRule)
api.HandleAPIMethod(api.POST, "/alerting/rule/metric", alert.getMetricData)
api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.getChannel)
api.HandleAPIMethod(api.POST, "/alerting/channel", alert.createChannel)

View File

@ -6,16 +6,17 @@ package alerting
import (
"fmt"
log "github.com/cihub/seelog"
"infini.sh/console/model/alerting"
alerting2 "infini.sh/console/service/alerting"
_ "infini.sh/console/service/alerting/elasticsearch"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/kv"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/task"
"infini.sh/framework/core/util"
"net/http"
log "src/github.com/cihub/seelog"
"time"
)
@ -23,6 +24,7 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p
rules := []alerting.Rule{}
err := alertAPI.DecodeJSON(req, &rules)
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
@ -32,6 +34,7 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p
for _, rule := range rules {
exists, err := checkResourceExists(&rule)
if err != nil || !exists {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
@ -39,6 +42,7 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p
}
rule.Metrics.Expression, err = rule.Metrics.GenerateExpression()
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
@ -55,6 +59,7 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p
err = orm.Save(rule)
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
@ -87,6 +92,7 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h
exists, err := orm.Get(&obj)
if !exists || err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
@ -94,6 +100,7 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h
return
}
if err != nil {
log.Error(err)
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
@ -114,6 +121,7 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
obj.ID = id
exists, err := orm.Get(obj)
if !exists || err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"_id": id,
"result": "not_found",
@ -151,6 +159,7 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
if obj.Enabled {
exists, err = checkResourceExists(obj)
if err != nil || !exists {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
@ -170,6 +179,7 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
}else{
task.DeleteTask(id)
}
clearKV(obj.ID)
alertAPI.WriteJSON(w, util.MapStr{
"_id": obj.ID,
@ -177,6 +187,11 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
}, 200)
}
func clearKV(ruleID string){
_ = kv.DeleteKey(alerting2.KVLastNotificationTime, []byte(ruleID))
_ = kv.DeleteKey(alerting2.KVLastEscalationTime, []byte(ruleID))
}
func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("rule_id")
@ -185,6 +200,7 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p
exists, err := orm.Get(&obj)
if !exists || err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"_id": id,
"result": "not_found",
@ -199,6 +215,7 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p
return
}
task.DeleteTask(obj.ID)
clearKV(obj.ID)
alertAPI.WriteJSON(w, util.MapStr{
"_id": obj.ID,
@ -325,6 +342,7 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) )
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
@ -334,8 +352,9 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque
alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
aletNumbers, err := alertAPI.getRuleAlertNumbers(ruleIDs)
alertNumbers, err := alertAPI.getRuleAlertNumbers(ruleIDs)
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
@ -346,8 +365,8 @@ func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Reque
for _, hit := range searchRes.Hits.Hits {
if ruleID, ok := hit.Source["rule_id"].(string); ok {
latestAlertInfos[ruleID] = util.MapStr{
"status": hit.Source["state"],
"alert_count": aletNumbers[ruleID],
"status": hit.Source["state"],
"alert_count": alertNumbers[ruleID],
}
}
@ -359,6 +378,7 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p
reqObj := alerting.Rule{}
err := alertAPI.DecodeJSON(req, &reqObj)
if err != nil {
log.Error(err)
alertAPI.WriteError(w, fmt.Sprintf("request format error:%v", err), http.StatusInternalServerError)
return
}
@ -368,6 +388,7 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p
exists, err := orm.Get(&obj)
if !exists || err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"_id": id,
"result": "not_found",
@ -390,6 +411,7 @@ func (alertAPI *AlertAPI) enableRule(w http.ResponseWriter, req *http.Request, p
obj.Enabled = reqObj.Enabled
err = orm.Save(obj)
if err != nil {
log.Error(err)
alertAPI.WriteError(w, fmt.Sprintf("save rule error:%v", err), http.StatusInternalServerError)
return
}
@ -403,6 +425,7 @@ func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Reque
rule := alerting.Rule{}
err := alertAPI.DecodeJSON(req, &rule)
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
@ -411,6 +434,7 @@ func (alertAPI *AlertAPI) sendTestMessage(w http.ResponseWriter, req *http.Reque
eng := alerting2.GetEngine(rule.Resource.Type)
actionResults, err := eng.Test(&rule)
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
@ -448,6 +472,38 @@ func (alertAPI *AlertAPI) getTemplateParams(w http.ResponseWriter, req *http.Req
}, http.StatusOK)
}
func (alertAPI *AlertAPI) getMetricData(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
rule := alerting.Rule{}
err := alertAPI.DecodeJSON(req, &rule)
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
eng := alerting2.GetEngine(rule.Resource.Type)
metricData, err := eng.GetTargetMetricData(&rule, true)
if err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
var filteredMetricData []alerting.MetricData
for _, md := range metricData {
if len(md.Data) == 0 {
continue
}
filteredMetricData = append(filteredMetricData, md)
}
alertAPI.WriteJSON(w, util.MapStr{
"metric": filteredMetricData,
}, http.StatusOK)
}
//func (alertAPI *AlertAPI) testRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
// rule := alerting.Rule{
// ID: util.GetUUID(),

View File

@ -16,15 +16,15 @@ const (
ParamResourceID = "resource_id" // 资源 UUID
ParamResourceName = "resource_name" // 资源名称 如集群名称 es-v714
ParamEventID = "event_id" // 检查事件 ID
ParamResults = "results" //
ParamResults = "results" //
ParamMessage = "message" //检查消息 自定义
ParamPresetValue = "preset_value" //检查预设值 float64
ParamResultValue = "result_value" //检查结果 {group_tags:["cluster-xxx", "node-xxx"], check_values:[]}
ParamStatus = "status" //状态
Severity = "severity" //告警等级
ParamTimestamp = "timestamp" //事件产生时间戳
ParamGroupValues = "group_values"
ParamIssueTimestamp = "issue_timestamp"
ParamRelationValues = "relation_values"
//rule expression, rule_id, resource_id, resource_name, event_id, condition_name, preset_value,[group_tags, check_values],
//check_status ,timestamp,
)
)

View File

@ -363,7 +363,7 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, e
queryResult.MetricData = metricData
return queryResult, nil
}
func (engine *Engine) GetTargetMetricData(rule *alerting.Rule)([]alerting.MetricData, error){
func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool)([]alerting.MetricData, error){
queryResult, err := engine.ExecuteQuery(rule)
if err != nil {
return nil, err
@ -411,7 +411,9 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule)([]alerting.Metric
}
if r, ok := result.(float64); ok {
if math.IsNaN(r) || math.IsInf(r, 0 ){
targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()})
if !isFilterNaN {
targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()})
}
continue
}
}
@ -436,7 +438,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionRes
}
var resultItems []alerting.ConditionResultItem
targetMetricData, err := engine.GetTargetMetricData(rule)
targetMetricData, err := engine.GetTargetMetricData(rule, false)
if err != nil {
return nil, err
}
@ -611,13 +613,15 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
paramsCtx := newParameterCtx(rule, checkResults,alertItem.ID, alertItem.Created.Format(time.RFC3339))
if lastAlertItem.ID == "" || period > periodDuration {
actionResults := performChannels(rule.Channels.Normal, paramsCtx)
actionResults, errCount := performChannels(rule.Channels.Normal, paramsCtx)
alertItem.ActionExecutionResults = actionResults
//todo init last notification time when create task (by last alert item is notified)
rule.LastNotificationTime = time.Now()
strTime := time.Now().UTC().Format(time.RFC3339)
kv.AddValue(alerting2.KVLastNotificationTime, []byte(rule.ID), []byte(strTime))
alertItem.IsNotified = true
//change and save last notification time in local kv store when action error count equals zero
if errCount == 0 {
rule.LastNotificationTime = time.Now()
strTime := time.Now().UTC().Format(time.RFC3339)
kv.AddValue(alerting2.KVLastNotificationTime, []byte(rule.ID), []byte(strTime))
alertItem.IsNotified = true
}
}
isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime)
if err != nil {
@ -629,7 +633,7 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
if err != nil {
return err
}
//todo init last term start time when create task (by last alert item of state normal)
//change and save last term start time in local kv store when action error count equals zero
if rule.LastTermStartTime.IsZero(){
tm, err := readTimeFromKV(alerting2.KVLastTermStartTime, []byte(rule.ID))
if err != nil {
@ -650,13 +654,15 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
}
}
if time.Now().Sub(rule.LastEscalationTime.Local()) > periodDuration {
actionResults := performChannels(rule.Channels.Escalation, paramsCtx)
alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...)
actionResults, errCount := performChannels(rule.Channels.Escalation, paramsCtx)
alertItem.ActionExecutionResults = actionResults
//todo init last escalation time when create task (by last alert item is escalated)
rule.LastEscalationTime = time.Now()
alertItem.IsEscalated = true
strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339)
kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime))
if errCount == 0 {
rule.LastEscalationTime = time.Now()
alertItem.IsEscalated = true
strTime := rule.LastEscalationTime.UTC().Format(time.RFC3339)
kv.AddValue(alerting2.KVLastEscalationTime, []byte(rule.ID), []byte(strTime))
}
}
}
@ -669,12 +675,12 @@ func newParameterCtx(rule *alerting.Rule, checkResults *alerting.ConditionResult
var conditionParams []util.MapStr
for _, resultItem := range checkResults.ResultItems {
conditionParams = append(conditionParams, util.MapStr{
alerting2.ParamMessage: resultItem.ConditionItem.Message,
alerting2.ParamPresetValue: resultItem.ConditionItem.Values,
alerting2.ParamStatus: resultItem.ConditionItem.Severity,
alerting2.ParamGroupValues: resultItem.GroupValues,
alerting2.ParamMessage: resultItem.ConditionItem.Message,
alerting2.ParamPresetValue: resultItem.ConditionItem.Values,
alerting2.Severity: resultItem.ConditionItem.Severity,
alerting2.ParamGroupValues: resultItem.GroupValues,
alerting2.ParamIssueTimestamp: resultItem.IssueTimestamp,
alerting2.ParamResultValue: resultItem.ResultValue,
alerting2.ParamResultValue: resultItem.ResultValue,
alerting2.ParamRelationValues: resultItem.RelationValues,
})
}
@ -697,22 +703,23 @@ func (engine *Engine) Test(rule *alerting.Rule) ([]alerting.ActionExecutionResul
var actionResults []alerting.ActionExecutionResult
paramsCtx := newParameterCtx(rule, checkResults, util.GetUUID(), time.Now().Format(time.RFC3339))
if len(rule.Channels.Normal) > 0 {
actionResults = performChannels(rule.Channels.Normal, paramsCtx)
actionResults, _ = performChannels(rule.Channels.Normal, paramsCtx)
}else if len(rule.Channels.Escalation) > 0{
actionResults = performChannels(rule.Channels.Escalation, paramsCtx)
actionResults, _ = performChannels(rule.Channels.Escalation, paramsCtx)
}else{
return nil, fmt.Errorf("no useable channel")
}
return actionResults, nil
}
func performChannels(channels []alerting.Channel, ctx map[string]interface{}) []alerting.ActionExecutionResult {
func performChannels(channels []alerting.Channel, ctx map[string]interface{}) ([]alerting.ActionExecutionResult, int) {
var errCount int
var actionResults []alerting.ActionExecutionResult
for _, channel := range channels {
resBytes, err := performChannel(&channel, ctx)
var errStr string
if err != nil {
errCount++
errStr = err.Error()
}
actionResults = append(actionResults, alerting.ActionExecutionResult{
@ -721,23 +728,14 @@ func performChannels(channels []alerting.Channel, ctx map[string]interface{}) []
LastExecutionTime: int(time.Now().UnixNano()/1e6),
})
}
return actionResults
return actionResults, errCount
}
func resolveMessage(messageTemplate string, ctx map[string]interface{}) ([]byte, error){
msg := messageTemplate
//tpl := fasttemplate.New(msg, "{{", "}}")
//msgBuffer := bytes.NewBuffer(nil)
//_, err := tpl.ExecuteFunc(msgBuffer, func(writer io.Writer, tag string)(int, error){
// keyParts := strings.Split(tag,".")
// value, _, _, err := jsonparser.Get(ctx, keyParts...)
// if err != nil {
// return 0, err
// }
// return writer.Write(value)
//})
//return msgBuffer.Bytes(), err
tmpl, err := template.New("alert-message").Parse(msg)
tmpl, err := template.New("alert-message").Funcs(template.FuncMap{
"format_bytes": func(precision int, bytes float64) string { return util.FormatBytes(bytes, precision)},
}).Parse(msg)
if err !=nil {
return nil, fmt.Errorf("parse message temlate error: %w", err)
}

View File

@ -17,6 +17,7 @@ type Engine interface {
CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error)
GenerateTask(rule *alerting.Rule) func(ctx context.Context)
Test(rule *alerting.Rule) ([]alerting.ActionExecutionResult, error)
GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool)([]alerting.MetricData, error)
}
var (

View File

@ -20,13 +20,13 @@ func GetTemplateParameters() []ParameterMeta {
{ParamEventID, "string", "identifier for check details", "c9f663tath2e5a0vksjx", nil},
{ParamResults, "array", "", "", []ParameterMeta{
{ParamMessage, "string", "", "disk used 90%", nil},
{ParamPresetValue, "float", "", "", nil},
{ParamStatus, "string", "", "error", nil},
{ParamPresetValue, "array", "", "[\"90\"]", nil},
{Severity, "string", "", "error", nil},
{ParamGroupValues, "array", "", "[\"cluster-xxx\", \"node-xxx\"]", nil},
{ParamIssueTimestamp, "date", "", "1652184211252", nil},
{ParamIssueTimestamp, "date", "", "2022-05-11T11:50:55+08:00", nil},
{ParamResultValue, "float", "", "91.2", nil},
{ParamRelationValues, "map", "", "{a:100, b:91.2}", nil},
}},
{ParamTimestamp, "date", "", "", nil},
{ParamTimestamp, "date", "", "2022-05-11T11:50:55+08:00", nil},
}
}