From e018f15f736a48540ed4a41d0194fa82eb9dcd5d Mon Sep 17 00:00:00 2001 From: silenceqi Date: Sat, 14 Dec 2024 11:05:29 +0800 Subject: [PATCH] chore: moving primary shard filter from aggregations to query (#31) --- modules/elastic/api/index_metrics.go | 82 ++++++++++++---------------- 1 file changed, 36 insertions(+), 46 deletions(-) diff --git a/modules/elastic/api/index_metrics.go b/modules/elastic/api/index_metrics.go index f3002596..587ae2a9 100644 --- a/modules/elastic/api/index_metrics.go +++ b/modules/elastic/api/index_metrics.go @@ -119,30 +119,6 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu } query:=map[string]interface{}{} - query["query"]=util.MapStr{ - "bool": util.MapStr{ - "must": must, - "must_not": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.index_name": util.MapStr{ - "value": "_all", - }, - }, - }, - }, - "filter": []util.MapStr{ - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gte": min, - "lte": max, - }, - }, - }, - }, - }, - } indexMetricItems := []GroupMetricItem{} switch metricKey { case v1.IndexStorageMetricKey: @@ -617,7 +593,6 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu aggs:=map[string]interface{}{} sumAggs := util.MapStr{} - var filterSubAggs = util.MapStr{} for _,metricItem:=range indexMetricItems { leafAgg := util.MapStr{ @@ -626,22 +601,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu }, } var sumBucketPath = "term_shard>"+ metricItem.ID - if metricItem.MetricItem.OnlyPrimary { - filterSubAggs[metricItem.ID] = leafAgg - aggs["filter_pri"]=util.MapStr{ - "filter": util.MapStr{ - "term": util.MapStr{ - "payload.elasticsearch.shard_stats.routing.primary": util.MapStr{ - "value": true, - }, - }, - }, - "aggs": filterSubAggs, - } - sumBucketPath = "term_shard>filter_pri>"+ metricItem.ID - }else{ - aggs[metricItem.ID]= leafAgg - } + aggs[metricItem.ID]= leafAgg sumAggs[metricItem.ID] = util.MapStr{ "sum_bucket": util.MapStr{ @@ -655,11 +615,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu "field": metricItem.Field2, }, } - if metricItem.MetricItem.OnlyPrimary { - filterSubAggs[metricItem.ID+"_field2"] = leafAgg2 - }else{ - aggs[metricItem.ID+"_field2"] = leafAgg2 - } + aggs[metricItem.ID+"_field2"] = leafAgg2 sumAggs[metricItem.ID + "_field2"] = util.MapStr{ "sum_bucket": util.MapStr{ "buckets_path": sumBucketPath + "_field2", @@ -695,6 +651,40 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu panic(err) } + //we can do this because we are querying one metric once time + if indexMetricItems[0].MetricItem.OnlyPrimary { + must = append(must, util.MapStr{ + "term": util.MapStr{ + "payload.elasticsearch.shard_stats.routing.primary": util.MapStr{ + "value": true, + }, + }, + }) + } + query["query"]=util.MapStr{ + "bool": util.MapStr{ + "must": must, + "must_not": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.index_name": util.MapStr{ + "value": "_all", + }, + }, + }, + }, + "filter": []util.MapStr{ + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": min, + "lte": max, + }, + }, + }, + }, + }, + } query["size"]=0 query["aggs"]= util.MapStr{ "group_by_level": util.MapStr{