update overview api

This commit is contained in:
liugq 2022-02-21 17:59:56 +08:00
parent 3c229077f8
commit aa680c2796
1 changed files with 61 additions and 28 deletions

View File

@ -15,38 +15,56 @@ func (handler APIHandler) ElasticsearchOverviewAction(w http.ResponseWriter, req
var ( var (
totalNode int totalNode int
totalStoreSize int totalStoreSize int
clusterIDs []interface{}
) )
elastic.WalkConfigs(func(key, value interface{})bool{ elastic.WalkConfigs(func(key, value interface{})bool{
if handler.Config.Elasticsearch == key { if handler.Config.Elasticsearch == key {
return true return true
} }
data, err := handler.getLatestClusterMonitorData(key) clusterIDs = append(clusterIDs, key)
if err != nil{ return true
log.Error(err) })
}
val, err := data.GetValue("payload.elasticsearch.cluster_stats.nodes.count.total") res, err := handler.getLatestClusterMonitorData(clusterIDs)
if err != nil { if err != nil {
log.Warn(err) log.Error(err)
} handler.WriteJSON(w, util.MapStr{
if num, ok := val.(float64); ok { "error": err.Error(),
totalNode += int(num) }, http.StatusInternalServerError)
} return
val, err = data.GetValue("payload.elasticsearch.cluster_stats.indices.store.size_in_bytes") }
for _, info := range res.Hits.Hits {
data := util.MapStr(info.Source)
//val, err := data.GetValue("payload.elasticsearch.cluster_stats.nodes.count.total")
//if err != nil {
// log.Warn(err)
//}
//if num, ok := val.(float64); ok {
// totalNode += int(num)
//}
val, err := data.GetValue("payload.elasticsearch.cluster_stats.indices.store.size_in_bytes")
if err != nil { if err != nil {
log.Warn(err) log.Warn(err)
} }
if num, ok := val.(float64); ok { if num, ok := val.(float64); ok {
totalStoreSize += int(num) totalStoreSize += int(num)
} }
return true }
})
hostCount, err := handler.getLastActiveHostCount() hostCount, err := handler.getMetricCount(orm.GetIndexName(elastic.HostMetadata{}), "metadata.host")
if err != nil{ if err != nil{
log.Error(err) log.Error(err)
} }
nodeCount, err := handler.getMetricCount(orm.GetIndexName(elastic.NodeMetadata{}), "node_id")
if err != nil{
log.Error(err)
}
if v, ok := nodeCount.(float64); ok {
totalNode = int(v)
}
resBody := util.MapStr{ resBody := util.MapStr{
"nodes_count": totalNode, "nodes_count": totalNode,
"cluster_count": len(clusterIDs),
"total_used_store_in_bytes": totalStoreSize, "total_used_store_in_bytes": totalStoreSize,
"hosts_count": hostCount, "hosts_count": hostCount,
//"hosts": hosts, //"hosts": hosts,
@ -54,18 +72,16 @@ func (handler APIHandler) ElasticsearchOverviewAction(w http.ResponseWriter, req
handler.WriteJSON(w, resBody, http.StatusOK) handler.WriteJSON(w, resBody, http.StatusOK)
} }
func (handler APIHandler) getLatestClusterMonitorData(clusterID interface{}) (util.MapStr, error){ func (handler APIHandler) getLatestClusterMonitorData(clusterIDs []interface{}) (*elastic.SearchResponse, error){
client := elastic.GetClient(handler.Config.Elasticsearch) client := elastic.GetClient(handler.Config.Elasticsearch)
queryDSLTpl := `{ queryDSLTpl := `{
"size": 1, "size": %d,
"query": { "query": {
"bool": { "bool": {
"must": [ "must": [
{ {
"term": { "terms": {
"metadata.labels.cluster_id": { "metadata.labels.cluster_id": %s
"value": "%s"
}
} }
}, },
{ {
@ -85,6 +101,9 @@ func (handler APIHandler) getLatestClusterMonitorData(clusterID interface{}) (ut
] ]
} }
}, },
"collapse": {
"field": "metadata.labels.cluster_id"
},
"sort": [ "sort": [
{ {
"timestamp": { "timestamp": {
@ -93,16 +112,30 @@ func (handler APIHandler) getLatestClusterMonitorData(clusterID interface{}) (ut
} }
] ]
}` }`
queryDSL := fmt.Sprintf(queryDSLTpl, clusterID) queryDSL := fmt.Sprintf(queryDSLTpl, len(clusterIDs), util.MustToJSONBytes(clusterIDs))
searchRes, err := client.SearchWithRawQueryDSL(orm.GetIndexName(event.Event{}), []byte(queryDSL)) return client.SearchWithRawQueryDSL(orm.GetWildcardIndexName(event.Event{}), []byte(queryDSL))
}
func (handler APIHandler) getMetricCount(indexName, field string) (interface{}, error){
client := elastic.GetClient(handler.Config.Elasticsearch)
queryDSL := `{
"size": 0,
"aggs": {
"field_count": {
"cardinality": {
"field": "%s"
}
}
}
}`
queryDSL = fmt.Sprintf(queryDSL, field)
searchRes, err := client.SearchWithRawQueryDSL(indexName, []byte(queryDSL))
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return nil, err return 0, err
} }
if len(searchRes.Hits.Hits) == 0 { return searchRes.Aggregations["field_count"].Value, nil
return nil, nil
}
return searchRes.Hits.Hits[0].Source, nil
} }
func (handler APIHandler) getLastActiveHostCount() (int, error){ func (handler APIHandler) getLastActiveHostCount() (int, error){