diff --git a/plugin/api/index_management/elasticsearch.go b/plugin/api/index_management/elasticsearch.go index 9bddc632..1cc95478 100644 --- a/plugin/api/index_management/elasticsearch.go +++ b/plugin/api/index_management/elasticsearch.go @@ -7,6 +7,7 @@ import ( "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" "infini.sh/framework/core/global" + "infini.sh/framework/core/graph" "infini.sh/framework/core/host" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" @@ -322,3 +323,155 @@ func (handler APIHandler) getGroupMetric(indexName, field string, filter interfa } return groups, nil } + +func (h *APIHandler) ClusterOverTreeMap(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + + clusterID := ps.ByName("id") + + queryLatency := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "indices": util.MapStr{ + "terms": util.MapStr{ + "field": "metadata.labels.index_name", + "size": 1000, + }, + "aggs": util.MapStr{ + "recent_15m": util.MapStr{ + "auto_date_histogram": util.MapStr{ + "field": "timestamp", + "minimum_interval": "minute", //es7.3 and above + "buckets": 12, + }, + "aggs": util.MapStr{ + "max_query_count": util.MapStr{ + "max": util.MapStr{ + "field": "payload.elasticsearch.index_stats.primaries.search.query_total", + }, + }, + "query_count_deriv": util.MapStr{ + "derivative": util.MapStr{ + "buckets_path": "max_query_count", + }, + }, + "max_query_time": util.MapStr{ + "max": util.MapStr{ + "field": "payload.elasticsearch.index_stats.primaries.search.query_time_in_millis", + }, + }, + "query_time_deriv": util.MapStr{ + "derivative": util.MapStr{ + "buckets_path": "max_query_time", + }, + }, + "query_latency": util.MapStr{ + "bucket_script": util.MapStr{ + "buckets_path": util.MapStr{ + "my_var1": "query_time_deriv", + "my_var2": "query_count_deriv", + }, + "script": "params.my_var1 / params.my_var2", + }, + }, + }, + }, + "max_query_latency": util.MapStr{ + "max_bucket": util.MapStr{ + "buckets_path": "recent_15m>query_latency", + }, + }, + "sort": util.MapStr{ + "bucket_sort": util.MapStr{ + "sort": []util.MapStr{ + { + "max_query_latency": util.MapStr{ + "order": "desc", + }, + }, + }, + }, + }, + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must_not": []util.MapStr{{ + "term": util.MapStr{ + "metadata.labels.index_name": util.MapStr{ + "value": "_all", + }, + }, + }, + }, + "must": []util.MapStr{ + { + "match": util.MapStr{ + "metadata.name": "index_stats", + }}, + util.MapStr{ + "term": util.MapStr{ + "metadata.labels.cluster_id": util.MapStr{ + "value": clusterID, + }, + }, + }, + }, + "filter": []util.MapStr{ + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": "now-7d", + "lte": "now", + }, + }, + }, + }, + }}, + } + + q := orm.Query{WildcardIndex: true} + q.AddQueryArgs("filter_path", "aggregations.indices.buckets.key,aggregations.indices.buckets.max_query_latency") + q.RawQuery = util.MustToJSONBytes(queryLatency) + err, searchR1 := orm.Search(&event.Event{}, &q) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + searchResponse := elastic.SearchResponse{} + err = util.FromJSONBytes(searchR1.Raw, &searchResponse) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + root := graph.NestedNode{Name: "root"} + + indices, ok := searchResponse.Aggregations["indices"] + if ok { + buckets := indices.Buckets + for _, item := range buckets { + indexName := item["key"] + latencyObj, ok := item["max_query_latency"].(map[string]interface{}) + if ok { + v := latencyObj["value"] + date := latencyObj["keys"].([]interface{}) + root.Add(indexName.(string), date[0].(string), v.(float64)) + } + } + } + + result := util.MapStr{ + "_index": ".infini-graph", + "_type": "_doc", + "_id": "graph-1", + "_source": util.MapStr{ + "name": "Avg search latency by index", + "unit": "ms", + "data": root, + }, + } + + h.Write(w, util.MustToJSONBytes(result)) +} \ No newline at end of file diff --git a/plugin/api/init.go b/plugin/api/init.go index 8b020da4..281f3e27 100644 --- a/plugin/api/init.go +++ b/plugin/api/init.go @@ -53,7 +53,7 @@ func Init(cfg *config.AppConfig) { api.HandleAPIMethod(api.GET, path.Join(pathPrefix, "elasticsearch/command"), handler.RequirePermission(handler.HandleQueryCommonCommandAction, enum.PermissionCommandRead)) api.HandleAPIMethod(api.DELETE, path.Join(pathPrefix, "elasticsearch/command/:cid"), handler.RequirePermission(handler.HandleDeleteCommonCommandAction, enum.PermissionCommandWrite)) api.HandleAPIMethod(api.GET, "/elasticsearch/overview/status", handler.RequireLogin(handler.ElasticsearchStatusSummaryAction)) - + api.HandleAPIMethod(api.GET, "/elasticsearch/:id/overview/treemap", handler.RequireClusterPermission(handler.RequirePermission(handler.ClusterOverTreeMap, enum.PermissionElasticsearchMetricRead))) //task.RegisterScheduleTask(task.ScheduleTask{ // Description: "sync reindex task result", // Task: func() {