chore: moving primary shard filter from aggregations to query (#31)

This commit is contained in:
silenceqi 2024-12-14 11:05:29 +08:00 committed by GitHub
parent a08e33a570
commit e018f15f73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 36 additions and 46 deletions

View File

@ -119,30 +119,6 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu
}
query:=map[string]interface{}{}
query["query"]=util.MapStr{
"bool": util.MapStr{
"must": must,
"must_not": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.index_name": util.MapStr{
"value": "_all",
},
},
},
},
"filter": []util.MapStr{
{
"range": util.MapStr{
"timestamp": util.MapStr{
"gte": min,
"lte": max,
},
},
},
},
},
}
indexMetricItems := []GroupMetricItem{}
switch metricKey {
case v1.IndexStorageMetricKey:
@ -617,7 +593,6 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu
aggs:=map[string]interface{}{}
sumAggs := util.MapStr{}
var filterSubAggs = util.MapStr{}
for _,metricItem:=range indexMetricItems {
leafAgg := util.MapStr{
@ -626,22 +601,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu
},
}
var sumBucketPath = "term_shard>"+ metricItem.ID
if metricItem.MetricItem.OnlyPrimary {
filterSubAggs[metricItem.ID] = leafAgg
aggs["filter_pri"]=util.MapStr{
"filter": util.MapStr{
"term": util.MapStr{
"payload.elasticsearch.shard_stats.routing.primary": util.MapStr{
"value": true,
},
},
},
"aggs": filterSubAggs,
}
sumBucketPath = "term_shard>filter_pri>"+ metricItem.ID
}else{
aggs[metricItem.ID]= leafAgg
}
aggs[metricItem.ID]= leafAgg
sumAggs[metricItem.ID] = util.MapStr{
"sum_bucket": util.MapStr{
@ -655,11 +615,7 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu
"field": metricItem.Field2,
},
}
if metricItem.MetricItem.OnlyPrimary {
filterSubAggs[metricItem.ID+"_field2"] = leafAgg2
}else{
aggs[metricItem.ID+"_field2"] = leafAgg2
}
aggs[metricItem.ID+"_field2"] = leafAgg2
sumAggs[metricItem.ID + "_field2"] = util.MapStr{
"sum_bucket": util.MapStr{
"buckets_path": sumBucketPath + "_field2",
@ -695,6 +651,40 @@ func (h *APIHandler) getIndexMetrics(ctx context.Context, req *http.Request, clu
panic(err)
}
//we can do this because we are querying one metric once time
if indexMetricItems[0].MetricItem.OnlyPrimary {
must = append(must, util.MapStr{
"term": util.MapStr{
"payload.elasticsearch.shard_stats.routing.primary": util.MapStr{
"value": true,
},
},
})
}
query["query"]=util.MapStr{
"bool": util.MapStr{
"must": must,
"must_not": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.index_name": util.MapStr{
"value": "_all",
},
},
},
},
"filter": []util.MapStr{
{
"range": util.MapStr{
"timestamp": util.MapStr{
"gte": min,
"lte": max,
},
},
},
},
},
}
query["size"]=0
query["aggs"]= util.MapStr{
"group_by_level": util.MapStr{