diff --git a/plugin/api/insight/metric_util.go b/plugin/api/insight/metric_util.go index d44ff37a..367c6b00 100644 --- a/plugin/api/insight/metric_util.go +++ b/plugin/api/insight/metric_util.go @@ -6,17 +6,18 @@ package insight import ( "fmt" + "strconv" + "strings" + "infini.sh/framework/core/elastic" "infini.sh/framework/core/insight" "infini.sh/framework/core/util" - "strconv" - "strings" ) -func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{ +func generateAgg(metricItem *insight.MetricItem, timeField string) map[string]interface{} { var ( aggType = "value_count" - field = metricItem.Field + field = metricItem.Field ) if field == "" || field == "*" { field = "_id" @@ -33,13 +34,28 @@ func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{ isPipeline = true case "medium": // from es version 6.6 aggType = "median_absolute_deviation" - case "p99", "p95","p90","p80","p50": + case "p99", "p95", "p90", "p80", "p50": aggType = "percentiles" percentStr := strings.TrimPrefix(metricItem.Statistic, "p") percent, _ = strconv.ParseFloat(percentStr, 32) + case "latest": + aggType = "top_hits" } - aggValue := util.MapStr{ - "field": field, + aggValue := util.MapStr{} + if aggType != "top_hits" { + aggValue["field"] = field + } else { + aggValue["_source"] = util.MapStr{ + "includes": []string{field}, + } + aggValue["sort"] = []util.MapStr{ + util.MapStr{ + timeField: util.MapStr{ + "order": "desc", + }, + }, + } + aggValue["size"] = 1 } if aggType == "percentiles" { aggValue["percents"] = []interface{}{percent} @@ -49,7 +65,7 @@ func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{ aggType: aggValue, }, } - if !isPipeline{ + if !isPipeline { return aggs } pipelineAggID := util.GetUUID() @@ -66,18 +82,18 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) { var timeBeforeGroup = metric.TimeBeforeGroup basicAggs := util.MapStr{} i := 0 - for _, metricItem := range metric.Items { + for _, metricItem := range metric.Items { if metricItem.Name == "" { metricItem.Name = string('a' + i) } - metricAggs := generateAgg(&metricItem) + metricAggs := generateAgg(&metricItem, metric.TimeField) if err := util.MergeFields(basicAggs, metricAggs, true); err != nil { return nil, err } } verInfo := elastic.GetClient(metric.ClusterId).GetVersion() - if verInfo.Number==""{ + if verInfo.Number == "" { panic("invalid version") } @@ -85,11 +101,11 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) { if err != nil { return nil, fmt.Errorf("get interval field error: %w", err) } - if metric.TimeField != "" && !timeBeforeGroup{ + if metric.BucketSize != "" && !timeBeforeGroup { basicAggs = util.MapStr{ "time_buckets": util.MapStr{ "date_histogram": util.MapStr{ - "field": metric.TimeField, + "field": metric.TimeField, intervalField: metric.BucketSize, }, "aggs": basicAggs, @@ -97,14 +113,13 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) { } } - var rootAggs util.MapStr groups := metric.Groups if grpLength := len(groups); grpLength > 0 { var lastGroupAgg util.MapStr - for i := grpLength-1; i>=0; i-- { + for i := grpLength - 1; i >= 0; i-- { limit := groups[i].Limit //top group 10 if limit <= 0 { @@ -113,7 +128,7 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) { groupAgg := util.MapStr{ "terms": util.MapStr{ "field": groups[i].Field, - "size": limit, + "size": limit, }, } groupID := util.GetUUID() @@ -121,35 +136,35 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) { groupAgg["aggs"] = util.MapStr{ groupID: lastGroupAgg, } - }else{ - groupAgg["aggs"] = basicAggs + } else { + groupAgg["aggs"] = basicAggs } lastGroupAgg = groupAgg } - if metric.TimeField == "" || (metric.TimeField != "" && !timeBeforeGroup) { + if metric.BucketSize == "" || (metric.BucketSize != "" && !timeBeforeGroup) { rootAggs = util.MapStr{ util.GetUUID(): lastGroupAgg, } - }else{ + } else { rootAggs = util.MapStr{ - "time_buckets": util.MapStr{ - "date_histogram": util.MapStr{ - "field": metric.TimeField, - intervalField: metric.BucketSize, - }, - "aggs": util.MapStr{ - util.GetUUID(): lastGroupAgg, - }, + "time_buckets": util.MapStr{ + "date_histogram": util.MapStr{ + "field": metric.TimeField, + intervalField: metric.BucketSize, }, + "aggs": util.MapStr{ + util.GetUUID(): lastGroupAgg, + }, + }, } } - }else{ - if metric.TimeField != "" && timeBeforeGroup{ + } else { + if metric.BucketSize != "" && timeBeforeGroup { basicAggs = util.MapStr{ "time_buckets": util.MapStr{ "date_histogram": util.MapStr{ - "field": metric.TimeField, + "field": metric.TimeField, intervalField: metric.BucketSize, }, "aggs": basicAggs, @@ -163,7 +178,7 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) { rootAggs = util.MapStr{ "filter_agg": util.MapStr{ "filter": metric.Filter, - "aggs": rootAggs, + "aggs": rootAggs, }, } } @@ -178,34 +193,34 @@ func GenerateQuery(metric *insight.Metric) (interface{}, error) { return queryDsl, nil } -func CollectMetricData(agg interface{}, timeBeforeGroup bool) []insight.MetricData{ +func CollectMetricData(agg interface{}, timeBeforeGroup bool) []insight.MetricData { metricData := []insight.MetricData{} if timeBeforeGroup { collectMetricDataOther(agg, "", &metricData, nil) - }else{ + } else { collectMetricData(agg, "", &metricData) } return metricData } -//timeBeforeGroup => false -func collectMetricData(agg interface{}, groupValues string, metricData *[]insight.MetricData){ +// timeBeforeGroup => false +func collectMetricData(agg interface{}, groupValues string, metricData *[]insight.MetricData) { if aggM, ok := agg.(map[string]interface{}); ok { if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok { if bks, ok := timeBks["buckets"].([]interface{}); ok { md := insight.MetricData{ - Data: map[string][]insight.MetricDataItem{}, + Data: map[string][]insight.MetricDataItem{}, Group: groupValues, } for _, bk := range bks { - if bkM, ok := bk.(map[string]interface{}); ok{ + if bkM, ok := bk.(map[string]interface{}); ok { for k, v := range bkM { - if k == "key" || k == "key_as_string" || k== "doc_count"{ + if k == "key" || k == "key_as_string" || k == "doc_count" { continue } - if vm, ok := v.(map[string]interface{}); ok && len(k) < 5{ + if vm, ok := v.(map[string]interface{}); ok && len(k) < 5 { collectMetricDataItem(k, vm, &md, bkM["key"]) } @@ -213,22 +228,22 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]insigh } } - *metricData = append(*metricData,md) + *metricData = append(*metricData, md) } - }else{ + } else { md := insight.MetricData{ - Data: map[string][]insight.MetricDataItem{}, + Data: map[string][]insight.MetricDataItem{}, Group: groupValues, } for k, v := range aggM { - if k == "key" || k== "doc_count"{ + if k == "key" || k == "doc_count" { continue } if vm, ok := v.(map[string]interface{}); ok { if bks, ok := vm["buckets"].([]interface{}); ok { for _, bk := range bks { - if bkVal, ok := bk.(map[string]interface{}); ok { + if bkVal, ok := bk.(map[string]interface{}); ok { var currentGroup = fmt.Sprintf("%v", bkVal["key"]) newGroupValues := currentGroup if groupValues != "" { @@ -237,59 +252,59 @@ func collectMetricData(agg interface{}, groupValues string, metricData *[]insigh collectMetricData(bk, newGroupValues, metricData) } } - }else{ + } else { //non time series metric data - if len(k) < 5 { + if len(k) < 5 { collectMetricDataItem(k, vm, &md, nil) } } } } if len(md.Data) > 0 { - *metricData = append(*metricData,md) + *metricData = append(*metricData, md) } } } } -//timeBeforeGroup => true -func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]insight.MetricData, timeKey interface{}){ +// timeBeforeGroup => true +func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]insight.MetricData, timeKey interface{}) { if aggM, ok := agg.(map[string]interface{}); ok { if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok { if bks, ok := timeBks["buckets"].([]interface{}); ok { md := insight.MetricData{ - Data: map[string][]insight.MetricDataItem{}, + Data: map[string][]insight.MetricDataItem{}, Group: groupValues, } for _, bk := range bks { - if bkM, ok := bk.(map[string]interface{}); ok{ + if bkM, ok := bk.(map[string]interface{}); ok { for k, v := range bkM { - if k == "key" || k == "key_as_string" || k== "doc_count"{ + if k == "key" || k == "key_as_string" || k == "doc_count" { continue } if vm, ok := v.(map[string]interface{}); ok { if vm["buckets"] != nil { collectMetricDataOther(vm, groupValues, metricData, bkM["key"]) - }else{ - collectMetricDataItem(k, vm, &md, bkM["key"]) + } else { + collectMetricDataItem(k, vm, &md, bkM["key"]) } } } } } if len(md.Data) > 0 { - *metricData = append(*metricData,md) + *metricData = append(*metricData, md) } } - }else{ + } else { md := insight.MetricData{ - Data: map[string][]insight.MetricDataItem{}, + Data: map[string][]insight.MetricDataItem{}, Group: groupValues, } if bks, ok := aggM["buckets"].([]interface{}); ok { for _, bk := range bks { - if bkVal, ok := bk.(map[string]interface{}); ok { + if bkVal, ok := bk.(map[string]interface{}); ok { currentGroup := bkVal["key"].(string) newGroupValues := currentGroup if groupValues != "" { @@ -298,13 +313,13 @@ func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]i collectMetricDataOther(bk, newGroupValues, metricData, timeKey) } } - }else{ + } else { //non time series metric data for k, v := range aggM { if vm, ok := v.(map[string]interface{}); ok { if vm["buckets"] != nil { collectMetricDataOther(vm, groupValues, metricData, timeKey) - }else{ + } else { collectMetricDataItem(k, vm, &md, timeKey) } } @@ -312,19 +327,34 @@ func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]i } if len(md.Data) > 0 { - *metricData = append(*metricData,md) + *metricData = append(*metricData, md) } } } } - func collectMetricDataItem(key string, vm map[string]interface{}, metricData *insight.MetricData, timeKey interface{}) { if val, ok := vm["value"]; ok { metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ - Value: val, + Value: val, Timestamp: timeKey, }) + } else if hits, ok := vm["hits"]; ok { + if hits, ok := hits.(map[string]interface{}); ok { + // statistic: top_hits + if hits, ok := hits["hits"]; ok { + if hits, ok := hits.([]interface{}); ok { + for _, hit := range hits { + if hit, ok := hit.(map[string]interface{}); ok { + metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ + Value: extractSomeValue(hit["_source"]), + Timestamp: timeKey, + }) + } + } + } + } + } } else { //percentiles agg type switch vm["values"].(type) { @@ -332,7 +362,7 @@ func collectMetricDataItem(key string, vm map[string]interface{}, metricData *in for _, val := range vm["values"].([]interface{}) { if valM, ok := val.(map[string]interface{}); ok { metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ - Value: valM["value"], + Value: valM["value"], Timestamp: timeKey, }) } @@ -341,7 +371,7 @@ func collectMetricDataItem(key string, vm map[string]interface{}, metricData *in case map[string]interface{}: for _, val = range vm["values"].(map[string]interface{}) { metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ - Value: val, + Value: val, Timestamp: timeKey, }) break @@ -350,3 +380,11 @@ func collectMetricDataItem(key string, vm map[string]interface{}, metricData *in } } +func extractSomeValue(v interface{}) interface{} { + if vm, ok := v.(map[string]interface{}); ok { + for _, v := range vm { + return extractSomeValue(v) + } + } + return v +}