diff --git a/modules/elastic/api/cluster_overview.go b/modules/elastic/api/cluster_overview.go index 07916c8b..57277a82 100644 --- a/modules/elastic/api/cluster_overview.go +++ b/modules/elastic/api/cluster_overview.go @@ -50,6 +50,8 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request, h.WriteJSON(w, util.MapStr{}, http.StatusOK) return } + //only query the first cluster info + clusterIDs = clusterIDs[0:1] cids := make([]interface{}, 0, len(clusterIDs)) for _, clusterID := range clusterIDs { @@ -62,7 +64,7 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request, q1.Conds = orm.And( orm.Eq("metadata.category", "elasticsearch"), orm.Eq("metadata.name", "cluster_stats"), - orm.In("metadata.labels.cluster_id", cids), + orm.Eq("metadata.labels.cluster_id", cids[0]), ) q1.Collapse("metadata.labels.cluster_id") q1.AddSort("timestamp", orm.DESC) @@ -173,8 +175,8 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request, "bool": util.MapStr{ "must": []util.MapStr{ { - "terms": util.MapStr{ - "metadata.labels.cluster_uuid": clusterUUIDs, + "term": util.MapStr{ + "metadata.labels.cluster_uuid": clusterUUIDs[0], }, }, { @@ -252,7 +254,16 @@ func (h *APIHandler) FetchClusterInfo(w http.ResponseWriter, req *http.Request, }, }, } - indexMetrics := h.getMetrics(context.Background(), query, indexMetricItems, bucketSize) + timeout := h.GetParameterOrDefault(req, "timeout", "60s") + du, err := time.ParseDuration(timeout) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + ctx, cancel := context.WithTimeout(context.Background(), du) + defer cancel() + indexMetrics := h.getMetrics(ctx, query, indexMetricItems, bucketSize) indexingMetricData := util.MapStr{} for _, line := range indexMetrics["cluster_indexing"].Lines { // remove first metric dot @@ -716,6 +727,9 @@ func (h *APIHandler) GetRealtimeClusterNodes(w http.ResponseWriter, req *http.Re info.IndexQPS = qps[nodeInfo.Id]["index"] info.QueryQPS = qps[nodeInfo.Id]["query"] info.IndexBytesQPS = qps[nodeInfo.Id]["index_bytes"] + if v, ok := qps[nodeInfo.Id]["latest_timestamp"].(float64); ok { + info.Timestamp = uint64(v) + } } nodeInfos = append(nodeInfos, info) } @@ -826,6 +840,7 @@ type RealtimeNodeInfo struct { IndexQPS interface{} `json:"index_qps"` QueryQPS interface{} `json:"query_qps"` IndexBytesQPS interface{} `json:"index_bytes_qps"` + Timestamp uint64 `json:"timestamp"` CatNodeResponse } @@ -1122,6 +1137,11 @@ func (h *APIHandler) getNodeQPS(clusterID string, bucketSizeInSeconds int) (map[ "buckets_path": "query_total", }, }, + "latest_timestamp": util.MapStr{ + "max": util.MapStr{ + "field": "timestamp", + }, + }, }, }, }, diff --git a/modules/elastic/api/index_overview.go b/modules/elastic/api/index_overview.go index 46bbffbb..fc8c703b 100644 --- a/modules/elastic/api/index_overview.go +++ b/modules/elastic/api/index_overview.go @@ -31,6 +31,7 @@ import ( "context" "fmt" log "github.com/cihub/seelog" + v1 "infini.sh/console/modules/elastic/api/v1" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" @@ -274,21 +275,42 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p h.WriteJSON(w, util.MapStr{}, http.StatusOK) return } + indexIDs = indexIDs[0:1] // map indexIDs(cluster_id:index_name => cluster_uuid:indexName) var ( indexIDM = map[string]string{} newIndexIDs []interface{} clusterIndexNames = map[string][]string{} ) - for _, indexID := range indexIDs { - if v, ok := indexID.(string); ok { - parts := strings.Split(v, ":") - if len(parts) != 2 { - log.Warnf("got wrong index_id: %s", v) - continue - } - clusterIndexNames[parts[0]] = append(clusterIndexNames[parts[0]], parts[1]) + indexID := indexIDs[0] + timeout := h.GetParameterOrDefault(req, "timeout", "60s") + du, err := time.ParseDuration(timeout) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + ctx, cancel := context.WithTimeout(context.Background(), du) + defer cancel() + var ( + firstClusterID string + firstIndexName string + ) + if v, ok := indexID.(string); ok { + parts := strings.Split(v, ":") + if len(parts) != 2 { + h.WriteError(w, fmt.Sprintf("invalid index_id: %s", indexID), http.StatusInternalServerError) + return } + firstClusterID, firstIndexName = parts[0], parts[1] + if GetMonitorState(firstClusterID) == Console { + h.APIHandler.FetchIndexInfo(w, ctx, indexIDs) + return + } + clusterIndexNames[firstClusterID] = append(clusterIndexNames[firstClusterID], firstIndexName) + }else{ + h.WriteError(w, fmt.Sprintf("invalid index_id: %v", indexID), http.StatusInternalServerError) + return } for clusterID, indexNames := range clusterIndexNames { clusterUUID, err := adapter.GetClusterUUID(clusterID) @@ -306,7 +328,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p q1.Conds = orm.And( orm.Eq("metadata.category", "elasticsearch"), orm.Eq("metadata.name", "shard_stats"), - orm.In("metadata.labels.index_id", newIndexIDs), + orm.Eq("metadata.labels.index_id", newIndexIDs[0]), ) q1.Collapse("metadata.labels.shard_id") q1.AddSort("timestamp", orm.DESC) @@ -358,7 +380,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p } } - statusMetric, err := getIndexStatusOfRecentDay(indexIDs) + statusMetric, err := h.GetIndexStatusOfRecentDay(firstClusterID, firstIndexName) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) @@ -416,8 +438,8 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p }, }, { - "terms": util.MapStr{ - "metadata.labels.index_id": newIndexIDs, + "term": util.MapStr{ + "metadata.labels.index_id": newIndexIDs[0], }, }, }, @@ -504,7 +526,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p }, }, } - metrics := h.getMetrics(context.Background(), query, nodeMetricItems, bucketSize) + metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize) indexMetrics := map[string]util.MapStr{} for key, item := range metrics { for _, line := range item.Lines { @@ -890,9 +912,15 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ return } metrics["shard_state"] = shardStateMetric - }else { + }else if metricKey == v1.IndexHealthMetricKey { + healthMetric, err := h.GetIndexHealthMetric(ctx, clusterID, indexName, min, max, bucketSize) + if err != nil { + log.Error(err) + } + metrics["index_health"] = healthMetric + } else { switch metricKey { - case IndexThroughputMetricKey: + case v1.IndexThroughputMetricKey: metricItem := newMetricItem("index_throughput", 1, OperationGroupKey) metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true) if shardID == "" { @@ -905,14 +933,14 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ metricItem.AddLine("Deleting Rate", "Deleting Rate", "Number of documents being deleted for node.", "group1", "payload.elasticsearch.shard_stats.indexing.delete_total", "max", bucketSizeStr, "doc/s", "num", "0,0.[00]", "0,0.[00]", false, true) } metricItems = append(metricItems, metricItem) - case SearchThroughputMetricKey: + case v1.SearchThroughputMetricKey: metricItem := newMetricItem("search_throughput", 2, OperationGroupKey) metricItem.AddAxi("searching", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false) metricItem.AddLine("Search Rate", "Search Rate", "Number of search requests being executed.", "group1", "payload.elasticsearch.shard_stats.search.query_total", "max", bucketSizeStr, "query/s", "num", "0,0.[00]", "0,0.[00]", false, true) metricItems = append(metricItems, metricItem) - case IndexLatencyMetricKey: + case v1.IndexLatencyMetricKey: metricItem := newMetricItem("index_latency", 3, LatencyGroupKey) metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true) if shardID == "" { //index level @@ -933,7 +961,7 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ return value / value2 } metricItems = append(metricItems, metricItem) - case SearchLatencyMetricKey: + case v1.SearchLatencyMetricKey: metricItem := newMetricItem("search_latency", 4, LatencyGroupKey) metricItem.AddAxi("searching", "group2", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false) @@ -1052,261 +1080,6 @@ func (h *APIHandler) getIndexShardsMetric(ctx context.Context, id, indexName str return metricItem, nil } -func (h *APIHandler) getIndexHealthMetric(id, indexName string, min, max int64, bucketSize int)(*common.MetricItem, error){ - bucketSizeStr:=fmt.Sprintf("%vs",bucketSize) - intervalField, err := getDateHistogramIntervalField(global.MustLookupString(elastic.GlobalSystemElasticsearchID), bucketSizeStr) - if err != nil { - return nil, err - } - query := util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.cluster_id": util.MapStr{ - "value": id, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.category": util.MapStr{ - "value": "elasticsearch", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "index_stats", - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.index_name": util.MapStr{ - "value": indexName, - }, - }, - }, - }, - "filter": []util.MapStr{ - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gte": min, - "lte": max, - }, - }, - }, - }, - }, - }, - "aggs": util.MapStr{ - "dates": util.MapStr{ - "date_histogram": util.MapStr{ - "field": "timestamp", - intervalField: bucketSizeStr, - }, - "aggs": util.MapStr{ - "groups": util.MapStr{ - "terms": util.MapStr{ - "field": "payload.elasticsearch.index_stats.index_info.health", - "size": 5, - }, - }, - }, - }, - }, - } - response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(getAllMetricsIndex(), util.MustToJSONBytes(query)) - if err != nil { - log.Error(err) - return nil, err - } - - metricItem:=newMetricItem("index_health", 1, "") - metricItem.AddLine("health","Health","","group1","payload.elasticsearch.index_stats.index_info.health","max",bucketSizeStr,"%","ratio","0.[00]","0.[00]",false,false) - - metricData := []interface{}{} - if response.StatusCode == 200 { - metricData, err = parseGroupMetricData(response.Aggregations["dates"].Buckets, true) - if err != nil { - return nil, err - } - } - metricItem.Lines[0].Data = metricData - metricItem.Lines[0].Type = common.GraphTypeBar - return metricItem, nil -} - - -func getIndexStatusOfRecentDay(indexIDs []interface{})(map[string][]interface{}, error){ - q := orm.Query{ - WildcardIndex: true, - } - query := util.MapStr{ - "aggs": util.MapStr{ - "group_by_index_id": util.MapStr{ - "terms": util.MapStr{ - "field": "metadata.labels.index_id", - "size": 100, - }, - "aggs": util.MapStr{ - "time_histogram": util.MapStr{ - "date_range": util.MapStr{ - "field": "timestamp", - "format": "yyyy-MM-dd", - "time_zone": "+08:00", - "ranges": []util.MapStr{ - { - "from": "now-13d/d", - "to": "now-12d/d", - }, { - "from": "now-12d/d", - "to": "now-11d/d", - }, - { - "from": "now-11d/d", - "to": "now-10d/d", - }, - { - "from": "now-10d/d", - "to": "now-9d/d", - }, { - "from": "now-9d/d", - "to": "now-8d/d", - }, - { - "from": "now-8d/d", - "to": "now-7d/d", - }, - { - "from": "now-7d/d", - "to": "now-6d/d", - }, - { - "from": "now-6d/d", - "to": "now-5d/d", - }, { - "from": "now-5d/d", - "to": "now-4d/d", - }, - { - "from": "now-4d/d", - "to": "now-3d/d", - },{ - "from": "now-3d/d", - "to": "now-2d/d", - }, { - "from": "now-2d/d", - "to": "now-1d/d", - }, { - "from": "now-1d/d", - "to": "now/d", - }, - { - "from": "now/d", - "to": "now", - }, - }, - }, - "aggs": util.MapStr{ - "term_health": util.MapStr{ - "terms": util.MapStr{ - "field": "payload.elasticsearch.index_stats.index_info.health", - }, - }, - }, - }, - }, - }, - }, - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - }, - "size": 0, - "query": util.MapStr{ - "bool": util.MapStr{ - "filter": []util.MapStr{ - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gte": "now-15d", - "lte": "now", - }, - }, - }, - }, - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "index_stats", - }, - }, - }, - { - "terms": util.MapStr{ - "metadata.labels.index_id": indexIDs, - }, - }, - }, - }, - }, - } - q.RawQuery = util.MustToJSONBytes(query) - - err, res := orm.Search(&event.Event{}, &q) - if err != nil { - return nil, err - } - - response := elastic.SearchResponse{} - util.FromJSONBytes(res.Raw, &response) - recentStatus := map[string][]interface{}{} - for _, bk := range response.Aggregations["group_by_index_id"].Buckets { - indexKey := bk["key"].(string) - recentStatus[indexKey] = []interface{}{} - if histogramAgg, ok := bk["time_histogram"].(map[string]interface{}); ok { - if bks, ok := histogramAgg["buckets"].([]interface{}); ok { - for _, bkItem := range bks { - if bkVal, ok := bkItem.(map[string]interface{}); ok { - if termHealth, ok := bkVal["term_health"].(map[string]interface{}); ok { - if healthBks, ok := termHealth["buckets"].([]interface{}); ok { - if len(healthBks) == 0 { - continue - } - healthMap := map[string]int{} - status := "unknown" - for _, hbkItem := range healthBks { - if hitem, ok := hbkItem.(map[string]interface{}); ok { - healthMap[hitem["key"].(string)] = 1 - } - } - if _, ok = healthMap["red"]; ok { - status = "red" - }else if _, ok = healthMap["yellow"]; ok { - status = "yellow" - }else if _, ok = healthMap["green"]; ok { - status = "green" - } - recentStatus[indexKey] = append(recentStatus[indexKey], []interface{}{bkVal["key"], status}) - } - } - } - } - } - } - } - return recentStatus, nil -} - func (h *APIHandler) getIndexNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { resBody := map[string] interface{}{} id := ps.ByName("id") diff --git a/modules/elastic/api/manage.go b/modules/elastic/api/manage.go index 7e2ead62..4dc38a1c 100644 --- a/modules/elastic/api/manage.go +++ b/modules/elastic/api/manage.go @@ -560,7 +560,7 @@ func (h *APIHandler) HandleClusterMetricsAction(w http.ResponseWriter, req *http } ctx, cancel := context.WithTimeout(context.Background(), du) defer cancel() - if util.StringInArray([]string{IndexThroughputMetricKey, SearchThroughputMetricKey, IndexLatencyMetricKey, SearchLatencyMetricKey}, key) { + if util.StringInArray([]string{v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.IndexLatencyMetricKey, v1.SearchLatencyMetricKey}, key) { metrics = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, key) }else{ metrics = h.GetClusterMetrics(ctx, id, bucketSize, min, max, key) @@ -912,7 +912,7 @@ func (h *APIHandler) GetClusterMetrics(ctx context.Context, id string, bucketSiz ClusterIndicesMetricKey, ClusterNodeCountMetricKey: clusterMetricsResult = h.getClusterMetricsByKey(ctx, id, bucketSize, min, max, metricKey) - case IndexLatencyMetricKey, IndexThroughputMetricKey, SearchThroughputMetricKey, SearchLatencyMetricKey: + case v1.IndexLatencyMetricKey, v1.IndexThroughputMetricKey, v1.SearchThroughputMetricKey, v1.SearchLatencyMetricKey: clusterMetricsResult = h.GetClusterIndexMetrics(ctx, id, bucketSize, min, max, metricKey) case ClusterHealthMetricKey: statusMetric, err := h.getClusterStatusMetric(ctx, id, min, max, bucketSize) @@ -1024,31 +1024,24 @@ func (h *APIHandler) getClusterMetricsByKey(ctx context.Context, id string, buck return h.getSingleMetrics(ctx, clusterMetricItems, query, bucketSize) } -const ( - IndexThroughputMetricKey = "index_throughput" - SearchThroughputMetricKey = "search_throughput" - IndexLatencyMetricKey = "index_latency" - SearchLatencyMetricKey = "search_latency" -) - func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, bucketSize int, min, max int64, metricKey string) map[string]*common.MetricItem { bucketSizeStr := fmt.Sprintf("%vs", bucketSize) metricItems := []*common.MetricItem{} switch metricKey { - case IndexThroughputMetricKey: - metricItem := newMetricItem(IndexThroughputMetricKey, 2, OperationGroupKey) + case v1.IndexThroughputMetricKey: + metricItem := newMetricItem(v1.IndexThroughputMetricKey, 2, OperationGroupKey) metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true) metricItem.AddLine("Indexing Rate", "Total Indexing", "Number of documents being indexed for primary and replica shards.", "group1", "payload.elasticsearch.node_stats.indices.indexing.index_total", "max", bucketSizeStr, "doc/s", "num", "0,0.[00]", "0,0.[00]", false, true) metricItems = append(metricItems, metricItem) - case SearchThroughputMetricKey: - metricItem := newMetricItem(SearchThroughputMetricKey, 2, OperationGroupKey) + case v1.SearchThroughputMetricKey: + metricItem := newMetricItem(v1.SearchThroughputMetricKey, 2, OperationGroupKey) metricItem.AddAxi("searching", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false) metricItem.AddLine("Search Rate", "Total Query", "Number of search requests being executed across primary and replica shards. A single search can run against multiple shards!", "group1", "payload.elasticsearch.node_stats.indices.search.query_total", "max", bucketSizeStr, "query/s", "num", "0,0.[00]", "0,0.[00]", false, true) metricItems = append(metricItems, metricItem) - case IndexLatencyMetricKey: - metricItem := newMetricItem(IndexLatencyMetricKey, 3, LatencyGroupKey) + case v1.IndexLatencyMetricKey: + metricItem := newMetricItem(v1.IndexLatencyMetricKey, 3, LatencyGroupKey) metricItem.AddAxi("indexing", "group1", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, true) metricItem.AddLine("Indexing", "Indexing Latency", "Average latency for indexing documents.", "group1", "payload.elasticsearch.node_stats.indices.indexing.index_time_in_millis", "max", bucketSizeStr, "ms", "num", "0,0.[00]", "0,0.[00]", false, true) @@ -1062,8 +1055,8 @@ func (h *APIHandler) GetClusterIndexMetrics(ctx context.Context, id string, buck return value / value2 } metricItems = append(metricItems, metricItem) - case SearchLatencyMetricKey: - metricItem := newMetricItem(SearchLatencyMetricKey, 3, LatencyGroupKey) + case v1.SearchLatencyMetricKey: + metricItem := newMetricItem(v1.SearchLatencyMetricKey, 3, LatencyGroupKey) metricItem.AddAxi("searching", "group2", common.PositionLeft, "num", "0,0", "0,0.[00]", 5, false) metricItem.AddLine("Searching", "Query Latency", "Average latency for searching query.", "group2", "payload.elasticsearch.node_stats.indices.search.query_time_in_millis", "max", bucketSizeStr, "ms", "num", "0,0.[00]", "0,0.[00]", false, true) diff --git a/modules/elastic/api/node_overview.go b/modules/elastic/api/node_overview.go index e1057716..b9ec3f51 100644 --- a/modules/elastic/api/node_overview.go +++ b/modules/elastic/api/node_overview.go @@ -31,6 +31,7 @@ import ( "context" "fmt" log "github.com/cihub/seelog" + v1 "infini.sh/console/modules/elastic/api/v1" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" @@ -204,6 +205,17 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps h.WriteJSON(w, util.MapStr{}, http.StatusOK) return } + //only query one node info for fast response + nodeIDs = nodeIDs[0:1] + timeout := h.GetParameterOrDefault(req, "timeout", "60s") + du, err := time.ParseDuration(timeout) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + ctx, cancel := context.WithTimeout(context.Background(), du) + defer cancel() q1 := orm.Query{WildcardIndex: true} query := util.MapStr{ @@ -235,8 +247,8 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps }, }, { - "terms": util.MapStr{ - "metadata.labels.node_id": nodeIDs, + "term": util.MapStr{ + "metadata.labels.node_id": nodeIDs[0], }, }, }, @@ -355,8 +367,8 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps }, }, { - "terms": util.MapStr{ - "metadata.labels.node_id": nodeIDs, + "term": util.MapStr{ + "metadata.labels.node_id": nodeIDs[0], }, }, }, @@ -410,7 +422,7 @@ func (h *APIHandler) FetchNodeInfo(w http.ResponseWriter, req *http.Request, ps }, }, } - metrics := h.getMetrics(context.Background(), query, nodeMetricItems, bucketSize) + metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize) indexMetrics := map[string]util.MapStr{} for key, item := range metrics { for _, line := range item.Lines { @@ -722,19 +734,19 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque metricItem.AddLine("Max Heap","Max Heap","JVM max Heap of node.","group1","payload.elasticsearch.node_stats.jvm.mem.heap_max_in_bytes","max",bucketSizeStr,"","bytes","0,0.[00]","0,0.[00]",false,false) metricItem.AddLine("Used Heap","Used Heap","JVM used Heap of node.","group1","payload.elasticsearch.node_stats.jvm.mem.heap_used_in_bytes","max",bucketSizeStr,"","bytes","0,0.[00]","0,0.[00]",false,false) metricItems=append(metricItems,metricItem) - case IndexThroughputMetricKey: + case v1.IndexThroughputMetricKey: metricItem := newMetricItem("index_throughput", 3, OperationGroupKey) metricItem.AddAxi("indexing","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,true) metricItem.AddLine("Indexing Rate","Total Shards","Number of documents being indexed for node.","group1","payload.elasticsearch.node_stats.indices.indexing.index_total","max",bucketSizeStr,"doc/s","num","0,0.[00]","0,0.[00]",false,true) metricItems=append(metricItems,metricItem) - case SearchThroughputMetricKey: + case v1.SearchThroughputMetricKey: metricItem := newMetricItem("search_throughput", 4, OperationGroupKey) metricItem.AddAxi("searching","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,false) metricItem.AddLine("Search Rate","Total Shards", "Number of search requests being executed.", "group1","payload.elasticsearch.node_stats.indices.search.query_total","max",bucketSizeStr,"query/s","num","0,0.[00]","0,0.[00]",false,true) metricItems=append(metricItems,metricItem) - case IndexLatencyMetricKey: + case v1.IndexLatencyMetricKey: metricItem := newMetricItem("index_latency", 5, LatencyGroupKey) metricItem.AddAxi("indexing","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,true) @@ -749,7 +761,7 @@ func (h *APIHandler) GetSingleNodeMetrics(w http.ResponseWriter, req *http.Reque return value/value2 } metricItems=append(metricItems,metricItem) - case SearchLatencyMetricKey: + case v1.SearchLatencyMetricKey: metricItem := newMetricItem("search_latency", 6, LatencyGroupKey) metricItem.AddAxi("searching","group2",common.PositionLeft,"num","0,0","0,0.[00]",5,false) diff --git a/modules/elastic/api/v1/cluster_overview.go b/modules/elastic/api/v1/cluster_overview.go index 121fcc98..ec7afb77 100644 --- a/modules/elastic/api/v1/cluster_overview.go +++ b/modules/elastic/api/v1/cluster_overview.go @@ -29,7 +29,6 @@ import ( "strings" "time" - log "github.com/cihub/seelog" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" "infini.sh/framework/core/event" @@ -569,71 +568,6 @@ func (h *APIHandler) GetClusterNodes(w http.ResponseWriter, req *http.Request, p h.WriteJSON(w, nodes, http.StatusOK) } -func (h *APIHandler) GetRealtimeClusterNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.ByName("id") - meta := elastic.GetMetadata(id) - if meta == nil || !meta.IsAvailable() { - log.Debugf("cluster [%s] is not available", id) - h.WriteJSON(w, []interface{}{}, http.StatusOK) - return - } - esClient := elastic.GetClient(id) - if esClient == nil { - h.WriteJSON(w, util.MapStr{ - "error": "cluster not found", - }, http.StatusNotFound) - return - } - catNodesInfo, err := esClient.CatNodes("id,name,ip,port,master,heap.percent,disk.avail,cpu,load_1m,uptime") - if err != nil { - h.WriteJSON(w, util.MapStr{ - "error": err.Error(), - }, http.StatusInternalServerError) - return - } - catShardsInfo, err := esClient.CatShards() - if err != nil { - log.Error(err) - } - shardCounts := map[string]int{} - nodeM := map[string]string{} - for _, shardInfo := range catShardsInfo { - nodeM[shardInfo.NodeName] = shardInfo.NodeID - if c, ok := shardCounts[shardInfo.NodeName]; ok { - shardCounts[shardInfo.NodeName] = c + 1 - } else { - shardCounts[shardInfo.NodeName] = 1 - } - } - qps, err := h.getNodeQPS(id, 20) - if err != nil { - h.WriteJSON(w, util.MapStr{ - "error": err.Error(), - }, http.StatusInternalServerError) - return - } - - nodeInfos := []RealtimeNodeInfo{} - for _, nodeInfo := range catNodesInfo { - if len(nodeInfo.Id) == 4 { //node short id, use nodeId from shard info isnstead - nodeInfo.Id = nodeM[nodeInfo.Name] - } - if c, ok := shardCounts[nodeInfo.Name]; ok { - nodeInfo.Shards = c - } - info := RealtimeNodeInfo{ - CatNodeResponse: CatNodeResponse(nodeInfo), - } - if _, ok := qps[nodeInfo.Id]; ok { - info.IndexQPS = qps[nodeInfo.Id]["index"] - info.QueryQPS = qps[nodeInfo.Id]["query"] - info.IndexBytesQPS = qps[nodeInfo.Id]["index_bytes"] - } - nodeInfos = append(nodeInfos, info) - } - h.WriteJSON(w, nodeInfos, http.StatusOK) -} - func (h *APIHandler) GetClusterIndices(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( //size = h.GetIntOrDefault(req, "size", 20) @@ -824,97 +758,6 @@ func (h *APIHandler) getIndexQPS(clusterID string, bucketSizeInSeconds int) (map return h.QueryQPS(query, bucketSizeInSeconds) } -func (h *APIHandler) getNodeQPS(clusterID string, bucketSizeInSeconds int) (map[string]util.MapStr, error) { - ver := h.Client().GetVersion() - bucketSizeStr := fmt.Sprintf("%ds", bucketSizeInSeconds) - intervalField, err := elastic.GetDateHistogramIntervalField(ver.Distribution, ver.Number, bucketSizeStr) - if err != nil { - return nil, err - } - query := util.MapStr{ - "size": 0, - "aggs": util.MapStr{ - "term_node": util.MapStr{ - "terms": util.MapStr{ - "field": "metadata.labels.node_id", - "size": 1000, - }, - "aggs": util.MapStr{ - "date": util.MapStr{ - "date_histogram": util.MapStr{ - "field": "timestamp", - intervalField: "10s", - }, - "aggs": util.MapStr{ - "index_total": util.MapStr{ - "max": util.MapStr{ - "field": "payload.elasticsearch.node_stats.indices.indexing.index_total", - }, - }, - "index_bytes_total": util.MapStr{ - "max": util.MapStr{ - "field": "payload.elasticsearch.node_stats.indices.store.size_in_bytes", - }, - }, - "query_total": util.MapStr{ - "max": util.MapStr{ - "field": "payload.elasticsearch.node_stats.indices.search.query_total", - }, - }, - "index_rate": util.MapStr{ - "derivative": util.MapStr{ - "buckets_path": "index_total", - }, - }, - "index_bytes_rate": util.MapStr{ - "derivative": util.MapStr{ - "buckets_path": "index_bytes_total", - }, - }, - "query_rate": util.MapStr{ - "derivative": util.MapStr{ - "buckets_path": "query_total", - }, - }, - }, - }, - }, - }, - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "filter": []util.MapStr{ - { - "range": util.MapStr{ - "timestamp": util.MapStr{ - "gte": "now-1m", - "lte": "now", - }, - }, - }, - }, - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.cluster_id": util.MapStr{ - "value": clusterID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.name": util.MapStr{ - "value": "node_stats", - }, - }, - }, - }, - }, - }, - } - return h.QueryQPS(query, bucketSizeInSeconds) -} - func (h *APIHandler) QueryQPS(query util.MapStr, bucketSizeInSeconds int) (map[string]util.MapStr, error) { esClient := h.Client() searchRes, err := esClient.SearchWithRawQueryDSL(orm.GetWildcardIndexName(event.Event{}), util.MustToJSONBytes(query)) @@ -934,6 +777,7 @@ func (h *APIHandler) QueryQPS(query util.MapStr, bucketSizeInSeconds int) (map[s maxIndexBytesRate float64 preIndexTotal float64 dropNext bool + maxTimestamp float64 ) for _, dateBk := range bks { if dateBkVal, ok := dateBk.(map[string]interface{}); ok { @@ -971,11 +815,17 @@ func (h *APIHandler) QueryQPS(query util.MapStr, bucketSizeInSeconds int) (map[s maxIndexBytesRate = indexBytesRateVal } } + if latestTime, ok := dateBkVal["latest_timestamp"].(map[string]interface{}); ok { + if latestTimeVal, ok := latestTime["value"].(float64); ok && latestTimeVal > maxTimestamp { + maxTimestamp = latestTimeVal + } + } } } indexQPS[k]["index"] = maxIndexRate / float64(bucketSizeInSeconds) indexQPS[k]["query"] = maxQueryRate / float64(bucketSizeInSeconds) indexQPS[k]["index_bytes"] = maxIndexBytesRate / float64(bucketSizeInSeconds) + indexQPS[k]["latest_timestamp"] = maxTimestamp } } } diff --git a/modules/elastic/api/v1/index_overview.go b/modules/elastic/api/v1/index_overview.go index 20b2b555..a8420b92 100644 --- a/modules/elastic/api/v1/index_overview.go +++ b/modules/elastic/api/v1/index_overview.go @@ -43,252 +43,23 @@ import ( "time" ) -func (h *APIHandler) SearchIndexMetadata(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - resBody:=util.MapStr{} - reqBody := struct{ - Keyword string `json:"keyword"` - Size int `json:"size"` - From int `json:"from"` - Aggregations []elastic.SearchAggParam `json:"aggs"` - Highlight elastic.SearchHighlightParam `json:"highlight"` - Filter elastic.SearchFilterParam `json:"filter"` - Sort []string `json:"sort"` - SearchField string `json:"search_field"` - }{} - err := h.DecodeJSON(req, &reqBody) - if err != nil { - resBody["error"] = err.Error() - h.WriteJSON(w,resBody, http.StatusInternalServerError ) - return - } - aggs := elastic.BuildSearchTermAggregations(reqBody.Aggregations) - aggs["term_cluster_id"] = util.MapStr{ - "terms": util.MapStr{ - "field": "metadata.cluster_id", - "size": 1000, - }, - "aggs": util.MapStr{ - "term_cluster_name": util.MapStr{ - "terms": util.MapStr{ - "field": "metadata.cluster_name", - "size": 1, - }, - }, - }, - } - filter := elastic.BuildSearchTermFilter(reqBody.Filter) - var should []util.MapStr - if reqBody.SearchField != ""{ - should = []util.MapStr{ - { - "prefix": util.MapStr{ - reqBody.SearchField: util.MapStr{ - "value": reqBody.Keyword, - "boost": 20, - }, - }, - }, - { - "match": util.MapStr{ - reqBody.SearchField: util.MapStr{ - "query": reqBody.Keyword, - "fuzziness": "AUTO", - "max_expansions": 10, - "prefix_length": 2, - "fuzzy_transpositions": true, - "boost": 2, - }, - }, - }, - } - }else{ - if reqBody.Keyword != ""{ - should = []util.MapStr{ - { - "prefix": util.MapStr{ - "metadata.index_name": util.MapStr{ - "value": reqBody.Keyword, - "boost": 30, - }, - }, - }, - { - "prefix": util.MapStr{ - "metadata.aliases": util.MapStr{ - "value": reqBody.Keyword, - "boost": 20, - }, - }, - }, - { - "match": util.MapStr{ - "search_text": util.MapStr{ - "query": reqBody.Keyword, - "fuzziness": "AUTO", - "max_expansions": 10, - "prefix_length": 2, - "fuzzy_transpositions": true, - "boost": 2, - }, - }, - }, - { - "query_string": util.MapStr{ - "fields": []string{"*"}, - "query": reqBody.Keyword, - "fuzziness": "AUTO", - "fuzzy_prefix_length": 2, - "fuzzy_max_expansions": 10, - "fuzzy_transpositions": true, - "allow_leading_wildcard": false, - }, - }, - } - } - } - - must := []interface{}{ - } - hasAllPrivilege, indexPrivilege := h.GetCurrentUserIndex(req) - if !hasAllPrivilege && len(indexPrivilege) == 0 { - h.WriteJSON(w, elastic.SearchResponse{ - - }, http.StatusOK) - return - } - if !hasAllPrivilege { - indexShould := make([]interface{}, 0, len(indexPrivilege)) - for clusterID, indices := range indexPrivilege { - var ( - wildcardIndices []string - normalIndices []string - ) - for _, index := range indices { - if strings.Contains(index,"*") { - wildcardIndices = append(wildcardIndices, index) - continue - } - normalIndices = append(normalIndices, index) - } - subShould := []util.MapStr{} - if len(wildcardIndices) > 0 { - subShould = append(subShould, util.MapStr{ - "query_string": util.MapStr{ - "query": strings.Join(wildcardIndices, " "), - "fields": []string{"metadata.index_name"}, - "default_operator": "OR", - }, - }) - } - if len(normalIndices) > 0 { - subShould = append(subShould, util.MapStr{ - "terms": util.MapStr{ - "metadata.index_name": normalIndices, - }, - }) - } - indexShould = append(indexShould, util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "wildcard": util.MapStr{ - "metadata.cluster_id": util.MapStr{ - "value": clusterID, - }, - }, - }, - { - "bool": util.MapStr{ - "minimum_should_match": 1, - "should": subShould, - }, - }, - }, - }, - }) - } - indexFilter := util.MapStr{ - "bool": util.MapStr{ - "minimum_should_match": 1, - "should": indexShould, - }, - } - must = append(must, indexFilter) - } - boolQuery := util.MapStr{ - "must_not": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.labels.index_status": "deleted", - }, - }, - }, - "filter": filter, - "must": must, - } - if len(should) > 0 { - boolQuery["should"] = should - boolQuery["minimum_should_match"] = 1 - } - query := util.MapStr{ - "aggs": aggs, - "size": reqBody.Size, - "from": reqBody.From, - "highlight": elastic.BuildSearchHighlight(&reqBody.Highlight), - "query": util.MapStr{ - "bool": boolQuery, - }, - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "desc", - }, - }, - }, - } - if len(reqBody.Sort) > 1 { - query["sort"] = []util.MapStr{ - { - reqBody.Sort[0]: util.MapStr{ - "order": reqBody.Sort[1], - }, - }, - } - } - dsl := util.MustToJSONBytes(query) - response, err := elastic.GetClient(global.MustLookupString(elastic.GlobalSystemElasticsearchID)).SearchWithRawQueryDSL(orm.GetIndexName(elastic.IndexConfig{}), dsl) - if err != nil { - resBody["error"] = err.Error() - h.WriteJSON(w,resBody, http.StatusInternalServerError ) - return - } - w.Write(util.MustToJSONBytes(response)) - -} -func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - defer func() { - if err := recover(); err != nil { - log.Error(err) - } - }() - var indexIDs []interface{} - - - h.DecodeJSON(req, &indexIDs) +func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, ctx context.Context, indexIDs []interface{}) { if len(indexIDs) == 0 { h.WriteJSON(w, util.MapStr{}, http.StatusOK) return } + //only query first index + indexIDs = indexIDs[0:1] q1 := orm.Query{WildcardIndex: true} q1.Conds = orm.And( orm.Eq("metadata.category", "elasticsearch"), orm.Eq("metadata.name", "index_stats"), - orm.In("metadata.labels.index_id", indexIDs), + orm.Eq("metadata.labels.index_id", indexIDs[0]), ) - q1.Collapse("metadata.labels.index_id") + //q1.Collapse("metadata.labels.index_id") q1.AddSort("timestamp", orm.DESC) - q1.Size = len(indexIDs) + 1 + q1.Size = 1 err, results := orm.Search(&event.Event{}, &q1) if err != nil { @@ -304,10 +75,23 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p if indexID, ok := util.GetMapValueByKeys([]string{"metadata", "labels", "index_id"}, result); ok { summary := map[string]interface{}{} if docs, ok := util.GetMapValueByKeys([]string{"payload", "elasticsearch", "index_stats", "total", "docs"}, result); ok { - summary["docs"] = docs + if docsM, ok := docs.(map[string]interface{}); ok { + summary["docs_deleted"] = docsM["docs_deleted"] + summary["docs_count"] = docsM["count"] + } } if indexInfo, ok := util.GetMapValueByKeys([]string{"payload", "elasticsearch", "index_stats", "index_info"}, result); ok { - summary["index_info"] = indexInfo + if indexInfoM, ok := indexInfo.(map[string]interface{}); ok { + summary["index"] = indexInfoM["index"] + summary["status"] = indexInfoM["status"] + summary["shards"] = indexInfoM["shards"] + summary["replicas"] = indexInfoM["replicas"] + storeInBytes, _ := util.ToBytes(indexInfoM["store_size"].(string)) + summary["store_in_bytes"] = storeInBytes + priStoreInBytes, _ := util.ToBytes(indexInfoM["pri_store_size"].(string)) + summary["pri_store_in_bytes"] = priStoreInBytes + summary["timestamp"] = result["timestamp"] + } } if shardInfo, ok := util.GetMapValueByKeys([]string{"payload", "elasticsearch", "index_stats", "shard_info"}, result); ok { if sinfo, ok := shardInfo.([]interface{}); ok { @@ -328,18 +112,30 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p } } - statusMetric, err := getIndexStatusOfRecentDay(indexIDs) + indexID := indexIDs[0] + var firstClusterID, firstIndexName string + if v, ok := indexID.(string); ok { + parts := strings.Split(v, ":") + if len(parts) != 2 { + h.WriteError(w, fmt.Sprintf("invalid index_id: %s", indexID), http.StatusInternalServerError) + return + } + firstClusterID, firstIndexName = parts[0], parts[1] + }else{ + h.WriteError(w, fmt.Sprintf("invalid index_id: %v", indexID), http.StatusInternalServerError) + return + } + statusMetric, err := h.GetIndexStatusOfRecentDay(firstClusterID, firstIndexName) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - - bucketSize, min, max, err := h.getMetricRangeAndBucketSize(req, 60, (15)) - if err != nil { - panic(err) - return + bucketSize := GetMinBucketSize() + if bucketSize < 60 { + bucketSize = 60 } + var metricLen = 15 // ็ดขๅผ•้€Ÿ็އ indexMetric:=newMetricItem("indexing", 1, OperationGroupKey) indexMetric.AddAxi("indexing rate","group1",common.PositionLeft,"num","0,0","0,0.[00]",5,true) @@ -394,8 +190,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p { "range": util.MapStr{ "timestamp": util.MapStr{ - "gte": min, - "lte": max, + "gte": fmt.Sprintf("now-%ds", metricLen * bucketSize), }, }, }, @@ -441,7 +236,7 @@ func (h *APIHandler) FetchIndexInfo(w http.ResponseWriter, req *http.Request, p }, }, } - metrics := h.getMetrics(context.Background(), query, nodeMetricItems, bucketSize) + metrics := h.getMetrics(ctx, query, nodeMetricItems, bucketSize) indexMetrics := map[string]util.MapStr{} for key, item := range metrics { for _, line := range item.Lines { @@ -714,7 +509,7 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ defer cancel() metrics := map[string]*common.MetricItem{} if metricKey == IndexHealthMetricKey { - healthMetric, err := h.getIndexHealthMetric(ctx, clusterID, indexName, min, max, bucketSize) + healthMetric, err := h.GetIndexHealthMetric(ctx, clusterID, indexName, min, max, bucketSize) if err != nil { log.Error(err) } @@ -776,7 +571,7 @@ func (h *APIHandler) GetSingleIndexMetrics(w http.ResponseWriter, req *http.Requ h.WriteJSON(w, resBody, http.StatusOK) } -func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName string, min, max int64, bucketSize int)(*common.MetricItem, error){ +func (h *APIHandler) GetIndexHealthMetric(ctx context.Context, id, indexName string, min, max int64, bucketSize int)(*common.MetricItem, error){ bucketSizeStr:=fmt.Sprintf("%vs",bucketSize) intervalField, err := getDateHistogramIntervalField(global.MustLookupString(elastic.GlobalSystemElasticsearchID), bucketSizeStr) if err != nil { @@ -803,7 +598,7 @@ func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName str { "term": util.MapStr{ "metadata.name": util.MapStr{ - "value": "index_stats", + "value": "index_health", }, }, }, @@ -836,7 +631,7 @@ func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName str "aggs": util.MapStr{ "group_status": util.MapStr{ "terms": util.MapStr{ - "field": "payload.elasticsearch.index_stats.index_info.health", + "field": "payload.elasticsearch.index_health.status", "size": 5, }, }, @@ -852,7 +647,7 @@ func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName str } metricItem:=newMetricItem("index_health", 1, "") - metricItem.AddLine("health","Health","","group1","payload.elasticsearch.index_stats.index_info.health","max",bucketSizeStr,"%","ratio","0.[00]","0.[00]",false,false) + metricItem.AddLine("health","Health","","group1","payload.elasticsearch.index_health.status","max",bucketSizeStr,"%","ratio","0.[00]","0.[00]",false,false) metricData := []interface{}{} if response.StatusCode == 200 { @@ -868,82 +663,74 @@ func (h *APIHandler) getIndexHealthMetric(ctx context.Context, id, indexName str } -func getIndexStatusOfRecentDay(indexIDs []interface{})(map[string][]interface{}, error){ +func (h *APIHandler) GetIndexStatusOfRecentDay(clusterID, indexName string)(map[string][]interface{}, error){ q := orm.Query{ WildcardIndex: true, } query := util.MapStr{ "aggs": util.MapStr{ - "group_by_index_id": util.MapStr{ - "terms": util.MapStr{ - "field": "metadata.labels.index_id", - "size": 100, + "time_histogram": util.MapStr{ + "date_range": util.MapStr{ + "field": "timestamp", + "format": "yyyy-MM-dd", + "time_zone": "+08:00", + "ranges": []util.MapStr{ + { + "from": "now-13d/d", + "to": "now-12d/d", + }, { + "from": "now-12d/d", + "to": "now-11d/d", + }, + { + "from": "now-11d/d", + "to": "now-10d/d", + }, + { + "from": "now-10d/d", + "to": "now-9d/d", + }, { + "from": "now-9d/d", + "to": "now-8d/d", + }, + { + "from": "now-8d/d", + "to": "now-7d/d", + }, + { + "from": "now-7d/d", + "to": "now-6d/d", + }, + { + "from": "now-6d/d", + "to": "now-5d/d", + }, { + "from": "now-5d/d", + "to": "now-4d/d", + }, + { + "from": "now-4d/d", + "to": "now-3d/d", + },{ + "from": "now-3d/d", + "to": "now-2d/d", + }, { + "from": "now-2d/d", + "to": "now-1d/d", + }, { + "from": "now-1d/d", + "to": "now/d", + }, + { + "from": "now/d", + "to": "now", + }, + }, }, "aggs": util.MapStr{ - "time_histogram": util.MapStr{ - "date_range": util.MapStr{ - "field": "timestamp", - "format": "yyyy-MM-dd", - "time_zone": "+08:00", - "ranges": []util.MapStr{ - { - "from": "now-13d/d", - "to": "now-12d/d", - }, { - "from": "now-12d/d", - "to": "now-11d/d", - }, - { - "from": "now-11d/d", - "to": "now-10d/d", - }, - { - "from": "now-10d/d", - "to": "now-9d/d", - }, { - "from": "now-9d/d", - "to": "now-8d/d", - }, - { - "from": "now-8d/d", - "to": "now-7d/d", - }, - { - "from": "now-7d/d", - "to": "now-6d/d", - }, - { - "from": "now-6d/d", - "to": "now-5d/d", - }, { - "from": "now-5d/d", - "to": "now-4d/d", - }, - { - "from": "now-4d/d", - "to": "now-3d/d", - },{ - "from": "now-3d/d", - "to": "now-2d/d", - }, { - "from": "now-2d/d", - "to": "now-1d/d", - }, { - "from": "now-1d/d", - "to": "now/d", - }, - { - "from": "now/d", - "to": "now", - }, - }, - }, - "aggs": util.MapStr{ - "term_health": util.MapStr{ - "terms": util.MapStr{ - "field": "payload.elasticsearch.index_stats.index_info.health", - }, - }, + "term_health": util.MapStr{ + "terms": util.MapStr{ + "field": "payload.elasticsearch.index_health.status", }, }, }, @@ -973,13 +760,18 @@ func getIndexStatusOfRecentDay(indexIDs []interface{})(map[string][]interface{}, { "term": util.MapStr{ "metadata.name": util.MapStr{ - "value": "index_stats", + "value": "index_health", }, }, }, { - "terms": util.MapStr{ - "metadata.labels.index_id": indexIDs, + "term": util.MapStr{ + "metadata.labels.cluster_id": clusterID, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.index_name": indexName, }, }, }, @@ -996,37 +788,28 @@ func getIndexStatusOfRecentDay(indexIDs []interface{})(map[string][]interface{}, response := elastic.SearchResponse{} util.FromJSONBytes(res.Raw, &response) recentStatus := map[string][]interface{}{} - for _, bk := range response.Aggregations["group_by_index_id"].Buckets { - indexKey := bk["key"].(string) - recentStatus[indexKey] = []interface{}{} - if histogramAgg, ok := bk["time_histogram"].(map[string]interface{}); ok { - if bks, ok := histogramAgg["buckets"].([]interface{}); ok { - for _, bkItem := range bks { - if bkVal, ok := bkItem.(map[string]interface{}); ok { - if termHealth, ok := bkVal["term_health"].(map[string]interface{}); ok { - if healthBks, ok := termHealth["buckets"].([]interface{}); ok { - if len(healthBks) == 0 { - continue - } - healthMap := map[string]int{} - status := "unknown" - for _, hbkItem := range healthBks { - if hitem, ok := hbkItem.(map[string]interface{}); ok { - healthMap[hitem["key"].(string)] = 1 - } - } - if _, ok = healthMap["red"]; ok { - status = "red" - }else if _, ok = healthMap["yellow"]; ok { - status = "yellow" - }else if _, ok = healthMap["green"]; ok { - status = "green" - } - recentStatus[indexKey] = append(recentStatus[indexKey], []interface{}{bkVal["key"], status}) - } - } + for _, bkVal := range response.Aggregations["time_histogram"].Buckets { + if termHealth, ok := bkVal["term_health"].(map[string]interface{}); ok { + if healthBks, ok := termHealth["buckets"].([]interface{}); ok { + if len(healthBks) == 0 { + continue + } + healthMap := map[string]int{} + status := "unknown" + for _, hbkItem := range healthBks { + if hitem, ok := hbkItem.(map[string]interface{}); ok { + healthMap[hitem["key"].(string)] = 1 } } + if _, ok = healthMap["red"]; ok { + status = "red" + }else if _, ok = healthMap["yellow"]; ok { + status = "yellow" + }else if _, ok = healthMap["green"]; ok { + status = "green" + } + key := fmt.Sprintf("%s:%s", clusterID, indexName) + recentStatus[key] = append(recentStatus[key], []interface{}{bkVal["key"], status}) } } }