From ef3aa89a9c80a83ec73b714d34053956d9a3d6fc Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 1 Jun 2022 12:40:41 +0800 Subject: [PATCH] return query dsl after search error --- service/alerting/elasticsearch/engine.go | 142 ++++++++++++++--------- 1 file changed, 86 insertions(+), 56 deletions(-) diff --git a/service/alerting/elasticsearch/engine.go b/service/alerting/elasticsearch/engine.go index 1ca25cb2..0d44db3d 100644 --- a/service/alerting/elasticsearch/engine.go +++ b/service/alerting/elasticsearch/engine.go @@ -56,14 +56,6 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F return nil, err } } - if len(filter) > 0 { - basicAggs = util.MapStr{ - "filter_agg": util.MapStr{ - "filter": filter, - "aggs": basicAggs, - }, - } - } targetESVersion := elastic.GetMetadata(rule.Resource.ID).Config.Version intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion) if err != nil { @@ -74,11 +66,22 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F periodInterval = filterParam.BucketSize } timeAggs := util.MapStr{ - "date_histogram": util.MapStr{ - "field": rule.Resource.TimeField, - intervalField: periodInterval, + "time_buckets": util.MapStr{ + "date_histogram": util.MapStr{ + "field": rule.Resource.TimeField, + intervalField: periodInterval, + }, + "aggs": basicAggs, }, - "aggs": basicAggs, + } + + if len(filter) > 0 { + timeAggs = util.MapStr{ + "filter_agg": util.MapStr{ + "filter": filter, + "aggs": timeAggs, + }, + } } var rootAggs util.MapStr groups := rule.Metrics.Items[0].Group @@ -103,9 +106,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F groupID: lastGroupAgg, } }else{ - groupAgg["aggs"] = util.MapStr{ - "time_buckets": timeAggs, - } + groupAgg["aggs"] = timeAggs } lastGroupAgg = groupAgg } @@ -113,9 +114,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F util.GetUUID(): lastGroupAgg, } }else{ - rootAggs = util.MapStr{ - "time_buckets": timeAggs, - } + rootAggs = timeAggs } return util.MapStr{ @@ -310,8 +309,8 @@ func (engine *Engine) generateTimeFilter(rule *alerting.Rule, filterParam *alert if err != nil { return nil, err } - timeStart = time.Now().Add(-duration).Format(time.RFC3339Nano) - timeEnd = time.Now().Format(time.RFC3339Nano) + timeStart = time.Now().Add(-duration).UnixMilli() //.Format(time.RFC3339Nano) + timeEnd = time.Now().UnixMilli() } timeQuery := util.MapStr{ @@ -397,16 +396,42 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi queryResult.Nodata = true } if searchRes.StatusCode != 200 { - return nil, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body)) + return queryResult, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body)) } queryResult.Raw = string(searchRes.RawResult.Body) searchResult := map[string]interface{}{} err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult) if err != nil { - return nil, err + return queryResult, err } metricData := []alerting.MetricData{} collectMetricData(searchResult["aggregations"], "", &metricData) + //将 rate 求导数据 除以 bucket size (单位 /s) + //statisticM := map[string] string{} + //for _, mi := range rule.Metrics.Items { + // statisticM[mi.Name] = mi.Statistic + //} + //var periodInterval = rule.Metrics.PeriodInterval + //if filterParam != nil && filterParam.BucketSize != "" { + // periodInterval = filterParam.BucketSize + //} + //interval, err := time.ParseDuration(periodInterval) + //if err != nil { + // log.Error(err) + //} + //for i, _ := range metricData { + // for k, d := range metricData[i].Data { + // if statisticM[k] == "rate" { + // for _, td := range d { + // if len(td) > 1 { + // if v, ok := td[1].(float64); ok { + // td[1] = v / interval.Seconds() + // } + // } + // } + // } + // } + //} queryResult.MetricData = metricData return queryResult, nil } @@ -960,39 +985,39 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti continue } //has filter - if k == "filter_agg" { - if filterM, ok := v.(map[string]interface{}); ok { - for fk, fv := range filterM { - if fk == "doc_count" { - continue - } - if vm, ok := fv.(map[string]interface{}); ok { - if metricVal, ok := vm["value"]; ok { - md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], metricVal}) - }else{ - //percentiles agg type - switch vm["values"].(type) { - case []interface{}: - for _, val := range vm["values"].([]interface{}) { - if valM, ok := val.(map[string]interface{}); ok { - md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], valM["value"]}) - } - break - } - case map[string]interface{}: - for _, val := range vm["values"].(map[string]interface{}) { - md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], val}) - break - } - } - - } - - } - } - } - continue - } + //if k == "filter_agg" { + // if filterM, ok := v.(map[string]interface{}); ok { + // for fk, fv := range filterM { + // if fk == "doc_count" { + // continue + // } + // if vm, ok := fv.(map[string]interface{}); ok { + // if metricVal, ok := vm["value"]; ok { + // md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], metricVal}) + // }else{ + // //percentiles agg type + // switch vm["values"].(type) { + // case []interface{}: + // for _, val := range vm["values"].([]interface{}) { + // if valM, ok := val.(map[string]interface{}); ok { + // md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], valM["value"]}) + // } + // break + // } + // case map[string]interface{}: + // for _, val := range vm["values"].(map[string]interface{}) { + // md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], val}) + // break + // } + // } + // + // } + // + // } + // } + // } + // continue + //} if vm, ok := v.(map[string]interface{}); ok { if metricVal, ok := vm["value"]; ok { md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal}) @@ -1038,7 +1063,12 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti if groupValues != "" { newGroupValues = fmt.Sprintf("%s*%s", groupValues, currentGroup) } - collectMetricData(bk, newGroupValues, metricData) + if filterAgg, ok := bkVal["filter_agg"].(map[string]interface{}); ok { + collectMetricData(filterAgg, newGroupValues, metricData) + }else{ + collectMetricData(bk, newGroupValues, metricData) + } + } }