Merge pull request '[plugin][api] visualization/data supports latest statistics' (#36) from feature/top_hits into master
This commit is contained in:
commit
cd56a4ff24
|
@ -6,17 +6,18 @@ package insight
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"infini.sh/framework/core/elastic"
|
||||
"infini.sh/framework/core/insight"
|
||||
"infini.sh/framework/core/util"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{
|
||||
func generateAgg(metricItem *insight.MetricItem, timeField string) map[string]interface{} {
|
||||
var (
|
||||
aggType = "value_count"
|
||||
field = metricItem.Field
|
||||
field = metricItem.Field
|
||||
)
|
||||
if field == "" || field == "*" {
|
||||
field = "_id"
|
||||
|
@ -33,13 +34,28 @@ func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{
|
|||
isPipeline = true
|
||||
case "medium": // from es version 6.6
|
||||
aggType = "median_absolute_deviation"
|
||||
case "p99", "p95","p90","p80","p50":
|
||||
case "p99", "p95", "p90", "p80", "p50":
|
||||
aggType = "percentiles"
|
||||
percentStr := strings.TrimPrefix(metricItem.Statistic, "p")
|
||||
percent, _ = strconv.ParseFloat(percentStr, 32)
|
||||
case "latest":
|
||||
aggType = "top_hits"
|
||||
}
|
||||
aggValue := util.MapStr{
|
||||
"field": field,
|
||||
aggValue := util.MapStr{}
|
||||
if aggType != "top_hits" {
|
||||
aggValue["field"] = field
|
||||
} else {
|
||||
aggValue["_source"] = util.MapStr{
|
||||
"includes": []string{field},
|
||||
}
|
||||
aggValue["sort"] = []util.MapStr{
|
||||
util.MapStr{
|
||||
timeField: util.MapStr{
|
||||
"order": "desc",
|
||||
},
|
||||
},
|
||||
}
|
||||
aggValue["size"] = 1
|
||||
}
|
||||
if aggType == "percentiles" {
|
||||
aggValue["percents"] = []interface{}{percent}
|
||||
|
@ -49,7 +65,7 @@ func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{
|
|||
aggType: aggValue,
|
||||
},
|
||||
}
|
||||
if !isPipeline{
|
||||
if !isPipeline {
|
||||
return aggs
|
||||
}
|
||||
pipelineAggID := util.GetUUID()
|
||||
|
@ -66,18 +82,18 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
|
|||
var timeBeforeGroup = metric.TimeBeforeGroup
|
||||
basicAggs := util.MapStr{}
|
||||
i := 0
|
||||
for _, metricItem := range metric.Items {
|
||||
for _, metricItem := range metric.Items {
|
||||
if metricItem.Name == "" {
|
||||
metricItem.Name = string('a' + i)
|
||||
}
|
||||
metricAggs := generateAgg(&metricItem)
|
||||
metricAggs := generateAgg(&metricItem, metric.TimeField)
|
||||
if err := util.MergeFields(basicAggs, metricAggs, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
verInfo := elastic.GetClient(metric.ClusterId).GetVersion()
|
||||
|
||||
if verInfo.Number==""{
|
||||
if verInfo.Number == "" {
|
||||
panic("invalid version")
|
||||
}
|
||||
|
||||
|
@ -85,11 +101,11 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("get interval field error: %w", err)
|
||||
}
|
||||
if metric.TimeField != "" && !timeBeforeGroup{
|
||||
if metric.BucketSize != "" && !timeBeforeGroup {
|
||||
basicAggs = util.MapStr{
|
||||
"time_buckets": util.MapStr{
|
||||
"date_histogram": util.MapStr{
|
||||
"field": metric.TimeField,
|
||||
"field": metric.TimeField,
|
||||
intervalField: metric.BucketSize,
|
||||
},
|
||||
"aggs": basicAggs,
|
||||
|
@ -97,14 +113,13 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
var rootAggs util.MapStr
|
||||
groups := metric.Groups
|
||||
|
||||
if grpLength := len(groups); grpLength > 0 {
|
||||
var lastGroupAgg util.MapStr
|
||||
|
||||
for i := grpLength-1; i>=0; i-- {
|
||||
for i := grpLength - 1; i >= 0; i-- {
|
||||
limit := groups[i].Limit
|
||||
//top group 10
|
||||
if limit <= 0 {
|
||||
|
@ -113,7 +128,7 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
|
|||
groupAgg := util.MapStr{
|
||||
"terms": util.MapStr{
|
||||
"field": groups[i].Field,
|
||||
"size": limit,
|
||||
"size": limit,
|
||||
},
|
||||
}
|
||||
groupID := util.GetUUID()
|
||||
|
@ -121,35 +136,35 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
|
|||
groupAgg["aggs"] = util.MapStr{
|
||||
groupID: lastGroupAgg,
|
||||
}
|
||||
}else{
|
||||
groupAgg["aggs"] = basicAggs
|
||||
} else {
|
||||
groupAgg["aggs"] = basicAggs
|
||||
}
|
||||
lastGroupAgg = groupAgg
|
||||
}
|
||||
if metric.TimeField == "" || (metric.TimeField != "" && !timeBeforeGroup) {
|
||||
if metric.BucketSize == "" || (metric.BucketSize != "" && !timeBeforeGroup) {
|
||||
rootAggs = util.MapStr{
|
||||
util.GetUUID(): lastGroupAgg,
|
||||
}
|
||||
}else{
|
||||
} else {
|
||||
rootAggs = util.MapStr{
|
||||
"time_buckets": util.MapStr{
|
||||
"date_histogram": util.MapStr{
|
||||
"field": metric.TimeField,
|
||||
intervalField: metric.BucketSize,
|
||||
},
|
||||
"aggs": util.MapStr{
|
||||
util.GetUUID(): lastGroupAgg,
|
||||
},
|
||||
"time_buckets": util.MapStr{
|
||||
"date_histogram": util.MapStr{
|
||||
"field": metric.TimeField,
|
||||
intervalField: metric.BucketSize,
|
||||
},
|
||||
"aggs": util.MapStr{
|
||||
util.GetUUID(): lastGroupAgg,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
}else{
|
||||
if metric.TimeField != "" && timeBeforeGroup{
|
||||
} else {
|
||||
if metric.BucketSize != "" && timeBeforeGroup {
|
||||
basicAggs = util.MapStr{
|
||||
"time_buckets": util.MapStr{
|
||||
"date_histogram": util.MapStr{
|
||||
"field": metric.TimeField,
|
||||
"field": metric.TimeField,
|
||||
intervalField: metric.BucketSize,
|
||||
},
|
||||
"aggs": basicAggs,
|
||||
|
@ -163,7 +178,7 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
|
|||
rootAggs = util.MapStr{
|
||||
"filter_agg": util.MapStr{
|
||||
"filter": metric.Filter,
|
||||
"aggs": rootAggs,
|
||||
"aggs": rootAggs,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -178,34 +193,34 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) {
|
|||
return queryDsl, nil
|
||||
}
|
||||
|
||||
func CollectMetricData(agg interface{}, timeBeforeGroup bool) []insight.MetricData{
|
||||
func CollectMetricData(agg interface{}, timeBeforeGroup bool) []insight.MetricData {
|
||||
metricData := []insight.MetricData{}
|
||||
if timeBeforeGroup {
|
||||
collectMetricDataOther(agg, "", &metricData, nil)
|
||||
}else{
|
||||
} else {
|
||||
collectMetricData(agg, "", &metricData)
|
||||
}
|
||||
return metricData
|
||||
}
|
||||
|
||||
//timeBeforeGroup => false
|
||||
func collectMetricData(agg interface{}, groupValues string, metricData *[]insight.MetricData){
|
||||
// timeBeforeGroup => false
|
||||
func collectMetricData(agg interface{}, groupValues string, metricData *[]insight.MetricData) {
|
||||
if aggM, ok := agg.(map[string]interface{}); ok {
|
||||
if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok {
|
||||
if bks, ok := timeBks["buckets"].([]interface{}); ok {
|
||||
md := insight.MetricData{
|
||||
Data: map[string][]insight.MetricDataItem{},
|
||||
Data: map[string][]insight.MetricDataItem{},
|
||||
Group: groupValues,
|
||||
}
|
||||
for _, bk := range bks {
|
||||
if bkM, ok := bk.(map[string]interface{}); ok{
|
||||
if bkM, ok := bk.(map[string]interface{}); ok {
|
||||
|
||||
for k, v := range bkM {
|
||||
if k == "key" || k == "key_as_string" || k== "doc_count"{
|
||||
if k == "key" || k == "key_as_string" || k == "doc_count" {
|
||||
continue
|
||||
}
|
||||
|
||||
if vm, ok := v.(map[string]interface{}); ok && len(k) < 5{
|
||||
if vm, ok := v.(map[string]interface{}); ok && len(k) < 5 {
|
||||
collectMetricDataItem(k, vm, &md, bkM["key"])
|
||||
}
|
||||
|
||||
|
@ -213,22 +228,22 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]insigh
|
|||
}
|
||||
|
||||
}
|
||||
*metricData = append(*metricData,md)
|
||||
*metricData = append(*metricData, md)
|
||||
}
|
||||
|
||||
}else{
|
||||
} else {
|
||||
md := insight.MetricData{
|
||||
Data: map[string][]insight.MetricDataItem{},
|
||||
Data: map[string][]insight.MetricDataItem{},
|
||||
Group: groupValues,
|
||||
}
|
||||
for k, v := range aggM {
|
||||
if k == "key" || k== "doc_count"{
|
||||
if k == "key" || k == "doc_count" {
|
||||
continue
|
||||
}
|
||||
if vm, ok := v.(map[string]interface{}); ok {
|
||||
if bks, ok := vm["buckets"].([]interface{}); ok {
|
||||
for _, bk := range bks {
|
||||
if bkVal, ok := bk.(map[string]interface{}); ok {
|
||||
if bkVal, ok := bk.(map[string]interface{}); ok {
|
||||
var currentGroup = fmt.Sprintf("%v", bkVal["key"])
|
||||
newGroupValues := currentGroup
|
||||
if groupValues != "" {
|
||||
|
@ -237,59 +252,59 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]insigh
|
|||
collectMetricData(bk, newGroupValues, metricData)
|
||||
}
|
||||
}
|
||||
}else{
|
||||
} else {
|
||||
//non time series metric data
|
||||
if len(k) < 5 {
|
||||
if len(k) < 5 {
|
||||
collectMetricDataItem(k, vm, &md, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(md.Data) > 0 {
|
||||
*metricData = append(*metricData,md)
|
||||
*metricData = append(*metricData, md)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//timeBeforeGroup => true
|
||||
func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]insight.MetricData, timeKey interface{}){
|
||||
// timeBeforeGroup => true
|
||||
func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]insight.MetricData, timeKey interface{}) {
|
||||
if aggM, ok := agg.(map[string]interface{}); ok {
|
||||
if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok {
|
||||
if bks, ok := timeBks["buckets"].([]interface{}); ok {
|
||||
md := insight.MetricData{
|
||||
Data: map[string][]insight.MetricDataItem{},
|
||||
Data: map[string][]insight.MetricDataItem{},
|
||||
Group: groupValues,
|
||||
}
|
||||
for _, bk := range bks {
|
||||
if bkM, ok := bk.(map[string]interface{}); ok{
|
||||
if bkM, ok := bk.(map[string]interface{}); ok {
|
||||
for k, v := range bkM {
|
||||
if k == "key" || k == "key_as_string" || k== "doc_count"{
|
||||
if k == "key" || k == "key_as_string" || k == "doc_count" {
|
||||
continue
|
||||
}
|
||||
if vm, ok := v.(map[string]interface{}); ok {
|
||||
if vm["buckets"] != nil {
|
||||
collectMetricDataOther(vm, groupValues, metricData, bkM["key"])
|
||||
}else{
|
||||
collectMetricDataItem(k, vm, &md, bkM["key"])
|
||||
} else {
|
||||
collectMetricDataItem(k, vm, &md, bkM["key"])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(md.Data) > 0 {
|
||||
*metricData = append(*metricData,md)
|
||||
*metricData = append(*metricData, md)
|
||||
}
|
||||
}
|
||||
}else{
|
||||
} else {
|
||||
md := insight.MetricData{
|
||||
Data: map[string][]insight.MetricDataItem{},
|
||||
Data: map[string][]insight.MetricDataItem{},
|
||||
Group: groupValues,
|
||||
}
|
||||
|
||||
if bks, ok := aggM["buckets"].([]interface{}); ok {
|
||||
for _, bk := range bks {
|
||||
if bkVal, ok := bk.(map[string]interface{}); ok {
|
||||
if bkVal, ok := bk.(map[string]interface{}); ok {
|
||||
currentGroup := bkVal["key"].(string)
|
||||
newGroupValues := currentGroup
|
||||
if groupValues != "" {
|
||||
|
@ -298,13 +313,13 @@ func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]i
|
|||
collectMetricDataOther(bk, newGroupValues, metricData, timeKey)
|
||||
}
|
||||
}
|
||||
}else{
|
||||
} else {
|
||||
//non time series metric data
|
||||
for k, v := range aggM {
|
||||
if vm, ok := v.(map[string]interface{}); ok {
|
||||
if vm["buckets"] != nil {
|
||||
collectMetricDataOther(vm, groupValues, metricData, timeKey)
|
||||
}else{
|
||||
} else {
|
||||
collectMetricDataItem(k, vm, &md, timeKey)
|
||||
}
|
||||
}
|
||||
|
@ -312,19 +327,34 @@ func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]i
|
|||
}
|
||||
|
||||
if len(md.Data) > 0 {
|
||||
*metricData = append(*metricData,md)
|
||||
*metricData = append(*metricData, md)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func collectMetricDataItem(key string, vm map[string]interface{}, metricData *insight.MetricData, timeKey interface{}) {
|
||||
if val, ok := vm["value"]; ok {
|
||||
metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{
|
||||
Value: val,
|
||||
Value: val,
|
||||
Timestamp: timeKey,
|
||||
})
|
||||
} else if hits, ok := vm["hits"]; ok {
|
||||
if hits, ok := hits.(map[string]interface{}); ok {
|
||||
// statistic: top_hits
|
||||
if hits, ok := hits["hits"]; ok {
|
||||
if hits, ok := hits.([]interface{}); ok {
|
||||
for _, hit := range hits {
|
||||
if hit, ok := hit.(map[string]interface{}); ok {
|
||||
metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{
|
||||
Value: extractSomeValue(hit["_source"]),
|
||||
Timestamp: timeKey,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//percentiles agg type
|
||||
switch vm["values"].(type) {
|
||||
|
@ -332,7 +362,7 @@ func collectMetricDataItem(key string, vm map[string]interface{}, metricData *in
|
|||
for _, val := range vm["values"].([]interface{}) {
|
||||
if valM, ok := val.(map[string]interface{}); ok {
|
||||
metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{
|
||||
Value: valM["value"],
|
||||
Value: valM["value"],
|
||||
Timestamp: timeKey,
|
||||
})
|
||||
}
|
||||
|
@ -341,7 +371,7 @@ func collectMetricDataItem(key string, vm map[string]interface{}, metricData *in
|
|||
case map[string]interface{}:
|
||||
for _, val = range vm["values"].(map[string]interface{}) {
|
||||
metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{
|
||||
Value: val,
|
||||
Value: val,
|
||||
Timestamp: timeKey,
|
||||
})
|
||||
break
|
||||
|
@ -350,3 +380,12 @@ func collectMetricDataItem(key string, vm map[string]interface{}, metricData *in
|
|||
}
|
||||
}
|
||||
|
||||
func extractSomeValue(v interface{}) interface{} {
|
||||
if vm, ok := v.(map[string]interface{}); ok {
|
||||
for _, v := range vm {
|
||||
return extractSomeValue(v)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue