[plugin][api] visualization/data supports top_hits

This commit is contained in:
Kassian Sun 2023-03-22 10:10:30 +08:00
parent 157fc43ee8
commit 17abd00e20
1 changed files with 104 additions and 66 deletions

View File

@ -6,17 +6,18 @@ package insight
import ( import (
"fmt" "fmt"
"strconv"
"strings"
"infini.sh/framework/core/elastic" "infini.sh/framework/core/elastic"
"infini.sh/framework/core/insight" "infini.sh/framework/core/insight"
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
"strconv"
"strings"
) )
func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{ func generateAgg(metricItem *insight.MetricItem, timeField string) map[string]interface{} {
var ( var (
aggType = "value_count" aggType = "value_count"
field = metricItem.Field field = metricItem.Field
) )
if field == "" || field == "*" { if field == "" || field == "*" {
field = "_id" field = "_id"
@ -33,13 +34,28 @@ func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{
isPipeline = true isPipeline = true
case "medium": // from es version 6.6 case "medium": // from es version 6.6
aggType = "median_absolute_deviation" aggType = "median_absolute_deviation"
case "p99", "p95","p90","p80","p50": case "p99", "p95", "p90", "p80", "p50":
aggType = "percentiles" aggType = "percentiles"
percentStr := strings.TrimPrefix(metricItem.Statistic, "p") percentStr := strings.TrimPrefix(metricItem.Statistic, "p")
percent, _ = strconv.ParseFloat(percentStr, 32) percent, _ = strconv.ParseFloat(percentStr, 32)
case "latest":
aggType = "top_hits"
} }
aggValue := util.MapStr{ aggValue := util.MapStr{}
"field": field, if aggType != "top_hits" {
aggValue["field"] = field
} else {
aggValue["_source"] = util.MapStr{
"includes": []string{field},
}
aggValue["sort"] = []util.MapStr{
util.MapStr{
timeField: util.MapStr{
"order": "desc",
},
},
}
aggValue["size"] = 1
} }
if aggType == "percentiles" { if aggType == "percentiles" {
aggValue["percents"] = []interface{}{percent} aggValue["percents"] = []interface{}{percent}
@ -49,7 +65,7 @@ func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{
aggType: aggValue, aggType: aggValue,
}, },
} }
if !isPipeline{ if !isPipeline {
return aggs return aggs
} }
pipelineAggID := util.GetUUID() pipelineAggID := util.GetUUID()
@ -66,18 +82,18 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
var timeBeforeGroup = metric.TimeBeforeGroup var timeBeforeGroup = metric.TimeBeforeGroup
basicAggs := util.MapStr{} basicAggs := util.MapStr{}
i := 0 i := 0
for _, metricItem := range metric.Items { for _, metricItem := range metric.Items {
if metricItem.Name == "" { if metricItem.Name == "" {
metricItem.Name = string('a' + i) metricItem.Name = string('a' + i)
} }
metricAggs := generateAgg(&metricItem) metricAggs := generateAgg(&metricItem, metric.TimeField)
if err := util.MergeFields(basicAggs, metricAggs, true); err != nil { if err := util.MergeFields(basicAggs, metricAggs, true); err != nil {
return nil, err return nil, err
} }
} }
verInfo := elastic.GetClient(metric.ClusterId).GetVersion() verInfo := elastic.GetClient(metric.ClusterId).GetVersion()
if verInfo.Number==""{ if verInfo.Number == "" {
panic("invalid version") panic("invalid version")
} }
@ -85,11 +101,11 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("get interval field error: %w", err) return nil, fmt.Errorf("get interval field error: %w", err)
} }
if metric.TimeField != "" && !timeBeforeGroup{ if metric.BucketSize != "" && !timeBeforeGroup {
basicAggs = util.MapStr{ basicAggs = util.MapStr{
"time_buckets": util.MapStr{ "time_buckets": util.MapStr{
"date_histogram": util.MapStr{ "date_histogram": util.MapStr{
"field": metric.TimeField, "field": metric.TimeField,
intervalField: metric.BucketSize, intervalField: metric.BucketSize,
}, },
"aggs": basicAggs, "aggs": basicAggs,
@ -97,14 +113,13 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
} }
} }
var rootAggs util.MapStr var rootAggs util.MapStr
groups := metric.Groups groups := metric.Groups
if grpLength := len(groups); grpLength > 0 { if grpLength := len(groups); grpLength > 0 {
var lastGroupAgg util.MapStr var lastGroupAgg util.MapStr
for i := grpLength-1; i>=0; i-- { for i := grpLength - 1; i >= 0; i-- {
limit := groups[i].Limit limit := groups[i].Limit
//top group 10 //top group 10
if limit <= 0 { if limit <= 0 {
@ -113,7 +128,7 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
groupAgg := util.MapStr{ groupAgg := util.MapStr{
"terms": util.MapStr{ "terms": util.MapStr{
"field": groups[i].Field, "field": groups[i].Field,
"size": limit, "size": limit,
}, },
} }
groupID := util.GetUUID() groupID := util.GetUUID()
@ -121,35 +136,35 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
groupAgg["aggs"] = util.MapStr{ groupAgg["aggs"] = util.MapStr{
groupID: lastGroupAgg, groupID: lastGroupAgg,
} }
}else{ } else {
groupAgg["aggs"] = basicAggs groupAgg["aggs"] = basicAggs
} }
lastGroupAgg = groupAgg lastGroupAgg = groupAgg
} }
if metric.TimeField == "" || (metric.TimeField != "" && !timeBeforeGroup) { if metric.BucketSize == "" || (metric.BucketSize != "" && !timeBeforeGroup) {
rootAggs = util.MapStr{ rootAggs = util.MapStr{
util.GetUUID(): lastGroupAgg, util.GetUUID(): lastGroupAgg,
} }
}else{ } else {
rootAggs = util.MapStr{ rootAggs = util.MapStr{
"time_buckets": util.MapStr{ "time_buckets": util.MapStr{
"date_histogram": util.MapStr{ "date_histogram": util.MapStr{
"field": metric.TimeField, "field": metric.TimeField,
intervalField: metric.BucketSize, intervalField: metric.BucketSize,
},
"aggs": util.MapStr{
util.GetUUID(): lastGroupAgg,
},
}, },
"aggs": util.MapStr{
util.GetUUID(): lastGroupAgg,
},
},
} }
} }
}else{ } else {
if metric.TimeField != "" && timeBeforeGroup{ if metric.BucketSize != "" && timeBeforeGroup {
basicAggs = util.MapStr{ basicAggs = util.MapStr{
"time_buckets": util.MapStr{ "time_buckets": util.MapStr{
"date_histogram": util.MapStr{ "date_histogram": util.MapStr{
"field": metric.TimeField, "field": metric.TimeField,
intervalField: metric.BucketSize, intervalField: metric.BucketSize,
}, },
"aggs": basicAggs, "aggs": basicAggs,
@ -163,7 +178,7 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
rootAggs = util.MapStr{ rootAggs = util.MapStr{
"filter_agg": util.MapStr{ "filter_agg": util.MapStr{
"filter": metric.Filter, "filter": metric.Filter,
"aggs": rootAggs, "aggs": rootAggs,
}, },
} }
} }
@ -178,34 +193,34 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
return queryDsl, nil return queryDsl, nil
} }
func CollectMetricData(agg interface{}, timeBeforeGroup bool) []insight.MetricData{ func CollectMetricData(agg interface{}, timeBeforeGroup bool) []insight.MetricData {
metricData := []insight.MetricData{} metricData := []insight.MetricData{}
if timeBeforeGroup { if timeBeforeGroup {
collectMetricDataOther(agg, "", &metricData, nil) collectMetricDataOther(agg, "", &metricData, nil)
}else{ } else {
collectMetricData(agg, "", &metricData) collectMetricData(agg, "", &metricData)
} }
return metricData return metricData
} }
//timeBeforeGroup => false // timeBeforeGroup => false
func collectMetricData(agg interface{}, groupValues string, metricData *[]insight.MetricData){ func collectMetricData(agg interface{}, groupValues string, metricData *[]insight.MetricData) {
if aggM, ok := agg.(map[string]interface{}); ok { if aggM, ok := agg.(map[string]interface{}); ok {
if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok { if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok {
if bks, ok := timeBks["buckets"].([]interface{}); ok { if bks, ok := timeBks["buckets"].([]interface{}); ok {
md := insight.MetricData{ md := insight.MetricData{
Data: map[string][]insight.MetricDataItem{}, Data: map[string][]insight.MetricDataItem{},
Group: groupValues, Group: groupValues,
} }
for _, bk := range bks { for _, bk := range bks {
if bkM, ok := bk.(map[string]interface{}); ok{ if bkM, ok := bk.(map[string]interface{}); ok {
for k, v := range bkM { for k, v := range bkM {
if k == "key" || k == "key_as_string" || k== "doc_count"{ if k == "key" || k == "key_as_string" || k == "doc_count" {
continue continue
} }
if vm, ok := v.(map[string]interface{}); ok && len(k) < 5{ if vm, ok := v.(map[string]interface{}); ok && len(k) < 5 {
collectMetricDataItem(k, vm, &md, bkM["key"]) collectMetricDataItem(k, vm, &md, bkM["key"])
} }
@ -213,22 +228,22 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]insigh
} }
} }
*metricData = append(*metricData,md) *metricData = append(*metricData, md)
} }
}else{ } else {
md := insight.MetricData{ md := insight.MetricData{
Data: map[string][]insight.MetricDataItem{}, Data: map[string][]insight.MetricDataItem{},
Group: groupValues, Group: groupValues,
} }
for k, v := range aggM { for k, v := range aggM {
if k == "key" || k== "doc_count"{ if k == "key" || k == "doc_count" {
continue continue
} }
if vm, ok := v.(map[string]interface{}); ok { if vm, ok := v.(map[string]interface{}); ok {
if bks, ok := vm["buckets"].([]interface{}); ok { if bks, ok := vm["buckets"].([]interface{}); ok {
for _, bk := range bks { for _, bk := range bks {
if bkVal, ok := bk.(map[string]interface{}); ok { if bkVal, ok := bk.(map[string]interface{}); ok {
var currentGroup = fmt.Sprintf("%v", bkVal["key"]) var currentGroup = fmt.Sprintf("%v", bkVal["key"])
newGroupValues := currentGroup newGroupValues := currentGroup
if groupValues != "" { if groupValues != "" {
@ -237,59 +252,59 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]insigh
collectMetricData(bk, newGroupValues, metricData) collectMetricData(bk, newGroupValues, metricData)
} }
} }
}else{ } else {
//non time series metric data //non time series metric data
if len(k) < 5 { if len(k) < 5 {
collectMetricDataItem(k, vm, &md, nil) collectMetricDataItem(k, vm, &md, nil)
} }
} }
} }
} }
if len(md.Data) > 0 { if len(md.Data) > 0 {
*metricData = append(*metricData,md) *metricData = append(*metricData, md)
} }
} }
} }
} }
//timeBeforeGroup => true // timeBeforeGroup => true
func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]insight.MetricData, timeKey interface{}){ func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]insight.MetricData, timeKey interface{}) {
if aggM, ok := agg.(map[string]interface{}); ok { if aggM, ok := agg.(map[string]interface{}); ok {
if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok { if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok {
if bks, ok := timeBks["buckets"].([]interface{}); ok { if bks, ok := timeBks["buckets"].([]interface{}); ok {
md := insight.MetricData{ md := insight.MetricData{
Data: map[string][]insight.MetricDataItem{}, Data: map[string][]insight.MetricDataItem{},
Group: groupValues, Group: groupValues,
} }
for _, bk := range bks { for _, bk := range bks {
if bkM, ok := bk.(map[string]interface{}); ok{ if bkM, ok := bk.(map[string]interface{}); ok {
for k, v := range bkM { for k, v := range bkM {
if k == "key" || k == "key_as_string" || k== "doc_count"{ if k == "key" || k == "key_as_string" || k == "doc_count" {
continue continue
} }
if vm, ok := v.(map[string]interface{}); ok { if vm, ok := v.(map[string]interface{}); ok {
if vm["buckets"] != nil { if vm["buckets"] != nil {
collectMetricDataOther(vm, groupValues, metricData, bkM["key"]) collectMetricDataOther(vm, groupValues, metricData, bkM["key"])
}else{ } else {
collectMetricDataItem(k, vm, &md, bkM["key"]) collectMetricDataItem(k, vm, &md, bkM["key"])
} }
} }
} }
} }
} }
if len(md.Data) > 0 { if len(md.Data) > 0 {
*metricData = append(*metricData,md) *metricData = append(*metricData, md)
} }
} }
}else{ } else {
md := insight.MetricData{ md := insight.MetricData{
Data: map[string][]insight.MetricDataItem{}, Data: map[string][]insight.MetricDataItem{},
Group: groupValues, Group: groupValues,
} }
if bks, ok := aggM["buckets"].([]interface{}); ok { if bks, ok := aggM["buckets"].([]interface{}); ok {
for _, bk := range bks { for _, bk := range bks {
if bkVal, ok := bk.(map[string]interface{}); ok { if bkVal, ok := bk.(map[string]interface{}); ok {
currentGroup := bkVal["key"].(string) currentGroup := bkVal["key"].(string)
newGroupValues := currentGroup newGroupValues := currentGroup
if groupValues != "" { if groupValues != "" {
@ -298,13 +313,13 @@ func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]i
collectMetricDataOther(bk, newGroupValues, metricData, timeKey) collectMetricDataOther(bk, newGroupValues, metricData, timeKey)
} }
} }
}else{ } else {
//non time series metric data //non time series metric data
for k, v := range aggM { for k, v := range aggM {
if vm, ok := v.(map[string]interface{}); ok { if vm, ok := v.(map[string]interface{}); ok {
if vm["buckets"] != nil { if vm["buckets"] != nil {
collectMetricDataOther(vm, groupValues, metricData, timeKey) collectMetricDataOther(vm, groupValues, metricData, timeKey)
}else{ } else {
collectMetricDataItem(k, vm, &md, timeKey) collectMetricDataItem(k, vm, &md, timeKey)
} }
} }
@ -312,19 +327,34 @@ func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]i
} }
if len(md.Data) > 0 { if len(md.Data) > 0 {
*metricData = append(*metricData,md) *metricData = append(*metricData, md)
} }
} }
} }
} }
func collectMetricDataItem(key string, vm map[string]interface{}, metricData *insight.MetricData, timeKey interface{}) { func collectMetricDataItem(key string, vm map[string]interface{}, metricData *insight.MetricData, timeKey interface{}) {
if val, ok := vm["value"]; ok { if val, ok := vm["value"]; ok {
metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{
Value: val, Value: val,
Timestamp: timeKey, Timestamp: timeKey,
}) })
} else if hits, ok := vm["hits"]; ok {
if hits, ok := hits.(map[string]interface{}); ok {
// statistic: top_hits
if hits, ok := hits["hits"]; ok {
if hits, ok := hits.([]interface{}); ok {
for _, hit := range hits {
if hit, ok := hit.(map[string]interface{}); ok {
metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{
Value: extractSomeValue(hit["_source"]),
Timestamp: timeKey,
})
}
}
}
}
}
} else { } else {
//percentiles agg type //percentiles agg type
switch vm["values"].(type) { switch vm["values"].(type) {
@ -332,7 +362,7 @@ func collectMetricDataItem(key string, vm map[string]interface{}, metricData *in
for _, val := range vm["values"].([]interface{}) { for _, val := range vm["values"].([]interface{}) {
if valM, ok := val.(map[string]interface{}); ok { if valM, ok := val.(map[string]interface{}); ok {
metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{
Value: valM["value"], Value: valM["value"],
Timestamp: timeKey, Timestamp: timeKey,
}) })
} }
@ -341,7 +371,7 @@ func collectMetricDataItem(key string, vm map[string]interface{}, metricData *in
case map[string]interface{}: case map[string]interface{}:
for _, val = range vm["values"].(map[string]interface{}) { for _, val = range vm["values"].(map[string]interface{}) {
metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{
Value: val, Value: val,
Timestamp: timeKey, Timestamp: timeKey,
}) })
break break
@ -350,3 +380,11 @@ func collectMetricDataItem(key string, vm map[string]interface{}, metricData *in
} }
} }
func extractSomeValue(v interface{}) interface{} {
if vm, ok := v.(map[string]interface{}); ok {
for _, v := range vm {
return extractSomeValue(v)
}
}
return v
}