Merge branch 'master' into rbac

# Conflicts:
#	main.go
This commit is contained in:
xushuhui 2022-04-20 10:32:44 +08:00
commit e16ca50773
18 changed files with 1202 additions and 112 deletions

View File

@ -31,7 +31,7 @@ elastic:
enabled: true enabled: true
interval: 30s interval: 30s
availability_check: availability_check:
enabled: false enabled: true
interval: 60s interval: 60s
metadata_refresh: metadata_refresh:
enabled: true enabled: true

14
main.go
View File

@ -9,6 +9,7 @@ import (
"infini.sh/console/model/gateway" "infini.sh/console/model/gateway"
"infini.sh/console/model/rbac" "infini.sh/console/model/rbac"
_ "infini.sh/console/plugin" _ "infini.sh/console/plugin"
alerting2 "infini.sh/console/service/alerting"
"infini.sh/framework" "infini.sh/framework"
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"
"infini.sh/framework/core/env" "infini.sh/framework/core/env"
@ -28,6 +29,7 @@ import (
_ "infini.sh/framework/plugins" _ "infini.sh/framework/plugins"
api2 "infini.sh/gateway/api" api2 "infini.sh/gateway/api"
_ "infini.sh/gateway/proxy" _ "infini.sh/gateway/proxy"
log "src/github.com/cihub/seelog"
) )
var appConfig *config.AppConfig var appConfig *config.AppConfig
@ -50,7 +52,7 @@ func main() {
terminalFooter := "" terminalFooter := ""
app := framework.NewApp("console", "INFINI Cloud Console, The easiest way to operate your own elasticsearch platform.", app := framework.NewApp("console", "INFINI Cloud Console, The easiest way to operate your own elasticsearch platform.",
config.Version, config.BuildNumber, config.LastCommitLog, config.BuildDate, config.EOLDate, terminalHeader, terminalFooter) config.Version,config.BuildNumber, config.LastCommitLog, config.BuildDate, config.EOLDate, terminalHeader, terminalFooter)
app.Init(nil) app.Init(nil)
defer app.Shutdown() defer app.Shutdown()
@ -59,10 +61,11 @@ func main() {
if app.Setup(func() { if app.Setup(func() {
err := bootstrapRequirementCheck() err := bootstrapRequirementCheck()
if err != nil { if err !=nil{
panic(err) panic(err)
} }
//load core modules first //load core modules first
module.RegisterSystemModule(&elastic2.ElasticModule{}) module.RegisterSystemModule(&elastic2.ElasticModule{})
module.RegisterSystemModule(&filter.FilterModule{}) module.RegisterSystemModule(&filter.FilterModule{})
@ -117,6 +120,7 @@ func main() {
module.Start() module.Start()
orm.RegisterSchemaWithIndexName(model.Dict{}, "dict") orm.RegisterSchemaWithIndexName(model.Dict{}, "dict")
orm.RegisterSchemaWithIndexName(model.Reindex{}, "reindex") orm.RegisterSchemaWithIndexName(model.Reindex{}, "reindex")
orm.RegisterSchemaWithIndexName(elastic.View{}, "view") orm.RegisterSchemaWithIndexName(elastic.View{}, "view")
@ -130,6 +134,12 @@ func main() {
orm.RegisterSchemaWithIndexName(rbac.User{}, "rbac-user") orm.RegisterSchemaWithIndexName(rbac.User{}, "rbac-user")
api.RegisterSchema() api.RegisterSchema()
go func() {
err := alerting2.InitTasks()
if err != nil {
log.Errorf("init alerting task error: %v", err)
}
}()
}, nil) { }, nil) {
app.Run() app.Run()
} }

View File

@ -12,21 +12,26 @@ type Alert struct {
ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"` ID string `json:"id,omitempty" elastic_meta:"_id" elastic_mapping:"id: { type: keyword }"`
Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"` Created time.Time `json:"created,omitempty" elastic_mapping:"created: { type: date }"`
Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"` Updated time.Time `json:"updated,omitempty" elastic_mapping:"updated: { type: date }"`
RuleID string `json:"rule_id"` RuleID string `json:"rule_id" elastic_mapping:"rule_id: { type: keyword }"`
ClusterID string `json:"cluster_id"` ResourceID string `json:"resource_id" elastic_mapping:"resource_id: { type: keyword }"`
Expression string `json:"expression"` ResourceName string `json:"resource_name" elastic_mapping:"resource_name: { type: keyword }"`
Objects []string `json:"objects"` Expression string `json:"expression" elastic_mapping:"expression: { type: keyword, copy_to:search_text }"`
Severity string `json:"severity"` Objects []string `json:"objects" elastic_mapping:"objects: { type:keyword,copy_to:search_text }"`
Content string `json:"content"` Severity string `json:"severity" elastic_mapping:"severity: { type: keyword }"`
Content string `json:"content" elastic_mapping:"context: { type: keyword, copy_to:search_text }"`
AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"` AcknowledgedTime interface{} `json:"acknowledged_time,omitempty"`
ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"` ActionExecutionResults []ActionExecutionResult `json:"action_execution_results"`
Users []string `json:"users,omitempty"` Users []string `json:"users,omitempty"`
State string `json:"state"` State string `json:"state"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
IsNotified bool `json:"is_notified" elastic_mapping:"is_notified: { type: boolean }"` //标识本次检测是否发送了告警通知
IsEscalated bool `json:"is_escalated" elastic_mapping:"is_escalated: { type: boolean }"` //标识本次检测是否发送了升级告警通知
Conditions Condition `json:"condition"`
ConditionResult *ConditionResult `json:"condition_result,omitempty" elastic_mapping:"condition_result: { type: object,enabled:false }"`
SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"`
} }
type ActionExecutionResult struct { type ActionExecutionResult struct {
//ActionId string `json:"action_id"`
LastExecutionTime int `json:"last_execution_time"` LastExecutionTime int `json:"last_execution_time"`
Error string `json:"error"` Error string `json:"error"`
Result string `json:"result"` Result string `json:"result"`

View File

@ -19,6 +19,10 @@ type ConditionItem struct {
} }
type ConditionResult struct { type ConditionResult struct {
ResultItems []ConditionResultItem `json:"result_items"`
QueryResult *QueryResult `json:"query_result"`
}
type ConditionResultItem struct {
GroupValues []string `json:"group_values"` GroupValues []string `json:"group_values"`
ConditionItem *ConditionItem `json:"condition_item"` ConditionItem *ConditionItem `json:"condition_item"`
} }

View File

@ -10,3 +10,7 @@ type Filter struct {
Not []FilterQuery `json:"not,omitempty"` Not []FilterQuery `json:"not,omitempty"`
//MinimumShouldMatch int `json:"minimum_should_match"` //MinimumShouldMatch int `json:"minimum_should_match"`
} }
func (f Filter) IsEmpty() bool {
return len(f.And) == 0 && len(f.Or) == 0 && len(f.Not) == 0
}

View File

@ -12,3 +12,11 @@ type FilterQuery struct {
Or []FilterQuery `json:"or,omitempty"` Or []FilterQuery `json:"or,omitempty"`
Not []FilterQuery `json:"not,omitempty"` Not []FilterQuery `json:"not,omitempty"`
} }
func (fq FilterQuery) IsComplex() bool {
return len(fq.And) > 0 || len(fq.Or) > 0 || len(fq.Not) > 0
}
func (f FilterQuery) IsEmpty() bool {
return !f.IsComplex() && f.Operator == ""
}

View File

@ -48,6 +48,12 @@ type MetricItem struct {
Group []string `json:"group"` //bucket group Group []string `json:"group"` //bucket group
} }
type QueryResult struct {
Query string `json:"query"`
Raw string `json:"raw"`
MetricData []MetricData `json:"metric_data"`
}
type MetricData struct { type MetricData struct {
GroupValues []string `json:"group_values"` GroupValues []string `json:"group_values"`
Data map[string][]TimeMetricData `json:"data"` Data map[string][]TimeMetricData `json:"data"`

View File

@ -4,12 +4,25 @@
package alerting package alerting
import (
"fmt"
)
type Resource struct { type Resource struct {
ID string `json:"resource_id" elastic_mapping:"resource_id:{type:keyword}"` ID string `json:"resource_id" elastic_mapping:"resource_id:{type:keyword}"`
Name string `json:"resource_name" elastic_mapping:"resource_name:{type:keyword}"`
Type string `json:"type" elastic_mapping:"type:{type:keyword}"` Type string `json:"type" elastic_mapping:"type:{type:keyword}"`
Objects []string `json:"objects" elastic_mapping:"objects:{type:keyword,copy_to:search_text}"` Objects []string `json:"objects" elastic_mapping:"objects:{type:keyword,copy_to:search_text}"`
Filter Filter `json:"filter,omitempty" elastic_mapping:"-"` Filter FilterQuery `json:"filter,omitempty" elastic_mapping:"-"`
RawFilter map[string]interface{} `json:"raw_filter,omitempty"` RawFilter map[string]interface{} `json:"raw_filter,omitempty"`
TimeField string `json:"time_field,omitempty" elastic_mapping:"id:{type:keyword}"` TimeField string `json:"time_field,omitempty" elastic_mapping:"id:{type:keyword}"`
Context Context `json:"context"` Context Context `json:"context"`
} }
func (r Resource) Validate() error{
if r.TimeField == "" {
return fmt.Errorf("TimeField can not be empty")
}
return nil
}

View File

@ -19,15 +19,18 @@ type Rule struct {
Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"` Conditions Condition `json:"conditions" elastic_mapping:"conditions:{type:object}"`
Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"` Channels RuleChannel `json:"channels" elastic_mapping:"channels:{type:object}"`
Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"` Schedule Schedule `json:"schedule" elastic_mapping:"schedule:{type:object}"`
LastNotificationTime time.Time `json:"-" elastic_mapping:"last_notification_time:{type:date}"`
LastTermStartTime time.Time `json:"-"` //标识最近一轮告警的开始时间
LastEscalationTime time.Time `json:"-"` //标识最近一轮告警的开始时间
SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"` SearchText string `json:"-" elastic_mapping:"search_text:{type:text,index_prefixes:{},index_phrases:true, analyzer:suggest_text_search }"`
} }
type RuleChannel struct { type RuleChannel struct {
Normal []Channel `json:"normal"` Normal []Channel `json:"normal"`
Escalation []Channel `json:"escalation"` Escalation []Channel `json:"escalation,omitempty"`
ThrottlePeriod string `json:"throttle_period"` //沉默周期 ThrottlePeriod string `json:"throttle_period"` //沉默周期
AcceptTimeRange TimeRange `json:"accept_time_range"` AcceptTimeRange TimeRange `json:"accept_time_range"`
EscalationThrottlePeriod string `json:"escalation_throttle_period"` EscalationThrottlePeriod string `json:"escalation_throttle_period,omitempty"`
EscalationEnabled bool `json:"escalation_enabled"` EscalationEnabled bool `json:"escalation_enabled"`
} }

View File

@ -0,0 +1,106 @@
/* Copyright © INFINI Ltd. All rights reserved.
* web: https://infinilabs.com
* mail: hello#infini.ltd */
package alerting
import (
"fmt"
"infini.sh/console/model/alerting"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"net/http"
log "src/github.com/cihub/seelog"
"strconv"
"strings"
)
func (h *AlertAPI) getAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("alert_id")
obj := alerting.Alert{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"found": true,
"_id": id,
"_source": obj,
}, 200)
}
func (h *AlertAPI) acknowledgeAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
body := struct {
AlertIDs []string `json:"alert_ids"`
}{}
queryDsl := util.MapStr{
"query": util.MapStr{
"terms": util.MapStr{
"_id": body.AlertIDs,
},
},
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['state'] = '%s'", alerting.AlertStateAcknowledge),
},
}
err := orm.UpdateBy(alerting.Alert{}, util.MustToJSONBytes(queryDsl))
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"ids": body.AlertIDs,
"result": "updated",
}, 200)
}
func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
keyword = h.GetParameterOrDefault(req, "keyword", "")
queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}`
strSize = h.GetParameterOrDefault(req, "size", "20")
strFrom = h.GetParameterOrDefault(req, "from", "0")
mustBuilder = &strings.Builder{}
)
if keyword != "" {
mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword))
}
size, _ := strconv.Atoi(strSize)
if size <= 0 {
size = 20
}
from, _ := strconv.Atoi(strFrom)
if from < 0 {
from = 0
}
q := orm.Query{}
queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from)
q.RawQuery = []byte(queryDSL)
err, res := orm.Search(&alerting.Alert{}, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.Write(w, res.Raw)
}

View File

@ -4,19 +4,33 @@
package alerting package alerting
import "infini.sh/framework/core/api" import (
"infini.sh/console/config"
"infini.sh/framework/core/api"
)
type AlertAPI struct { type AlertAPI struct {
api.Handler api.Handler
Config *config.AppConfig
} }
func init() { func (alert *AlertAPI) Init() {
alert:=AlertAPI{}
api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.getRule)
api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule) api.HandleAPIMethod(api.POST, "/alerting/rule", alert.createRule)
api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule) api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.deleteRule)
api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.updateRule)
api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.searchRule)
api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.fetchAlertInfos)
api.HandleAPIMethod(api.GET, "/alerting/channel/:channel_id", alert.getChannel)
api.HandleAPIMethod(api.POST, "/alerting/channel", alert.createChannel)
api.HandleAPIMethod(api.DELETE, "/alerting/channel/:channel_id", alert.deleteChannel)
api.HandleAPIMethod(api.PUT, "/alerting/channel/:channel_id", alert.updateChannel)
api.HandleAPIMethod(api.GET, "/alerting/channel/_search", alert.searchChannel)
//just for test
//api.HandleAPIMethod(api.GET, "/alerting/rule/test", alert.testRule)
} }

View File

@ -0,0 +1,169 @@
/* Copyright © INFINI Ltd. All rights reserved.
* web: https://infinilabs.com
* mail: hello#infini.ltd */
package alerting
import (
"fmt"
"infini.sh/console/model/alerting"
httprouter "infini.sh/framework/core/api/router"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util"
"net/http"
log "src/github.com/cihub/seelog"
"strconv"
"strings"
)
func (h *AlertAPI) createChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var obj = &alerting.Channel{}
err := h.DecodeJSON(req, obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
err = orm.Create(obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"_id": obj.ID,
"result": "created",
}, 200)
}
func (h *AlertAPI) getChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("channel_id")
obj := alerting.Channel{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"found": false,
}, http.StatusNotFound)
return
}
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"found": true,
"_id": id,
"_source": obj,
}, 200)
}
func (h *AlertAPI) updateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("channel_id")
obj := alerting.Channel{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"result": "not_found",
}, http.StatusNotFound)
return
}
id = obj.ID
create := obj.Created
obj = alerting.Channel{}
err = h.DecodeJSON(req, &obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
//protect
obj.ID = id
obj.Created = create
err = orm.Update(&obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"_id": obj.ID,
"result": "updated",
}, 200)
}
func (h *AlertAPI) deleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("channel_id")
obj := alerting.Channel{}
obj.ID = id
exists, err := orm.Get(&obj)
if !exists || err != nil {
h.WriteJSON(w, util.MapStr{
"_id": id,
"result": "not_found",
}, http.StatusNotFound)
return
}
err = orm.Delete(&obj)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
h.WriteJSON(w, util.MapStr{
"_id": obj.ID,
"result": "deleted",
}, 200)
}
func (h *AlertAPI) searchChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
keyword = h.GetParameterOrDefault(req, "keyword", "")
queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}`
strSize = h.GetParameterOrDefault(req, "size", "20")
strFrom = h.GetParameterOrDefault(req, "from", "0")
mustBuilder = &strings.Builder{}
)
if keyword != "" {
mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword))
}
size, _ := strconv.Atoi(strSize)
if size <= 0 {
size = 20
}
from, _ := strconv.Atoi(strFrom)
if from < 0 {
from = 0
}
q := orm.Query{}
queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from)
q.RawQuery = []byte(queryDSL)
err, res := orm.Search(&alerting.Channel{}, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.Write(w, res.Raw)
}

View File

@ -109,10 +109,10 @@ func (alertAPI *AlertAPI) getRule(w http.ResponseWriter, req *http.Request, ps h
func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("rule_id") id := ps.MustGetParameter("rule_id")
obj := alerting.Rule{} obj := &alerting.Rule{}
obj.ID = id obj.ID = id
exists, err := orm.Get(&obj) exists, err := orm.Get(obj)
if !exists || err != nil { if !exists || err != nil {
alertAPI.WriteJSON(w, util.MapStr{ alertAPI.WriteJSON(w, util.MapStr{
"_id": id, "_id": id,
@ -123,8 +123,8 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
id = obj.ID id = obj.ID
create := obj.Created create := obj.Created
obj = alerting.Rule{} obj = &alerting.Rule{}
err = alertAPI.DecodeJSON(req, &obj) err = alertAPI.DecodeJSON(req, obj)
if err != nil { if err != nil {
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err) log.Error(err)
@ -135,7 +135,13 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
obj.ID = id obj.ID = id
obj.Created = create obj.Created = create
obj.Updated = time.Now() obj.Updated = time.Now()
err = orm.Update(&obj) err = obj.Metrics.RefreshExpression()
if err != nil {
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
return
}
err = orm.Update(obj)
if err != nil { if err != nil {
alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError) alertAPI.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err) log.Error(err)
@ -143,6 +149,13 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
} }
if obj.Enabled { if obj.Enabled {
exists, err = checkResourceExists(obj)
if err != nil || !exists {
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
//update task //update task
task.StopTask(id) task.StopTask(id)
eng := alerting2.GetEngine(obj.Resource.Type) eng := alerting2.GetEngine(obj.Resource.Type)
@ -150,7 +163,7 @@ func (alertAPI *AlertAPI) updateRule(w http.ResponseWriter, req *http.Request, p
ID: obj.ID, ID: obj.ID,
Interval: obj.Schedule.Interval, Interval: obj.Schedule.Interval,
Description: obj.Metrics.Expression, Description: obj.Metrics.Expression,
Task: eng.GenerateTask(&obj), Task: eng.GenerateTask(obj),
} }
task.RegisterScheduleTask(ruleTask) task.RegisterScheduleTask(ruleTask)
task.StartTask(ruleTask.ID) task.StartTask(ruleTask.ID)
@ -234,9 +247,114 @@ func (alertAPI *AlertAPI) searchRule(w http.ResponseWriter, req *http.Request, p
log.Error(err) log.Error(err)
return return
} }
w.Write(searchResult.Raw) w.Write(searchResult.Raw)
} }
func (alertAPI *AlertAPI) getRuleAlertNumbers(ruleIDs []string) ( map[string]interface{},error) {
esClient := elastic.GetClient(alertAPI.Config.Elasticsearch)
queryDsl := util.MapStr{
"size": 0,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"terms": util.MapStr{
"rule_id": ruleIDs,
},
},
{
"terms": util.MapStr{
"state": []string{alerting.AlertStateError, alerting.AlertStateActive},
},
},
},
},
},
"aggs": util.MapStr{
"terms_rule_id": util.MapStr{
"terms": util.MapStr{
"field": "rule_id",
},
},
},
}
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) )
if err != nil {
return nil, err
}
ruleAlertNumbers := map[string]interface{}{}
if termRules, ok := searchRes.Aggregations["terms_rule_id"]; ok {
for _, bk := range termRules.Buckets {
key := bk["key"].(string)
ruleAlertNumbers[key] = bk["doc_count"]
}
}
return ruleAlertNumbers, nil
}
func (alertAPI *AlertAPI) fetchAlertInfos(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var ruleIDs = []string{}
alertAPI.DecodeJSON(req, &ruleIDs)
if len(ruleIDs) == 0 {
alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
esClient := elastic.GetClient(alertAPI.Config.Elasticsearch)
queryDsl := util.MapStr{
"_source": []string{"state", "rule_id"},
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"collapse": util.MapStr{
"field": "rule_id",
},
"query": util.MapStr{
"terms": util.MapStr{
"rule_id": ruleIDs,
},
},
}
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alerting.Alert{}), util.MustToJSONBytes(queryDsl) )
if err != nil {
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
if len(searchRes.Hits.Hits) == 0 {
alertAPI.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
aletNumbers, err := alertAPI.getRuleAlertNumbers(ruleIDs)
if err != nil {
alertAPI.WriteJSON(w, util.MapStr{
"error": err.Error(),
}, http.StatusInternalServerError)
return
}
latestAlertInfos := map[string]util.MapStr{}
for _, hit := range searchRes.Hits.Hits {
if ruleID, ok := hit.Source["rule_id"].(string); ok {
latestAlertInfos[ruleID] = util.MapStr{
"status": hit.Source["state"],
"alert_count": aletNumbers[ruleID],
}
}
}
alertAPI.WriteJSON(w, latestAlertInfos, http.StatusOK)
}
func checkResourceExists(rule *alerting.Rule) (bool, error) { func checkResourceExists(rule *alerting.Rule) (bool, error) {
if rule.Resource.ID == "" { if rule.Resource.ID == "" {
return false, fmt.Errorf("resource id can not be empty") return false, fmt.Errorf("resource id can not be empty")
@ -249,8 +367,71 @@ func checkResourceExists(rule *alerting.Rule) (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
if rule.Resource.Name == "" {
rule.Resource.Name = obj.Name
}
return ok && obj.Name != "", nil return ok && obj.Name != "", nil
default: default:
return false, fmt.Errorf("unsupport resource type: %s", rule.Resource.Type) return false, fmt.Errorf("unsupport resource type: %s", rule.Resource.Type)
} }
} }
//func (alertAPI *AlertAPI) testRule(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
// rule := alerting.Rule{
// ID: util.GetUUID(),
// Created: time.Now(),
// Updated: time.Now(),
// Enabled: true,
// Resource: alerting.Resource{
// ID: "c8i18llath2blrusdjng",
// Type: "elasticsearch",
// Objects: []string{".infini_metrics*"},
// TimeField: "timestamp",
// RawFilter: map[string]interface{}{
// "bool": util.MapStr{
// "must": []util.MapStr{},
// },
// },
// },
//
// Metrics: alerting.Metric{
// PeriodInterval: "1m",
// MaxPeriods: 15,
// Items: []alerting.MetricItem{
// {Name: "a", Field: "payload.elasticsearch.node_stats.os.cpu.percent", Statistic: "p99", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},
// },
// },
// Conditions: alerting.Condition{
// Operator: "any",
// Items: []alerting.ConditionItem{
// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"},
// },
// },
//
// Channels: alerting.RuleChannel{
// Normal: []alerting.Channel{
// {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
// HeaderParams: map[string]string{
// "Content-Type": "application/json",
// },
// Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
// Method: http.MethodPost,
// URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX",
// }},
// },
// ThrottlePeriod: "1h",
// AcceptTimeRange: alerting.TimeRange{
// Start: "8:00",
// End: "21:00",
// },
// EscalationEnabled: true,
// EscalationThrottlePeriod: "30m",
// },
// }
// eng := alerting2.GetEngine(rule.Resource.Type)
// data, err := eng.ExecuteQuery(&rule)
// if err != nil {
// log.Error(err)
// }
// alertAPI.WriteJSON(w, data, http.StatusOK)
//}

View File

@ -2,6 +2,7 @@ package api
import ( import (
"infini.sh/console/config" "infini.sh/console/config"
"infini.sh/console/plugin/api/alerting"
"infini.sh/console/plugin/api/index_management" "infini.sh/console/plugin/api/index_management"
"infini.sh/framework/core/api" "infini.sh/framework/core/api"
"path" "path"
@ -53,5 +54,9 @@ func Init(cfg *config.AppConfig) {
// } // }
// }, // },
//}) //})
alertAPI := alerting.AlertAPI{
Config: cfg,
}
alertAPI.Init()
} }

View File

@ -18,7 +18,10 @@ import (
"infini.sh/framework/core/orm" "infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"io" "io"
"math"
"runtime/debug"
"sort" "sort"
"strconv"
"strings" "strings"
"time" "time"
) )
@ -26,7 +29,12 @@ import (
type Engine struct { type Engine struct {
} }
//GenerateQuery generate a final elasticsearch query dsl object
//when RawFilter of rule is not empty, priority use it, otherwise to covert from Filter of rule (todo)
//auto generate time filter query and then attach to final query
//auto generate elasticsearch aggregations by metrics of rule
//group of metric item converted to terms aggregation and TimeField of rule converted to date_histogram aggregation
//convert statistic of metric item to elasticsearch aggregation
func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) { func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
filter, err := engine.GenerateRawFilter(rule) filter, err := engine.GenerateRawFilter(rule)
if err != nil { if err != nil {
@ -38,10 +46,9 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
} }
basicAggs := util.MapStr{} basicAggs := util.MapStr{}
for _, metricItem := range rule.Metrics.Items { for _, metricItem := range rule.Metrics.Items {
basicAggs[metricItem.Name] = util.MapStr{ metricAggs := engine.generateAgg(&metricItem)
metricItem.Statistic: util.MapStr{ if err = util.MergeFields(basicAggs, metricAggs, true); err != nil {
"field": metricItem.Field, return nil, err
},
} }
} }
timeAggs := util.MapStr{ timeAggs := util.MapStr{
@ -92,11 +99,167 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule) (interface{}, error) {
"aggs": rootAggs, "aggs": rootAggs,
}, nil }, nil
} }
//generateAgg convert statistic of metric item to elasticsearch aggregation
func (engine *Engine) generateAgg(metricItem *alerting.MetricItem) map[string]interface{}{
var (
aggType = "value_count"
field = metricItem.Field
)
if field == "" || field == "*" {
field = "_id"
}
var percent = 0.0
var isPipeline = false
switch metricItem.Statistic {
case "max", "min", "sum", "avg":
aggType = metricItem.Statistic
case "count", "value_count":
aggType = "value_count"
case "rate":
aggType = "max"
isPipeline = true
case "medium":
aggType = "median_absolute_deviation"
case "p99", "p95","p90","p80","p50":
aggType = "percentiles"
percentStr := strings.TrimPrefix(metricItem.Statistic, "p")
percent, _ = strconv.ParseFloat(percentStr, 32)
}
aggValue := util.MapStr{
"field": field,
}
if aggType == "percentiles" {
aggValue["percents"] = []interface{}{percent}
}
aggs := util.MapStr{
metricItem.Name: util.MapStr{
aggType: aggValue,
},
}
if !isPipeline{
return aggs
}
pipelineAggID := util.GetUUID()
aggs[pipelineAggID] = aggs[metricItem.Name]
aggs[metricItem.Name] = util.MapStr{
"derivative": util.MapStr{
"buckets_path": pipelineAggID,
},
}
return aggs
}
func (engine *Engine) ConvertFilterQueryToDsl(fq *alerting.FilterQuery) (map[string]interface{}, error){
if !fq.IsComplex(){
q := map[string]interface{}{}
if len(fq.Values) == 0 {
return nil, fmt.Errorf("values should not be empty")
}
//equals/gte/gt/lt/lte/in/match/regexp/wildcard/range/prefix/suffix/contain/
switch fq.Operator {
case "equals":
q["term"] = util.MapStr{
fq.Field: util.MapStr{
"value": fq.Values[0],
},
}
case "in":
q["terms"] = util.MapStr{
fq.Field: fq.Values,
}
case "match":
q[fq.Operator] = util.MapStr{
fq.Field: fq.Values[0],
}
case "gte", "gt", "lt", "lte":
q["range"] = util.MapStr{
fq.Field: util.MapStr{
fq.Operator: fq.Values[0],
},
}
case "range":
if len(fq.Values) != 2 {
return nil, fmt.Errorf("values length of range query must be 2, but got %d", len(fq.Values))
}
q["range"] = util.MapStr{
fq.Field: util.MapStr{
"gte": fq.Values[0],
"lte": fq.Values[1],
},
}
case "prefix":
q["prefix"] = util.MapStr{
fq.Field: fq.Values[0],
}
case "regexp", "wildcard":
q[fq.Operator] = util.MapStr{
fq.Field: util.MapStr{
"value": fq.Values[0],
},
}
default:
return nil, fmt.Errorf("unsupport query operator %s", fq.Operator)
}
return q, nil
}
if fq.Or != nil && fq.And != nil {
return nil, fmt.Errorf("filter format error: or, and bool operation in same level")
}
if fq.Or != nil && fq.Not != nil {
return nil, fmt.Errorf("filter format error: or, not bool operation in same level")
}
if fq.And != nil && fq.Not != nil {
return nil, fmt.Errorf("filter format error: and, not bool operation in same level")
}
var (
boolOperator string
filterQueries []alerting.FilterQuery
)
if len(fq.Not) >0 {
boolOperator = "must_not"
filterQueries = fq.Not
}else if len(fq.Or) > 0 {
boolOperator = "should"
filterQueries = fq.Or
}else {
boolOperator = "must"
filterQueries = fq.And
}
var subQueries []interface{}
for _, subQ := range filterQueries {
subQuery, err := engine.ConvertFilterQueryToDsl(&subQ)
if err != nil {
return nil, err
}
subQueries = append(subQueries, subQuery)
}
boolQuery := util.MapStr{
boolOperator: subQueries,
}
if boolOperator == "should" {
boolQuery["minimum_should_match"] = 1
}
resultQuery := map[string]interface{}{
"bool": boolQuery,
}
return resultQuery, nil
}
func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) { func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interface{}, error) {
query := map[string]interface{}{} query := map[string]interface{}{}
var err error
if rule.Resource.RawFilter != nil { if rule.Resource.RawFilter != nil {
query = rule.Resource.RawFilter query = rule.Resource.RawFilter
}else{
if !rule.Resource.Filter.IsEmpty(){
query, err = engine.ConvertFilterQueryToDsl(&rule.Resource.Filter)
if err != nil {
return nil, err
}
}
} }
intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval) intervalDuration, err := time.ParseDuration(rule.Metrics.PeriodInterval)
if err != nil { if err != nil {
@ -141,17 +304,26 @@ func (engine *Engine) GenerateRawFilter(rule *alerting.Rule) (map[string]interfa
} }
} }
}else{ }else{
query["bool"] = util.MapStr{ must := []interface{}{
"must": []interface{}{ timeQuery,
timeQuery, }
if len(query) > 0 {
if _, ok = query["match_all"]; !ok {
must = append(must, query)
}
}
query = util.MapStr{
"bool": util.MapStr{
"must": must,
}, },
} }
} }
return query, nil return query, nil
} }
func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, error){ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error){
esClient := elastic.GetClient(rule.Resource.ID) esClient := elastic.GetClient(rule.Resource.ID)
queryResult := &alerting.QueryResult{}
indexName := strings.Join(rule.Resource.Objects, ",") indexName := strings.Join(rule.Resource.Objects, ",")
queryDsl, err := engine.GenerateQuery(rule) queryDsl, err := engine.GenerateQuery(rule)
if err != nil { if err != nil {
@ -161,10 +333,15 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
queryResult.Query = string(queryDslBytes)
searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes) searchRes, err := esClient.SearchWithRawQueryDSL(indexName, queryDslBytes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if searchRes.StatusCode != 200 {
return nil, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body))
}
queryResult.Raw = string(searchRes.RawResult.Body)
searchResult := map[string]interface{}{} searchResult := map[string]interface{}{}
err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult) err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult)
if err != nil { if err != nil {
@ -172,15 +349,24 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, e
} }
metricData := []alerting.MetricData{} metricData := []alerting.MetricData{}
collectMetricData(searchResult["aggregations"], "", &metricData) collectMetricData(searchResult["aggregations"], "", &metricData)
return metricData, nil queryResult.MetricData = metricData
return queryResult, nil
} }
func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error){ //CheckCondition check whether rule conditions triggered or not
metricData, err := engine.ExecuteQuery(rule) //if triggered returns an array of ConditionResult
if err != nil { //sort conditions by severity desc before check , and then if condition is true, then continue check another group
return nil, err func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){
queryResult, err := engine.ExecuteQuery(rule)
conditionResult := &alerting.ConditionResult{
QueryResult: queryResult,
} }
var conditionResults []alerting.ConditionResult if err != nil {
for _, md := range metricData { return conditionResult, err
}
var resultItems []alerting.ConditionResultItem
var targetMetricData []alerting.MetricData
for _, md := range queryResult.MetricData {
var targetData alerting.MetricData var targetData alerting.MetricData
if len(rule.Metrics.Items) == 1 { if len(rule.Metrics.Items) == 1 {
targetData = md targetData = md
@ -191,7 +377,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
} }
expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula) expression, err := govaluate.NewEvaluableExpression(rule.Metrics.Formula)
if err != nil { if err != nil {
return nil, err return conditionResult, err
} }
dataLength := 0 dataLength := 0
for _, v := range md.Data { for _, v := range md.Data {
@ -204,8 +390,14 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
} }
var timestamp interface{} var timestamp interface{}
for k, v := range md.Data { for k, v := range md.Data {
if len(k) == 20 {
continue
}
//drop nil value bucket //drop nil value bucket
if v[i][1] == nil { if len(v[i]) < 2 {
continue DataLoop
}
if _, ok := v[i][1].(float64); !ok {
continue DataLoop continue DataLoop
} }
parameters[k] = v[i][1] parameters[k] = v[i][1]
@ -213,11 +405,18 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
} }
result, err := expression.Evaluate(parameters) result, err := expression.Evaluate(parameters)
if err != nil { if err != nil {
return nil, err return conditionResult, err
} }
if r, ok := result.(float64); ok {
if math.IsNaN(r){
continue
}
}
targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, result}) targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, result})
} }
} }
targetMetricData = append(targetMetricData, targetData)
sort.Slice(rule.Conditions.Items, func(i, j int) bool { sort.Slice(rule.Conditions.Items, func(i, j int) bool {
return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity] return alerting.SeverityWeights[rule.Conditions.Items[i].Severity] > alerting.SeverityWeights[rule.Conditions.Items[j].Severity]
}) })
@ -226,7 +425,7 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
conditionExpression := "" conditionExpression := ""
valueLength := len(cond.Values) valueLength := len(cond.Values)
if valueLength == 0 { if valueLength == 0 {
return nil, fmt.Errorf("condition values: %v should not be empty", cond.Values) return conditionResult, fmt.Errorf("condition values: %v should not be empty", cond.Values)
} }
switch cond.Operator { switch cond.Operator {
case "equals": case "equals":
@ -241,15 +440,15 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
conditionExpression = fmt.Sprintf("result < %v", cond.Values[0]) conditionExpression = fmt.Sprintf("result < %v", cond.Values[0])
case "range": case "range":
if valueLength != 2 { if valueLength != 2 {
return nil, fmt.Errorf("length of %s condition values should be 2", cond.Operator) return conditionResult, fmt.Errorf("length of %s condition values should be 2", cond.Operator)
} }
conditionExpression = fmt.Sprintf("result >= %v && result <= %v", cond.Values[0], cond.Values[1]) conditionExpression = fmt.Sprintf("result >= %v && result <= %v", cond.Values[0], cond.Values[1])
default: default:
return nil, fmt.Errorf("unsupport condition operator: %s", cond.Operator) return conditionResult, fmt.Errorf("unsupport condition operator: %s", cond.Operator)
} }
expression, err := govaluate.NewEvaluableExpression(conditionExpression) expression, err := govaluate.NewEvaluableExpression(conditionExpression)
if err != nil { if err != nil {
return nil, err return conditionResult, err
} }
dataLength := 0 dataLength := 0
dataKey := "" dataKey := ""
@ -259,20 +458,20 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
} }
triggerCount := 0 triggerCount := 0
for i := 0; i < dataLength; i++ { for i := 0; i < dataLength; i++ {
conditionResult, err := expression.Evaluate(map[string]interface{}{ evaluateResult, err := expression.Evaluate(map[string]interface{}{
"result": targetData.Data[dataKey][i][1], "result": targetData.Data[dataKey][i][1],
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
if conditionResult == true { if evaluateResult == true {
triggerCount += 1 triggerCount += 1
}else { }else {
triggerCount = 0 triggerCount = 0
} }
if triggerCount >= cond.MinimumPeriodMatch { if triggerCount >= cond.MinimumPeriodMatch {
log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues) log.Debugf("triggered condition %v, groups: %v\n", cond, targetData.GroupValues)
conditionResults = append(conditionResults, alerting.ConditionResult{ resultItems = append(resultItems, alerting.ConditionResultItem{
GroupValues: targetData.GroupValues, GroupValues: targetData.GroupValues,
ConditionItem: &cond, ConditionItem: &cond,
}) })
@ -282,21 +481,31 @@ func (engine *Engine) CheckCondition(rule *alerting.Rule)([]alerting.ConditionRe
} }
} }
return conditionResults, nil conditionResult.QueryResult.MetricData = targetMetricData
conditionResult.ResultItems = resultItems
return conditionResult, nil
} }
func (engine *Engine) Do(rule *alerting.Rule) error { func (engine *Engine) Do(rule *alerting.Rule) error {
log.Tracef("start check condition of rule %s", rule.ID)
conditionResults, err := engine.CheckCondition(rule) var (
if err != nil { alertItem *alerting.Alert
return err err error
} )
lastAlertItem := alerting.Alert{}
err = getLastAlert(rule.ID, rule.Resource.ID, &lastAlertItem)
if err != nil {
return err
}
var alertItem *alerting.Alert
defer func() { defer func() {
if err != nil && alertItem == nil {
alertItem = &alerting.Alert{
ID: util.GetUUID(),
Created: time.Now(),
Updated: time.Now(),
RuleID: rule.ID,
ResourceID: rule.Resource.ID,
ResourceName: rule.Resource.Name,
Expression: rule.Metrics.Expression,
Objects: rule.Resource.Objects,
State: alerting.AlertStateError,
Error: err.Error(),
}
}
if alertItem != nil { if alertItem != nil {
for _, actionResult := range alertItem.ActionExecutionResults { for _, actionResult := range alertItem.ActionExecutionResults {
if actionResult.Error != "" { if actionResult.Error != "" {
@ -312,23 +521,37 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
} }
} }
}() }()
log.Tracef("start check condition of rule %s", rule.ID)
checkResults, err := engine.CheckCondition(rule)
alertItem = &alerting.Alert{
ID: util.GetUUID(),
Created: time.Now(),
Updated: time.Now(),
RuleID: rule.ID,
ResourceID: rule.Resource.ID,
Expression: rule.Metrics.Expression,
Objects: rule.Resource.Objects,
ConditionResult: checkResults,
Conditions: rule.Conditions,
}
if err != nil {
return err
}
lastAlertItem := alerting.Alert{}
err = getLastAlert(rule.ID, &lastAlertItem)
if err != nil {
return err
}
conditionResults := checkResults.ResultItems
if len(conditionResults) == 0 { if len(conditionResults) == 0 {
if lastAlertItem.State != alerting.AlertStateNormal && lastAlertItem.ID != "" { alertItem.Severity = "info"
alertItem = &alerting.Alert{ alertItem.Content = ""
ID: util.GetUUID(), alertItem.State = alerting.AlertStateNormal
Created: time.Now(),
Updated: time.Now(),
RuleID: rule.ID,
ClusterID: rule.Resource.ID,
Expression: rule.Metrics.Expression,
Objects: rule.Resource.Objects,
Severity: "info",
Content: "",
State: alerting.AlertStateNormal,
}
}
return nil return nil
}else{ }else{
if lastAlertItem.State == "" || lastAlertItem.State == alerting.AlertStateNormal {
rule.LastTermStartTime = time.Now()
}
log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID ) log.Debugf("check condition result of rule %s is %v", conditionResults, rule.ID )
var ( var (
severity = conditionResults[0].ConditionItem.Severity severity = conditionResults[0].ConditionItem.Severity
@ -337,21 +560,12 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
for _, conditionResult := range conditionResults { for _, conditionResult := range conditionResults {
if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] { if alerting.SeverityWeights[severity] < alerting.SeverityWeights[conditionResult.ConditionItem.Severity] {
severity = conditionResult.ConditionItem.Severity severity = conditionResult.ConditionItem.Severity
content = conditionResult.ConditionItem.Message
} }
content += conditionResult.ConditionItem.Message + ";"
}
alertItem = &alerting.Alert{
ID: util.GetUUID(),
Created: time.Now(),
Updated: time.Now(),
RuleID: rule.ID,
ClusterID: rule.Resource.ID,
Expression: rule.Metrics.Expression,
Objects: rule.Resource.Objects,
Severity: severity,
Content: content,
State: alerting.AlertStateActive,
} }
alertItem.Severity = severity
alertItem.Content = content
alertItem.State = alerting.AlertStateActive
} }
if rule.Channels.AcceptTimeRange.Include(time.Now()) { if rule.Channels.AcceptTimeRange.Include(time.Now()) {
@ -360,27 +574,43 @@ func (engine *Engine) Do(rule *alerting.Rule) error {
alertItem.Error = err.Error() alertItem.Error = err.Error()
return err return err
} }
period := time.Now().Sub(rule.LastNotificationTime)
//log.Error(lastAlertItem.ID, period, periodDuration)
if time.Now().Sub(lastAlertItem.Created) > periodDuration { if lastAlertItem.ID == "" || period > periodDuration {
actionResults := performChannels(rule.Channels.Normal, conditionResults) actionResults := performChannels(rule.Channels.Normal, conditionResults)
alertItem.ActionExecutionResults = actionResults alertItem.ActionExecutionResults = actionResults
//todo init last notification time when create task (by last alert item is notified)
rule.LastNotificationTime = time.Now()
alertItem.IsNotified = true
} }
if rule.Channels.EscalationEnabled && lastAlertItem.State != alerting.AlertStateNormal { isAck, err := hasAcknowledgedRule(rule.ID, rule.LastTermStartTime)
periodDuration, err = time.ParseDuration(rule.Channels.EscalationThrottlePeriod) if err != nil {
alertItem.Error = err.Error()
return err
}
if rule.Channels.EscalationEnabled && lastAlertItem.ID !="" && !isAck {
throttlePeriod, err := time.ParseDuration(rule.Channels.EscalationThrottlePeriod)
if err != nil { if err != nil {
return err return err
} }
if time.Now().Sub(lastAlertItem.Created) > periodDuration { //todo init last term start time when create task (by last alert item of state normal)
actionResults := performChannels(rule.Channels.Escalation, conditionResults) if time.Now().Sub(rule.LastTermStartTime) > throttlePeriod {
alertItem.ActionExecutionResults = actionResults if time.Now().Sub(rule.LastEscalationTime) > periodDuration {
actionResults := performChannels(rule.Channels.Escalation, conditionResults)
alertItem.ActionExecutionResults = append(alertItem.ActionExecutionResults, actionResults...)
//todo init last escalation time when create task (by last alert item is escalated)
rule.LastEscalationTime = time.Now()
alertItem.IsEscalated = true
}
} }
} }
} }
return nil return nil
} }
func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResult) []alerting.ActionExecutionResult { func performChannels(channels []alerting.Channel, conditionResults []alerting.ConditionResultItem) []alerting.ActionExecutionResult {
var message string var message string
for _, conditionResult := range conditionResults { for _, conditionResult := range conditionResults {
message += fmt.Sprintf("severity: %s\t message:%s\t groups:%v\t timestamp: %v;", conditionResult.ConditionItem.Severity, conditionResult.ConditionItem.Message, conditionResult.GroupValues, time.Now()) message += fmt.Sprintf("severity: %s\t message:%s\t groups:%v\t timestamp: %v;", conditionResult.ConditionItem.Severity, conditionResult.ConditionItem.Message, conditionResult.GroupValues, time.Now())
@ -441,6 +671,12 @@ func performChannel(channel *alerting.Channel, ctx []byte) ([]byte, error) {
} }
func (engine *Engine) GenerateTask(rule *alerting.Rule) func(ctx context.Context) { func (engine *Engine) GenerateTask(rule *alerting.Rule) func(ctx context.Context) {
return func(ctx context.Context) { return func(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
log.Error(err)
debug.PrintStack()
}
}()
err := engine.Do(rule) err := engine.Do(rule)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
@ -465,7 +701,27 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti
continue continue
} }
if vm, ok := v.(map[string]interface{}); ok { if vm, ok := v.(map[string]interface{}); ok {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], vm["value"]}) if metricVal, ok := vm["value"]; ok {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal})
}else{
//percentiles agg type
switch vm["values"].(type) {
case []interface{}:
for _, val := range vm["values"].([]interface{}) {
if valM, ok := val.(map[string]interface{}); ok {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], valM["value"]})
}
break
}
case map[string]interface{}:
for _, val := range vm["values"].(map[string]interface{}) {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], val})
break
}
}
}
} }
} }
@ -501,8 +757,7 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti
} }
} }
func getLastAlert(ruleID, clusterID string, alertItem *alerting.Alert) error { func getLastAlert(ruleID string, alertItem *alerting.Alert) error {
esClient := elastic.GetClient(clusterID)
queryDsl := util.MapStr{ queryDsl := util.MapStr{
"size": 1, "size": 1,
"sort": []util.MapStr{ "sort": []util.MapStr{
@ -520,13 +775,60 @@ func getLastAlert(ruleID, clusterID string, alertItem *alerting.Alert) error {
}, },
}, },
} }
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(alertItem), util.MustToJSONBytes(queryDsl) ) q := orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),
}
err, searchResult := orm.Search(alertItem, &q )
if err != nil { if err != nil {
return err return err
} }
if len(searchRes.Hits.Hits) == 0 { if len(searchResult.Result) == 0 {
return nil return nil
} }
alertBytes := util.MustToJSONBytes(searchRes.Hits.Hits[0].Source) alertBytes := util.MustToJSONBytes(searchResult.Result[0])
return util.FromJSONBytes(alertBytes, alertItem) return util.FromJSONBytes(alertBytes, alertItem)
} }
func hasAcknowledgedRule(ruleID string, startTime time.Time) (bool, error){
queryDsl := util.MapStr{
"size": 1,
"query": util.MapStr{
"bool": util.MapStr{
"must":[]util.MapStr{
{
"term": util.MapStr{
"rule_id": util.MapStr{
"value": ruleID,
},
},
},
{
"term": util.MapStr{
"state": alerting.AlertStateAcknowledge,
},
},
{
"range": util.MapStr{
"created": util.MapStr{
"gte": startTime,
},
},
},
},
},
},
}
q := orm.Query{
WildcardIndex: true,
RawQuery: util.MustToJSONBytes(queryDsl),
}
err, searchResult := orm.Search(alerting.Alert{}, &q )
if err != nil {
return false, err
}
if len(searchResult.Result) == 0 {
return false, nil
}
return true, nil
}

View File

@ -7,7 +7,6 @@ package elasticsearch
import ( import (
"fmt" "fmt"
"infini.sh/console/model/alerting" "infini.sh/console/model/alerting"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"net/http" "net/http"
"sort" "sort"
@ -17,18 +16,16 @@ import (
func TestEngine( t *testing.T) { func TestEngine( t *testing.T) {
rule := alerting.Rule{ rule := alerting.Rule{
ORMObjectBase: orm.ORMObjectBase{ ID: util.GetUUID(),
ID: util.GetUUID(), Created: time.Now(),
Created: time.Now(), Updated: time.Now(),
Updated: time.Now(),
},
Enabled: true, Enabled: true,
Resource: alerting.Resource{ Resource: alerting.Resource{
ID: "c8i18llath2blrusdjng", ID: "c8i18llath2blrusdjng",
Type: "elasticsearch", Type: "elasticsearch",
Objects: []string{".infini_metrics*"}, Objects: []string{".infini_metrics*"},
TimeField: "timestamp", TimeField: "timestamp",
Filter: alerting.Filter{ Filter: alerting.FilterQuery{
And: []alerting.FilterQuery{ And: []alerting.FilterQuery{
{Field: "timestamp", Operator: "gte", Values: []string{"now-15m"}}, {Field: "timestamp", Operator: "gte", Values: []string{"now-15m"}},
//{Field: "payload.elasticsearch.cluster_health.status", Operator: "equals", Values: []string{"red"}}, //{Field: "payload.elasticsearch.cluster_health.status", Operator: "equals", Values: []string{"red"}},
@ -117,3 +114,169 @@ func TestEngine( t *testing.T) {
//fmt.Println(util.MustToJSON(filter)) //fmt.Println(util.MustToJSON(filter))
} }
func TestGenerateAgg(t *testing.T) {
eng := &Engine{}
agg := eng.generateAgg(&alerting.MetricItem{
Name: "a",
Field: "cpu.percent",
Statistic: "p99",
})
fmt.Println(util.MustToJSON(agg))
}
func TestGeneratePercentilesAggQuery(t *testing.T) {
//rule := alerting.Rule{
// ID: util.GetUUID(),
// Created: time.Now(),
// Updated: time.Now(),
// Enabled: true,
// Resource: alerting.Resource{
// ID: "c8i18llath2blrusdjng",
// Type: "elasticsearch",
// Objects: []string{".infini_metrics*"},
// TimeField: "timestamp",
// RawFilter: map[string]interface{}{
// "match_all": util.MapStr{
//
// },
// },
// },
//
// Metrics: alerting.Metric{
// PeriodInterval: "1m",
// MaxPeriods: 15,
// Items: []alerting.MetricItem{
// {Name: "a", Field: "payload.elasticsearch.node_stats.os.cpu.percent", Statistic: "p99", Group: []string{"metadata.labels.cluster_id", "metadata.labels.node_id"}},
// },
// },
// Conditions: alerting.Condition{
// Operator: "any",
// Items: []alerting.ConditionItem{
// {MinimumPeriodMatch: 5, Operator: "gte", Values: []string{"90"}, Severity: "error", Message: "cpu使用率大于90%"},
// },
// },
//
// Channels: alerting.RuleChannel{
// Normal: []alerting.Channel{
// {Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
// HeaderParams: map[string]string{
// "Content-Type": "application/json",
// },
// Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
// Method: http.MethodPost,
// URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX",
// }},
// },
// ThrottlePeriod: "1h",
// AcceptTimeRange: alerting.TimeRange{
// Start: "8:00",
// End: "21:00",
// },
// EscalationEnabled: true,
// EscalationThrottlePeriod: "30m",
// },
//}
rule := alerting.Rule{
ID: util.GetUUID(),
Created: time.Now(),
Updated: time.Now(),
Enabled: true,
Resource: alerting.Resource{
ID: "c8i18llath2blrusdjng",
Type: "elasticsearch",
Objects: []string{".infini_metrics*"},
TimeField: "timestamp",
RawFilter: map[string]interface{}{
"bool": map[string]interface{}{
"must": []interface{}{
util.MapStr{
"term": util.MapStr{
"metadata.name": util.MapStr{
"value": "index_stats",
},
},
},
},
},
},
},
Metrics: alerting.Metric{
PeriodInterval: "1m",
MaxPeriods: 15,
Items: []alerting.MetricItem{
{Name: "a", Field: "payload.elasticsearch.index_stats.total.search.query_total", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}},
{Name: "b", Field: "payload.elasticsearch.index_stats.total.search.query_time_in_millis", Statistic: "rate", Group: []string{"metadata.labels.cluster_id"}},
},
Formula: "b/a",
},
Conditions: alerting.Condition{
Operator: "any",
Items: []alerting.ConditionItem{
{MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"10"}, Severity: "warning", Message: "搜索延迟大于10ms"},
},
},
Channels: alerting.RuleChannel{
Normal: []alerting.Channel{
{Name: "钉钉", Type: alerting.ChannelWebhook, Webhook: &alerting.CustomWebhook{
HeaderParams: map[string]string{
"Content-Type": "application/json",
},
Body: `{"msgtype": "text","text": {"content":"告警通知: {{ctx.message}}"}}`,
Method: http.MethodPost,
URL: "https://oapi.dingtalk.com/robot/send?access_token=XXXXXX",
}},
},
ThrottlePeriod: "1h",
AcceptTimeRange: alerting.TimeRange{
Start: "08:00",
End: "21:00",
},
EscalationEnabled: true,
EscalationThrottlePeriod: "30m",
},
}
eng := &Engine{}
q, err := eng.GenerateQuery(&rule)
if err != nil {
t.Fatal(err)
}
fmt.Println(util.MustToJSON(q))
}
func TestConvertFilterQuery(t *testing.T) {
fq := alerting.FilterQuery{
And: []alerting.FilterQuery{
{
Field: "metadata.category",
Values: []string{"elasticsearch"},
Operator: "equals",
},
{
Field: "metadata.name",
Values: []string{"index_stats", "node_stats"},
Operator: "in",
},
{
Not: []alerting.FilterQuery{
{
Field: "timestamp",
Operator: "gt",
Values: []string{"2022-04-16T16:16:39.168605+08:00"},
},
},
},
},
}
var targetDsl = `{"bool":{"must":[{"term":{"metadata.category":{"value":"elasticsearch"}}},{"terms":{"metadata.name":["index_stats","node_stats"]}},{"bool":{"must_not":[{"range":{"timestamp":{"gt":"2022-04-16T16:16:39.168605+08:00"}}}]}}]}}`
eng := &Engine{}
q, err := eng.ConvertFilterQueryToDsl(&fq)
if err != nil {
t.Fatal(err)
}
if dsl := util.MustToJSON(q); dsl != targetDsl {
t.Errorf("expect dsl %s but got %s", targetDsl, dsl)
}
}

View File

@ -13,8 +13,8 @@ import (
type Engine interface { type Engine interface {
GenerateQuery(rule *alerting.Rule) (interface{}, error) GenerateQuery(rule *alerting.Rule) (interface{}, error)
ExecuteQuery(rule *alerting.Rule)([]alerting.MetricData, error) ExecuteQuery(rule *alerting.Rule)(*alerting.QueryResult, error)
CheckCondition(rule *alerting.Rule)([]alerting.ConditionResult, error) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error)
GenerateTask(rule *alerting.Rule) func(ctx context.Context) GenerateTask(rule *alerting.Rule) func(ctx context.Context)
} }

97
service/alerting/init.go Normal file
View File

@ -0,0 +1,97 @@
/* Copyright © INFINI Ltd. All rights reserved.
* web: https://infinilabs.com
* mail: hello#infini.ltd */
package alerting
import (
"infini.sh/console/model/alerting"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/task"
"infini.sh/framework/core/util"
log "src/github.com/cihub/seelog"
"time"
)
func InitTasks() error {
//fetch alerting rules from es
q := orm.Query{
Size: 10000,
WildcardIndex: true,
}
q.Conds = orm.And(orm.Eq("enabled", true))
err, result := orm.Search(alerting.Rule{}, &q)
if err != nil {
return err
}
for _, ruleM := range result.Result {
rb := util.MustToJSONBytes(ruleM)
rule := &alerting.Rule{}
err = util.FromJSONBytes(rb, rule)
if err != nil {
return err
}
if !rule.Enabled {
continue
}
eng := GetEngine(rule.Resource.Type)
task.RegisterScheduleTask(task.ScheduleTask{
ID: rule.ID,
Interval: rule.Schedule.Interval,
Description: rule.Metrics.Expression,
Task: eng.GenerateTask(rule),
})
task.StartTask(rule.ID)
}
return nil
}
func getRuleLastTermTime() (map[string]time.Time, error) {
query := util.MapStr{
"_source": "created",
"sort": []util.MapStr{
{
"created": util.MapStr{
"order": "desc",
},
},
},
"collapse": util.MapStr{
"field": "rule_id",
},
"query": util.MapStr{
"term": util.MapStr{
"state": util.MapStr{
"value": "normal",
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(query),
Size: 1000,
WildcardIndex: true,
}
err, result := orm.Search(alerting.Alert{}, q)
if err != nil {
return nil, err
}
times := map[string]time.Time{}
obj := &ruleTime{}
for _, item := range result.Result {
itemBytes := util.MustToJSONBytes(item)
err = util.FromJSONBytes(itemBytes, obj)
if err != nil {
log.Error(err)
continue
}
times[obj.RuleID] = obj.Created
}
return times, nil
}
type ruleTime struct {
Created time.Time `json:"created"`
RuleID string `json:"rule_id"`
}