diff --git a/plugin/api/index_management/elasticsearch.go b/plugin/api/index_management/elasticsearch.go index 7af5b107..07410591 100644 --- a/plugin/api/index_management/elasticsearch.go +++ b/plugin/api/index_management/elasticsearch.go @@ -15,38 +15,56 @@ func (handler APIHandler) ElasticsearchOverviewAction(w http.ResponseWriter, req var ( totalNode int totalStoreSize int + clusterIDs []interface{} ) elastic.WalkConfigs(func(key, value interface{})bool{ if handler.Config.Elasticsearch == key { return true } - data, err := handler.getLatestClusterMonitorData(key) - if err != nil{ - log.Error(err) - } - val, err := data.GetValue("payload.elasticsearch.cluster_stats.nodes.count.total") - if err != nil { - log.Warn(err) - } - if num, ok := val.(float64); ok { - totalNode += int(num) - } - val, err = data.GetValue("payload.elasticsearch.cluster_stats.indices.store.size_in_bytes") + clusterIDs = append(clusterIDs, key) + return true + }) + + res, err := handler.getLatestClusterMonitorData(clusterIDs) + if err != nil { + log.Error(err) + handler.WriteJSON(w, util.MapStr{ + "error": err.Error(), + }, http.StatusInternalServerError) + return + } + for _, info := range res.Hits.Hits { + data := util.MapStr(info.Source) + //val, err := data.GetValue("payload.elasticsearch.cluster_stats.nodes.count.total") + //if err != nil { + // log.Warn(err) + //} + //if num, ok := val.(float64); ok { + // totalNode += int(num) + //} + val, err := data.GetValue("payload.elasticsearch.cluster_stats.indices.store.size_in_bytes") if err != nil { log.Warn(err) } if num, ok := val.(float64); ok { totalStoreSize += int(num) } - return true - }) + } - hostCount, err := handler.getLastActiveHostCount() + hostCount, err := handler.getMetricCount(orm.GetIndexName(elastic.HostMetadata{}), "metadata.host") if err != nil{ log.Error(err) } + nodeCount, err := handler.getMetricCount(orm.GetIndexName(elastic.NodeMetadata{}), "node_id") + if err != nil{ + log.Error(err) + } + if v, ok := nodeCount.(float64); ok { + totalNode = int(v) + } resBody := util.MapStr{ "nodes_count": totalNode, + "cluster_count": len(clusterIDs), "total_used_store_in_bytes": totalStoreSize, "hosts_count": hostCount, //"hosts": hosts, @@ -54,18 +72,16 @@ func (handler APIHandler) ElasticsearchOverviewAction(w http.ResponseWriter, req handler.WriteJSON(w, resBody, http.StatusOK) } -func (handler APIHandler) getLatestClusterMonitorData(clusterID interface{}) (util.MapStr, error){ +func (handler APIHandler) getLatestClusterMonitorData(clusterIDs []interface{}) (*elastic.SearchResponse, error){ client := elastic.GetClient(handler.Config.Elasticsearch) queryDSLTpl := `{ - "size": 1, + "size": %d, "query": { "bool": { "must": [ { - "term": { - "metadata.labels.cluster_id": { - "value": "%s" - } + "terms": { + "metadata.labels.cluster_id": %s } }, { @@ -85,6 +101,9 @@ func (handler APIHandler) getLatestClusterMonitorData(clusterID interface{}) (ut ] } }, + "collapse": { + "field": "metadata.labels.cluster_id" + }, "sort": [ { "timestamp": { @@ -93,16 +112,30 @@ func (handler APIHandler) getLatestClusterMonitorData(clusterID interface{}) (ut } ] }` - queryDSL := fmt.Sprintf(queryDSLTpl, clusterID) - searchRes, err := client.SearchWithRawQueryDSL(orm.GetIndexName(event.Event{}), []byte(queryDSL)) + queryDSL := fmt.Sprintf(queryDSLTpl, len(clusterIDs), util.MustToJSONBytes(clusterIDs)) + return client.SearchWithRawQueryDSL(orm.GetWildcardIndexName(event.Event{}), []byte(queryDSL)) + +} + +func (handler APIHandler) getMetricCount(indexName, field string) (interface{}, error){ + client := elastic.GetClient(handler.Config.Elasticsearch) + queryDSL := `{ + "size": 0, + "aggs": { + "field_count": { + "cardinality": { + "field": "%s" + } + } + } +}` + queryDSL = fmt.Sprintf(queryDSL, field) + searchRes, err := client.SearchWithRawQueryDSL(indexName, []byte(queryDSL)) if err != nil { log.Error(err) - return nil, err + return 0, err } - if len(searchRes.Hits.Hits) == 0 { - return nil, nil - } - return searchRes.Hits.Hits[0].Source, nil + return searchRes.Aggregations["field_count"].Value, nil } func (handler APIHandler) getLastActiveHostCount() (int, error){