chore: moving primary shard filter from aggregations to query with single index metric (#32)

* chore: moving primary shard filter from aggregations to query

* chore: moving primary shard filter from aggregations to query with single index metric
This commit is contained in:
silenceqi 2024-12-14 15:25:22 +08:00 committed by GitHub
parent e2a6b29b53
commit aaa4dcee6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 13 additions and 24 deletions

View File

@ -1052,7 +1052,6 @@ func (h *APIHandler) getSingleIndexMetrics(ctx context.Context, metricItems []*c
aggs := util.MapStr{} aggs := util.MapStr{}
metricItemsMap := map[string]*common.MetricLine{} metricItemsMap := map[string]*common.MetricLine{}
sumAggs := util.MapStr{} sumAggs := util.MapStr{}
var filterSubAggs = util.MapStr{}
for _, metricItem := range metricItems { for _, metricItem := range metricItems {
for _, line := range metricItem.Lines { for _, line := range metricItem.Lines {
@ -1065,23 +1064,7 @@ func (h *APIHandler) getSingleIndexMetrics(ctx context.Context, metricItems []*c
}, },
} }
var sumBucketPath = "term_shard>"+ line.Metric.ID var sumBucketPath = "term_shard>"+ line.Metric.ID
if line.Metric.OnlyPrimary {
filterSubAggs[line.Metric.ID] = leafAgg
aggs["filter_pri"]=util.MapStr{
"filter": util.MapStr{
"term": util.MapStr{
"payload.elasticsearch.shard_stats.routing.primary": util.MapStr{
"value": true,
},
},
},
"aggs": filterSubAggs,
}
sumBucketPath = "term_shard>filter_pri>"+ line.Metric.ID
}else{
aggs[line.Metric.ID] = leafAgg aggs[line.Metric.ID] = leafAgg
}
sumAggs[line.Metric.ID] = util.MapStr{ sumAggs[line.Metric.ID] = util.MapStr{
"sum_bucket": util.MapStr{ "sum_bucket": util.MapStr{
"buckets_path": sumBucketPath, "buckets_path": sumBucketPath,
@ -1093,11 +1076,7 @@ func (h *APIHandler) getSingleIndexMetrics(ctx context.Context, metricItems []*c
"field": line.Metric.Field2, "field": line.Metric.Field2,
}, },
} }
if line.Metric.OnlyPrimary {
filterSubAggs[line.Metric.ID+"_field2"] = leafAgg2
}else{
aggs[line.Metric.ID+"_field2"] = leafAgg2 aggs[line.Metric.ID+"_field2"] = leafAgg2
}
sumAggs[line.Metric.ID + "_field2"] = util.MapStr{ sumAggs[line.Metric.ID + "_field2"] = util.MapStr{
"sum_bucket": util.MapStr{ "sum_bucket": util.MapStr{
@ -1136,7 +1115,17 @@ func (h *APIHandler) getSingleIndexMetrics(ctx context.Context, metricItems []*c
clusterID := global.MustLookupString(elastic.GlobalSystemElasticsearchID) clusterID := global.MustLookupString(elastic.GlobalSystemElasticsearchID)
intervalField, err := getDateHistogramIntervalField(clusterID, bucketSizeStr) intervalField, err := getDateHistogramIntervalField(clusterID, bucketSizeStr)
if err != nil { if err != nil {
panic(err) return nil, err
}
if len(metricItems) > 0 && len(metricItems[0].Lines) > 0 && metricItems[0].Lines[0].Metric.OnlyPrimary {
query["query"] = util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
query["query"].(util.MapStr),
{"term": util.MapStr{"payload.elasticsearch.shard_stats.routing.primary": true}},
},
},
}
} }
query["size"] = 0 query["size"] = 0
query["aggs"] = util.MapStr{ query["aggs"] = util.MapStr{