change filter agg to top agg

This commit is contained in:
liugq 2022-07-07 14:10:34 +08:00
parent 14f1f0ed65
commit 29223f50a2
6 changed files with 36 additions and 63 deletions

View File

@ -93,7 +93,9 @@ func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps http
from = 0
}
q := orm.Query{}
q := orm.Query{
WildcardIndex: true,
}
queryDSL = fmt.Sprintf(queryDSL, sortBuilder.String(), mustBuilder.String(), size, from)
q.RawQuery = []byte(queryDSL)

View File

@ -19,13 +19,13 @@ type AlertAPI struct {
func (alert *AlertAPI) Init() {
api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.RequirePermission(alert.getRule,enum.PermissionAlertRuleRead))
api.HandleAPIMethod(api.POST, "/alerting/rule", alert.RequirePermission(alert.createRule, enum.PermissionAlertRuleWrite))
api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.sendTestMessage)
api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.RequireLogin(alert.sendTestMessage))
api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.RequirePermission(alert.deleteRule, enum.PermissionAlertRuleWrite))
api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.RequirePermission(alert.updateRule, enum.PermissionAlertRuleWrite))
api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.RequirePermission(alert.searchRule, enum.PermissionAlertRuleRead))
api.HandleAPIMethod(api.GET, "/alerting/stats", alert.getAlertStats)
api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.fetchAlertInfos)
api.HandleAPIMethod(api.POST, "/alerting/rule/preview_metric", alert.getPreviewMetricData)
api.HandleAPIMethod(api.GET, "/alerting/stats", alert.RequirePermission(alert.getAlertStats, enum.PermissionAlertHistoryRead))
api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.RequirePermission(alert.fetchAlertInfos, enum.PermissionAlertHistoryRead))
api.HandleAPIMethod(api.POST, "/alerting/rule/preview_metric", alert.RequireLogin(alert.getPreviewMetricData))
api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.RequirePermission(alert.enableRule, enum.PermissionAlertRuleWrite))
api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/metric", alert.RequirePermission(alert.getMetricData, enum.PermissionAlertRuleRead))
api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/info", alert.RequirePermission(alert.getRuleDetail, enum.PermissionAlertRuleRead, enum.PermissionAlertMessageRead))

View File

@ -15,7 +15,7 @@ type GatewayAPI struct {
func init() {
gateway:=GatewayAPI{}
api.HandleAPIMethod(api.POST, "/gateway/instance/try_connect", gateway.tryConnect)
api.HandleAPIMethod(api.POST, "/gateway/instance/try_connect", gateway.RequireLogin(gateway.tryConnect))
api.HandleAPIMethod(api.GET, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.getInstance, enum.PermissionGatewayInstanceRead))
api.HandleAPIMethod(api.POST, "/gateway/instance", gateway.RequirePermission(gateway.createInstance, enum.PermissionGatewayInstanceWrite))
api.HandleAPIMethod(api.PUT, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.updateInstance, enum.PermissionGatewayInstanceWrite))

View File

@ -25,10 +25,10 @@ func Init(cfg *config.AppConfig) {
api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "dict/:id"), handler.DeleteDictItemAction)
api.HandleAPIMethod(api.PUT, path.Join(pathPrefix, "dict/:id"), handler.UpdateDictItemAction)
api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index/_search"), handler.HandleSearchDocumentAction)
api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index"), handler.ClusterRequired(handler.HandleAddDocumentAction, "doc.create"))
api.HandleAPIMethod(api.PUT, path.Join(esPrefix, "doc/:index/:docId"), handler.ClusterRequired(handler.HandleUpdateDocumentAction, "doc.update"))
api.HandleAPIMethod(api.DELETE, path.Join(esPrefix, "doc/:index/:docId"), handler.ClusterRequired(handler.HandleDeleteDocumentAction, "doc.delete"))
api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index/_search"), handler.RequireLogin(handler.HandleSearchDocumentAction))
api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index"), handler.IndexRequired(handler.HandleAddDocumentAction, "doc.create"))
api.HandleAPIMethod(api.PUT, path.Join(esPrefix, "doc/:index/:docId"), handler.IndexRequired(handler.HandleUpdateDocumentAction, "doc.update"))
api.HandleAPIMethod(api.DELETE, path.Join(esPrefix, "doc/:index/:docId"), handler.IndexRequired(handler.HandleDeleteDocumentAction, "doc.delete"))
api.HandleAPIMethod(api.GET, path.Join(esPrefix, "doc/_validate"), handler.ValidateDocIDAction)
api.HandleAPIMethod(api.POST, path.Join(pathPrefix, "rebuild/*id"), handler.HandleReindexAction)

View File

@ -75,14 +75,6 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
},
}
if len(filter) > 0 {
timeAggs = util.MapStr{
"filter_agg": util.MapStr{
"filter": filter,
"aggs": timeAggs,
},
}
}
var rootAggs util.MapStr
groups := rule.Metrics.Items[0].Group
limit := rule.Metrics.Items[0].Limit
@ -116,6 +108,14 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
}else{
rootAggs = timeAggs
}
if len(filter) > 0 {
rootAggs = util.MapStr{
"filter_agg": util.MapStr{
"filter": filter,
"aggs": rootAggs,
},
}
}
return util.MapStr{
"size": 0,
@ -405,7 +405,7 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi
return queryResult, err
}
metricData := []alerting.MetricData{}
collectMetricData(searchResult["aggregations"], "", &metricData)
CollectMetricData(searchResult["aggregations"], "", &metricData)
//将 rate 求导数据 除以 bucket size (单位 /s)
//statisticM := map[string] string{}
//for _, mi := range rule.Metrics.Items {
@ -502,7 +502,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool,
//sort conditions by severity desc before check , and then if condition is true, then continue check another group
func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){
var resultItems []alerting.ConditionResultItem
targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, false, nil)
targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, true, nil)
conditionResult := &alerting.ConditionResult{
QueryResult: queryResult,
}
@ -970,6 +970,15 @@ func (engine *Engine) GenerateTask(rule alerting.Rule) func(ctx context.Context)
}
}
func CollectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData){
if aggM, ok := agg.(map[string]interface{}); ok {
if targetAgg, ok := aggM["filter_agg"]; ok {
collectMetricData(targetAgg, groupValues, metricData)
}else{
collectMetricData(aggM, groupValues, metricData)
}
}
}
func collectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData){
if aggM, ok := agg.(map[string]interface{}); ok {
@ -986,40 +995,6 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti
if k == "key" || k == "key_as_string" || k== "doc_count"{
continue
}
//has filter
//if k == "filter_agg" {
// if filterM, ok := v.(map[string]interface{}); ok {
// for fk, fv := range filterM {
// if fk == "doc_count" {
// continue
// }
// if vm, ok := fv.(map[string]interface{}); ok {
// if metricVal, ok := vm["value"]; ok {
// md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], metricVal})
// }else{
// //percentiles agg type
// switch vm["values"].(type) {
// case []interface{}:
// for _, val := range vm["values"].([]interface{}) {
// if valM, ok := val.(map[string]interface{}); ok {
// md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], valM["value"]})
// }
// break
// }
// case map[string]interface{}:
// for _, val := range vm["values"].(map[string]interface{}) {
// md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], val})
// break
// }
// }
//
// }
//
// }
// }
// }
// continue
//}
if vm, ok := v.(map[string]interface{}); ok {
if metricVal, ok := vm["value"]; ok {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal})
@ -1065,12 +1040,8 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti
if groupValues != "" {
newGroupValues = fmt.Sprintf("%s*%s", groupValues, currentGroup)
}
if filterAgg, ok := bkVal["filter_agg"].(map[string]interface{}); ok {
collectMetricData(filterAgg, newGroupValues, metricData)
}else{
collectMetricData(bk, newGroupValues, metricData)
}
collectMetricData(bk, newGroupValues, metricData)
}
}

View File

@ -65,8 +65,8 @@ func TestEngine( t *testing.T) {
Conditions: alerting.Condition{
Operator: "any",
Items: []alerting.ConditionItem{
{MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"10"}, Severity: "error", Message: "磁盘可用率小于10%"},
{MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"20"}, Severity: "warning", Message: "磁盘可用率小于20%"},
{MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"10"}, Severity: "error"},
{MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"20"}, Severity: "warning"},
},
},
@ -212,7 +212,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
Conditions: alerting.Condition{
Operator: "any",
Items: []alerting.ConditionItem{
{MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"10"}, Severity: "warning", Message: "搜索延迟大于10ms"},
{MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"10"}, Severity: "warning"},
},
},
@ -237,7 +237,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) {
},
}
eng := &Engine{}
q, err := eng.GenerateQuery(&rule)
q, err := eng.GenerateQuery(&rule, nil)
if err != nil {
t.Fatal(err)
}