feat: add cluster-level shard state metrics (#78)

This commit is contained in:
silenceqi 2025-01-11 17:22:28 +08:00 committed by GitHub
parent c8977b18d5
commit 243d64cc3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 68 additions and 2 deletions

View File

@ -534,7 +534,7 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
key := h.GetParameter(req, "key")
var metricType string
switch key {
case v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey, CircuitBreakerMetricKey:
case v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey, CircuitBreakerMetricKey,ShardStateMetricKey:
metricType = v1.MetricTypeNodeStats
case ClusterDocumentsMetricKey,
ClusterStorageMetricKey,
@ -570,7 +570,73 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
defer cancel()
if util.StringInArray([]string{v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey}, key) {
metrics, err = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key)
}else{
} else if key == ShardStateMetricKey {
clusterUUID, err := h.getClusterUUID(id)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
query := util.MapStr{
"size": 0,
"query": util.MapStr{
"bool": util.MapStr{
"minimum_should_match": 1,
"should": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.cluster_id": util.MapStr{
"value": id,
},
},
},
{
"term": util.MapStr{
"metadata.labels.cluster_uuid": util.MapStr{
"value": clusterUUID,
},
},
},
},
"must": []util.MapStr{
{
"term": util.MapStr{
"metadata.category": util.MapStr{
"value": "elasticsearch",
},
},
},
{
"term": util.MapStr{
"metadata.name": util.MapStr{
"value": "shard_stats",
},
},
},
},
"filter": []util.MapStr{
{
"range": util.MapStr{
"timestamp": util.MapStr{
"gte": min,
"lte": max,
},
},
},
},
},
},
}
shardStateMetric, err := getNodeShardStateMetric(ctx, query, bucketSize)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
metrics = map[string]*common.MetricItem{
ShardStateMetricKey: shardStateMetric,
}
} else {
metrics, err = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key)
}
if err != nil {