From 6b5796088259a275f9fbe65f02f69771838b90ac Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 26 Jul 2022 18:49:05 +0800 Subject: [PATCH] add insight api --- config/permission.json | 10 +- main.go | 5 +- plugin/api/insight/api.go | 30 +++ plugin/api/insight/dashboard.go | 180 ++++++++++++++ plugin/api/insight/metadata.go | 355 ++++++++++++++++++++++++++++ plugin/api/insight/metric_util.go | 347 +++++++++++++++++++++++++++ plugin/api/insight/visualization.go | 163 +++++++++++++ 7 files changed, 1086 insertions(+), 4 deletions(-) create mode 100644 plugin/api/insight/api.go create mode 100644 plugin/api/insight/dashboard.go create mode 100644 plugin/api/insight/metadata.go create mode 100644 plugin/api/insight/metric_util.go create mode 100644 plugin/api/insight/visualization.go diff --git a/config/permission.json b/config/permission.json index e4489e2e..613c7f51 100644 --- a/config/permission.json +++ b/config/permission.json @@ -204,7 +204,7 @@ "path": "/:index_name/_close" }, {"name": "indices.get_field_mapping", "methods": ["get"], - "path": "/:index_name/_mapping/field" + "path": "/:index_name/_mapping/:fields" }, {"name": "indices.delete_alias", "methods": ["delete"], "path": "/:index_name/_alias/:alias" @@ -354,6 +354,9 @@ {"name": "reindex.*", "methods": ["*"], "path": "/_reindex/*" }, + {"name": "reindex", "methods": ["post"], + "path": "/_reindex" + }, {"name": "reindex.rethrottle", "methods": ["post"], "path": "/_reindex/:rid/_rethrottle" } @@ -363,8 +366,11 @@ {"name": "render_search_template.*", "methods": ["*"], "path": "/_render/template" }, - {"name": "render_search_template.create", "methods": ["post"], + {"name": "render_search_template", "methods": ["post", "get"], "path": "/_render/template" + }, + {"name": "render_search_template_by_id", "methods": ["post", "get"], + "path": "/_render/template/:tid" } ], "scripts": [ diff --git a/main.go b/main.go index 8dc19083..e901cdff 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "infini.sh/framework" "infini.sh/framework/core/elastic" "infini.sh/framework/core/env" + "infini.sh/framework/core/insight" _ "infini.sh/framework/core/log" "infini.sh/framework/core/module" "infini.sh/framework/core/orm" @@ -112,8 +113,8 @@ func main() { orm.RegisterSchemaWithIndexName(alerting.Alert{}, "alert-history") orm.RegisterSchemaWithIndexName(alerting.AlertMessage{}, "alert-message") orm.RegisterSchemaWithIndexName(alerting.Channel{}, "channel") - //orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") - //orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") + orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") + orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") api.RegisterSchema() go func() { diff --git a/plugin/api/insight/api.go b/plugin/api/insight/api.go new file mode 100644 index 00000000..01f00960 --- /dev/null +++ b/plugin/api/insight/api.go @@ -0,0 +1,30 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package insight + +import "infini.sh/framework/core/api" + +type InsightAPI struct { + api.Handler +} + +func init() { + insight := InsightAPI{} + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/metadata", insight.HandleGetMetadata) + api.HandleAPIMethod(api.POST, "/elasticsearch/:id/visualization/data", insight.HandleGetMetricData) + + api.HandleAPIMethod(api.GET, "/insight/visualization/:visualization_id", insight.getVisualization) + api.HandleAPIMethod(api.POST, "/insight/visualization", insight.createVisualization) + api.HandleAPIMethod(api.PUT, "/insight/visualization/:visualization_id", insight.updateVisualization) + api.HandleAPIMethod(api.DELETE, "/insight/visualization/:visualization_id", insight.deleteVisualization) + api.HandleAPIMethod(api.GET, "/insight/visualization/_search", insight.searchVisualization) + + api.HandleAPIMethod(api.GET, "/insight/dashboard/:dashboard_id", insight.getDashboard) + api.HandleAPIMethod(api.POST, "/insight/dashboard", insight.createDashboard) + api.HandleAPIMethod(api.PUT, "/insight/dashboard/:dashboard_id", insight.updateDashboard) + api.HandleAPIMethod(api.DELETE, "/insight/dashboard/:dashboard_id", insight.deleteDashboard) + api.HandleAPIMethod(api.GET, "/insight/dashboard/_search", insight.searchDashboard) + +} diff --git a/plugin/api/insight/dashboard.go b/plugin/api/insight/dashboard.go new file mode 100644 index 00000000..fe83e963 --- /dev/null +++ b/plugin/api/insight/dashboard.go @@ -0,0 +1,180 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package insight + +import ( + "fmt" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/insight" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + log "src/github.com/cihub/seelog" + "strconv" + "strings" +) + +func (h *InsightAPI) createDashboard(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var obj = &insight.Dashboard{} + err := h.DecodeJSON(req, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + err = orm.Create(obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "created", + }, 200) + +} + +func (h *InsightAPI) getDashboard(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("dashboard_id") + + obj := insight.Dashboard{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + query := util.MapStr{ + "query": util.MapStr{ + "terms": util.MapStr{ + "id": obj.Visualizations, + }, + }, + } + q := &orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } + err, result := orm.Search(insight.Visualization{}, q) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + obj.Visualizations = result.Result + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} + +func (h *InsightAPI) updateDashboard(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("dashboard_id") + obj := insight.Dashboard{} + + obj.ID = id + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + id = obj.ID + create := obj.Created + obj = insight.Dashboard{} + err = h.DecodeJSON(req, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + //protect + obj.ID = id + obj.Created = create + err = orm.Update(&obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "updated", + }, 200) +} + +func (h *InsightAPI) deleteDashboard(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("dashboard_id") + + obj := insight.Dashboard{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + err = orm.Delete(&obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "deleted", + }, 200) +} + +func (h *InsightAPI) searchDashboard(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + mustBuilder = &strings.Builder{} + ) + if keyword != "" { + mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) + } + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{} + queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from) + q.RawQuery = []byte(queryDSL) + + err, res := orm.Search(&insight.Dashboard{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.Write(w, res.Raw) +} \ No newline at end of file diff --git a/plugin/api/insight/metadata.go b/plugin/api/insight/metadata.go new file mode 100644 index 00000000..07f05a62 --- /dev/null +++ b/plugin/api/insight/metadata.go @@ -0,0 +1,355 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package insight + +import ( + "github.com/Knetic/govaluate" + log "github.com/cihub/seelog" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/insight" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "math" + "net/http" +) + +func (h *InsightAPI) HandleGetMetadata(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + clusterID := ps.MustGetParameter("id") + reqBody := struct { + IndexPattern string `json:"index_pattern"` + ViewID string `json:"view_id"` + TimeField string `json:"time_field"` + Filter interface{} `json:"filter"` + }{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + var fieldsFormat map[string]string + if reqBody.ViewID != "" { + view := elastic.View{ + ID: reqBody.ViewID, + } + exists, err := orm.Get(&view) + if err != nil || !exists { + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusNotFound) + return + } + reqBody.IndexPattern = view.Title + clusterID = view.ClusterID + reqBody.TimeField = view.TimeFieldName + fieldsFormat, err = parseFieldsFormat(view.FieldFormatMap) + if err != nil { + log.Error(err) + } + + } + + fieldsMeta, err := getMetadataByIndexPattern(clusterID, reqBody.IndexPattern, reqBody.TimeField, reqBody.Filter, fieldsFormat) + if err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + h.WriteJSON(w, fieldsMeta, http.StatusOK) +} + +func (h *InsightAPI) HandleGetMetricData(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ + reqBody := insight.Metric{} + err := h.DecodeJSON(req, &reqBody) + if err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + clusterID := ps.MustGetParameter("id") + reqBody.ClusterId = clusterID + metricData, err := getMetricData(&reqBody) + if err != nil { + log.Error(err) + h.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + + h.WriteJSON(w, metricData, http.StatusOK) +} + +func getMetricData(metric *insight.Metric) (interface{}, error) { + query, err := GenerateQuery(metric) + if err != nil { + return nil, err + } + esClient := elastic.GetClient(metric.ClusterId) + //log.Error(string(util.MustToJSONBytes(query))) + searchRes, err := esClient.SearchWithRawQueryDSL(metric.IndexPattern, util.MustToJSONBytes(query)) + if err != nil { + return nil, err + } + searchResult := map[string]interface{}{} + err = util.FromJSONBytes(searchRes.RawResult.Body, &searchResult) + if err != nil { + return nil, err + } + + var agg = searchResult["aggregations"] + if metric.Filter != nil { + if aggM, ok := agg.(map[string]interface{}); ok { + agg = aggM["filter_agg"] + } + } + metricData := CollectMetricData(agg, metric.TimeBeforeGroup) + + var targetMetricData []insight.MetricData + if len(metric.Items) == 1 { + targetMetricData = metricData + }else { + for _, md := range metricData { + targetData := insight.MetricData{ + Group: md.Group, + Data: map[string][]insight.MetricDataItem{}, + } + expression, err := govaluate.NewEvaluableExpression(metric.Formula) + if err != nil { + return nil, err + } + dataLength := 0 + for _, v := range md.Data { + dataLength = len(v) + break + } + DataLoop: + for i := 0; i < dataLength; i++ { + parameters := map[string]interface{}{} + var timestamp interface{} + hasValidData := false + for k, v := range md.Data { + if len(k) == 20 { + continue + } + if len(v) < dataLength { + continue + } + if _, ok := v[i].Value.(float64); !ok { + continue DataLoop + } + hasValidData = true + parameters[k] = v[i].Value + timestamp = v[i].Timestamp + } + //todo return error? + if !hasValidData { + continue + } + result, err := expression.Evaluate(parameters) + if err != nil { + return nil, err + } + if r, ok := result.(float64); ok { + if math.IsNaN(r) || math.IsInf(r, 0) { + //if !isFilterNaN { + // targetData.Data["result"] = append(targetData.Data["result"], []interface{}{timestamp, math.NaN()}) + //} + continue + } + } + + targetData.Data["result"] = append(targetData.Data["result"], insight.MetricDataItem{Timestamp: timestamp, Value: result}) + } + targetMetricData = append(targetMetricData, targetData) + } + } + result := []insight.MetricDataItem{} + for _, md := range targetMetricData { + for _, v := range md.Data { + for _, mitem := range v { + mitem.Group = md.Group + result = append(result, mitem) + } + } + } + return result, nil +} + +func getMetadataByIndexPattern(clusterID, indexPattern, timeField string, filter interface{}, fieldsFormat map[string]string) (interface{}, error){ + fieldsMeta, err := getFieldsMetadata(indexPattern, clusterID) + if err != nil { + return nil, err + } + var ( + metas []insight.Visualization + seriesType string + options = map[string]interface{}{ + "yField": "value", + } + aggTypes []string + ) + if timeField != "" { + options["xAxis"] = util.MapStr{ + "type": "time", + } + options["xField"] = "timestamp" + } + var fieldNames []string + for fieldName := range fieldsMeta.Aggregatable { + fieldNames = append(fieldNames, fieldName) + } + length := len(fieldNames) + step := 50 + for i := 0; i length { + end = length + } + counts, err := countFieldValue(fieldNames[i:end], clusterID, indexPattern, filter) + if err != nil { + return nil, err + } + for fieldName, count := range counts { + if count <= 1 { + continue + } + seriesType = "line" + aggField := fieldsMeta.Aggregatable[fieldName] + if count <= 10 { + if timeField == "" { + seriesType = "pie" + }else { + if aggField.Type == "string"{ + seriesType = "column" + options["seriesField"] = "group" + } + } + } + var defaultAggType string + if aggField.Type == "string" { + aggTypes = []string{"count", "terms"} + defaultAggType = "count" + } else { + aggTypes = []string{"min", "max", "avg", "sum", "medium", "rate"} + defaultAggType = "avg" + } + + if fieldsFormat != nil { + if ft, ok := fieldsFormat[aggField.Name]; ok { + options["yAxis"] = util.MapStr{ + "formatter": ft, + } + } + } + seriesItem := insight.SeriesItem{ + Type: seriesType, + Options: options, + Metric: insight.Metric{ + Items: []insight.MetricItem{ + { + Name: "a", + Field: aggField.Name, + FieldType: aggField.Type, + Statistic: defaultAggType, + + }, + }, + AggTypes: aggTypes, + }} + if seriesType == "column" { + seriesItem.Metric.Groups = []insight.MetricGroupItem{ + {aggField.Name, 10}, + } + } + fieldVis := insight.Visualization{ + Series: []insight.SeriesItem{ + seriesItem, + }, + } + fieldVis.Title, _ = fieldVis.Series[0].Metric.GenerateExpression() + metas = append(metas, fieldVis) + } + } + return metas, nil +} + +func countFieldValue(fields []string, clusterID, indexPattern string, filter interface{}) (map[string]float64, error){ + aggs := util.MapStr{} + for _, field := range fields { + aggs[field] = util.MapStr{ + "cardinality": util.MapStr{ + "field": field, + }, + } + } + queryDsl := util.MapStr{ + "size": 0, + "aggs": aggs, + } + if filter != nil { + queryDsl["query"] = filter + } + esClient := elastic.GetClient(clusterID) + searchRes, err := esClient.SearchWithRawQueryDSL(indexPattern, util.MustToJSONBytes(queryDsl)) + if err != nil { + return nil, err + } + fieldsCount := map[string] float64{} + for key, agg := range searchRes.Aggregations { + if count, ok := agg.Value.(float64); ok { + fieldsCount[key] = count + } + } + return fieldsCount, nil +} + +type FieldsMetadata struct { + Aggregatable map[string]elastic.ElasticField + Dates map[string]elastic.ElasticField +} + +func getFieldsMetadata(indexPattern string, clusterID string) (*FieldsMetadata, error){ + fields, err := elastic.GetFieldCaps(clusterID, indexPattern, nil) + if err != nil { + return nil, err + } + var fieldsMeta = &FieldsMetadata{ + Aggregatable: map[string]elastic.ElasticField{}, + Dates: map[string]elastic.ElasticField{}, + } + for _, field := range fields { + if field.Type == "date" { + fieldsMeta.Dates[field.Name] = field + continue + } + if field.Aggregatable { + fieldsMeta.Aggregatable[field.Name] = field + } + } + return fieldsMeta, nil +} + +func parseFieldsFormat(formatMap string) (map[string]string, error) { + formatObj := map[string]util.MapStr{} + err := util.FromJSONBytes([]byte(formatMap), &formatObj) + if err != nil { + return nil, err + } + fieldsFormat := map[string]string{} + for field, format := range formatObj { + if fv, ok := format["id"].(string); ok { + fieldsFormat[field] = fv + } + } + return fieldsFormat, nil +} \ No newline at end of file diff --git a/plugin/api/insight/metric_util.go b/plugin/api/insight/metric_util.go new file mode 100644 index 00000000..740ace35 --- /dev/null +++ b/plugin/api/insight/metric_util.go @@ -0,0 +1,347 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package insight + +import ( + "fmt" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/insight" + "infini.sh/framework/core/util" + "strconv" + "strings" +) + +func generateAgg(metricItem *insight.MetricItem) map[string]interface{}{ + var ( + aggType = "value_count" + field = metricItem.Field + ) + if field == "" || field == "*" { + field = "_id" + } + var percent = 0.0 + var isPipeline = false + switch metricItem.Statistic { + case "max", "min", "sum", "avg": + aggType = metricItem.Statistic + case "count", "value_count": + aggType = "value_count" + case "derivative": + aggType = "max" + isPipeline = true + case "medium": // from es version 6.6 + aggType = "median_absolute_deviation" + case "p99", "p95","p90","p80","p50": + aggType = "percentiles" + percentStr := strings.TrimPrefix(metricItem.Statistic, "p") + percent, _ = strconv.ParseFloat(percentStr, 32) + } + aggValue := util.MapStr{ + "field": field, + } + if aggType == "percentiles" { + aggValue["percents"] = []interface{}{percent} + } + aggs := util.MapStr{ + metricItem.Name: util.MapStr{ + aggType: aggValue, + }, + } + if !isPipeline{ + return aggs + } + pipelineAggID := util.GetUUID() + aggs[pipelineAggID] = aggs[metricItem.Name] + aggs[metricItem.Name] = util.MapStr{ + "derivative": util.MapStr{ + "buckets_path": pipelineAggID, + }, + } + return aggs +} + +func GenerateQuery(metric *insight.Metric) (interface{}, error) { + var timeBeforeGroup = metric.TimeBeforeGroup + basicAggs := util.MapStr{} + i := 0 + for _, metricItem := range metric.Items { + if metricItem.Name == "" { + metricItem.Name = string('a' + i) + } + metricAggs := generateAgg(&metricItem) + if err := util.MergeFields(basicAggs, metricAggs, true); err != nil { + return nil, err + } + } + targetESVersion := elastic.GetMetadata(metric.ClusterId).Config.Version + intervalField, err := elastic.GetDateHistogramIntervalField(targetESVersion) + if err != nil { + return nil, fmt.Errorf("get interval field error: %w", err) + } + if metric.TimeField != "" && !timeBeforeGroup{ + basicAggs = util.MapStr{ + "time_buckets": util.MapStr{ + "date_histogram": util.MapStr{ + "field": metric.TimeField, + intervalField: metric.BucketSize, + }, + "aggs": basicAggs, + }, + } + } + + + var rootAggs util.MapStr + groups := metric.Groups + + if grpLength := len(groups); grpLength > 0 { + var lastGroupAgg util.MapStr + + for i := grpLength-1; i>=0; i-- { + limit := groups[i].Limit + //top group 10 + if limit <= 0 { + limit = 10 + } + groupAgg := util.MapStr{ + "terms": util.MapStr{ + "field": groups[i].Field, + "size": limit, + }, + } + groupID := util.GetUUID() + if lastGroupAgg != nil { + groupAgg["aggs"] = util.MapStr{ + groupID: lastGroupAgg, + } + }else{ + groupAgg["aggs"] = basicAggs + } + lastGroupAgg = groupAgg + } + if metric.TimeField == "" || (metric.TimeField != "" && !timeBeforeGroup) { + rootAggs = util.MapStr{ + util.GetUUID(): lastGroupAgg, + } + }else{ + rootAggs = util.MapStr{ + "time_buckets": util.MapStr{ + "date_histogram": util.MapStr{ + "field": metric.TimeField, + intervalField: metric.BucketSize, + }, + "aggs": util.MapStr{ + util.GetUUID(): lastGroupAgg, + }, + }, + } + } + + }else{ + if metric.TimeField != "" && timeBeforeGroup{ + basicAggs = util.MapStr{ + "time_buckets": util.MapStr{ + "date_histogram": util.MapStr{ + "field": metric.TimeField, + intervalField: metric.BucketSize, + }, + "aggs": basicAggs, + }, + } + } + rootAggs = basicAggs + } + + if metric.Filter != nil { + rootAggs = util.MapStr{ + "filter_agg": util.MapStr{ + "filter": metric.Filter, + "aggs": rootAggs, + }, + } + } + queryDsl := util.MapStr{ + "size": 0, + "aggs": rootAggs, + } + if metric.TimeFilter != nil { + queryDsl["query"] = metric.TimeFilter + } + + return queryDsl, nil +} + +func CollectMetricData(agg interface{}, timeBeforeGroup bool) []insight.MetricData{ + metricData := []insight.MetricData{} + if timeBeforeGroup { + collectMetricDataOther(agg, "", &metricData, nil) + }else{ + collectMetricData(agg, "", &metricData) + } + return metricData +} + +//timeBeforeGroup => false +func collectMetricData(agg interface{}, groupValues string, metricData *[]insight.MetricData){ + if aggM, ok := agg.(map[string]interface{}); ok { + if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok { + if bks, ok := timeBks["buckets"].([]interface{}); ok { + md := insight.MetricData{ + Data: map[string][]insight.MetricDataItem{}, + Group: groupValues, + } + for _, bk := range bks { + if bkM, ok := bk.(map[string]interface{}); ok{ + + for k, v := range bkM { + if k == "key" || k == "key_as_string" || k== "doc_count"{ + continue + } + + if vm, ok := v.(map[string]interface{}); ok && len(k) < 5{ + collectMetricDataItem(k, vm, &md, bkM["key"]) + } + + } + } + + } + *metricData = append(*metricData,md) + } + + }else{ + md := insight.MetricData{ + Data: map[string][]insight.MetricDataItem{}, + Group: groupValues, + } + for k, v := range aggM { + if k == "key" || k== "doc_count"{ + continue + } + if vm, ok := v.(map[string]interface{}); ok { + if bks, ok := vm["buckets"].([]interface{}); ok { + for _, bk := range bks { + if bkVal, ok := bk.(map[string]interface{}); ok { + currentGroup := bkVal["key"].(string) + newGroupValues := currentGroup + if groupValues != "" { + newGroupValues = fmt.Sprintf("%s-%s", groupValues, currentGroup) + } + collectMetricData(bk, newGroupValues, metricData) + } + } + }else{ + //non time series metric data + if len(k) < 5 { + collectMetricDataItem(k, vm, &md, nil) + } + } + } + } + if len(md.Data) > 0 { + *metricData = append(*metricData,md) + } + } + } +} + +//timeBeforeGroup => true +func collectMetricDataOther(agg interface{}, groupValues string, metricData *[]insight.MetricData, timeKey interface{}){ + if aggM, ok := agg.(map[string]interface{}); ok { + if timeBks, ok := aggM["time_buckets"].(map[string]interface{}); ok { + if bks, ok := timeBks["buckets"].([]interface{}); ok { + md := insight.MetricData{ + Data: map[string][]insight.MetricDataItem{}, + Group: groupValues, + } + for _, bk := range bks { + if bkM, ok := bk.(map[string]interface{}); ok{ + for k, v := range bkM { + if k == "key" || k == "key_as_string" || k== "doc_count"{ + continue + } + if vm, ok := v.(map[string]interface{}); ok { + if vm["buckets"] != nil { + collectMetricDataOther(vm, groupValues, metricData, bkM["key"]) + }else{ + collectMetricDataItem(k, vm, &md, bkM["key"]) + } + } + } + } + } + if len(md.Data) > 0 { + *metricData = append(*metricData,md) + } + } + }else{ + md := insight.MetricData{ + Data: map[string][]insight.MetricDataItem{}, + Group: groupValues, + } + + if bks, ok := aggM["buckets"].([]interface{}); ok { + for _, bk := range bks { + if bkVal, ok := bk.(map[string]interface{}); ok { + currentGroup := bkVal["key"].(string) + newGroupValues := currentGroup + if groupValues != "" { + newGroupValues = fmt.Sprintf("%s-%s", groupValues, currentGroup) + } + collectMetricDataOther(bk, newGroupValues, metricData, timeKey) + } + } + }else{ + //non time series metric data + for k, v := range aggM { + if vm, ok := v.(map[string]interface{}); ok { + if vm["buckets"] != nil { + collectMetricDataOther(vm, groupValues, metricData, timeKey) + }else{ + collectMetricDataItem(k, vm, &md, timeKey) + } + } + } + } + + if len(md.Data) > 0 { + *metricData = append(*metricData,md) + } + } + } +} + + +func collectMetricDataItem(key string, vm map[string]interface{}, metricData *insight.MetricData, timeKey interface{}) { + if val, ok := vm["value"]; ok { + metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ + Value: val, + Timestamp: timeKey, + }) + } else { + //percentiles agg type + switch vm["values"].(type) { + case []interface{}: + for _, val := range vm["values"].([]interface{}) { + if valM, ok := val.(map[string]interface{}); ok { + metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ + Value: valM["value"], + Timestamp: timeKey, + }) + } + break + } + case map[string]interface{}: + for _, val = range vm["values"].(map[string]interface{}) { + metricData.Data[key] = append(metricData.Data[key], insight.MetricDataItem{ + Value: val, + Timestamp: timeKey, + }) + break + } + } + } +} + diff --git a/plugin/api/insight/visualization.go b/plugin/api/insight/visualization.go new file mode 100644 index 00000000..a1defd49 --- /dev/null +++ b/plugin/api/insight/visualization.go @@ -0,0 +1,163 @@ +/* Copyright © INFINI Ltd. All rights reserved. + * web: https://infinilabs.com + * mail: hello#infini.ltd */ + +package insight + +import ( + "fmt" + "infini.sh/framework/core/insight" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/util" + "net/http" + log "src/github.com/cihub/seelog" + "strconv" + "strings" +) + +func (h *InsightAPI) createVisualization(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var obj = &insight.Visualization{} + err := h.DecodeJSON(req, obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + err = orm.Create(obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "created", + }, 200) + +} + +func (h *InsightAPI) getVisualization(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("visualization_id") + + obj := insight.Visualization{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + + h.WriteJSON(w, util.MapStr{ + "found": true, + "_id": id, + "_source": obj, + }, 200) +} + +func (h *InsightAPI) updateVisualization(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("visualization_id") + obj := insight.Visualization{} + + obj.ID = id + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + id = obj.ID + create := obj.Created + obj = insight.Visualization{} + err = h.DecodeJSON(req, &obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + //protect + obj.ID = id + obj.Created = create + err = orm.Update(&obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "updated", + }, 200) +} + +func (h *InsightAPI) deleteVisualization(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("visualization_id") + + obj := insight.Visualization{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + + err = orm.Delete(&obj) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "deleted", + }, 200) +} + +func (h *InsightAPI) searchVisualization(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + mustBuilder = &strings.Builder{} + ) + if keyword != "" { + mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) + } + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + q := orm.Query{} + queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from) + q.RawQuery = []byte(queryDSL) + + err, res := orm.Search(&insight.Visualization{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.Write(w, res.Raw) +} \ No newline at end of file