From 29223f50a20cc797380d3249f89d178e4f8299d8 Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 7 Jul 2022 14:10:34 +0800 Subject: [PATCH] change filter agg to top agg --- plugin/api/alerting/alert.go | 4 +- plugin/api/alerting/api.go | 8 +-- plugin/api/gateway/api.go | 2 +- plugin/api/init.go | 8 +-- service/alerting/elasticsearch/engine.go | 69 ++++++------------- service/alerting/elasticsearch/engine_test.go | 8 +-- 6 files changed, 36 insertions(+), 63 deletions(-) diff --git a/plugin/api/alerting/alert.go b/plugin/api/alerting/alert.go index 6e3117fc..4ccdd730 100644 --- a/plugin/api/alerting/alert.go +++ b/plugin/api/alerting/alert.go @@ -93,7 +93,9 @@ func (h *AlertAPI) searchAlert(w http.ResponseWriter, req *http.Request, ps http from = 0 } - q := orm.Query{} + q := orm.Query{ + WildcardIndex: true, + } queryDSL = fmt.Sprintf(queryDSL, sortBuilder.String(), mustBuilder.String(), size, from) q.RawQuery = []byte(queryDSL) diff --git a/plugin/api/alerting/api.go b/plugin/api/alerting/api.go index 8bcb55da..0e0627b2 100644 --- a/plugin/api/alerting/api.go +++ b/plugin/api/alerting/api.go @@ -19,13 +19,13 @@ type AlertAPI struct { func (alert *AlertAPI) Init() { api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id", alert.RequirePermission(alert.getRule,enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.POST, "/alerting/rule", alert.RequirePermission(alert.createRule, enum.PermissionAlertRuleWrite)) - api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.sendTestMessage) + api.HandleAPIMethod(api.POST, "/alerting/rule/test", alert.RequireLogin(alert.sendTestMessage)) api.HandleAPIMethod(api.DELETE, "/alerting/rule/:rule_id", alert.RequirePermission(alert.deleteRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.PUT, "/alerting/rule/:rule_id", alert.RequirePermission(alert.updateRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.GET, "/alerting/rule/_search", alert.RequirePermission(alert.searchRule, enum.PermissionAlertRuleRead)) - api.HandleAPIMethod(api.GET, "/alerting/stats", alert.getAlertStats) - api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.fetchAlertInfos) - api.HandleAPIMethod(api.POST, "/alerting/rule/preview_metric", alert.getPreviewMetricData) + api.HandleAPIMethod(api.GET, "/alerting/stats", alert.RequirePermission(alert.getAlertStats, enum.PermissionAlertHistoryRead)) + api.HandleAPIMethod(api.POST, "/alerting/rule/info", alert.RequirePermission(alert.fetchAlertInfos, enum.PermissionAlertHistoryRead)) + api.HandleAPIMethod(api.POST, "/alerting/rule/preview_metric", alert.RequireLogin(alert.getPreviewMetricData)) api.HandleAPIMethod(api.POST, "/alerting/rule/:rule_id/_enable", alert.RequirePermission(alert.enableRule, enum.PermissionAlertRuleWrite)) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/metric", alert.RequirePermission(alert.getMetricData, enum.PermissionAlertRuleRead)) api.HandleAPIMethod(api.GET, "/alerting/rule/:rule_id/info", alert.RequirePermission(alert.getRuleDetail, enum.PermissionAlertRuleRead, enum.PermissionAlertMessageRead)) diff --git a/plugin/api/gateway/api.go b/plugin/api/gateway/api.go index ab1ac7f4..d41a9524 100644 --- a/plugin/api/gateway/api.go +++ b/plugin/api/gateway/api.go @@ -15,7 +15,7 @@ type GatewayAPI struct { func init() { gateway:=GatewayAPI{} - api.HandleAPIMethod(api.POST, "/gateway/instance/try_connect", gateway.tryConnect) + api.HandleAPIMethod(api.POST, "/gateway/instance/try_connect", gateway.RequireLogin(gateway.tryConnect)) api.HandleAPIMethod(api.GET, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.getInstance, enum.PermissionGatewayInstanceRead)) api.HandleAPIMethod(api.POST, "/gateway/instance", gateway.RequirePermission(gateway.createInstance, enum.PermissionGatewayInstanceWrite)) api.HandleAPIMethod(api.PUT, "/gateway/instance/:instance_id", gateway.RequirePermission(gateway.updateInstance, enum.PermissionGatewayInstanceWrite)) diff --git a/plugin/api/init.go b/plugin/api/init.go index fec22f23..4e43870e 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -25,10 +25,10 @@ func Init(cfg *config.AppConfig) { api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "dict/:id"), handler.DeleteDictItemAction) api.HandleAPIMethod(api.PUT, path.Join(pathPrefix, "dict/:id"), handler.UpdateDictItemAction) - api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index/_search"), handler.HandleSearchDocumentAction) - api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index"), handler.ClusterRequired(handler.HandleAddDocumentAction, "doc.create")) - api.HandleAPIMethod(api.PUT, path.Join(esPrefix, "doc/:index/:docId"), handler.ClusterRequired(handler.HandleUpdateDocumentAction, "doc.update")) - api.HandleAPIMethod(api.DELETE, path.Join(esPrefix, "doc/:index/:docId"), handler.ClusterRequired(handler.HandleDeleteDocumentAction, "doc.delete")) + api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index/_search"), handler.RequireLogin(handler.HandleSearchDocumentAction)) + api.HandleAPIMethod(api.POST, path.Join(esPrefix, "doc/:index"), handler.IndexRequired(handler.HandleAddDocumentAction, "doc.create")) + api.HandleAPIMethod(api.PUT, path.Join(esPrefix, "doc/:index/:docId"), handler.IndexRequired(handler.HandleUpdateDocumentAction, "doc.update")) + api.HandleAPIMethod(api.DELETE, path.Join(esPrefix, "doc/:index/:docId"), handler.IndexRequired(handler.HandleDeleteDocumentAction, "doc.delete")) api.HandleAPIMethod(api.GET, path.Join(esPrefix, "doc/_validate"), handler.ValidateDocIDAction) api.HandleAPIMethod(api.POST, path.Join(pathPrefix, "rebuild/*id"), handler.HandleReindexAction) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 7da35e88..9c5ead50 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -75,14 +75,6 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F }, } - if len(filter) > 0 { - timeAggs = util.MapStr{ - "filter_agg": util.MapStr{ - "filter": filter, - "aggs": timeAggs, - }, - } - } var rootAggs util.MapStr groups := rule.Metrics.Items[0].Group limit := rule.Metrics.Items[0].Limit @@ -116,6 +108,14 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F }else{ rootAggs = timeAggs } + if len(filter) > 0 { + rootAggs = util.MapStr{ + "filter_agg": util.MapStr{ + "filter": filter, + "aggs": rootAggs, + }, + } + } return util.MapStr{ "size": 0, @@ -405,7 +405,7 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi return queryResult, err } metricData := []alerting.MetricData{} - collectMetricData(searchResult["aggregations"], "", &metricData) + CollectMetricData(searchResult["aggregations"], "", &metricData) //将 rate 求导数据 除以 bucket size (单位 /s) //statisticM := map[string] string{} //for _, mi := range rule.Metrics.Items { @@ -502,7 +502,7 @@ func (engine *Engine) GetTargetMetricData(rule *alerting.Rule, isFilterNaN bool, //sort conditions by severity desc before check , and then if condition is true, then continue check another group func (engine *Engine) CheckCondition(rule *alerting.Rule)(*alerting.ConditionResult, error){ var resultItems []alerting.ConditionResultItem - targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, false, nil) + targetMetricData, queryResult, err := engine.GetTargetMetricData(rule, true, nil) conditionResult := &alerting.ConditionResult{ QueryResult: queryResult, } @@ -970,6 +970,15 @@ func (engine *Engine) GenerateTask(rule alerting.Rule) func(ctx context.Context) } } +func CollectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData){ + if aggM, ok := agg.(map[string]interface{}); ok { + if targetAgg, ok := aggM["filter_agg"]; ok { + collectMetricData(targetAgg, groupValues, metricData) + }else{ + collectMetricData(aggM, groupValues, metricData) + } + } +} func collectMetricData(agg interface{}, groupValues string, metricData *[]alerting.MetricData){ if aggM, ok := agg.(map[string]interface{}); ok { @@ -986,40 +995,6 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti if k == "key" || k == "key_as_string" || k== "doc_count"{ continue } - //has filter - //if k == "filter_agg" { - // if filterM, ok := v.(map[string]interface{}); ok { - // for fk, fv := range filterM { - // if fk == "doc_count" { - // continue - // } - // if vm, ok := fv.(map[string]interface{}); ok { - // if metricVal, ok := vm["value"]; ok { - // md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], metricVal}) - // }else{ - // //percentiles agg type - // switch vm["values"].(type) { - // case []interface{}: - // for _, val := range vm["values"].([]interface{}) { - // if valM, ok := val.(map[string]interface{}); ok { - // md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], valM["value"]}) - // } - // break - // } - // case map[string]interface{}: - // for _, val := range vm["values"].(map[string]interface{}) { - // md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], val}) - // break - // } - // } - // - // } - // - // } - // } - // } - // continue - //} if vm, ok := v.(map[string]interface{}); ok { if metricVal, ok := vm["value"]; ok { md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal}) @@ -1065,12 +1040,8 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti if groupValues != "" { newGroupValues = fmt.Sprintf("%s*%s", groupValues, currentGroup) } - if filterAgg, ok := bkVal["filter_agg"].(map[string]interface{}); ok { - collectMetricData(filterAgg, newGroupValues, metricData) - }else{ - collectMetricData(bk, newGroupValues, metricData) - } + collectMetricData(bk, newGroupValues, metricData) } } diff --git a/service/alerting/elasticsearch/engine_test.go b/service/alerting/elasticsearch/engine_test.go index 6a0eccb2..377eed5c 100644 --- a/service/alerting/elasticsearch/engine_test.go +++ b/service/alerting/elasticsearch/engine_test.go @@ -65,8 +65,8 @@ func TestEngine( t *testing.T) { Conditions: alerting.Condition{ Operator: "any", Items: []alerting.ConditionItem{ - {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"10"}, Severity: "error", Message: "磁盘可用率小于10%"}, - {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"20"}, Severity: "warning", Message: "磁盘可用率小于20%"}, + {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"10"}, Severity: "error"}, + {MinimumPeriodMatch: 1, Operator: "lte", Values: []string{"20"}, Severity: "warning"}, }, }, @@ -212,7 +212,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { Conditions: alerting.Condition{ Operator: "any", Items: []alerting.ConditionItem{ - {MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"10"}, Severity: "warning", Message: "搜索延迟大于10ms"}, + {MinimumPeriodMatch: 1, Operator: "gte", Values: []string{"10"}, Severity: "warning"}, }, }, @@ -237,7 +237,7 @@ func TestGeneratePercentilesAggQuery(t *testing.T) { }, } eng := &Engine{} - q, err := eng.GenerateQuery(&rule) + q, err := eng.GenerateQuery(&rule, nil) if err != nil { t.Fatal(err) }