optimize metric query bucket size (#59)

* feat: the automatically calculated bucket size should be at least twice the metrics collection interval

* feat: return the hit count and the minimum bucket size when querying metrics

* chore: update release notes

---------

Co-authored-by: Hardy <luohoufu@163.com>
This commit is contained in:
silenceqi 2024-12-26 16:50:57 +08:00 committed by GitHub
parent b8b24f8fab
commit f11a565948
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 253 additions and 117 deletions

View File

@ -21,6 +21,7 @@ Information about release notes of INFINI Console is provided here.
- Optimize UI of agent list when its columns are overflow.
- Add loading to each row in overview table.
- Adapter metrics query with cluster id and cluster uuid
- Optimize metric query bucket size (#59)
- Add suggestion to chart in monitor if is no data because the time interval is less than the collection interval.

View File

@ -466,7 +466,7 @@ func (h *APIHandler) FetchHostInfo(w http.ResponseWriter, req *http.Request, ps
return
}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 60, 15)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, "", "", 15)
if err != nil {
panic(err)
return
@ -712,7 +712,7 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
}
resBody := map[string]interface{}{}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 60)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, "", "", 60)
if err != nil {
log.Error(err)
resBody["error"] = err

View File

@ -835,7 +835,7 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
}, http.StatusForbidden)
return
}
clusterUUID, err := adapter.GetClusterUUID(clusterID)
clusterUUID, err := h.getClusterUUID(clusterID)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
@ -844,13 +844,6 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
shardID := h.GetParameterOrDefault(req, "shard_id", "")
var must = []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.cluster_uuid": util.MapStr{
"value": clusterUUID,
},
},
},
{
"term": util.MapStr{
"metadata.category": util.MapStr{
@ -883,7 +876,15 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
})
}
resBody := map[string]interface{}{}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 60)
metricKey := h.GetParameter(req, "key")
var metricType string
if metricKey == v1.IndexHealthMetricKey {
metricType = v1.MetricTypeClusterHealth
}else{
//for agent mode
metricType = v1.MetricTypeNodeStats
}
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, clusterID, metricType,60)
if err != nil {
log.Error(err)
resBody["error"] = err
@ -893,7 +894,6 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
if bucketSize <= 60 {
min = min - int64(2 * bucketSize * 1000)
}
metricKey := h.GetParameter(req, "key")
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
du, err := time.ParseDuration(timeout)
if err != nil {
@ -907,6 +907,22 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
query["query"] = util.MapStr{
"bool": util.MapStr{
"must": must,
"should": []util.MapStr{
{
"term": util.MapStr{
"metadata.labels.cluster_uuid": util.MapStr{
"value": clusterUUID,
},
},
},
{
"term": util.MapStr{
"metadata.labels.cluster_id": util.MapStr{
"value": clusterID,
},
},
},
},
"filter": []util.MapStr{
{
"range": util.MapStr{
@ -1009,6 +1025,16 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
h.WriteError(w, err, http.StatusInternalServerError)
}
}
if _, ok := metrics[metricKey]; ok {
if metrics[metricKey].HitsTotal > 0 {
minBucketSize, err := v1.GetMetricMinBucketSize(clusterID, metricType)
if err != nil {
log.Error(err)
}else{
metrics[metricKey].MinBucketSize = int64(minBucketSize)
}
}
}
resBody["metrics"] = metrics
h.WriteJSON(w, resBody, http.StatusOK)
@ -1102,6 +1128,7 @@ func (h *APIHandler) getIndexShardsMetric(ctx context.Context, id, indexName str
metricItem.Lines[0].Data = metricData
metricItem.Lines[0].Type = common.GraphTypeBar
metricItem.Request = string(queryDSL)
metricItem.HitsTotal = response.GetTotal()
return metricItem, nil
}

View File

@ -532,21 +532,30 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
return
}
key := h.GetParameter(req, "key")
var metricType string
switch key {
case v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey, CircuitBreakerMetricKey:
metricType = v1.MetricTypeNodeStats
case ClusterDocumentsMetricKey,
ClusterStorageMetricKey,
ClusterIndicesMetricKey,
ClusterNodeCountMetricKey:
metricType = v1.MetricTypeClusterStats
case ClusterHealthMetricKey:
metricType = v1.MetricTypeClusterStats
case ShardCountMetricKey:
metricType = v1.MetricTypeClusterHealth
default:
h.WriteError(w, "invalid metric key", http.StatusBadRequest)
return
}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, metricType, 90)
if err != nil {
panic(err)
return
}
meta := elastic.GetMetadata(id)
if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.IndexStats.Enabled && meta.Config.MonitorConfigs.IndexStats.Interval != "" {
du, _ := time.ParseDuration(meta.Config.MonitorConfigs.IndexStats.Interval)
if bucketSize < int(du.Seconds()) {
bucketSize = int(du.Seconds())
}
}
var metrics interface{}
var metrics map[string]*common.MetricItem
if bucketSize <= 60 {
min = min - int64(2*bucketSize*1000)
}
@ -569,6 +578,16 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
h.WriteError(w, err, http.StatusInternalServerError)
return
}
if _, ok := metrics[key]; ok {
if metrics[key].HitsTotal > 0 {
minBucketSize, err := v1.GetMetricMinBucketSize(id, metricType)
if err != nil {
log.Error(err)
}else{
metrics[key].MinBucketSize = int64(minBucketSize)
}
}
}
resBody["metrics"] = metrics
@ -582,7 +601,7 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
func (h *APIHandler) HandleNodeMetricsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
resBody := map[string]interface{}{}
id := ps.ByName("id")
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, v1.MetricTypeNodeStats, 90)
if err != nil {
log.Error(err)
resBody["error"] = err
@ -604,12 +623,23 @@ func (h *APIHandler) HandleNodeMetricsAction(w http.ResponseWriter, req *http.Re
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
resBody["metrics"], err = h.getNodeMetrics(ctx, id, bucketSize, min, max, nodeName, top, key)
metrics, err := h.getNodeMetrics(ctx, id, bucketSize, min, max, nodeName, top, key)
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
return
}
if _, ok := metrics[key]; ok {
if metrics[key].HitsTotal > 0 {
minBucketSize, err := v1.GetMetricMinBucketSize(id, v1.MetricTypeNodeStats)
if err != nil {
log.Error(err)
}else{
metrics[key].MinBucketSize = int64(minBucketSize)
}
}
}
resBody["metrics"] = metrics
ver := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).GetVersion()
if ver.Distribution == "" {
cr, err := util.VersionCompare(ver.Number, "6.1")
@ -634,7 +664,7 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R
h.APIHandler.HandleIndexMetricsAction(w, req, ps)
return
}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, v1.MetricTypeNodeStats, 90)
if err != nil {
log.Error(err)
resBody["error"] = err
@ -729,6 +759,16 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R
return
}
}
if _, ok := metrics[key]; ok {
if metrics[key].HitsTotal > 0 {
minBucketSize, err := v1.GetMetricMinBucketSize(id, v1.MetricTypeNodeStats)
if err != nil {
log.Error(err)
}else{
metrics[key].MinBucketSize = int64(minBucketSize)
}
}
}
resBody["metrics"] = metrics
ver := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).GetVersion()
if ver.Distribution == "" {
@ -749,7 +789,7 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R
func (h *APIHandler) HandleQueueMetricsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
resBody := map[string]interface{}{}
id := ps.ByName("id")
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, v1.MetricTypeNodeStats, 90)
if err != nil {
log.Error(err)
resBody["error"] = err
@ -771,12 +811,23 @@ func (h *APIHandler) HandleQueueMetricsAction(w http.ResponseWriter, req *http.R
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
resBody["metrics"], err = h.getThreadPoolMetrics(ctx, id, bucketSize, min, max, nodeName, top, key)
metrics, err := h.getThreadPoolMetrics(ctx, id, bucketSize, min, max, nodeName, top, key)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if _, ok := metrics[key]; ok {
if metrics[key].HitsTotal > 0 {
minBucketSize, err := v1.GetMetricMinBucketSize(id, v1.MetricTypeNodeStats)
if err != nil {
log.Error(err)
}else{
metrics[key].MinBucketSize = int64(minBucketSize)
}
}
}
resBody["metrics"] = metrics
ver := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).GetVersion()
if ver.Distribution == "" {
cr, err := util.VersionCompare(ver.Number, "6.1")
@ -1356,6 +1407,7 @@ func (h *APIHandler) getClusterStatusMetric(ctx context.Context, id string, min,
metricItem.Lines[0].Data = metricData
metricItem.Lines[0].Type = common.GraphTypeBar
metricItem.Request = string(queryDSL)
metricItem.HitsTotal = response.GetTotal()
return metricItem, nil
}

View File

@ -27,7 +27,6 @@ import (
"context"
"fmt"
"infini.sh/framework/core/env"
"net/http"
"strings"
"time"
@ -197,6 +196,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{
result := map[string]*common.MetricItem{}
hitsTotal := response.GetTotal()
for _, metricItem := range grpMetricItems {
for _, line := range metricItem.MetricItem.Lines {
line.TimeRange = common.TimeRange{Min: minDate, Max: maxDate}
@ -216,6 +216,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{
}
}
metricItem.MetricItem.Request = string(queryDSL)
metricItem.MetricItem.HitsTotal = hitsTotal
result[metricItem.Key] = metricItem.MetricItem
}
return result, nil
@ -234,36 +235,6 @@ func GetMinBucketSize() int {
return metricsCfg.MinBucketSizeInSeconds
}
// defaultBucketSize 也就是每次聚合的时间间隔
func (h *APIHandler) getMetricRangeAndBucketSize(req *http.Request, defaultBucketSize, defaultMetricCount int) (int, int64, int64, error) {
minBucketSizeInSeconds := GetMinBucketSize()
if defaultBucketSize <= 0 {
defaultBucketSize = minBucketSizeInSeconds
}
if defaultMetricCount <= 0 {
defaultMetricCount = 15 * 60
}
bucketSize := defaultBucketSize
bucketSizeStr := h.GetParameterOrDefault(req, "bucket_size", "") //默认 10每个 bucket 的时间范围,单位秒
if bucketSizeStr != "" {
du, err := util.ParseDuration(bucketSizeStr)
if err != nil {
return 0, 0, 0, err
}
bucketSize = int(du.Seconds())
}else {
bucketSize = 0
}
metricCount := h.GetIntOrDefault(req, "metric_count", defaultMetricCount) //默认 15分钟的区间每分钟15个指标也就是 15*6 个 bucket //90
//min,max are unix nanoseconds
minStr := h.Get(req, "min", "")
maxStr := h.Get(req, "max", "")
return GetMetricRangeAndBucketSize(minStr, maxStr, bucketSize, metricCount)
}
func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, metricCount int) (int, int64, int64, error) {
var min, max int64
var rangeFrom, rangeTo time.Time
@ -469,6 +440,7 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common
}
}
metricItem.Request = string(queryDSL)
metricItem.HitsTotal = response.GetTotal()
result[metricItem.Key] = metricItem
}
@ -1213,6 +1185,7 @@ func parseSingleIndexMetrics(ctx context.Context, clusterID string, metricItems
}
}
metricItem.Request = string(queryDSL)
metricItem.HitsTotal = response.GetTotal()
result[metricItem.Key] = metricItem
}

View File

@ -632,7 +632,7 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque
},
}
resBody := map[string]interface{}{}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req,10,60)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req,clusterID, v1.MetricTypeNodeStats,60)
if err != nil {
log.Error(err)
resBody["error"] = err
@ -803,6 +803,16 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque
return
}
}
if _, ok := metrics[metricKey]; ok {
if metrics[metricKey].HitsTotal > 0 {
minBucketSize, err := v1.GetMetricMinBucketSize(clusterID, v1.MetricTypeNodeStats)
if err != nil {
log.Error(err)
}else{
metrics[metricKey].MinBucketSize = int64(minBucketSize)
}
}
}
resBody["metrics"] = metrics
h.WriteJSON(w, resBody, http.StatusOK)

View File

@ -110,7 +110,7 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
}
//fetch cluster metrics
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 60, (15))
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, clusterIDs[0], MetricTypeIndexStats, 15)
if err != nil {
panic(err)
return

View File

@ -469,20 +469,13 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
},
}
resBody := map[string]interface{}{}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 60)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, clusterID, MetricTypeIndexStats, 60)
if err != nil {
log.Error(err)
resBody["error"] = err
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
meta := elastic.GetMetadata(clusterID)
if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.IndexStats.Interval != "" {
du, _ := time.ParseDuration(meta.Config.MonitorConfigs.IndexStats.Interval)
if bucketSize < int(du.Seconds()) {
bucketSize = int(du.Seconds())
}
}
query := map[string]interface{}{}
query["query"] = util.MapStr{
"bool": util.MapStr{
@ -577,6 +570,16 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
return
}
}
if _, ok := metrics[metricKey]; ok {
if metrics[metricKey].HitsTotal > 0 {
minBucketSize, err := GetMetricMinBucketSize(clusterID, MetricTypeIndexStats)
if err != nil {
log.Error(err)
}else{
metrics[metricKey].MinBucketSize = int64(minBucketSize)
}
}
}
resBody["metrics"] = metrics
h.WriteJSON(w, resBody, http.StatusOK)
}
@ -669,6 +672,7 @@ func (h *APIHandler) GetIndexHealthMetric(ctx context.Context, id, indexName str
metricItem.Lines[0].Data = metricData
metricItem.Lines[0].Type = common.GraphTypeBar
metricItem.Request = string(queryDSL)
metricItem.HitsTotal = response.GetTotal()
return metricItem, nil
}

View File

@ -501,19 +501,29 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
resBody := map[string]interface{}{}
id := ps.ByName("id")
key := h.GetParameter(req, "key")
var metricType string
switch key {
case IndexThroughputMetricKey, SearchThroughputMetricKey, IndexLatencyMetricKey, SearchLatencyMetricKey:
metricType = MetricTypeIndexStats
case ClusterDocumentsMetricKey,
ClusterStorageMetricKey,
ClusterIndicesMetricKey,
ClusterNodeCountMetricKey, ClusterHealthMetricKey:
metricType = MetricTypeClusterStats
case ShardCountMetricKey:
metricType = MetricTypeClusterHealth
case CircuitBreakerMetricKey:
metricType = MetricTypeNodeStats
default:
h.WriteError(w, "invalid metric key", http.StatusBadRequest)
return
}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, metricType, 90)
if err != nil {
panic(err)
return
}
meta := elastic.GetMetadata(id)
if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.IndexStats.Enabled && meta.Config.MonitorConfigs.IndexStats.Interval != "" {
du, _ := time.ParseDuration(meta.Config.MonitorConfigs.IndexStats.Interval)
if bucketSize < int(du.Seconds()) {
bucketSize = int(du.Seconds())
}
}
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
du, err := time.ParseDuration(timeout)
@ -524,24 +534,22 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
var metrics interface{}
var metrics map[string]*common.MetricItem
if util.StringInArray([]string{IndexThroughputMetricKey, SearchThroughputMetricKey, IndexLatencyMetricKey, SearchLatencyMetricKey}, key) {
metrics, err = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key)
} else {
if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.ClusterStats.Enabled && meta.Config.MonitorConfigs.ClusterStats.Interval != "" {
du, _ := time.ParseDuration(meta.Config.MonitorConfigs.ClusterStats.Interval)
if bucketSize < int(du.Seconds()) {
bucketSize = int(du.Seconds())
}
}
if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.ClusterHealth.Enabled && meta.Config.MonitorConfigs.ClusterHealth.Interval != "" {
du, _ := time.ParseDuration(meta.Config.MonitorConfigs.ClusterStats.Interval)
if bucketSize < int(du.Seconds()) {
bucketSize = int(du.Seconds())
}
}
metrics, err = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key)
}
if _, ok := metrics[key]; ok {
if metrics[key].HitsTotal > 0 {
minBucketSize, err := GetMetricMinBucketSize(id, metricType)
if err != nil {
log.Error(err)
}else{
metrics[key].MinBucketSize = int64(minBucketSize)
}
}
}
if err != nil {
log.Error(err)
h.WriteError(w, err, http.StatusInternalServerError)
@ -560,20 +568,13 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
resBody := map[string]interface{}{}
id := ps.ByName("id")
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, MetricTypeIndexStats, 90)
if err != nil {
log.Error(err)
resBody["error"] = err
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
meta := elastic.GetMetadata(id)
if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.IndexStats.Interval != "" {
du, _ := time.ParseDuration(meta.Config.MonitorConfigs.IndexStats.Interval)
if bucketSize < int(du.Seconds()) {
bucketSize = int(du.Seconds())
}
}
indexName := h.Get(req, "index_name", "")
top := h.GetIntOrDefault(req, "top", 5)
key := h.GetParameter(req, "key")
@ -654,6 +655,16 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R
return
}
}
if _, ok := metrics[key]; ok {
if metrics[key].HitsTotal > 0 {
minBucketSize, err := GetMetricMinBucketSize(id, MetricTypeNodeStats)
if err != nil {
log.Error(err)
}else{
metrics[key].MinBucketSize = int64(minBucketSize)
}
}
}
resBody["metrics"] = metrics
ver := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).GetVersion()
@ -1217,6 +1228,7 @@ func (h *APIHandler) getClusterStatusMetric(ctx context.Context, id string, min,
metricItem.Lines[0].Data = metricData
metricItem.Lines[0].Type = common.GraphTypeBar
metricItem.Request = string(queryDSL)
metricItem.HitsTotal = response.GetTotal()
return metricItem, nil
}

View File

@ -197,6 +197,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{
result := map[string]*common.MetricItem{}
hitsTotal := response.GetTotal()
for _, metricItem := range grpMetricItems {
for _, line := range metricItem.MetricItem.Lines {
line.TimeRange = common.TimeRange{Min: minDate, Max: maxDate}
@ -207,6 +208,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{
line.Data = grpMetricData[dataKey][line.Metric.Label]
}
metricItem.MetricItem.Request = string(queryDSL)
metricItem.MetricItem.HitsTotal = hitsTotal
result[metricItem.Key] = metricItem.MetricItem
}
return result, nil
@ -225,16 +227,56 @@ func GetMinBucketSize() int {
return metricsCfg.MinBucketSizeInSeconds
}
// defaultBucketSize 也就是每次聚合的时间间隔
func (h *APIHandler) getMetricRangeAndBucketSize(req *http.Request, defaultBucketSize, defaultMetricCount int) (int, int64, int64, error) {
minBucketSizeInSeconds := GetMinBucketSize()
if defaultBucketSize <= 0 {
defaultBucketSize = minBucketSizeInSeconds
const (
MetricTypeClusterHealth = "cluster_health"
MetricTypeClusterStats = "cluster_stats"
MetricTypeNodeStats = "node_stats"
MetricTypeIndexStats = "index_stats"
)
//GetMetricMinBucketSize returns twice the metrics collection interval based on the cluster ID and metric type
func GetMetricMinBucketSize(clusterID, metricType string) (int, error) {
meta := elastic.GetMetadata(clusterID)
if meta == nil {
return 0, fmt.Errorf("got empty metadata for cluster: %s", clusterID)
}
var interval string
switch metricType {
case MetricTypeClusterHealth:
if meta.Config.MonitorConfigs != nil {
interval = meta.Config.MonitorConfigs.ClusterHealth.Interval
}
case MetricTypeClusterStats:
if meta.Config.MonitorConfigs != nil {
interval = meta.Config.MonitorConfigs.ClusterStats.Interval
}
case MetricTypeNodeStats:
if meta.Config.MonitorConfigs != nil {
interval = meta.Config.MonitorConfigs.NodeStats.Interval
}
case MetricTypeIndexStats:
if meta.Config.MonitorConfigs != nil {
interval = meta.Config.MonitorConfigs.IndexStats.Interval
}
default:
return 0, fmt.Errorf("invalid metric name: %s", metricType)
}
if interval != "" {
du, err := util.ParseDuration(interval)
if err != nil {
return 0, err
}
return int(du.Seconds()) * 2, nil
}
// default to 20 seconds
return 20, nil
}
// defaultBucketSize 也就是每次聚合的时间间隔
func (h *APIHandler) GetMetricRangeAndBucketSize(req *http.Request, clusterID, metricType string, defaultMetricCount int) (int, int64, int64, error) {
if defaultMetricCount <= 0 {
defaultMetricCount = 15 * 60
}
bucketSize := defaultBucketSize
bucketSize := 0
bucketSizeStr := h.GetParameterOrDefault(req, "bucket_size", "") //默认 10每个 bucket 的时间范围,单位秒
if bucketSizeStr != "" {
@ -243,19 +285,31 @@ func (h *APIHandler) getMetricRangeAndBucketSize(req *http.Request, defaultBucke
return 0, 0, 0, err
}
bucketSize = int(du.Seconds())
}else {
bucketSize = 0
}
metricCount := h.GetIntOrDefault(req, "metric_count", defaultMetricCount) //默认 15分钟的区间每分钟15个指标也就是 15*6 个 bucket //90
//min,max are unix nanoseconds
minStr := h.Get(req, "min", "")
maxStr := h.Get(req, "max", "")
var (
minBucketSize = 0
err error
)
//clusterID may be empty when querying host metrics
if clusterID != "" {
minBucketSize, err = GetMetricMinBucketSize(clusterID, metricType)
if err != nil {
return 0, 0, 0, fmt.Errorf("failed to get min bucket size for cluster [%s]:%w", clusterID, err)
}
}else{
//default to 20
minBucketSize = 20
}
return GetMetricRangeAndBucketSize(minStr, maxStr, bucketSize, metricCount)
return GetMetricRangeAndBucketSize(minStr, maxStr, bucketSize, metricCount, minBucketSize)
}
func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, metricCount int) (int, int64, int64, error) {
func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, metricCount int, minBucketSize int) (int, int64, int64, error) {
var min, max int64
var rangeFrom, rangeTo time.Time
var err error
@ -324,6 +378,9 @@ func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, m
} else if hours >= 30*24+1 { //>30days
bucketSize = 60 * 60 * 24 //daily bucket
}
if bucketSize < minBucketSize {
bucketSize = minBucketSize
}
}
return bucketSize, min, max, nil
@ -445,12 +502,14 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common
result := map[string]*common.MetricItem{}
hitsTotal := response.GetTotal()
for _, metricItem := range metricItems {
for _, line := range metricItem.Lines {
line.TimeRange = common.TimeRange{Min: minDate, Max: maxDate}
line.Data = metricData[line.Metric.GetDataKey()]
}
metricItem.Request = string(queryDSL)
metricItem.HitsTotal = hitsTotal
result[metricItem.Key] = metricItem
}

View File

@ -251,6 +251,11 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
if len(results.Result) == 0 {
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
return
}
var clusterID string
statusMap := map[string]interface{}{}
for _, v := range results.Result {
result, ok := v.(map[string]interface{})
@ -288,7 +293,7 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps
source["shard_info"] = shardInfo
}
if tempClusterID, ok := util.GetMapValueByKeys([]string{"metadata", "labels", "cluster_id"}, result); ok {
if clusterID, ok := tempClusterID.(string); ok {
if clusterID, ok = tempClusterID.(string); ok {
if meta := elastic.GetMetadata(clusterID); meta != nil && meta.ClusterState != nil {
source["is_master_node"] = meta.ClusterState.MasterNode == nodeID
}
@ -305,7 +310,7 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 60, (15))
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, clusterID, MetricTypeNodeStats, 15)
if err != nil {
panic(err)
return
@ -607,20 +612,13 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque
},
}
resBody := map[string]interface{}{}
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req,10,60)
bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req,clusterID, MetricTypeNodeStats,60)
if err != nil {
log.Error(err)
resBody["error"] = err
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
meta := elastic.GetMetadata(clusterID)
if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.NodeStats.Interval != "" {
du, _ := time.ParseDuration(meta.Config.MonitorConfigs.NodeStats.Interval)
if bucketSize < int(du.Seconds()) {
bucketSize = int(du.Seconds())
}
}
query:=map[string]interface{}{}
query["query"]=util.MapStr{
"bool": util.MapStr{