update alerting api

This commit is contained in:
liugq 2022-05-23 11:43:50 +08:00
parent bfd79d751b
commit 9f9ed6894f
5 changed files with 99 additions and 39 deletions

View File

@ -11,7 +11,6 @@ import (
type Metric struct {
PeriodInterval string `json:"period_interval"`
MaxPeriods int `json:"max_periods"`
Items []MetricItem `json:"items"`
Formula string `json:"formula,omitempty"`
Expression string `json:"expression" elastic_mapping:"expression:{type:keyword,copy_to:search_text}"` //告警表达式,自动生成 eg: avg(cpu) > 80

View File

@ -61,7 +61,6 @@ func TestCreateRule( t *testing.T) {
Metrics: Metric{
PeriodInterval: "1m",
MaxPeriods: 15,
Items: []MetricItem{
{Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},
{Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},

View File

@ -215,7 +215,7 @@ func (h *AlertAPI) getAlertMessage(w http.ResponseWriter, req *http.Request, ps
exists, err = orm.Get(rule)
if !exists || err != nil {
log.Error(err)
h.WriteError(w, fmt.Sprintf("rule[%s] not found", rule.ID), http.StatusInternalServerError)
h.WriteError(w, fmt.Sprintf("rule [%s] not found", rule.ID), http.StatusInternalServerError)
return
}
metricExpression, _ := rule.Metrics.GenerateExpression()

View File

@ -7,13 +7,16 @@ package alerting
import (
"fmt"
log "github.com/cihub/seelog"
"github.com/r3labs/diff/v2"
"infini.sh/console/model/alerting"
alerting2 "infini.sh/console/service/alerting"
_ "infini.sh/console/service/alerting/elasticsearch"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/event"
"infini.sh/framework/core/kv"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/queue"
"infini.sh/framework/core/task"
"infini.sh/framework/core/util"
"infini.sh/framework/modules/elastic/api"
@ -55,7 +58,6 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p
ids = append(ids, rule.ID)
rule.Created = time.Now()
rule.Updated = time.Now()
rule.Metrics.MaxPeriods = 15
if rule.Schedule.Interval == ""{
rule.Schedule.Interval = "1m"
}
@ -68,6 +70,11 @@ func (alertAPI *AlertAPI) createRule(w http.ResponseWriter, req *http.Request, p
}, http.StatusInternalServerError)
return
}
saveAlertActivity("alerting_rule_change", "create", util.MapStr{
"cluster_id": rule.Resource.ID,
"rule_id": rule.ID,
"cluster_name": rule.Resource.Name,
},nil, &rule)
eng := alerting2.GetEngine(rule.Resource.Type)
if rule.Enabled {
ruleTask := task.ScheduleTask{
@ -189,12 +196,55 @@ func (alertAPI *AlertAPI) getRuleDetail(w http.ResponseWriter, req *http.Request
}
func saveActivity(activityInfo *event.Activity){
queueConfig := queue.GetOrInitConfig("platform##activities")
if queueConfig.Labels == nil {
queueConfig.Labels = map[string]interface{}{
"type": "platform",
"name": "activity",
"category": "elasticsearch",
"activity": true,
}
}
err := queue.Push(queueConfig, util.MustToJSONBytes(event.Event{
Timestamp: time.Now(),
Metadata: event.EventMetadata{
Category: "elasticsearch",
Name: "activity",
},
Fields: util.MapStr{
"activity": activityInfo,
}}))
if err != nil {
log.Error(err)
}
}
func saveAlertActivity(name, typ string, labels map[string]interface{}, changelog diff.Changelog, oldState interface{}){
activityInfo := &event.Activity{
ID: util.GetUUID(),
Timestamp: time.Now(),
Metadata: event.ActivityMetadata{
Category: "elasticsearch",
Group: "platform",
Name: name,
Type: typ,
Labels: labels,
},
Changelog: changelog,
Fields: util.MapStr{
"rule": oldState,
},
}
saveActivity(activityInfo)
}
func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("rule_id")
obj := &alerting.Rule{}
oldRule := &alerting.Rule{}
obj.ID = id
exists, err := orm.Get(obj)
oldRule.ID = id
exists, err := orm.Get(oldRule)
if !exists || err != nil {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
@ -204,35 +254,46 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
return
}
id = obj.ID
create := obj.Created
obj = &alerting.Rule{}
err = alertAPI.DecodeJSON(req, obj)
id = oldRule.ID
create := oldRule.Created
rule := &alerting.Rule{
}
err = alertAPI.DecodeJSON(req, rule)
if err != nil {
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
rule.Metrics.Expression, err = rule.Metrics.GenerateExpression()
if err != nil {
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
changeLog, err := util.DiffTwoObject(oldRule, rule)
if err != nil {
log.Error(err)
}
//protect
obj.ID = id
obj.Created = create
obj.Updated = time.Now()
obj.Metrics.Expression, err = obj.Metrics.GenerateExpression()
if err != nil {
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
err = orm.Update(obj)
if err != nil {
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
rule.ID = id
rule.Created = create
rule.Updated = time.Now()
if obj.Enabled {
exists, err = checkResourceExists(obj)
err = orm.Update(rule)
if err != nil {
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
saveAlertActivity("alerting_rule_change", "update", util.MapStr{
"cluster_id": rule.Resource.ID,
"rule_id": rule.ID,
"cluster_name": rule.Resource.Name,
},changeLog, oldRule)
if rule.Enabled {
exists, err = checkResourceExists(rule)
if err != nil || !exists {
log.Error(err)
alertAPI.WriteJSON(w, util.MapStr{
@ -242,22 +303,22 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
}
//update task
task.StopTask(id)
eng := alerting2.GetEngine(obj.Resource.Type)
eng := alerting2.GetEngine(rule.Resource.Type)
ruleTask := task.ScheduleTask{
ID: obj.ID,
Interval: obj.Schedule.Interval,
Description: obj.Metrics.Expression,
Task: eng.GenerateTask(obj),
ID: rule.ID,
Interval: rule.Schedule.Interval,
Description: rule.Metrics.Expression,
Task: eng.GenerateTask(rule),
}
task.RegisterScheduleTask(ruleTask)
task.StartTask(ruleTask.ID)
}else{
task.DeleteTask(id)
}
clearKV(obj.ID)
clearKV(rule.ID)
alertAPI.WriteJSON(w, util.MapStr{
"_id": obj.ID,
"_id": rule.ID,
"result": "updated",
}, 200)
}
@ -289,6 +350,11 @@ func (alertAPI *AlertAPI) deleteRule(w http.ResponseWriter, req *http.Request, p
log.Error(err)
return
}
saveAlertActivity("alerting_rule_change", "delete", util.MapStr{
"cluster_id": obj.Resource.ID,
"rule_id": obj.ID,
"cluster_name": obj.Resource.Name,
},nil, &obj)
task.DeleteTask(obj.ID)
clearKV(obj.ID)
@ -654,8 +720,6 @@ func getRuleMetricData( rule *alerting.Rule, filterParam *alerting.FilterParam)
if len(md.Data) == 0 {
continue
}
//filteredMetricData = append(filteredMetricData, md)
targetData := md.Data["result"]
if len(rule.Metrics.Items) == 1 {
for k, _ := range md.Data {

View File

@ -55,7 +55,6 @@ func TestEngine( t *testing.T) {
Metrics: alerting.Metric{
PeriodInterval: "1m",
MaxPeriods: 15,
Items: []alerting.MetricItem{
{Name: "a", Field: "payload.elasticsearch.node_stats.fs.total.free_in_bytes", Statistic: "min", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},
{Name: "b", Field: "payload.elasticsearch.node_stats.fs.total.total_in_bytes", Statistic: "max", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},
@ -204,7 +203,6 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
Metrics: alerting.Metric{
PeriodInterval: "1m",
MaxPeriods: 15,
Items: []alerting.MetricItem{
{Name: "a", Field: "payload.elasticsearch.index_stats.total.search.query_total", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}},
{Name: "b", Field: "payload.elasticsearch.index_stats.total.search.query_time_in_millis", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}},