Optimize overview (#23)
* chore: splitting overview info request * fix: no index health metric with agent mode * feat: add query param `timeout` for overview info api * fix: index overview info format
This commit is contained in:
parent
ccd44c9b7d
commit
217008fd2f
|
@ -50,6 +50,8 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
|
||||||
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
//only query the first cluster info
|
||||||
|
clusterIDs = clusterIDs[0:1]
|
||||||
|
|
||||||
cids := make([]interface{}, 0, len(clusterIDs))
|
cids := make([]interface{}, 0, len(clusterIDs))
|
||||||
for _, clusterID := range clusterIDs {
|
for _, clusterID := range clusterIDs {
|
||||||
|
@ -62,7 +64,7 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
|
||||||
q1.Conds = orm.And(
|
q1.Conds = orm.And(
|
||||||
orm.Eq("metadata.category", "elasticsearch"),
|
orm.Eq("metadata.category", "elasticsearch"),
|
||||||
orm.Eq("metadata.name", "cluster_stats"),
|
orm.Eq("metadata.name", "cluster_stats"),
|
||||||
orm.In("metadata.labels.cluster_id", cids),
|
orm.Eq("metadata.labels.cluster_id", cids[0]),
|
||||||
)
|
)
|
||||||
q1.Collapse("metadata.labels.cluster_id")
|
q1.Collapse("metadata.labels.cluster_id")
|
||||||
q1.AddSort("timestamp", orm.DESC)
|
q1.AddSort("timestamp", orm.DESC)
|
||||||
|
@ -173,8 +175,8 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
|
||||||
"bool": util.MapStr{
|
"bool": util.MapStr{
|
||||||
"must": []util.MapStr{
|
"must": []util.MapStr{
|
||||||
{
|
{
|
||||||
"terms": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.cluster_uuid": clusterUUIDs,
|
"metadata.labels.cluster_uuid": clusterUUIDs[0],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -252,7 +254,16 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
indexMetrics := h.getMetrics(context.Background(), query, indexMetricItems, bucketSize)
|
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
|
||||||
|
du, err := time.ParseDuration(timeout)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), du)
|
||||||
|
defer cancel()
|
||||||
|
indexMetrics := h.getMetrics(ctx, query, indexMetricItems, bucketSize)
|
||||||
indexingMetricData := util.MapStr{}
|
indexingMetricData := util.MapStr{}
|
||||||
for _, line := range indexMetrics["cluster_indexing"].Lines {
|
for _, line := range indexMetrics["cluster_indexing"].Lines {
|
||||||
// remove first metric dot
|
// remove first metric dot
|
||||||
|
@ -716,6 +727,9 @@ func (h *APIHandler) GetRealtimeClusterNodes(w http.ResponseWriter, req *http.Re
|
||||||
info.IndexQPS = qps[nodeInfo.Id]["index"]
|
info.IndexQPS = qps[nodeInfo.Id]["index"]
|
||||||
info.QueryQPS = qps[nodeInfo.Id]["query"]
|
info.QueryQPS = qps[nodeInfo.Id]["query"]
|
||||||
info.IndexBytesQPS = qps[nodeInfo.Id]["index_bytes"]
|
info.IndexBytesQPS = qps[nodeInfo.Id]["index_bytes"]
|
||||||
|
if v, ok := qps[nodeInfo.Id]["latest_timestamp"].(float64); ok {
|
||||||
|
info.Timestamp = uint64(v)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
nodeInfos = append(nodeInfos, info)
|
nodeInfos = append(nodeInfos, info)
|
||||||
}
|
}
|
||||||
|
@ -826,6 +840,7 @@ type RealtimeNodeInfo struct {
|
||||||
IndexQPS interface{} `json:"index_qps"`
|
IndexQPS interface{} `json:"index_qps"`
|
||||||
QueryQPS interface{} `json:"query_qps"`
|
QueryQPS interface{} `json:"query_qps"`
|
||||||
IndexBytesQPS interface{} `json:"index_bytes_qps"`
|
IndexBytesQPS interface{} `json:"index_bytes_qps"`
|
||||||
|
Timestamp uint64 `json:"timestamp"`
|
||||||
CatNodeResponse
|
CatNodeResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1122,6 +1137,11 @@ func (h *APIHandler) getNodeQPS(clusterID string, bucketSizeInSeconds int) (map[
|
||||||
"buckets_path": "query_total",
|
"buckets_path": "query_total",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"latest_timestamp": util.MapStr{
|
||||||
|
"max": util.MapStr{
|
||||||
|
"field": "timestamp",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
@ -31,6 +31,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
v1 "infini.sh/console/modules/elastic/api/v1"
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
"infini.sh/framework/core/event"
|
"infini.sh/framework/core/event"
|
||||||
|
@ -274,21 +275,42 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
|
||||||
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
indexIDs = indexIDs[0:1]
|
||||||
// map indexIDs(cluster_id:index_name => cluster_uuid:indexName)
|
// map indexIDs(cluster_id:index_name => cluster_uuid:indexName)
|
||||||
var (
|
var (
|
||||||
indexIDM = map[string]string{}
|
indexIDM = map[string]string{}
|
||||||
newIndexIDs []interface{}
|
newIndexIDs []interface{}
|
||||||
clusterIndexNames = map[string][]string{}
|
clusterIndexNames = map[string][]string{}
|
||||||
)
|
)
|
||||||
for _, indexID := range indexIDs {
|
indexID := indexIDs[0]
|
||||||
if v, ok := indexID.(string); ok {
|
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
|
||||||
parts := strings.Split(v, ":")
|
du, err := time.ParseDuration(timeout)
|
||||||
if len(parts) != 2 {
|
if err != nil {
|
||||||
log.Warnf("got wrong index_id: %s", v)
|
log.Error(err)
|
||||||
continue
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
}
|
return
|
||||||
clusterIndexNames[parts[0]] = append(clusterIndexNames[parts[0]], parts[1])
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), du)
|
||||||
|
defer cancel()
|
||||||
|
var (
|
||||||
|
firstClusterID string
|
||||||
|
firstIndexName string
|
||||||
|
)
|
||||||
|
if v, ok := indexID.(string); ok {
|
||||||
|
parts := strings.Split(v, ":")
|
||||||
|
if len(parts) != 2 {
|
||||||
|
h.WriteError(w, fmt.Sprintf("invalid index_id: %s", indexID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
firstClusterID, firstIndexName = parts[0], parts[1]
|
||||||
|
if GetMonitorState(firstClusterID) == Console {
|
||||||
|
h.APIHandler.FetchIndexInfo(w, ctx, indexIDs)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
clusterIndexNames[firstClusterID] = append(clusterIndexNames[firstClusterID], firstIndexName)
|
||||||
|
}else{
|
||||||
|
h.WriteError(w, fmt.Sprintf("invalid index_id: %v", indexID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
for clusterID, indexNames := range clusterIndexNames {
|
for clusterID, indexNames := range clusterIndexNames {
|
||||||
clusterUUID, err := adapter.GetClusterUUID(clusterID)
|
clusterUUID, err := adapter.GetClusterUUID(clusterID)
|
||||||
|
@ -306,7 +328,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
|
||||||
q1.Conds = orm.And(
|
q1.Conds = orm.And(
|
||||||
orm.Eq("metadata.category", "elasticsearch"),
|
orm.Eq("metadata.category", "elasticsearch"),
|
||||||
orm.Eq("metadata.name", "shard_stats"),
|
orm.Eq("metadata.name", "shard_stats"),
|
||||||
orm.In("metadata.labels.index_id", newIndexIDs),
|
orm.Eq("metadata.labels.index_id", newIndexIDs[0]),
|
||||||
)
|
)
|
||||||
q1.Collapse("metadata.labels.shard_id")
|
q1.Collapse("metadata.labels.shard_id")
|
||||||
q1.AddSort("timestamp", orm.DESC)
|
q1.AddSort("timestamp", orm.DESC)
|
||||||
|
@ -358,7 +380,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
statusMetric, err := getIndexStatusOfRecentDay(indexIDs)
|
statusMetric, err := h.GetIndexStatusOfRecentDay(firstClusterID, firstIndexName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
@ -416,8 +438,8 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"terms": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.index_id": newIndexIDs,
|
"metadata.labels.index_id": newIndexIDs[0],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -504,7 +526,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
metrics := h.getMetrics(context.Background(), query, nodeMetricItems, bucketSize)
|
metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize)
|
||||||
indexMetrics := map[string]util.MapStr{}
|
indexMetrics := map[string]util.MapStr{}
|
||||||
for key, item := range metrics {
|
for key, item := range metrics {
|
||||||
for _, line := range item.Lines {
|
for _, line := range item.Lines {
|
||||||
|
@ -890,9 +912,15 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
metrics["shard_state"] = shardStateMetric
|
metrics["shard_state"] = shardStateMetric
|
||||||
}else {
|
}else if metricKey == v1.IndexHealthMetricKey {
|
||||||
|
healthMetric, err := h.GetIndexHealthMetric(ctx, clusterID, indexName, min, max, bucketSize)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
metrics["index_health"] = healthMetric
|
||||||
|
} else {
|
||||||
switch metricKey {
|
switch metricKey {
|
||||||
case IndexThroughputMetricKey:
|
case v1.IndexThroughputMetricKey:
|
||||||
metricItem := newMetricItem("index_throughput", 1, OperationGroupKey)
|
metricItem := newMetricItem("index_throughput", 1, OperationGroupKey)
|
||||||
metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true)
|
metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true)
|
||||||
if shardID == "" {
|
if shardID == "" {
|
||||||
|
@ -905,14 +933,14 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
|
||||||
metricItem.AddLine("Deleting Rate", "Deleting Rate", "Number of documents being deleted for node.", "group1", "payload.elasticsearch.shard_stats.indexing.delete_total", "max", bucketSizeStr, "doc/s", "num", "0,0.[00]", "0,0.[00]", false, true)
|
metricItem.AddLine("Deleting Rate", "Deleting Rate", "Number of documents being deleted for node.", "group1", "payload.elasticsearch.shard_stats.indexing.delete_total", "max", bucketSizeStr, "doc/s", "num", "0,0.[00]", "0,0.[00]", false, true)
|
||||||
}
|
}
|
||||||
metricItems = append(metricItems, metricItem)
|
metricItems = append(metricItems, metricItem)
|
||||||
case SearchThroughputMetricKey:
|
case v1.SearchThroughputMetricKey:
|
||||||
metricItem := newMetricItem("search_throughput", 2, OperationGroupKey)
|
metricItem := newMetricItem("search_throughput", 2, OperationGroupKey)
|
||||||
metricItem.AddAxi("searching", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false)
|
metricItem.AddAxi("searching", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false)
|
||||||
metricItem.AddLine("Search Rate", "Search Rate",
|
metricItem.AddLine("Search Rate", "Search Rate",
|
||||||
"Number of search requests being executed.",
|
"Number of search requests being executed.",
|
||||||
"group1", "payload.elasticsearch.shard_stats.search.query_total", "max", bucketSizeStr, "query/s", "num", "0,0.[00]", "0,0.[00]", false, true)
|
"group1", "payload.elasticsearch.shard_stats.search.query_total", "max", bucketSizeStr, "query/s", "num", "0,0.[00]", "0,0.[00]", false, true)
|
||||||
metricItems = append(metricItems, metricItem)
|
metricItems = append(metricItems, metricItem)
|
||||||
case IndexLatencyMetricKey:
|
case v1.IndexLatencyMetricKey:
|
||||||
metricItem := newMetricItem("index_latency", 3, LatencyGroupKey)
|
metricItem := newMetricItem("index_latency", 3, LatencyGroupKey)
|
||||||
metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true)
|
metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true)
|
||||||
if shardID == "" { //index level
|
if shardID == "" { //index level
|
||||||
|
@ -933,7 +961,7 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
|
||||||
return value / value2
|
return value / value2
|
||||||
}
|
}
|
||||||
metricItems = append(metricItems, metricItem)
|
metricItems = append(metricItems, metricItem)
|
||||||
case SearchLatencyMetricKey:
|
case v1.SearchLatencyMetricKey:
|
||||||
metricItem := newMetricItem("search_latency", 4, LatencyGroupKey)
|
metricItem := newMetricItem("search_latency", 4, LatencyGroupKey)
|
||||||
metricItem.AddAxi("searching", "group2", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false)
|
metricItem.AddAxi("searching", "group2", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false)
|
||||||
|
|
||||||
|
@ -1052,261 +1080,6 @@ func (h *APIHandler) getIndexShardsMetric(ctx context.Context, id, indexName str
|
||||||
return metricItem, nil
|
return metricItem, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) getIndexHealthMetric(id, indexName string, min, max int64, bucketSize int)(*common.MetricItem, error){
|
|
||||||
bucketSizeStr:=fmt.Sprintf("%vs",bucketSize)
|
|
||||||
intervalField, err := getDateHistogramIntervalField(global.MustLookupString(elastic.GlobalSystemElasticsearchID), bucketSizeStr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
query := util.MapStr{
|
|
||||||
"query": util.MapStr{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.labels.cluster_id": util.MapStr{
|
|
||||||
"value": id,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.category": util.MapStr{
|
|
||||||
"value": "elasticsearch",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.name": util.MapStr{
|
|
||||||
"value": "index_stats",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.labels.index_name": util.MapStr{
|
|
||||||
"value": indexName,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"filter": []util.MapStr{
|
|
||||||
{
|
|
||||||
"range": util.MapStr{
|
|
||||||
"timestamp": util.MapStr{
|
|
||||||
"gte": min,
|
|
||||||
"lte": max,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"dates": util.MapStr{
|
|
||||||
"date_histogram": util.MapStr{
|
|
||||||
"field": "timestamp",
|
|
||||||
intervalField: bucketSizeStr,
|
|
||||||
},
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"groups": util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"field": "payload.elasticsearch.index_stats.index_info.health",
|
|
||||||
"size": 5,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(getAllMetricsIndex(), util.MustToJSONBytes(query))
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
metricItem:=newMetricItem("index_health", 1, "")
|
|
||||||
metricItem.AddLine("health","Health","","group1","payload.elasticsearch.index_stats.index_info.health","max",bucketSizeStr,"%","ratio","0.[00]","0.[00]",false,false)
|
|
||||||
|
|
||||||
metricData := []interface{}{}
|
|
||||||
if response.StatusCode == 200 {
|
|
||||||
metricData, err = parseGroupMetricData(response.Aggregations["dates"].Buckets, true)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
metricItem.Lines[0].Data = metricData
|
|
||||||
metricItem.Lines[0].Type = common.GraphTypeBar
|
|
||||||
return metricItem, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func getIndexStatusOfRecentDay(indexIDs []interface{})(map[string][]interface{}, error){
|
|
||||||
q := orm.Query{
|
|
||||||
WildcardIndex: true,
|
|
||||||
}
|
|
||||||
query := util.MapStr{
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"group_by_index_id": util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"field": "metadata.labels.index_id",
|
|
||||||
"size": 100,
|
|
||||||
},
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"time_histogram": util.MapStr{
|
|
||||||
"date_range": util.MapStr{
|
|
||||||
"field": "timestamp",
|
|
||||||
"format": "yyyy-MM-dd",
|
|
||||||
"time_zone": "+08:00",
|
|
||||||
"ranges": []util.MapStr{
|
|
||||||
{
|
|
||||||
"from": "now-13d/d",
|
|
||||||
"to": "now-12d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-12d/d",
|
|
||||||
"to": "now-11d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-11d/d",
|
|
||||||
"to": "now-10d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-10d/d",
|
|
||||||
"to": "now-9d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-9d/d",
|
|
||||||
"to": "now-8d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-8d/d",
|
|
||||||
"to": "now-7d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-7d/d",
|
|
||||||
"to": "now-6d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-6d/d",
|
|
||||||
"to": "now-5d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-5d/d",
|
|
||||||
"to": "now-4d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-4d/d",
|
|
||||||
"to": "now-3d/d",
|
|
||||||
},{
|
|
||||||
"from": "now-3d/d",
|
|
||||||
"to": "now-2d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-2d/d",
|
|
||||||
"to": "now-1d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-1d/d",
|
|
||||||
"to": "now/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now/d",
|
|
||||||
"to": "now",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"term_health": util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"field": "payload.elasticsearch.index_stats.index_info.health",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"sort": []util.MapStr{
|
|
||||||
{
|
|
||||||
"timestamp": util.MapStr{
|
|
||||||
"order": "desc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"size": 0,
|
|
||||||
"query": util.MapStr{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"filter": []util.MapStr{
|
|
||||||
{
|
|
||||||
"range": util.MapStr{
|
|
||||||
"timestamp": util.MapStr{
|
|
||||||
"gte": "now-15d",
|
|
||||||
"lte": "now",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.name": util.MapStr{
|
|
||||||
"value": "index_stats",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"metadata.labels.index_id": indexIDs,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
q.RawQuery = util.MustToJSONBytes(query)
|
|
||||||
|
|
||||||
err, res := orm.Search(&event.Event{}, &q)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
response := elastic.SearchResponse{}
|
|
||||||
util.FromJSONBytes(res.Raw, &response)
|
|
||||||
recentStatus := map[string][]interface{}{}
|
|
||||||
for _, bk := range response.Aggregations["group_by_index_id"].Buckets {
|
|
||||||
indexKey := bk["key"].(string)
|
|
||||||
recentStatus[indexKey] = []interface{}{}
|
|
||||||
if histogramAgg, ok := bk["time_histogram"].(map[string]interface{}); ok {
|
|
||||||
if bks, ok := histogramAgg["buckets"].([]interface{}); ok {
|
|
||||||
for _, bkItem := range bks {
|
|
||||||
if bkVal, ok := bkItem.(map[string]interface{}); ok {
|
|
||||||
if termHealth, ok := bkVal["term_health"].(map[string]interface{}); ok {
|
|
||||||
if healthBks, ok := termHealth["buckets"].([]interface{}); ok {
|
|
||||||
if len(healthBks) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
healthMap := map[string]int{}
|
|
||||||
status := "unknown"
|
|
||||||
for _, hbkItem := range healthBks {
|
|
||||||
if hitem, ok := hbkItem.(map[string]interface{}); ok {
|
|
||||||
healthMap[hitem["key"].(string)] = 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if _, ok = healthMap["red"]; ok {
|
|
||||||
status = "red"
|
|
||||||
}else if _, ok = healthMap["yellow"]; ok {
|
|
||||||
status = "yellow"
|
|
||||||
}else if _, ok = healthMap["green"]; ok {
|
|
||||||
status = "green"
|
|
||||||
}
|
|
||||||
recentStatus[indexKey] = append(recentStatus[indexKey], []interface{}{bkVal["key"], status})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return recentStatus, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *APIHandler) getIndexNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) getIndexNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
resBody := map[string] interface{}{}
|
resBody := map[string] interface{}{}
|
||||||
id := ps.ByName("id")
|
id := ps.ByName("id")
|
||||||
|
|
|
@ -560,7 +560,7 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), du)
|
ctx, cancel := context.WithTimeout(context.Background(), du)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if util.StringInArray([]string{IndexThroughputMetricKey, SearchThroughputMetricKey, IndexLatencyMetricKey, SearchLatencyMetricKey}, key) {
|
if util.StringInArray([]string{v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey}, key) {
|
||||||
metrics = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key)
|
metrics = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key)
|
||||||
}else{
|
}else{
|
||||||
metrics = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key)
|
metrics = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key)
|
||||||
|
@ -912,7 +912,7 @@ func (h *APIHandler) GetClusterMetrics(ctx context.Context, id string, bucketSiz
|
||||||
ClusterIndicesMetricKey,
|
ClusterIndicesMetricKey,
|
||||||
ClusterNodeCountMetricKey:
|
ClusterNodeCountMetricKey:
|
||||||
clusterMetricsResult = h.getClusterMetricsByKey(ctx, id, bucketSize, min, max, metricKey)
|
clusterMetricsResult = h.getClusterMetricsByKey(ctx, id, bucketSize, min, max, metricKey)
|
||||||
case IndexLatencyMetricKey, IndexThroughputMetricKey, SearchThroughputMetricKey, SearchLatencyMetricKey:
|
case v1.IndexLatencyMetricKey, v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.SearchLatencyMetricKey:
|
||||||
clusterMetricsResult = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, metricKey)
|
clusterMetricsResult = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, metricKey)
|
||||||
case ClusterHealthMetricKey:
|
case ClusterHealthMetricKey:
|
||||||
statusMetric, err := h.getClusterStatusMetric(ctx, id, min, max, bucketSize)
|
statusMetric, err := h.getClusterStatusMetric(ctx, id, min, max, bucketSize)
|
||||||
|
@ -1024,31 +1024,24 @@ func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, buck
|
||||||
return h.getSingleMetrics(ctx, clusterMetricItems, query, bucketSize)
|
return h.getSingleMetrics(ctx, clusterMetricItems, query, bucketSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
IndexThroughputMetricKey = "index_throughput"
|
|
||||||
SearchThroughputMetricKey = "search_throughput"
|
|
||||||
IndexLatencyMetricKey = "index_latency"
|
|
||||||
SearchLatencyMetricKey = "search_latency"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem {
|
func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem {
|
||||||
bucketSizeStr := fmt.Sprintf("%vs", bucketSize)
|
bucketSizeStr := fmt.Sprintf("%vs", bucketSize)
|
||||||
metricItems := []*common.MetricItem{}
|
metricItems := []*common.MetricItem{}
|
||||||
switch metricKey {
|
switch metricKey {
|
||||||
case IndexThroughputMetricKey:
|
case v1.IndexThroughputMetricKey:
|
||||||
metricItem := newMetricItem(IndexThroughputMetricKey, 2, OperationGroupKey)
|
metricItem := newMetricItem(v1.IndexThroughputMetricKey, 2, OperationGroupKey)
|
||||||
metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true)
|
metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true)
|
||||||
metricItem.AddLine("Indexing Rate", "Total Indexing", "Number of documents being indexed for primary and replica shards.", "group1", "payload.elasticsearch.node_stats.indices.indexing.index_total", "max", bucketSizeStr, "doc/s", "num", "0,0.[00]", "0,0.[00]", false, true)
|
metricItem.AddLine("Indexing Rate", "Total Indexing", "Number of documents being indexed for primary and replica shards.", "group1", "payload.elasticsearch.node_stats.indices.indexing.index_total", "max", bucketSizeStr, "doc/s", "num", "0,0.[00]", "0,0.[00]", false, true)
|
||||||
metricItems = append(metricItems, metricItem)
|
metricItems = append(metricItems, metricItem)
|
||||||
case SearchThroughputMetricKey:
|
case v1.SearchThroughputMetricKey:
|
||||||
metricItem := newMetricItem(SearchThroughputMetricKey, 2, OperationGroupKey)
|
metricItem := newMetricItem(v1.SearchThroughputMetricKey, 2, OperationGroupKey)
|
||||||
metricItem.AddAxi("searching", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false)
|
metricItem.AddAxi("searching", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false)
|
||||||
metricItem.AddLine("Search Rate", "Total Query",
|
metricItem.AddLine("Search Rate", "Total Query",
|
||||||
"Number of search requests being executed across primary and replica shards. A single search can run against multiple shards!",
|
"Number of search requests being executed across primary and replica shards. A single search can run against multiple shards!",
|
||||||
"group1", "payload.elasticsearch.node_stats.indices.search.query_total", "max", bucketSizeStr, "query/s", "num", "0,0.[00]", "0,0.[00]", false, true)
|
"group1", "payload.elasticsearch.node_stats.indices.search.query_total", "max", bucketSizeStr, "query/s", "num", "0,0.[00]", "0,0.[00]", false, true)
|
||||||
metricItems = append(metricItems, metricItem)
|
metricItems = append(metricItems, metricItem)
|
||||||
case IndexLatencyMetricKey:
|
case v1.IndexLatencyMetricKey:
|
||||||
metricItem := newMetricItem(IndexLatencyMetricKey, 3, LatencyGroupKey)
|
metricItem := newMetricItem(v1.IndexLatencyMetricKey, 3, LatencyGroupKey)
|
||||||
metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true)
|
metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true)
|
||||||
|
|
||||||
metricItem.AddLine("Indexing", "Indexing Latency", "Average latency for indexing documents.", "group1", "payload.elasticsearch.node_stats.indices.indexing.index_time_in_millis", "max", bucketSizeStr, "ms", "num", "0,0.[00]", "0,0.[00]", false, true)
|
metricItem.AddLine("Indexing", "Indexing Latency", "Average latency for indexing documents.", "group1", "payload.elasticsearch.node_stats.indices.indexing.index_time_in_millis", "max", bucketSizeStr, "ms", "num", "0,0.[00]", "0,0.[00]", false, true)
|
||||||
|
@ -1062,8 +1055,8 @@ func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, buck
|
||||||
return value / value2
|
return value / value2
|
||||||
}
|
}
|
||||||
metricItems = append(metricItems, metricItem)
|
metricItems = append(metricItems, metricItem)
|
||||||
case SearchLatencyMetricKey:
|
case v1.SearchLatencyMetricKey:
|
||||||
metricItem := newMetricItem(SearchLatencyMetricKey, 3, LatencyGroupKey)
|
metricItem := newMetricItem(v1.SearchLatencyMetricKey, 3, LatencyGroupKey)
|
||||||
metricItem.AddAxi("searching", "group2", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false)
|
metricItem.AddAxi("searching", "group2", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false)
|
||||||
|
|
||||||
metricItem.AddLine("Searching", "Query Latency", "Average latency for searching query.", "group2", "payload.elasticsearch.node_stats.indices.search.query_time_in_millis", "max", bucketSizeStr, "ms", "num", "0,0.[00]", "0,0.[00]", false, true)
|
metricItem.AddLine("Searching", "Query Latency", "Average latency for searching query.", "group2", "payload.elasticsearch.node_stats.indices.search.query_time_in_millis", "max", bucketSizeStr, "ms", "num", "0,0.[00]", "0,0.[00]", false, true)
|
||||||
|
|
|
@ -31,6 +31,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
v1 "infini.sh/console/modules/elastic/api/v1"
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
"infini.sh/framework/core/event"
|
"infini.sh/framework/core/event"
|
||||||
|
@ -204,6 +205,17 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps
|
||||||
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
//only query one node info for fast response
|
||||||
|
nodeIDs = nodeIDs[0:1]
|
||||||
|
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
|
||||||
|
du, err := time.ParseDuration(timeout)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), du)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
q1 := orm.Query{WildcardIndex: true}
|
q1 := orm.Query{WildcardIndex: true}
|
||||||
query := util.MapStr{
|
query := util.MapStr{
|
||||||
|
@ -235,8 +247,8 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"terms": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.node_id": nodeIDs,
|
"metadata.labels.node_id": nodeIDs[0],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -355,8 +367,8 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"terms": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.node_id": nodeIDs,
|
"metadata.labels.node_id": nodeIDs[0],
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -410,7 +422,7 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
metrics := h.getMetrics(context.Background(), query, nodeMetricItems, bucketSize)
|
metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize)
|
||||||
indexMetrics := map[string]util.MapStr{}
|
indexMetrics := map[string]util.MapStr{}
|
||||||
for key, item := range metrics {
|
for key, item := range metrics {
|
||||||
for _, line := range item.Lines {
|
for _, line := range item.Lines {
|
||||||
|
@ -722,19 +734,19 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque
|
||||||
metricItem.AddLine("Max Heap","Max Heap","JVM max Heap of node.","group1","payload.elasticsearch.node_stats.jvm.mem.heap_max_in_bytes","max",bucketSizeStr,"","bytes","0,0.[00]","0,0.[00]",false,false)
|
metricItem.AddLine("Max Heap","Max Heap","JVM max Heap of node.","group1","payload.elasticsearch.node_stats.jvm.mem.heap_max_in_bytes","max",bucketSizeStr,"","bytes","0,0.[00]","0,0.[00]",false,false)
|
||||||
metricItem.AddLine("Used Heap","Used Heap","JVM used Heap of node.","group1","payload.elasticsearch.node_stats.jvm.mem.heap_used_in_bytes","max",bucketSizeStr,"","bytes","0,0.[00]","0,0.[00]",false,false)
|
metricItem.AddLine("Used Heap","Used Heap","JVM used Heap of node.","group1","payload.elasticsearch.node_stats.jvm.mem.heap_used_in_bytes","max",bucketSizeStr,"","bytes","0,0.[00]","0,0.[00]",false,false)
|
||||||
metricItems=append(metricItems,metricItem)
|
metricItems=append(metricItems,metricItem)
|
||||||
case IndexThroughputMetricKey:
|
case v1.IndexThroughputMetricKey:
|
||||||
metricItem := newMetricItem("index_throughput", 3, OperationGroupKey)
|
metricItem := newMetricItem("index_throughput", 3, OperationGroupKey)
|
||||||
metricItem.AddAxi("indexing","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,true)
|
metricItem.AddAxi("indexing","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,true)
|
||||||
metricItem.AddLine("Indexing Rate","Total Shards","Number of documents being indexed for node.","group1","payload.elasticsearch.node_stats.indices.indexing.index_total","max",bucketSizeStr,"doc/s","num","0,0.[00]","0,0.[00]",false,true)
|
metricItem.AddLine("Indexing Rate","Total Shards","Number of documents being indexed for node.","group1","payload.elasticsearch.node_stats.indices.indexing.index_total","max",bucketSizeStr,"doc/s","num","0,0.[00]","0,0.[00]",false,true)
|
||||||
metricItems=append(metricItems,metricItem)
|
metricItems=append(metricItems,metricItem)
|
||||||
case SearchThroughputMetricKey:
|
case v1.SearchThroughputMetricKey:
|
||||||
metricItem := newMetricItem("search_throughput", 4, OperationGroupKey)
|
metricItem := newMetricItem("search_throughput", 4, OperationGroupKey)
|
||||||
metricItem.AddAxi("searching","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,false)
|
metricItem.AddAxi("searching","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,false)
|
||||||
metricItem.AddLine("Search Rate","Total Shards",
|
metricItem.AddLine("Search Rate","Total Shards",
|
||||||
"Number of search requests being executed.",
|
"Number of search requests being executed.",
|
||||||
"group1","payload.elasticsearch.node_stats.indices.search.query_total","max",bucketSizeStr,"query/s","num","0,0.[00]","0,0.[00]",false,true)
|
"group1","payload.elasticsearch.node_stats.indices.search.query_total","max",bucketSizeStr,"query/s","num","0,0.[00]","0,0.[00]",false,true)
|
||||||
metricItems=append(metricItems,metricItem)
|
metricItems=append(metricItems,metricItem)
|
||||||
case IndexLatencyMetricKey:
|
case v1.IndexLatencyMetricKey:
|
||||||
metricItem := newMetricItem("index_latency", 5, LatencyGroupKey)
|
metricItem := newMetricItem("index_latency", 5, LatencyGroupKey)
|
||||||
metricItem.AddAxi("indexing","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,true)
|
metricItem.AddAxi("indexing","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,true)
|
||||||
|
|
||||||
|
@ -749,7 +761,7 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque
|
||||||
return value/value2
|
return value/value2
|
||||||
}
|
}
|
||||||
metricItems=append(metricItems,metricItem)
|
metricItems=append(metricItems,metricItem)
|
||||||
case SearchLatencyMetricKey:
|
case v1.SearchLatencyMetricKey:
|
||||||
metricItem := newMetricItem("search_latency", 6, LatencyGroupKey)
|
metricItem := newMetricItem("search_latency", 6, LatencyGroupKey)
|
||||||
metricItem.AddAxi("searching","group2",common.PositionLeft,"num","0,0","0,0.[00]",5,false)
|
metricItem.AddAxi("searching","group2",common.PositionLeft,"num","0,0","0,0.[00]",5,false)
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,6 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/cihub/seelog"
|
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
"infini.sh/framework/core/event"
|
"infini.sh/framework/core/event"
|
||||||
|
@ -569,71 +568,6 @@ func (h *APIHandler) GetClusterNodes(w http.ResponseWriter, req *http.Request, p
|
||||||
h.WriteJSON(w, nodes, http.StatusOK)
|
h.WriteJSON(w, nodes, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) GetRealtimeClusterNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
|
||||||
id := ps.ByName("id")
|
|
||||||
meta := elastic.GetMetadata(id)
|
|
||||||
if meta == nil || !meta.IsAvailable() {
|
|
||||||
log.Debugf("cluster [%s] is not available", id)
|
|
||||||
h.WriteJSON(w, []interface{}{}, http.StatusOK)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
esClient := elastic.GetClient(id)
|
|
||||||
if esClient == nil {
|
|
||||||
h.WriteJSON(w, util.MapStr{
|
|
||||||
"error": "cluster not found",
|
|
||||||
}, http.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
catNodesInfo, err := esClient.CatNodes("id,name,ip,port,master,heap.percent,disk.avail,cpu,load_1m,uptime")
|
|
||||||
if err != nil {
|
|
||||||
h.WriteJSON(w, util.MapStr{
|
|
||||||
"error": err.Error(),
|
|
||||||
}, http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
catShardsInfo, err := esClient.CatShards()
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
shardCounts := map[string]int{}
|
|
||||||
nodeM := map[string]string{}
|
|
||||||
for _, shardInfo := range catShardsInfo {
|
|
||||||
nodeM[shardInfo.NodeName] = shardInfo.NodeID
|
|
||||||
if c, ok := shardCounts[shardInfo.NodeName]; ok {
|
|
||||||
shardCounts[shardInfo.NodeName] = c + 1
|
|
||||||
} else {
|
|
||||||
shardCounts[shardInfo.NodeName] = 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
qps, err := h.getNodeQPS(id, 20)
|
|
||||||
if err != nil {
|
|
||||||
h.WriteJSON(w, util.MapStr{
|
|
||||||
"error": err.Error(),
|
|
||||||
}, http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeInfos := []RealtimeNodeInfo{}
|
|
||||||
for _, nodeInfo := range catNodesInfo {
|
|
||||||
if len(nodeInfo.Id) == 4 { //node short id, use nodeId from shard info isnstead
|
|
||||||
nodeInfo.Id = nodeM[nodeInfo.Name]
|
|
||||||
}
|
|
||||||
if c, ok := shardCounts[nodeInfo.Name]; ok {
|
|
||||||
nodeInfo.Shards = c
|
|
||||||
}
|
|
||||||
info := RealtimeNodeInfo{
|
|
||||||
CatNodeResponse: CatNodeResponse(nodeInfo),
|
|
||||||
}
|
|
||||||
if _, ok := qps[nodeInfo.Id]; ok {
|
|
||||||
info.IndexQPS = qps[nodeInfo.Id]["index"]
|
|
||||||
info.QueryQPS = qps[nodeInfo.Id]["query"]
|
|
||||||
info.IndexBytesQPS = qps[nodeInfo.Id]["index_bytes"]
|
|
||||||
}
|
|
||||||
nodeInfos = append(nodeInfos, info)
|
|
||||||
}
|
|
||||||
h.WriteJSON(w, nodeInfos, http.StatusOK)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
var (
|
var (
|
||||||
//size = h.GetIntOrDefault(req, "size", 20)
|
//size = h.GetIntOrDefault(req, "size", 20)
|
||||||
|
@ -824,97 +758,6 @@ func (h *APIHandler) getIndexQPS(clusterID string, bucketSizeInSeconds int) (map
|
||||||
return h.QueryQPS(query, bucketSizeInSeconds)
|
return h.QueryQPS(query, bucketSizeInSeconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) getNodeQPS(clusterID string, bucketSizeInSeconds int) (map[string]util.MapStr, error) {
|
|
||||||
ver := h.Client().GetVersion()
|
|
||||||
bucketSizeStr := fmt.Sprintf("%ds", bucketSizeInSeconds)
|
|
||||||
intervalField, err := elastic.GetDateHistogramIntervalField(ver.Distribution, ver.Number, bucketSizeStr)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
query := util.MapStr{
|
|
||||||
"size": 0,
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"term_node": util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"field": "metadata.labels.node_id",
|
|
||||||
"size": 1000,
|
|
||||||
},
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"date": util.MapStr{
|
|
||||||
"date_histogram": util.MapStr{
|
|
||||||
"field": "timestamp",
|
|
||||||
intervalField: "10s",
|
|
||||||
},
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"index_total": util.MapStr{
|
|
||||||
"max": util.MapStr{
|
|
||||||
"field": "payload.elasticsearch.node_stats.indices.indexing.index_total",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"index_bytes_total": util.MapStr{
|
|
||||||
"max": util.MapStr{
|
|
||||||
"field": "payload.elasticsearch.node_stats.indices.store.size_in_bytes",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"query_total": util.MapStr{
|
|
||||||
"max": util.MapStr{
|
|
||||||
"field": "payload.elasticsearch.node_stats.indices.search.query_total",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"index_rate": util.MapStr{
|
|
||||||
"derivative": util.MapStr{
|
|
||||||
"buckets_path": "index_total",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"index_bytes_rate": util.MapStr{
|
|
||||||
"derivative": util.MapStr{
|
|
||||||
"buckets_path": "index_bytes_total",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"query_rate": util.MapStr{
|
|
||||||
"derivative": util.MapStr{
|
|
||||||
"buckets_path": "query_total",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"query": util.MapStr{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"filter": []util.MapStr{
|
|
||||||
{
|
|
||||||
"range": util.MapStr{
|
|
||||||
"timestamp": util.MapStr{
|
|
||||||
"gte": "now-1m",
|
|
||||||
"lte": "now",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.labels.cluster_id": util.MapStr{
|
|
||||||
"value": clusterID,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.name": util.MapStr{
|
|
||||||
"value": "node_stats",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
return h.QueryQPS(query, bucketSizeInSeconds)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *APIHandler) QueryQPS(query util.MapStr, bucketSizeInSeconds int) (map[string]util.MapStr, error) {
|
func (h *APIHandler) QueryQPS(query util.MapStr, bucketSizeInSeconds int) (map[string]util.MapStr, error) {
|
||||||
esClient := h.Client()
|
esClient := h.Client()
|
||||||
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(event.Event{}), util.MustToJSONBytes(query))
|
searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(event.Event{}), util.MustToJSONBytes(query))
|
||||||
|
@ -934,6 +777,7 @@ func (h *APIHandler) QueryQPS(query util.MapStr, bucketSizeInSeconds int) (map[s
|
||||||
maxIndexBytesRate float64
|
maxIndexBytesRate float64
|
||||||
preIndexTotal float64
|
preIndexTotal float64
|
||||||
dropNext bool
|
dropNext bool
|
||||||
|
maxTimestamp float64
|
||||||
)
|
)
|
||||||
for _, dateBk := range bks {
|
for _, dateBk := range bks {
|
||||||
if dateBkVal, ok := dateBk.(map[string]interface{}); ok {
|
if dateBkVal, ok := dateBk.(map[string]interface{}); ok {
|
||||||
|
@ -971,11 +815,17 @@ func (h *APIHandler) QueryQPS(query util.MapStr, bucketSizeInSeconds int) (map[s
|
||||||
maxIndexBytesRate = indexBytesRateVal
|
maxIndexBytesRate = indexBytesRateVal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if latestTime, ok := dateBkVal["latest_timestamp"].(map[string]interface{}); ok {
|
||||||
|
if latestTimeVal, ok := latestTime["value"].(float64); ok && latestTimeVal > maxTimestamp {
|
||||||
|
maxTimestamp = latestTimeVal
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
indexQPS[k]["index"] = maxIndexRate / float64(bucketSizeInSeconds)
|
indexQPS[k]["index"] = maxIndexRate / float64(bucketSizeInSeconds)
|
||||||
indexQPS[k]["query"] = maxQueryRate / float64(bucketSizeInSeconds)
|
indexQPS[k]["query"] = maxQueryRate / float64(bucketSizeInSeconds)
|
||||||
indexQPS[k]["index_bytes"] = maxIndexBytesRate / float64(bucketSizeInSeconds)
|
indexQPS[k]["index_bytes"] = maxIndexBytesRate / float64(bucketSizeInSeconds)
|
||||||
|
indexQPS[k]["latest_timestamp"] = maxTimestamp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,252 +43,23 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (h *APIHandler) SearchIndexMetadata(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, ctx context.Context, indexIDs []interface{}) {
|
||||||
resBody:=util.MapStr{}
|
|
||||||
reqBody := struct{
|
|
||||||
Keyword string `json:"keyword"`
|
|
||||||
Size int `json:"size"`
|
|
||||||
From int `json:"from"`
|
|
||||||
Aggregations []elastic.SearchAggParam `json:"aggs"`
|
|
||||||
Highlight elastic.SearchHighlightParam `json:"highlight"`
|
|
||||||
Filter elastic.SearchFilterParam `json:"filter"`
|
|
||||||
Sort []string `json:"sort"`
|
|
||||||
SearchField string `json:"search_field"`
|
|
||||||
}{}
|
|
||||||
err := h.DecodeJSON(req, &reqBody)
|
|
||||||
if err != nil {
|
|
||||||
resBody["error"] = err.Error()
|
|
||||||
h.WriteJSON(w,resBody, http.StatusInternalServerError )
|
|
||||||
return
|
|
||||||
}
|
|
||||||
aggs := elastic.BuildSearchTermAggregations(reqBody.Aggregations)
|
|
||||||
aggs["term_cluster_id"] = util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"field": "metadata.cluster_id",
|
|
||||||
"size": 1000,
|
|
||||||
},
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"term_cluster_name": util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"field": "metadata.cluster_name",
|
|
||||||
"size": 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
filter := elastic.BuildSearchTermFilter(reqBody.Filter)
|
|
||||||
var should []util.MapStr
|
|
||||||
if reqBody.SearchField != ""{
|
|
||||||
should = []util.MapStr{
|
|
||||||
{
|
|
||||||
"prefix": util.MapStr{
|
|
||||||
reqBody.SearchField: util.MapStr{
|
|
||||||
"value": reqBody.Keyword,
|
|
||||||
"boost": 20,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"match": util.MapStr{
|
|
||||||
reqBody.SearchField: util.MapStr{
|
|
||||||
"query": reqBody.Keyword,
|
|
||||||
"fuzziness": "AUTO",
|
|
||||||
"max_expansions": 10,
|
|
||||||
"prefix_length": 2,
|
|
||||||
"fuzzy_transpositions": true,
|
|
||||||
"boost": 2,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}else{
|
|
||||||
if reqBody.Keyword != ""{
|
|
||||||
should = []util.MapStr{
|
|
||||||
{
|
|
||||||
"prefix": util.MapStr{
|
|
||||||
"metadata.index_name": util.MapStr{
|
|
||||||
"value": reqBody.Keyword,
|
|
||||||
"boost": 30,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"prefix": util.MapStr{
|
|
||||||
"metadata.aliases": util.MapStr{
|
|
||||||
"value": reqBody.Keyword,
|
|
||||||
"boost": 20,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"match": util.MapStr{
|
|
||||||
"search_text": util.MapStr{
|
|
||||||
"query": reqBody.Keyword,
|
|
||||||
"fuzziness": "AUTO",
|
|
||||||
"max_expansions": 10,
|
|
||||||
"prefix_length": 2,
|
|
||||||
"fuzzy_transpositions": true,
|
|
||||||
"boost": 2,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"query_string": util.MapStr{
|
|
||||||
"fields": []string{"*"},
|
|
||||||
"query": reqBody.Keyword,
|
|
||||||
"fuzziness": "AUTO",
|
|
||||||
"fuzzy_prefix_length": 2,
|
|
||||||
"fuzzy_max_expansions": 10,
|
|
||||||
"fuzzy_transpositions": true,
|
|
||||||
"allow_leading_wildcard": false,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
must := []interface{}{
|
|
||||||
}
|
|
||||||
hasAllPrivilege, indexPrivilege := h.GetCurrentUserIndex(req)
|
|
||||||
if !hasAllPrivilege && len(indexPrivilege) == 0 {
|
|
||||||
h.WriteJSON(w, elastic.SearchResponse{
|
|
||||||
|
|
||||||
}, http.StatusOK)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !hasAllPrivilege {
|
|
||||||
indexShould := make([]interface{}, 0, len(indexPrivilege))
|
|
||||||
for clusterID, indices := range indexPrivilege {
|
|
||||||
var (
|
|
||||||
wildcardIndices []string
|
|
||||||
normalIndices []string
|
|
||||||
)
|
|
||||||
for _, index := range indices {
|
|
||||||
if strings.Contains(index,"*") {
|
|
||||||
wildcardIndices = append(wildcardIndices, index)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
normalIndices = append(normalIndices, index)
|
|
||||||
}
|
|
||||||
subShould := []util.MapStr{}
|
|
||||||
if len(wildcardIndices) > 0 {
|
|
||||||
subShould = append(subShould, util.MapStr{
|
|
||||||
"query_string": util.MapStr{
|
|
||||||
"query": strings.Join(wildcardIndices, " "),
|
|
||||||
"fields": []string{"metadata.index_name"},
|
|
||||||
"default_operator": "OR",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if len(normalIndices) > 0 {
|
|
||||||
subShould = append(subShould, util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"metadata.index_name": normalIndices,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
indexShould = append(indexShould, util.MapStr{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
|
||||||
"wildcard": util.MapStr{
|
|
||||||
"metadata.cluster_id": util.MapStr{
|
|
||||||
"value": clusterID,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"minimum_should_match": 1,
|
|
||||||
"should": subShould,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
indexFilter := util.MapStr{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"minimum_should_match": 1,
|
|
||||||
"should": indexShould,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
must = append(must, indexFilter)
|
|
||||||
}
|
|
||||||
boolQuery := util.MapStr{
|
|
||||||
"must_not": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.labels.index_status": "deleted",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"filter": filter,
|
|
||||||
"must": must,
|
|
||||||
}
|
|
||||||
if len(should) > 0 {
|
|
||||||
boolQuery["should"] = should
|
|
||||||
boolQuery["minimum_should_match"] = 1
|
|
||||||
}
|
|
||||||
query := util.MapStr{
|
|
||||||
"aggs": aggs,
|
|
||||||
"size": reqBody.Size,
|
|
||||||
"from": reqBody.From,
|
|
||||||
"highlight": elastic.BuildSearchHighlight(&reqBody.Highlight),
|
|
||||||
"query": util.MapStr{
|
|
||||||
"bool": boolQuery,
|
|
||||||
},
|
|
||||||
"sort": []util.MapStr{
|
|
||||||
{
|
|
||||||
"timestamp": util.MapStr{
|
|
||||||
"order": "desc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if len(reqBody.Sort) > 1 {
|
|
||||||
query["sort"] = []util.MapStr{
|
|
||||||
{
|
|
||||||
reqBody.Sort[0]: util.MapStr{
|
|
||||||
"order": reqBody.Sort[1],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
dsl := util.MustToJSONBytes(query)
|
|
||||||
response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(orm.GetIndexName(elastic.IndexConfig{}), dsl)
|
|
||||||
if err != nil {
|
|
||||||
resBody["error"] = err.Error()
|
|
||||||
h.WriteJSON(w,resBody, http.StatusInternalServerError )
|
|
||||||
return
|
|
||||||
}
|
|
||||||
w.Write(util.MustToJSONBytes(response))
|
|
||||||
|
|
||||||
}
|
|
||||||
func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
|
||||||
defer func() {
|
|
||||||
if err := recover(); err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
var indexIDs []interface{}
|
|
||||||
|
|
||||||
|
|
||||||
h.DecodeJSON(req, &indexIDs)
|
|
||||||
|
|
||||||
if len(indexIDs) == 0 {
|
if len(indexIDs) == 0 {
|
||||||
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
h.WriteJSON(w, util.MapStr{}, http.StatusOK)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
//only query first index
|
||||||
|
indexIDs = indexIDs[0:1]
|
||||||
q1 := orm.Query{WildcardIndex: true}
|
q1 := orm.Query{WildcardIndex: true}
|
||||||
q1.Conds = orm.And(
|
q1.Conds = orm.And(
|
||||||
orm.Eq("metadata.category", "elasticsearch"),
|
orm.Eq("metadata.category", "elasticsearch"),
|
||||||
orm.Eq("metadata.name", "index_stats"),
|
orm.Eq("metadata.name", "index_stats"),
|
||||||
orm.In("metadata.labels.index_id", indexIDs),
|
orm.Eq("metadata.labels.index_id", indexIDs[0]),
|
||||||
)
|
)
|
||||||
q1.Collapse("metadata.labels.index_id")
|
//q1.Collapse("metadata.labels.index_id")
|
||||||
q1.AddSort("timestamp", orm.DESC)
|
q1.AddSort("timestamp", orm.DESC)
|
||||||
q1.Size = len(indexIDs) + 1
|
q1.Size = 1
|
||||||
|
|
||||||
err, results := orm.Search(&event.Event{}, &q1)
|
err, results := orm.Search(&event.Event{}, &q1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -304,10 +75,23 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
|
||||||
if indexID, ok := util.GetMapValueByKeys([]string{"metadata", "labels", "index_id"}, result); ok {
|
if indexID, ok := util.GetMapValueByKeys([]string{"metadata", "labels", "index_id"}, result); ok {
|
||||||
summary := map[string]interface{}{}
|
summary := map[string]interface{}{}
|
||||||
if docs, ok := util.GetMapValueByKeys([]string{"payload", "elasticsearch", "index_stats", "total", "docs"}, result); ok {
|
if docs, ok := util.GetMapValueByKeys([]string{"payload", "elasticsearch", "index_stats", "total", "docs"}, result); ok {
|
||||||
summary["docs"] = docs
|
if docsM, ok := docs.(map[string]interface{}); ok {
|
||||||
|
summary["docs_deleted"] = docsM["docs_deleted"]
|
||||||
|
summary["docs_count"] = docsM["count"]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if indexInfo, ok := util.GetMapValueByKeys([]string{"payload", "elasticsearch", "index_stats", "index_info"}, result); ok {
|
if indexInfo, ok := util.GetMapValueByKeys([]string{"payload", "elasticsearch", "index_stats", "index_info"}, result); ok {
|
||||||
summary["index_info"] = indexInfo
|
if indexInfoM, ok := indexInfo.(map[string]interface{}); ok {
|
||||||
|
summary["index"] = indexInfoM["index"]
|
||||||
|
summary["status"] = indexInfoM["status"]
|
||||||
|
summary["shards"] = indexInfoM["shards"]
|
||||||
|
summary["replicas"] = indexInfoM["replicas"]
|
||||||
|
storeInBytes, _ := util.ToBytes(indexInfoM["store_size"].(string))
|
||||||
|
summary["store_in_bytes"] = storeInBytes
|
||||||
|
priStoreInBytes, _ := util.ToBytes(indexInfoM["pri_store_size"].(string))
|
||||||
|
summary["pri_store_in_bytes"] = priStoreInBytes
|
||||||
|
summary["timestamp"] = result["timestamp"]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if shardInfo, ok := util.GetMapValueByKeys([]string{"payload", "elasticsearch", "index_stats", "shard_info"}, result); ok {
|
if shardInfo, ok := util.GetMapValueByKeys([]string{"payload", "elasticsearch", "index_stats", "shard_info"}, result); ok {
|
||||||
if sinfo, ok := shardInfo.([]interface{}); ok {
|
if sinfo, ok := shardInfo.([]interface{}); ok {
|
||||||
|
@ -328,18 +112,30 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
statusMetric, err := getIndexStatusOfRecentDay(indexIDs)
|
indexID := indexIDs[0]
|
||||||
|
var firstClusterID, firstIndexName string
|
||||||
|
if v, ok := indexID.(string); ok {
|
||||||
|
parts := strings.Split(v, ":")
|
||||||
|
if len(parts) != 2 {
|
||||||
|
h.WriteError(w, fmt.Sprintf("invalid index_id: %s", indexID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
firstClusterID, firstIndexName = parts[0], parts[1]
|
||||||
|
}else{
|
||||||
|
h.WriteError(w, fmt.Sprintf("invalid index_id: %v", indexID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
statusMetric, err := h.GetIndexStatusOfRecentDay(firstClusterID, firstIndexName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
bucketSize := GetMinBucketSize()
|
||||||
bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 60, (15))
|
if bucketSize < 60 {
|
||||||
if err != nil {
|
bucketSize = 60
|
||||||
panic(err)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
var metricLen = 15
|
||||||
// 索引速率
|
// 索引速率
|
||||||
indexMetric:=newMetricItem("indexing", 1, OperationGroupKey)
|
indexMetric:=newMetricItem("indexing", 1, OperationGroupKey)
|
||||||
indexMetric.AddAxi("indexing rate","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,true)
|
indexMetric.AddAxi("indexing rate","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,true)
|
||||||
|
@ -394,8 +190,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
|
||||||
{
|
{
|
||||||
"range": util.MapStr{
|
"range": util.MapStr{
|
||||||
"timestamp": util.MapStr{
|
"timestamp": util.MapStr{
|
||||||
"gte": min,
|
"gte": fmt.Sprintf("now-%ds", metricLen * bucketSize),
|
||||||
"lte": max,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -441,7 +236,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
metrics := h.getMetrics(context.Background(), query, nodeMetricItems, bucketSize)
|
metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize)
|
||||||
indexMetrics := map[string]util.MapStr{}
|
indexMetrics := map[string]util.MapStr{}
|
||||||
for key, item := range metrics {
|
for key, item := range metrics {
|
||||||
for _, line := range item.Lines {
|
for _, line := range item.Lines {
|
||||||
|
@ -714,7 +509,7 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
|
||||||
defer cancel()
|
defer cancel()
|
||||||
metrics := map[string]*common.MetricItem{}
|
metrics := map[string]*common.MetricItem{}
|
||||||
if metricKey == IndexHealthMetricKey {
|
if metricKey == IndexHealthMetricKey {
|
||||||
healthMetric, err := h.getIndexHealthMetric(ctx, clusterID, indexName, min, max, bucketSize)
|
healthMetric, err := h.GetIndexHealthMetric(ctx, clusterID, indexName, min, max, bucketSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -776,7 +571,7 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ
|
||||||
h.WriteJSON(w, resBody, http.StatusOK)
|
h.WriteJSON(w, resBody, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName string, min, max int64, bucketSize int)(*common.MetricItem, error){
|
func (h *APIHandler) GetIndexHealthMetric(ctx context.Context, id, indexName string, min, max int64, bucketSize int)(*common.MetricItem, error){
|
||||||
bucketSizeStr:=fmt.Sprintf("%vs",bucketSize)
|
bucketSizeStr:=fmt.Sprintf("%vs",bucketSize)
|
||||||
intervalField, err := getDateHistogramIntervalField(global.MustLookupString(elastic.GlobalSystemElasticsearchID), bucketSizeStr)
|
intervalField, err := getDateHistogramIntervalField(global.MustLookupString(elastic.GlobalSystemElasticsearchID), bucketSizeStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -803,7 +598,7 @@ func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName str
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.name": util.MapStr{
|
"metadata.name": util.MapStr{
|
||||||
"value": "index_stats",
|
"value": "index_health",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -836,7 +631,7 @@ func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName str
|
||||||
"aggs": util.MapStr{
|
"aggs": util.MapStr{
|
||||||
"group_status": util.MapStr{
|
"group_status": util.MapStr{
|
||||||
"terms": util.MapStr{
|
"terms": util.MapStr{
|
||||||
"field": "payload.elasticsearch.index_stats.index_info.health",
|
"field": "payload.elasticsearch.index_health.status",
|
||||||
"size": 5,
|
"size": 5,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -852,7 +647,7 @@ func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName str
|
||||||
}
|
}
|
||||||
|
|
||||||
metricItem:=newMetricItem("index_health", 1, "")
|
metricItem:=newMetricItem("index_health", 1, "")
|
||||||
metricItem.AddLine("health","Health","","group1","payload.elasticsearch.index_stats.index_info.health","max",bucketSizeStr,"%","ratio","0.[00]","0.[00]",false,false)
|
metricItem.AddLine("health","Health","","group1","payload.elasticsearch.index_health.status","max",bucketSizeStr,"%","ratio","0.[00]","0.[00]",false,false)
|
||||||
|
|
||||||
metricData := []interface{}{}
|
metricData := []interface{}{}
|
||||||
if response.StatusCode == 200 {
|
if response.StatusCode == 200 {
|
||||||
|
@ -868,82 +663,74 @@ func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName str
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func getIndexStatusOfRecentDay(indexIDs []interface{})(map[string][]interface{}, error){
|
func (h *APIHandler) GetIndexStatusOfRecentDay(clusterID, indexName string)(map[string][]interface{}, error){
|
||||||
q := orm.Query{
|
q := orm.Query{
|
||||||
WildcardIndex: true,
|
WildcardIndex: true,
|
||||||
}
|
}
|
||||||
query := util.MapStr{
|
query := util.MapStr{
|
||||||
"aggs": util.MapStr{
|
"aggs": util.MapStr{
|
||||||
"group_by_index_id": util.MapStr{
|
"time_histogram": util.MapStr{
|
||||||
"terms": util.MapStr{
|
"date_range": util.MapStr{
|
||||||
"field": "metadata.labels.index_id",
|
"field": "timestamp",
|
||||||
"size": 100,
|
"format": "yyyy-MM-dd",
|
||||||
|
"time_zone": "+08:00",
|
||||||
|
"ranges": []util.MapStr{
|
||||||
|
{
|
||||||
|
"from": "now-13d/d",
|
||||||
|
"to": "now-12d/d",
|
||||||
|
}, {
|
||||||
|
"from": "now-12d/d",
|
||||||
|
"to": "now-11d/d",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"from": "now-11d/d",
|
||||||
|
"to": "now-10d/d",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"from": "now-10d/d",
|
||||||
|
"to": "now-9d/d",
|
||||||
|
}, {
|
||||||
|
"from": "now-9d/d",
|
||||||
|
"to": "now-8d/d",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"from": "now-8d/d",
|
||||||
|
"to": "now-7d/d",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"from": "now-7d/d",
|
||||||
|
"to": "now-6d/d",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"from": "now-6d/d",
|
||||||
|
"to": "now-5d/d",
|
||||||
|
}, {
|
||||||
|
"from": "now-5d/d",
|
||||||
|
"to": "now-4d/d",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"from": "now-4d/d",
|
||||||
|
"to": "now-3d/d",
|
||||||
|
},{
|
||||||
|
"from": "now-3d/d",
|
||||||
|
"to": "now-2d/d",
|
||||||
|
}, {
|
||||||
|
"from": "now-2d/d",
|
||||||
|
"to": "now-1d/d",
|
||||||
|
}, {
|
||||||
|
"from": "now-1d/d",
|
||||||
|
"to": "now/d",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"from": "now/d",
|
||||||
|
"to": "now",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"aggs": util.MapStr{
|
"aggs": util.MapStr{
|
||||||
"time_histogram": util.MapStr{
|
"term_health": util.MapStr{
|
||||||
"date_range": util.MapStr{
|
"terms": util.MapStr{
|
||||||
"field": "timestamp",
|
"field": "payload.elasticsearch.index_health.status",
|
||||||
"format": "yyyy-MM-dd",
|
|
||||||
"time_zone": "+08:00",
|
|
||||||
"ranges": []util.MapStr{
|
|
||||||
{
|
|
||||||
"from": "now-13d/d",
|
|
||||||
"to": "now-12d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-12d/d",
|
|
||||||
"to": "now-11d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-11d/d",
|
|
||||||
"to": "now-10d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-10d/d",
|
|
||||||
"to": "now-9d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-9d/d",
|
|
||||||
"to": "now-8d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-8d/d",
|
|
||||||
"to": "now-7d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-7d/d",
|
|
||||||
"to": "now-6d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-6d/d",
|
|
||||||
"to": "now-5d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-5d/d",
|
|
||||||
"to": "now-4d/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now-4d/d",
|
|
||||||
"to": "now-3d/d",
|
|
||||||
},{
|
|
||||||
"from": "now-3d/d",
|
|
||||||
"to": "now-2d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-2d/d",
|
|
||||||
"to": "now-1d/d",
|
|
||||||
}, {
|
|
||||||
"from": "now-1d/d",
|
|
||||||
"to": "now/d",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"from": "now/d",
|
|
||||||
"to": "now",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"term_health": util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"field": "payload.elasticsearch.index_stats.index_info.health",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -973,13 +760,18 @@ func getIndexStatusOfRecentDay(indexIDs []interface{})(map[string][]interface{},
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.name": util.MapStr{
|
"metadata.name": util.MapStr{
|
||||||
"value": "index_stats",
|
"value": "index_health",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"terms": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.index_id": indexIDs,
|
"metadata.labels.cluster_id": clusterID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.labels.index_name": indexName,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -996,37 +788,28 @@ func getIndexStatusOfRecentDay(indexIDs []interface{})(map[string][]interface{},
|
||||||
response := elastic.SearchResponse{}
|
response := elastic.SearchResponse{}
|
||||||
util.FromJSONBytes(res.Raw, &response)
|
util.FromJSONBytes(res.Raw, &response)
|
||||||
recentStatus := map[string][]interface{}{}
|
recentStatus := map[string][]interface{}{}
|
||||||
for _, bk := range response.Aggregations["group_by_index_id"].Buckets {
|
for _, bkVal := range response.Aggregations["time_histogram"].Buckets {
|
||||||
indexKey := bk["key"].(string)
|
if termHealth, ok := bkVal["term_health"].(map[string]interface{}); ok {
|
||||||
recentStatus[indexKey] = []interface{}{}
|
if healthBks, ok := termHealth["buckets"].([]interface{}); ok {
|
||||||
if histogramAgg, ok := bk["time_histogram"].(map[string]interface{}); ok {
|
if len(healthBks) == 0 {
|
||||||
if bks, ok := histogramAgg["buckets"].([]interface{}); ok {
|
continue
|
||||||
for _, bkItem := range bks {
|
}
|
||||||
if bkVal, ok := bkItem.(map[string]interface{}); ok {
|
healthMap := map[string]int{}
|
||||||
if termHealth, ok := bkVal["term_health"].(map[string]interface{}); ok {
|
status := "unknown"
|
||||||
if healthBks, ok := termHealth["buckets"].([]interface{}); ok {
|
for _, hbkItem := range healthBks {
|
||||||
if len(healthBks) == 0 {
|
if hitem, ok := hbkItem.(map[string]interface{}); ok {
|
||||||
continue
|
healthMap[hitem["key"].(string)] = 1
|
||||||
}
|
|
||||||
healthMap := map[string]int{}
|
|
||||||
status := "unknown"
|
|
||||||
for _, hbkItem := range healthBks {
|
|
||||||
if hitem, ok := hbkItem.(map[string]interface{}); ok {
|
|
||||||
healthMap[hitem["key"].(string)] = 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if _, ok = healthMap["red"]; ok {
|
|
||||||
status = "red"
|
|
||||||
}else if _, ok = healthMap["yellow"]; ok {
|
|
||||||
status = "yellow"
|
|
||||||
}else if _, ok = healthMap["green"]; ok {
|
|
||||||
status = "green"
|
|
||||||
}
|
|
||||||
recentStatus[indexKey] = append(recentStatus[indexKey], []interface{}{bkVal["key"], status})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if _, ok = healthMap["red"]; ok {
|
||||||
|
status = "red"
|
||||||
|
}else if _, ok = healthMap["yellow"]; ok {
|
||||||
|
status = "yellow"
|
||||||
|
}else if _, ok = healthMap["green"]; ok {
|
||||||
|
status = "green"
|
||||||
|
}
|
||||||
|
key := fmt.Sprintf("%s:%s", clusterID, indexName)
|
||||||
|
recentStatus[key] = append(recentStatus[key], []interface{}{bkVal["key"], status})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue