diff --git a/docs/content.en/docs/release-notes/_index.md b/docs/content.en/docs/release-notes/_index.md index 045433f1..93793e94 100644 --- a/docs/content.en/docs/release-notes/_index.md +++ b/docs/content.en/docs/release-notes/_index.md @@ -21,6 +21,7 @@ Information about release notes of INFINI Console is provided here. - Optimize UI of agent list when its columns are overflow. - Add loading to each row in overview table. - Adapter metrics query with cluster id and cluster uuid +- Optimize metric query bucket size (#59) - Add suggestion to chart in monitor if is no data because the time interval is less than the collection interval. diff --git a/modules/elastic/api/host.go b/modules/elastic/api/host.go index 9cdb5adf..f0ba251f 100644 --- a/modules/elastic/api/host.go +++ b/modules/elastic/api/host.go @@ -466,7 +466,7 @@ func (h *APIHandler) FetchHostInfo(w http.ResponseWriter, req *http.Request, ps return } - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 60, 15) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, "", "", 15) if err != nil { panic(err) return @@ -712,7 +712,7 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque } resBody := map[string]interface{}{} - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 60) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, "", "", 60) if err != nil { log.Error(err) resBody["error"] = err diff --git a/modules/elastic/api/index_overview.go b/modules/elastic/api/index_overview.go index 321d5f3c..2a5d3ecc 100644 --- a/modules/elastic/api/index_overview.go +++ b/modules/elastic/api/index_overview.go @@ -835,7 +835,7 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ }, http.StatusForbidden) return } - clusterUUID, err := adapter.GetClusterUUID(clusterID) + clusterUUID, err := h.getClusterUUID(clusterID) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -844,13 +844,6 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ shardID := h.GetParameterOrDefault(req, "shard_id", "") var must = []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.cluster_uuid": util.MapStr{ - "value": clusterUUID, - }, - }, - }, { "term": util.MapStr{ "metadata.category": util.MapStr{ @@ -883,7 +876,15 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ }) } resBody := map[string]interface{}{} - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 60) + metricKey := h.GetParameter(req, "key") + var metricType string + if metricKey == v1.IndexHealthMetricKey { + metricType = v1.MetricTypeClusterHealth + }else{ + //for agent mode + metricType = v1.MetricTypeNodeStats + } + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, clusterID, metricType,60) if err != nil { log.Error(err) resBody["error"] = err @@ -893,7 +894,6 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ if bucketSize <= 60 { min = min - int64(2 * bucketSize * 1000) } - metricKey := h.GetParameter(req, "key") timeout := h.GetParameterOrDefault(req, "timeout", "60s") du, err := time.ParseDuration(timeout) if err != nil { @@ -907,6 +907,22 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ query["query"] = util.MapStr{ "bool": util.MapStr{ "must": must, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.labels.cluster_uuid": util.MapStr{ + "value": clusterUUID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.cluster_id": util.MapStr{ + "value": clusterID, + }, + }, + }, + }, "filter": []util.MapStr{ { "range": util.MapStr{ @@ -1009,6 +1025,16 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ h.WriteError(w, err, http.StatusInternalServerError) } } + if _, ok := metrics[metricKey]; ok { + if metrics[metricKey].HitsTotal > 0 { + minBucketSize, err := v1.GetMetricMinBucketSize(clusterID, metricType) + if err != nil { + log.Error(err) + }else{ + metrics[metricKey].MinBucketSize = int64(minBucketSize) + } + } + } resBody["metrics"] = metrics h.WriteJSON(w, resBody, http.StatusOK) @@ -1102,6 +1128,7 @@ func (h *APIHandler) getIndexShardsMetric(ctx context.Context, id, indexName str metricItem.Lines[0].Data = metricData metricItem.Lines[0].Type = common.GraphTypeBar metricItem.Request = string(queryDSL) + metricItem.HitsTotal = response.GetTotal() return metricItem, nil } diff --git a/modules/elastic/api/manage.go b/modules/elastic/api/manage.go index 1fb19019..b33c74b9 100644 --- a/modules/elastic/api/manage.go +++ b/modules/elastic/api/manage.go @@ -532,21 +532,30 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http return } key := h.GetParameter(req, "key") + var metricType string + switch key { + case v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey, CircuitBreakerMetricKey: + metricType = v1.MetricTypeNodeStats + case ClusterDocumentsMetricKey, + ClusterStorageMetricKey, + ClusterIndicesMetricKey, + ClusterNodeCountMetricKey: + metricType = v1.MetricTypeClusterStats + case ClusterHealthMetricKey: + metricType = v1.MetricTypeClusterStats + case ShardCountMetricKey: + metricType = v1.MetricTypeClusterHealth + default: + h.WriteError(w, "invalid metric key", http.StatusBadRequest) + return + } - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, metricType, 90) if err != nil { panic(err) return } - meta := elastic.GetMetadata(id) - if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.IndexStats.Enabled && meta.Config.MonitorConfigs.IndexStats.Interval != "" { - du, _ := time.ParseDuration(meta.Config.MonitorConfigs.IndexStats.Interval) - if bucketSize < int(du.Seconds()) { - bucketSize = int(du.Seconds()) - } - } - - var metrics interface{} + var metrics map[string]*common.MetricItem if bucketSize <= 60 { min = min - int64(2*bucketSize*1000) } @@ -569,6 +578,16 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http h.WriteError(w, err, http.StatusInternalServerError) return } + if _, ok := metrics[key]; ok { + if metrics[key].HitsTotal > 0 { + minBucketSize, err := v1.GetMetricMinBucketSize(id, metricType) + if err != nil { + log.Error(err) + }else{ + metrics[key].MinBucketSize = int64(minBucketSize) + } + } + } resBody["metrics"] = metrics @@ -582,7 +601,7 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http func (h *APIHandler) HandleNodeMetricsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { resBody := map[string]interface{}{} id := ps.ByName("id") - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, v1.MetricTypeNodeStats, 90) if err != nil { log.Error(err) resBody["error"] = err @@ -604,12 +623,23 @@ func (h *APIHandler) HandleNodeMetricsAction(w http.ResponseWriter, req *http.Re } ctx, cancel := context.WithTimeout(context.Background(), du) defer cancel() - resBody["metrics"], err = h.getNodeMetrics(ctx, id, bucketSize, min, max, nodeName, top, key) + metrics, err := h.getNodeMetrics(ctx, id, bucketSize, min, max, nodeName, top, key) if err != nil { log.Error(err) h.WriteError(w, err, http.StatusInternalServerError) return } + if _, ok := metrics[key]; ok { + if metrics[key].HitsTotal > 0 { + minBucketSize, err := v1.GetMetricMinBucketSize(id, v1.MetricTypeNodeStats) + if err != nil { + log.Error(err) + }else{ + metrics[key].MinBucketSize = int64(minBucketSize) + } + } + } + resBody["metrics"] = metrics ver := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).GetVersion() if ver.Distribution == "" { cr, err := util.VersionCompare(ver.Number, "6.1") @@ -634,7 +664,7 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R h.APIHandler.HandleIndexMetricsAction(w, req, ps) return } - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, v1.MetricTypeNodeStats, 90) if err != nil { log.Error(err) resBody["error"] = err @@ -729,6 +759,16 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R return } } + if _, ok := metrics[key]; ok { + if metrics[key].HitsTotal > 0 { + minBucketSize, err := v1.GetMetricMinBucketSize(id, v1.MetricTypeNodeStats) + if err != nil { + log.Error(err) + }else{ + metrics[key].MinBucketSize = int64(minBucketSize) + } + } + } resBody["metrics"] = metrics ver := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).GetVersion() if ver.Distribution == "" { @@ -749,7 +789,7 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R func (h *APIHandler) HandleQueueMetricsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { resBody := map[string]interface{}{} id := ps.ByName("id") - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, v1.MetricTypeNodeStats, 90) if err != nil { log.Error(err) resBody["error"] = err @@ -771,12 +811,23 @@ func (h *APIHandler) HandleQueueMetricsAction(w http.ResponseWriter, req *http.R } ctx, cancel := context.WithTimeout(context.Background(), du) defer cancel() - resBody["metrics"], err = h.getThreadPoolMetrics(ctx, id, bucketSize, min, max, nodeName, top, key) + metrics, err := h.getThreadPoolMetrics(ctx, id, bucketSize, min, max, nodeName, top, key) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + if _, ok := metrics[key]; ok { + if metrics[key].HitsTotal > 0 { + minBucketSize, err := v1.GetMetricMinBucketSize(id, v1.MetricTypeNodeStats) + if err != nil { + log.Error(err) + }else{ + metrics[key].MinBucketSize = int64(minBucketSize) + } + } + } + resBody["metrics"] = metrics ver := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).GetVersion() if ver.Distribution == "" { cr, err := util.VersionCompare(ver.Number, "6.1") @@ -1356,6 +1407,7 @@ func (h *APIHandler) getClusterStatusMetric(ctx context.Context, id string, min, metricItem.Lines[0].Data = metricData metricItem.Lines[0].Type = common.GraphTypeBar metricItem.Request = string(queryDSL) + metricItem.HitsTotal = response.GetTotal() return metricItem, nil } diff --git a/modules/elastic/api/metrics_util.go b/modules/elastic/api/metrics_util.go index 6f39a121..73441d69 100644 --- a/modules/elastic/api/metrics_util.go +++ b/modules/elastic/api/metrics_util.go @@ -27,7 +27,6 @@ import ( "context" "fmt" "infini.sh/framework/core/env" - "net/http" "strings" "time" @@ -197,6 +196,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{ result := map[string]*common.MetricItem{} + hitsTotal := response.GetTotal() for _, metricItem := range grpMetricItems { for _, line := range metricItem.MetricItem.Lines { line.TimeRange = common.TimeRange{Min: minDate, Max: maxDate} @@ -216,6 +216,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{ } } metricItem.MetricItem.Request = string(queryDSL) + metricItem.MetricItem.HitsTotal = hitsTotal result[metricItem.Key] = metricItem.MetricItem } return result, nil @@ -234,36 +235,6 @@ func GetMinBucketSize() int { return metricsCfg.MinBucketSizeInSeconds } -// defaultBucketSize 也就是每次聚合的时间间隔 -func (h *APIHandler) getMetricRangeAndBucketSize(req *http.Request, defaultBucketSize, defaultMetricCount int) (int, int64, int64, error) { - minBucketSizeInSeconds := GetMinBucketSize() - if defaultBucketSize <= 0 { - defaultBucketSize = minBucketSizeInSeconds - } - if defaultMetricCount <= 0 { - defaultMetricCount = 15 * 60 - } - bucketSize := defaultBucketSize - - bucketSizeStr := h.GetParameterOrDefault(req, "bucket_size", "") //默认 10,每个 bucket 的时间范围,单位秒 - if bucketSizeStr != "" { - du, err := util.ParseDuration(bucketSizeStr) - if err != nil { - return 0, 0, 0, err - } - bucketSize = int(du.Seconds()) - }else { - bucketSize = 0 - } - metricCount := h.GetIntOrDefault(req, "metric_count", defaultMetricCount) //默认 15分钟的区间,每分钟15个指标,也就是 15*6 个 bucket //90 - //min,max are unix nanoseconds - - minStr := h.Get(req, "min", "") - maxStr := h.Get(req, "max", "") - - return GetMetricRangeAndBucketSize(minStr, maxStr, bucketSize, metricCount) -} - func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, metricCount int) (int, int64, int64, error) { var min, max int64 var rangeFrom, rangeTo time.Time @@ -469,6 +440,7 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common } } metricItem.Request = string(queryDSL) + metricItem.HitsTotal = response.GetTotal() result[metricItem.Key] = metricItem } @@ -1213,6 +1185,7 @@ func parseSingleIndexMetrics(ctx context.Context, clusterID string, metricItems } } metricItem.Request = string(queryDSL) + metricItem.HitsTotal = response.GetTotal() result[metricItem.Key] = metricItem } diff --git a/modules/elastic/api/node_overview.go b/modules/elastic/api/node_overview.go index bd19a017..446de777 100644 --- a/modules/elastic/api/node_overview.go +++ b/modules/elastic/api/node_overview.go @@ -632,7 +632,7 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque }, } resBody := map[string]interface{}{} - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req,10,60) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req,clusterID, v1.MetricTypeNodeStats,60) if err != nil { log.Error(err) resBody["error"] = err @@ -803,6 +803,16 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque return } } + if _, ok := metrics[metricKey]; ok { + if metrics[metricKey].HitsTotal > 0 { + minBucketSize, err := v1.GetMetricMinBucketSize(clusterID, v1.MetricTypeNodeStats) + if err != nil { + log.Error(err) + }else{ + metrics[metricKey].MinBucketSize = int64(minBucketSize) + } + } + } resBody["metrics"] = metrics h.WriteJSON(w, resBody, http.StatusOK) diff --git a/modules/elastic/api/v1/cluster_overview.go b/modules/elastic/api/v1/cluster_overview.go index ec7afb77..6b65fe27 100644 --- a/modules/elastic/api/v1/cluster_overview.go +++ b/modules/elastic/api/v1/cluster_overview.go @@ -110,7 +110,7 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request, } //fetch cluster metrics - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 60, (15)) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, clusterIDs[0], MetricTypeIndexStats, 15) if err != nil { panic(err) return diff --git a/modules/elastic/api/v1/index_overview.go b/modules/elastic/api/v1/index_overview.go index f9007ee7..c1f019ba 100644 --- a/modules/elastic/api/v1/index_overview.go +++ b/modules/elastic/api/v1/index_overview.go @@ -469,20 +469,13 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ }, } resBody := map[string]interface{}{} - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 60) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, clusterID, MetricTypeIndexStats, 60) if err != nil { log.Error(err) resBody["error"] = err h.WriteJSON(w, resBody, http.StatusInternalServerError) return } - meta := elastic.GetMetadata(clusterID) - if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.IndexStats.Interval != "" { - du, _ := time.ParseDuration(meta.Config.MonitorConfigs.IndexStats.Interval) - if bucketSize < int(du.Seconds()) { - bucketSize = int(du.Seconds()) - } - } query := map[string]interface{}{} query["query"] = util.MapStr{ "bool": util.MapStr{ @@ -577,6 +570,16 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ return } } + if _, ok := metrics[metricKey]; ok { + if metrics[metricKey].HitsTotal > 0 { + minBucketSize, err := GetMetricMinBucketSize(clusterID, MetricTypeIndexStats) + if err != nil { + log.Error(err) + }else{ + metrics[metricKey].MinBucketSize = int64(minBucketSize) + } + } + } resBody["metrics"] = metrics h.WriteJSON(w, resBody, http.StatusOK) } @@ -669,6 +672,7 @@ func (h *APIHandler) GetIndexHealthMetric(ctx context.Context, id, indexName str metricItem.Lines[0].Data = metricData metricItem.Lines[0].Type = common.GraphTypeBar metricItem.Request = string(queryDSL) + metricItem.HitsTotal = response.GetTotal() return metricItem, nil } diff --git a/modules/elastic/api/v1/manage.go b/modules/elastic/api/v1/manage.go index 5536408d..de753919 100644 --- a/modules/elastic/api/v1/manage.go +++ b/modules/elastic/api/v1/manage.go @@ -501,19 +501,29 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http resBody := map[string]interface{}{} id := ps.ByName("id") key := h.GetParameter(req, "key") + var metricType string + switch key { + case IndexThroughputMetricKey, SearchThroughputMetricKey, IndexLatencyMetricKey, SearchLatencyMetricKey: + metricType = MetricTypeIndexStats + case ClusterDocumentsMetricKey, + ClusterStorageMetricKey, + ClusterIndicesMetricKey, + ClusterNodeCountMetricKey, ClusterHealthMetricKey: + metricType = MetricTypeClusterStats + case ShardCountMetricKey: + metricType = MetricTypeClusterHealth + case CircuitBreakerMetricKey: + metricType = MetricTypeNodeStats + default: + h.WriteError(w, "invalid metric key", http.StatusBadRequest) + return + } - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, metricType, 90) if err != nil { panic(err) return } - meta := elastic.GetMetadata(id) - if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.IndexStats.Enabled && meta.Config.MonitorConfigs.IndexStats.Interval != "" { - du, _ := time.ParseDuration(meta.Config.MonitorConfigs.IndexStats.Interval) - if bucketSize < int(du.Seconds()) { - bucketSize = int(du.Seconds()) - } - } timeout := h.GetParameterOrDefault(req, "timeout", "60s") du, err := time.ParseDuration(timeout) @@ -524,24 +534,22 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http } ctx, cancel := context.WithTimeout(context.Background(), du) defer cancel() - var metrics interface{} + var metrics map[string]*common.MetricItem if util.StringInArray([]string{IndexThroughputMetricKey, SearchThroughputMetricKey, IndexLatencyMetricKey, SearchLatencyMetricKey}, key) { metrics, err = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key) } else { - if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.ClusterStats.Enabled && meta.Config.MonitorConfigs.ClusterStats.Interval != "" { - du, _ := time.ParseDuration(meta.Config.MonitorConfigs.ClusterStats.Interval) - if bucketSize < int(du.Seconds()) { - bucketSize = int(du.Seconds()) - } - } - if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.ClusterHealth.Enabled && meta.Config.MonitorConfigs.ClusterHealth.Interval != "" { - du, _ := time.ParseDuration(meta.Config.MonitorConfigs.ClusterStats.Interval) - if bucketSize < int(du.Seconds()) { - bucketSize = int(du.Seconds()) - } - } metrics, err = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key) } + if _, ok := metrics[key]; ok { + if metrics[key].HitsTotal > 0 { + minBucketSize, err := GetMetricMinBucketSize(id, metricType) + if err != nil { + log.Error(err) + }else{ + metrics[key].MinBucketSize = int64(minBucketSize) + } + } + } if err != nil { log.Error(err) h.WriteError(w, err, http.StatusInternalServerError) @@ -560,20 +568,13 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { resBody := map[string]interface{}{} id := ps.ByName("id") - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 10, 90) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, id, MetricTypeIndexStats, 90) if err != nil { log.Error(err) resBody["error"] = err h.WriteJSON(w, resBody, http.StatusInternalServerError) return } - meta := elastic.GetMetadata(id) - if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.IndexStats.Interval != "" { - du, _ := time.ParseDuration(meta.Config.MonitorConfigs.IndexStats.Interval) - if bucketSize < int(du.Seconds()) { - bucketSize = int(du.Seconds()) - } - } indexName := h.Get(req, "index_name", "") top := h.GetIntOrDefault(req, "top", 5) key := h.GetParameter(req, "key") @@ -654,6 +655,16 @@ func (h *APIHandler) HandleIndexMetricsAction(w http.ResponseWriter, req *http.R return } } + if _, ok := metrics[key]; ok { + if metrics[key].HitsTotal > 0 { + minBucketSize, err := GetMetricMinBucketSize(id, MetricTypeNodeStats) + if err != nil { + log.Error(err) + }else{ + metrics[key].MinBucketSize = int64(minBucketSize) + } + } + } resBody["metrics"] = metrics ver := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).GetVersion() @@ -1217,6 +1228,7 @@ func (h *APIHandler) getClusterStatusMetric(ctx context.Context, id string, min, metricItem.Lines[0].Data = metricData metricItem.Lines[0].Type = common.GraphTypeBar metricItem.Request = string(queryDSL) + metricItem.HitsTotal = response.GetTotal() return metricItem, nil } diff --git a/modules/elastic/api/v1/metrics_util.go b/modules/elastic/api/v1/metrics_util.go index 626d23e6..1fb3a744 100644 --- a/modules/elastic/api/v1/metrics_util.go +++ b/modules/elastic/api/v1/metrics_util.go @@ -197,6 +197,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{ result := map[string]*common.MetricItem{} + hitsTotal := response.GetTotal() for _, metricItem := range grpMetricItems { for _, line := range metricItem.MetricItem.Lines { line.TimeRange = common.TimeRange{Min: minDate, Max: maxDate} @@ -207,6 +208,7 @@ func (h *APIHandler) getMetrics(ctx context.Context, query map[string]interface{ line.Data = grpMetricData[dataKey][line.Metric.Label] } metricItem.MetricItem.Request = string(queryDSL) + metricItem.MetricItem.HitsTotal = hitsTotal result[metricItem.Key] = metricItem.MetricItem } return result, nil @@ -225,16 +227,56 @@ func GetMinBucketSize() int { return metricsCfg.MinBucketSizeInSeconds } -// defaultBucketSize 也就是每次聚合的时间间隔 -func (h *APIHandler) getMetricRangeAndBucketSize(req *http.Request, defaultBucketSize, defaultMetricCount int) (int, int64, int64, error) { - minBucketSizeInSeconds := GetMinBucketSize() - if defaultBucketSize <= 0 { - defaultBucketSize = minBucketSizeInSeconds +const ( + MetricTypeClusterHealth = "cluster_health" + MetricTypeClusterStats = "cluster_stats" + MetricTypeNodeStats = "node_stats" + MetricTypeIndexStats = "index_stats" +) +//GetMetricMinBucketSize returns twice the metrics collection interval based on the cluster ID and metric type +func GetMetricMinBucketSize(clusterID, metricType string) (int, error) { + meta := elastic.GetMetadata(clusterID) + if meta == nil { + return 0, fmt.Errorf("got empty metadata for cluster: %s", clusterID) } + var interval string + switch metricType { + case MetricTypeClusterHealth: + if meta.Config.MonitorConfigs != nil { + interval = meta.Config.MonitorConfigs.ClusterHealth.Interval + } + case MetricTypeClusterStats: + if meta.Config.MonitorConfigs != nil { + interval = meta.Config.MonitorConfigs.ClusterStats.Interval + } + case MetricTypeNodeStats: + if meta.Config.MonitorConfigs != nil { + interval = meta.Config.MonitorConfigs.NodeStats.Interval + } + case MetricTypeIndexStats: + if meta.Config.MonitorConfigs != nil { + interval = meta.Config.MonitorConfigs.IndexStats.Interval + } + default: + return 0, fmt.Errorf("invalid metric name: %s", metricType) + } + if interval != "" { + du, err := util.ParseDuration(interval) + if err != nil { + return 0, err + } + return int(du.Seconds()) * 2, nil + } + // default to 20 seconds + return 20, nil +} + +// defaultBucketSize 也就是每次聚合的时间间隔 +func (h *APIHandler) GetMetricRangeAndBucketSize(req *http.Request, clusterID, metricType string, defaultMetricCount int) (int, int64, int64, error) { if defaultMetricCount <= 0 { defaultMetricCount = 15 * 60 } - bucketSize := defaultBucketSize + bucketSize := 0 bucketSizeStr := h.GetParameterOrDefault(req, "bucket_size", "") //默认 10,每个 bucket 的时间范围,单位秒 if bucketSizeStr != "" { @@ -243,19 +285,31 @@ func (h *APIHandler) getMetricRangeAndBucketSize(req *http.Request, defaultBucke return 0, 0, 0, err } bucketSize = int(du.Seconds()) - }else { - bucketSize = 0 } metricCount := h.GetIntOrDefault(req, "metric_count", defaultMetricCount) //默认 15分钟的区间,每分钟15个指标,也就是 15*6 个 bucket //90 //min,max are unix nanoseconds minStr := h.Get(req, "min", "") maxStr := h.Get(req, "max", "") + var ( + minBucketSize = 0 + err error + ) + //clusterID may be empty when querying host metrics + if clusterID != "" { + minBucketSize, err = GetMetricMinBucketSize(clusterID, metricType) + if err != nil { + return 0, 0, 0, fmt.Errorf("failed to get min bucket size for cluster [%s]:%w", clusterID, err) + } + }else{ + //default to 20 + minBucketSize = 20 + } - return GetMetricRangeAndBucketSize(minStr, maxStr, bucketSize, metricCount) + return GetMetricRangeAndBucketSize(minStr, maxStr, bucketSize, metricCount, minBucketSize) } -func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, metricCount int) (int, int64, int64, error) { +func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, metricCount int, minBucketSize int) (int, int64, int64, error) { var min, max int64 var rangeFrom, rangeTo time.Time var err error @@ -324,6 +378,9 @@ func GetMetricRangeAndBucketSize(minStr string, maxStr string, bucketSize int, m } else if hours >= 30*24+1 { //>30days bucketSize = 60 * 60 * 24 //daily bucket } + if bucketSize < minBucketSize { + bucketSize = minBucketSize + } } return bucketSize, min, max, nil @@ -445,12 +502,14 @@ func (h *APIHandler) getSingleMetrics(ctx context.Context, metricItems []*common result := map[string]*common.MetricItem{} + hitsTotal := response.GetTotal() for _, metricItem := range metricItems { for _, line := range metricItem.Lines { line.TimeRange = common.TimeRange{Min: minDate, Max: maxDate} line.Data = metricData[line.Metric.GetDataKey()] } metricItem.Request = string(queryDSL) + metricItem.HitsTotal = hitsTotal result[metricItem.Key] = metricItem } diff --git a/modules/elastic/api/v1/node_overview.go b/modules/elastic/api/v1/node_overview.go index e8fcb2eb..a4048576 100644 --- a/modules/elastic/api/v1/node_overview.go +++ b/modules/elastic/api/v1/node_overview.go @@ -251,6 +251,11 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + if len(results.Result) == 0 { + h.WriteJSON(w, util.MapStr{}, http.StatusOK) + return + } + var clusterID string statusMap := map[string]interface{}{} for _, v := range results.Result { result, ok := v.(map[string]interface{}) @@ -288,7 +293,7 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps source["shard_info"] = shardInfo } if tempClusterID, ok := util.GetMapValueByKeys([]string{"metadata", "labels", "cluster_id"}, result); ok { - if clusterID, ok := tempClusterID.(string); ok { + if clusterID, ok = tempClusterID.(string); ok { if meta := elastic.GetMetadata(clusterID); meta != nil && meta.ClusterState != nil { source["is_master_node"] = meta.ClusterState.MasterNode == nodeID } @@ -305,7 +310,7 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 60, (15)) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req, clusterID, MetricTypeNodeStats, 15) if err != nil { panic(err) return @@ -607,20 +612,13 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque }, } resBody := map[string]interface{}{} - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req,10,60) + bucketSize, min, max, err := h.GetMetricRangeAndBucketSize(req,clusterID, MetricTypeNodeStats,60) if err != nil { log.Error(err) resBody["error"] = err h.WriteJSON(w, resBody, http.StatusInternalServerError) return } - meta := elastic.GetMetadata(clusterID) - if meta != nil && meta.Config.MonitorConfigs != nil && meta.Config.MonitorConfigs.NodeStats.Interval != "" { - du, _ := time.ParseDuration(meta.Config.MonitorConfigs.NodeStats.Interval) - if bucketSize < int(du.Seconds()) { - bucketSize = int(du.Seconds()) - } - } query:=map[string]interface{}{} query["query"]=util.MapStr{ "bool": util.MapStr{