return query dsl after search error

This commit is contained in:
liugq 2022-06-01 12:40:41 +08:00
parent eaaf44bd7a
commit ef3aa89a9c
1 changed files with 86 additions and 56 deletions

View File

@ -56,14 +56,6 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
return nil, err
}
}
if len(filter) > 0 {
basicAggs = util.MapStr{
"filter_agg": util.MapStr{
"filter": filter,
"aggs": basicAggs,
},
}
}
targetESVersion := elastic.GetMetadata(rule.Resource.ID).Config.Version
intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion)
if err != nil {
@ -74,11 +66,22 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
periodInterval = filterParam.BucketSize
}
timeAggs := util.MapStr{
"date_histogram": util.MapStr{
"field": rule.Resource.TimeField,
intervalField: periodInterval,
"time_buckets": util.MapStr{
"date_histogram": util.MapStr{
"field": rule.Resource.TimeField,
intervalField: periodInterval,
},
"aggs": basicAggs,
},
"aggs": basicAggs,
}
if len(filter) > 0 {
timeAggs = util.MapStr{
"filter_agg": util.MapStr{
"filter": filter,
"aggs": timeAggs,
},
}
}
var rootAggs util.MapStr
groups := rule.Metrics.Items[0].Group
@ -103,9 +106,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
groupID: lastGroupAgg,
}
}else{
groupAgg["aggs"] = util.MapStr{
"time_buckets": timeAggs,
}
groupAgg["aggs"] = timeAggs
}
lastGroupAgg = groupAgg
}
@ -113,9 +114,7 @@ func (engine *Engine) GenerateQuery(rule *alerting.Rule, filterParam *alerting.F
util.GetUUID(): lastGroupAgg,
}
}else{
rootAggs = util.MapStr{
"time_buckets": timeAggs,
}
rootAggs = timeAggs
}
return util.MapStr{
@ -310,8 +309,8 @@ func (engine *Engine) generateTimeFilter(rule *alerting.Rule, filterParam *alert
if err != nil {
return nil, err
}
timeStart = time.Now().Add(-duration).Format(time.RFC3339Nano)
timeEnd = time.Now().Format(time.RFC3339Nano)
timeStart = time.Now().Add(-duration).UnixMilli() //.Format(time.RFC3339Nano)
timeEnd = time.Now().UnixMilli()
}
timeQuery := util.MapStr{
@ -397,16 +396,42 @@ func (engine *Engine) ExecuteQuery(rule *alerting.Rule, filterParam *alerting.Fi
queryResult.Nodata = true
}
if searchRes.StatusCode != 200 {
return nil, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body))
return queryResult, fmt.Errorf("search error: %s", string(searchRes.RawResult.Body))
}
queryResult.Raw = string(searchRes.RawResult.Body)
searchResult := map[string]interface{}{}
err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult)
if err != nil {
return nil, err
return queryResult, err
}
metricData := []alerting.MetricData{}
collectMetricData(searchResult["aggregations"], "", &metricData)
//将 rate 求导数据 除以 bucket size (单位 /s)
//statisticM := map[string] string{}
//for _, mi := range rule.Metrics.Items {
// statisticM[mi.Name] = mi.Statistic
//}
//var periodInterval = rule.Metrics.PeriodInterval
//if filterParam != nil && filterParam.BucketSize != "" {
// periodInterval = filterParam.BucketSize
//}
//interval, err := time.ParseDuration(periodInterval)
//if err != nil {
// log.Error(err)
//}
//for i, _ := range metricData {
// for k, d := range metricData[i].Data {
// if statisticM[k] == "rate" {
// for _, td := range d {
// if len(td) > 1 {
// if v, ok := td[1].(float64); ok {
// td[1] = v / interval.Seconds()
// }
// }
// }
// }
// }
//}
queryResult.MetricData = metricData
return queryResult, nil
}
@ -960,39 +985,39 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti
continue
}
//has filter
if k == "filter_agg" {
if filterM, ok := v.(map[string]interface{}); ok {
for fk, fv := range filterM {
if fk == "doc_count" {
continue
}
if vm, ok := fv.(map[string]interface{}); ok {
if metricVal, ok := vm["value"]; ok {
md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], metricVal})
}else{
//percentiles agg type
switch vm["values"].(type) {
case []interface{}:
for _, val := range vm["values"].([]interface{}) {
if valM, ok := val.(map[string]interface{}); ok {
md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], valM["value"]})
}
break
}
case map[string]interface{}:
for _, val := range vm["values"].(map[string]interface{}) {
md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], val})
break
}
}
}
}
}
}
continue
}
//if k == "filter_agg" {
// if filterM, ok := v.(map[string]interface{}); ok {
// for fk, fv := range filterM {
// if fk == "doc_count" {
// continue
// }
// if vm, ok := fv.(map[string]interface{}); ok {
// if metricVal, ok := vm["value"]; ok {
// md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], metricVal})
// }else{
// //percentiles agg type
// switch vm["values"].(type) {
// case []interface{}:
// for _, val := range vm["values"].([]interface{}) {
// if valM, ok := val.(map[string]interface{}); ok {
// md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], valM["value"]})
// }
// break
// }
// case map[string]interface{}:
// for _, val := range vm["values"].(map[string]interface{}) {
// md.Data[fk] = append(md.Data[fk], alerting.TimeMetricData{bkM["key"], val})
// break
// }
// }
//
// }
//
// }
// }
// }
// continue
//}
if vm, ok := v.(map[string]interface{}); ok {
if metricVal, ok := vm["value"]; ok {
md.Data[k] = append(md.Data[k], alerting.TimeMetricData{bkM["key"], metricVal})
@ -1038,7 +1063,12 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]alerti
if groupValues != "" {
newGroupValues = fmt.Sprintf("%s*%s", groupValues, currentGroup)
}
collectMetricData(bk, newGroupValues, metricData)
if filterAgg, ok := bkVal["filter_agg"].(map[string]interface{}); ok {
collectMetricData(filterAgg, newGroupValues, metricData)
}else{
collectMetricData(bk, newGroupValues, metricData)
}
}
}