feat: splitting host metrics query (#14)

This commit is contained in:
silenceqi 2024-12-07 17:10:35 +08:00 committed by GitHub
parent b2401bb31f
commit 852420281d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 136 additions and 108 deletions

View File

@ -31,6 +31,7 @@ import (
"infini.sh/console/plugin/api/email"
"infini.sh/console/plugin/audit_log"
"infini.sh/framework/core/api"
"infini.sh/framework/core/host"
model2 "infini.sh/framework/core/model"
elastic2 "infini.sh/framework/modules/elastic"
_ "time/tzdata"
@ -156,6 +157,7 @@ func main() {
orm.RegisterSchemaWithIndexName(model2.Instance{}, "instance")
orm.RegisterSchemaWithIndexName(api3.RemoteConfig{}, "configs")
orm.RegisterSchemaWithIndexName(model.AuditLog{}, "audit-logs")
orm.RegisterSchemaWithIndexName(host.HostInfo{}, "host")
module.Start()

View File

@ -495,7 +495,7 @@ func (h *APIHandler) FetchHostInfo(w http.ResponseWriter, req *http.Request, ps
Units: "/s",
},
}
hostMetrics := h.getGroupHostMetric(agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
hostMetrics := h.getGroupHostMetric(context.Background(), agentIDs, min, max, bucketSize, hostMetricItems, "agent.id")
networkMetrics := map[string]util.MapStr{}
for key, item := range hostMetrics {
@ -572,7 +572,7 @@ func (h *APIHandler) GetHostInfo(w http.ResponseWriter, req *http.Request, ps ht
}
func (h *APIHandler) getSingleHostMetric(agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) map[string]*common.MetricItem {
func (h *APIHandler) getSingleHostMetric(ctx context.Context, agentID string, min, max int64, bucketSize int, metricItems []*common.MetricItem) map[string]*common.MetricItem {
var must = []util.MapStr{
{
"term": util.MapStr{
@ -605,10 +605,10 @@ func (h *APIHandler) getSingleHostMetric(agentID string, min, max int64, bucketS
},
},
}
return h.getSingleMetrics(context.Background(), metricItems, query, bucketSize)
return h.getSingleMetrics(ctx, metricItems, query, bucketSize)
}
func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int) map[string]*common.MetricItem {
func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
var must = []util.MapStr{
{
"term": util.MapStr{
@ -652,17 +652,19 @@ func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID str
bucketSizeStr := fmt.Sprintf("%vs", bucketSize)
metricItems := []*common.MetricItem{}
metricItem := newMetricItem("cpu_used_percent", 1, SystemGroupKey)
switch metricKey {
case OSCPUUsedPercentMetricKey:
metricItem := newMetricItem(OSCPUUsedPercentMetricKey, 1, SystemGroupKey)
metricItem.AddAxi("cpu", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("CPU Used Percent", "CPU", "cpu used percent of host.", "group1", "payload.elasticsearch.node_stats.os.cpu.percent", "max", bucketSizeStr, "%", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
metricItem = newMetricItem("memory_used_percent", 1, SystemGroupKey)
case MemoryUsedPercentMetricKey:
metricItem := newMetricItem(MemoryUsedPercentMetricKey, 1, SystemGroupKey)
metricItem.AddAxi("Memory", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Memory Used Percent", "Memory Used Percent", "memory used percent of host.", "group1", "payload.elasticsearch.node_stats.os.mem.used_percent", "max", bucketSizeStr, "%", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
metricItem = newMetricItem("disk_used_percent", 1, SystemGroupKey)
case DiskUsedPercentMetricKey:
metricItem := newMetricItem(DiskUsedPercentMetricKey, 1, SystemGroupKey)
metricItem.AddAxi("disk", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Disk Used Percent", "Disk Used Percent", "disk used percent of host.", "group1", "payload.elasticsearch.node_stats.fs.total.free_in_bytes", "max", bucketSizeStr, "%", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItem.Lines[0].Metric.Field2 = "payload.elasticsearch.node_stats.fs.total.total_in_bytes"
@ -670,9 +672,25 @@ func (h *APIHandler) getSingleHostMetricFromNode(ctx context.Context, nodeID str
return 100 - value*100/value2
}
metricItems = append(metricItems, metricItem)
}
return h.getSingleMetrics(ctx, metricItems, query, bucketSize)
}
const (
OSCPUUsedPercentMetricKey = "cpu_used_percent"
MemoryUsedPercentMetricKey = "memory_used_percent"
DiskUsedPercentMetricKey = "disk_used_percent"
SystemLoadMetricKey = "system_load"
CPUIowaitMetricKey = "cpu_iowait"
SwapMemoryUsedPercentMetricKey= "swap_memory_used_percent"
NetworkSummaryMetricKey = "network_summary"
NetworkPacketsSummaryMetricKey = "network_packets_summary"
DiskReadRateMetricKey = "disk_read_rate"
DiskWriteRateMetricKey = "disk_write_rate"
DiskPartitionUsageMetricKey = "disk_partition_usage"
NetworkInterfaceOutputRateMetricKey = "network_interface_output_rate"
)
func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
hostID := ps.MustGetParameter("host_id")
hostInfo := &host.HostInfo{}
@ -696,90 +714,97 @@ func (h *APIHandler) GetSingleHostMetrics(w http.ResponseWriter, req *http.Reque
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
key := h.GetParameter(req, "key")
timeout := h.GetParameterOrDefault(req, "timeout", "60s")
du, err := time.ParseDuration(timeout)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(context.Background(), du)
defer cancel()
if hostInfo.AgentID == "" {
resBody["metrics"] = h.getSingleHostMetricFromNode(context.Background(), hostInfo.NodeID, min, max, bucketSize)
resBody["metrics"] = h.getSingleHostMetricFromNode(ctx, hostInfo.NodeID, min, max, bucketSize, key)
h.WriteJSON(w, resBody, http.StatusOK)
return
}
isOverview := h.GetIntOrDefault(req, "overview", 0)
bucketSizeStr := fmt.Sprintf("%vs", bucketSize)
metricItems := []*common.MetricItem{}
switch key {
case OSCPUUsedPercentMetricKey:
metricItem := newMetricItem("cpu_used_percent", 1, SystemGroupKey)
metricItem.AddAxi("cpu", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("CPU Used Percent", "CPU", "cpu used percent of host.", "group1", "payload.host.cpu.used_percent", "max", bucketSizeStr, "%", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
if isOverview == 0 {
metricItem = newMetricItem("system_load", 1, SystemGroupKey)
case MemoryUsedPercentMetricKey:
metricItem := newMetricItem("memory_used_percent", 1, SystemGroupKey)
metricItem.AddAxi("Memory", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Memory Used Percent", "Memory Used Percent", "memory used percent of host.", "group1", "payload.host.memory.used.percent", "max", bucketSizeStr, "%", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
case DiskUsedPercentMetricKey:
metricItem := newMetricItem("disk_used_percent", 1, SystemGroupKey)
metricItem.AddAxi("disk", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Disk Used Percent", "Disk Used Percent", "disk used percent of host.", "group1", "payload.host.filesystem_summary.used.percent", "max", bucketSizeStr, "%", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
case SystemLoadMetricKey:
metricItem := newMetricItem("system_load", 1, SystemGroupKey)
metricItem.AddAxi("system_load", "group1", common.PositionLeft, "num", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Load1", "Load1", "system load1.", "group1", "payload.host.cpu.load.load1", "max", bucketSizeStr, "", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItem.AddLine("Load5", "Load5", "system load5.", "group1", "payload.host.cpu.load.load5", "max", bucketSizeStr, "", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItem.AddLine("Load15", "Load15", "system load15.", "group1", "payload.host.cpu.load.load15", "max", bucketSizeStr, "", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
metricItem = newMetricItem("cpu_iowait", 1, SystemGroupKey)
case CPUIowaitMetricKey:
metricItem := newMetricItem("cpu_iowait", 1, SystemGroupKey)
metricItem.AddAxi("cpu_iowait", "group1", common.PositionLeft, "num", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("iowait", "iowait", "cpu iowait.", "group1", "payload.host.cpu.iowait", "max", bucketSizeStr, "", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
}
metricItem = newMetricItem("memory_used_percent", 1, SystemGroupKey)
metricItem.AddAxi("Memory", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Memory Used Percent", "Memory Used Percent", "memory used percent of host.", "group1", "payload.host.memory.used.percent", "max", bucketSizeStr, "%", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
if isOverview == 0 {
metricItem = newMetricItem("swap_memory_used_percent", 1, SystemGroupKey)
case SwapMemoryUsedPercentMetricKey:
metricItem := newMetricItem("swap_memory_used_percent", 1, SystemGroupKey)
metricItem.AddAxi("Swap Memory", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Swap Memory Used Percent", "Swap Memory Used Percent", "swap memory used percent of host.", "group1", "payload.host.memory_swap.used_percent", "max", bucketSizeStr, "%", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
}
metricItem = newMetricItem("network_summary", 1, SystemGroupKey)
case NetworkSummaryMetricKey:
metricItem := newMetricItem("network_summary", 1, SystemGroupKey)
metricItem.AddAxi("network_rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Network In Rate", "Network In Rate", "network in rate of host.", "group1", "payload.host.network_summary.in.bytes", "max", bucketSizeStr, "/s", "bytes", "0,0.[00]", "0,0.[00]", false, true)
metricItem.AddLine("Network Out Rate", "Network Out Rate", "network out rate of host.", "group1", "payload.host.network_summary.out.bytes", "max", bucketSizeStr, "/s", "bytes", "0,0.[00]", "0,0.[00]", false, true)
metricItems = append(metricItems, metricItem)
if isOverview == 0 {
metricItem = newMetricItem("network_packets_summary", 1, SystemGroupKey)
case NetworkPacketsSummaryMetricKey:
metricItem := newMetricItem("network_packets_summary", 1, SystemGroupKey)
metricItem.AddAxi("network_packets_rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Network Packets In Rate", "Network Packets In Rate", "network packets in rate of host.", "group1", "payload.host.network_summary.in.packets", "max", bucketSizeStr, "packets/s", "num", "0,0.[00]", "0,0.[00]", false, true)
metricItem.AddLine("Network Packets Out Rate", "Network Packets Out Rate", "network packets out rate of host.", "group1", "payload.host.network_summary.out.packets", "max", bucketSizeStr, "packets/s", "num", "0,0.[00]", "0,0.[00]", false, true)
metricItems = append(metricItems, metricItem)
}
metricItem = newMetricItem("disk_used_percent", 1, SystemGroupKey)
metricItem.AddAxi("disk", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Disk Used Percent", "Disk Used Percent", "disk used percent of host.", "group1", "payload.host.filesystem_summary.used.percent", "max", bucketSizeStr, "%", "num", "0,0.[00]", "0,0.[00]", false, false)
metricItems = append(metricItems, metricItem)
metricItem = newMetricItem("disk_read_rate", 1, SystemGroupKey)
case DiskReadRateMetricKey:
metricItem := newMetricItem("disk_read_rate", 1, SystemGroupKey)
metricItem.AddAxi("disk_read_rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Disk Read Rate", "Disk Read Rate", "Disk read rate of host.", "group1", "payload.host.diskio_summary.read.bytes", "max", bucketSizeStr, "%", "bytes", "0,0.[00]", "0,0.[00]", false, true)
metricItems = append(metricItems, metricItem)
metricItem = newMetricItem("disk_write_rate", 1, SystemGroupKey)
case DiskWriteRateMetricKey:
metricItem := newMetricItem("disk_write_rate", 1, SystemGroupKey)
metricItem.AddAxi("disk_write_rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true)
metricItem.AddLine("Disk Write Rate", "Disk Write Rate", "network write rate of host.", "group1", "payload.host.diskio_summary.write.bytes", "max", bucketSizeStr, "%", "bytes", "0,0.[00]", "0,0.[00]", false, true)
metricItems = append(metricItems, metricItem)
hostMetrics := h.getSingleHostMetric(hostInfo.AgentID, min, max, bucketSize, metricItems)
if isOverview == 0 {
groupMetrics := h.getGroupHostMetrics(hostInfo.AgentID, min, max, bucketSize)
if hostMetrics == nil {
hostMetrics = map[string]*common.MetricItem{}
}
for k, v := range groupMetrics {
hostMetrics[k] = v
}
case DiskPartitionUsageMetricKey, NetworkInterfaceOutputRateMetricKey:
groupMetrics := h.getGroupHostMetrics(ctx, hostInfo.AgentID, min, max, bucketSize, key)
resBody["metrics"] = groupMetrics
h.WriteJSON(w, resBody, http.StatusOK)
return
}
hostMetrics := h.getSingleHostMetric(ctx, hostInfo.AgentID, min, max, bucketSize, metricItems)
resBody["metrics"] = hostMetrics
h.WriteJSON(w, resBody, http.StatusOK)
}
func (h *APIHandler) getGroupHostMetrics(agentID string, min, max int64, bucketSize int) map[string]*common.MetricItem {
diskPartitionMetric := newMetricItem("disk_partition_usage", 2, SystemGroupKey)
func (h *APIHandler) getGroupHostMetrics(ctx context.Context, agentID string, min, max int64, bucketSize int, metricKey string) map[string]*common.MetricItem {
var metrics = make(map[string]*common.MetricItem)
switch metricKey {
case DiskPartitionUsageMetricKey:
diskPartitionMetric := newMetricItem(DiskPartitionUsageMetricKey, 2, SystemGroupKey)
diskPartitionMetric.AddAxi("Disk Partition Usage", "group1", common.PositionLeft, "ratio", "0.[0]", "0.[0]", 5, true)
hostMetricItems := []GroupMetricItem{
{
@ -792,10 +817,11 @@ func (h *APIHandler) getGroupHostMetrics(agentID string, min, max int64, bucketS
Units: "%",
},
}
hostMetrics := h.getGroupHostMetric([]string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
networkOutputMetric := newMetricItem("network_interface_output_rate", 2, SystemGroupKey)
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.disk_partition_usage.partition")
case NetworkInterfaceOutputRateMetricKey:
networkOutputMetric := newMetricItem(NetworkInterfaceOutputRateMetricKey, 2, SystemGroupKey)
networkOutputMetric.AddAxi("Network interface output rate", "group1", common.PositionLeft, "bytes", "0.[0]", "0.[0]", 5, true)
hostMetricItems = []GroupMetricItem{
hostMetricItems := []GroupMetricItem{
{
Key: "network_interface_output_rate",
Field: "payload.host.network_interface.output_in_bytes",
@ -806,14 +832,13 @@ func (h *APIHandler) getGroupHostMetrics(agentID string, min, max int64, bucketS
Units: "",
},
}
networkOutMetrics := h.getGroupHostMetric([]string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
if networkOutMetrics != nil {
hostMetrics["network_interface_output_rate"] = networkOutMetrics["network_interface_output_rate"]
}
return hostMetrics
metrics = h.getGroupHostMetric(ctx, []string{agentID}, min, max, bucketSize, hostMetricItems, "payload.host.network_interface.name")
}
func (h *APIHandler) getGroupHostMetric(agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) map[string]*common.MetricItem {
return metrics
}
func (h *APIHandler) getGroupHostMetric(ctx context.Context, agentIDs []string, min, max int64, bucketSize int, hostMetricItems []GroupMetricItem, groupField string) map[string]*common.MetricItem {
var must = []util.MapStr{
{
"term": util.MapStr{
@ -867,7 +892,7 @@ func (h *APIHandler) getGroupHostMetric(agentIDs []string, min, max int64, bucke
},
},
}
return h.getMetrics(context.Background(), query, hostMetricItems, bucketSize)
return h.getMetrics(ctx, query, hostMetricItems, bucketSize)
}
func getHost(hostID string) (*host.HostInfo, error) {

View File

@ -1083,6 +1083,7 @@ func (h *APIHandler) getNodeMetrics(ctx context.Context, clusterID string, bucke
Units: "times/s",
})
case ModelInferenceBreakerMetricKey:
//Elasticsearch 8.6+ Model Inference Breaker
modelInferenceBreakerMetric := newMetricItem(ModelInferenceBreakerMetricKey, 6, CircuitBreakerGroupKey)
modelInferenceBreakerMetric.AddAxi("Model Inference Breaker","group1",common.PositionLeft,"num","0.[0]","0.[0]",5,true)
nodeMetricItems = append(nodeMetricItems, GroupMetricItem{