Merge remote-tracking branch 'origin/master'

Former-commit-id: 9905536031594c23188d32e3d26c783e795b8458
This commit is contained in:
zhouqunjie 2023-11-22 18:13:08 +08:00
commit 70698b5fa5
17 changed files with 322 additions and 103 deletions

View File

@ -8,6 +8,21 @@ info(
) )
/******************find datasetList start*************************/ /******************find datasetList start*************************/
type ControllerMetricsReq {
ParticipantId int64 `form:"participantId"`
Namespace string `form:"namespace"`
Pods string `form:"pods"`
Steps string `form:"steps"`
Start string `form:"start"`
End string `form:"end"`
}
type ControllerMetricsResp {
Data interface{} `json:"data"`
}
type ApplyReq { type ApplyReq {
YamlString string `json:"yamlString" copier:"yamlString"` YamlString string `json:"yamlString" copier:"yamlString"`
} }

View File

@ -141,6 +141,10 @@ service pcm {
@doc "yaml删除" @doc "yaml删除"
@handler deleteYamlHandler @handler deleteYamlHandler
get /cloud/DeleteYaml (ApplyReq) returns (DeleteResp) get /cloud/DeleteYaml (ApplyReq) returns (DeleteResp)
@doc "控制器监控"
@handler controllerMetricsHandler
get /cloud/controller/Metrics (ControllerMetricsReq) returns (ControllerMetricsResp)
} }
//智算二级接口 //智算二级接口

View File

@ -2,15 +2,12 @@ NacosConfig:
DataId: pcm-core-api.yaml DataId: pcm-core-api.yaml
Group: DEFAULT_GROUP Group: DEFAULT_GROUP
ServerConfigs: ServerConfigs:
# - IpAddr: 127.0.0.1 # - IpAddr: 127.0.0.1
# Port: 8848 # Port: 8848
# - IpAddr: 10.101.15.7 - IpAddr: nacos.jcce.dev
# Port: 8848
- IpAddr: 119.45.100.73
Port: 8848 Port: 8848
ClientConfig: ClientConfig:
NamespaceId: tzwang NamespaceId: test
# NamespaceId: test
TimeoutMs: 5000 TimeoutMs: 5000
NotLoadCacheAtStart: true NotLoadCacheAtStart: true
LogDir: LogDir:

View File

@ -24,7 +24,7 @@ func AddCronGroup(svc *svc.ServiceContext) {
SyncParticipantRpc(svc) SyncParticipantRpc(svc)
}) })
// 删除三天前的监控信息 // 删除三天前的监控信息
svc.Cron.AddFunc("*/5 * * * * ?", func() { svc.Cron.AddFunc("0 0 0 ? * ? ", func() {
ClearMetricsData(svc) ClearMetricsData(svc)
}) })

View File

@ -19,6 +19,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker"
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient" "gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient"
) )
@ -30,14 +31,22 @@ func SyncParticipantRpc(svc *svc.ServiceContext) {
} }
for _, participant := range participants { for _, participant := range participants {
// 初始化p端rpc客户端
if len(participant.RpcAddress) != 0 && svc.K8sRpc[participant.Id] == nil { if len(participant.RpcAddress) != 0 && svc.K8sRpc[participant.Id] == nil {
switch participant.Type { switch participant.Type {
case constants.CLOUD: case constants.CLOUD:
// 初始化p端rpc客户端
svc.K8sRpc[participant.Id] = kubernetesclient.NewKubernetes(zrpc.MustNewClient(zrpc.RpcClientConf{ svc.K8sRpc[participant.Id] = kubernetesclient.NewKubernetes(zrpc.MustNewClient(zrpc.RpcClientConf{
Endpoints: []string{participant.RpcAddress}, Endpoints: []string{participant.RpcAddress},
NonBlock: true, NonBlock: true,
})) }))
// 初始化p端prometheus client
promClient, err := tracker.NewPrometheus(participant.MetricsUrl)
if err != nil {
return
}
svc.PromClient[participant.Id] = promClient
} }
} }
} }

View File

@ -0,0 +1,25 @@
package cloud
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/cloud"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
)
func ControllerMetricsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ControllerMetricsReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := cloud.NewControllerMetricsLogic(r.Context(), svcCtx)
resp, err := l.ControllerMetrics(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -162,6 +162,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/cloud/DeleteYaml", Path: "/cloud/DeleteYaml",
Handler: cloud.DeleteYamlHandler(serverCtx), Handler: cloud.DeleteYamlHandler(serverCtx),
}, },
{
Method: http.MethodGet,
Path: "/cloud/controller/Metrics",
Handler: cloud.ControllerMetricsHandler(serverCtx),
},
}, },
rest.WithPrefix("/pcm/v1"), rest.WithPrefix("/pcm/v1"),
) )

View File

@ -0,0 +1,35 @@
package cloud
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker"
"time"
"github.com/zeromicro/go-zero/core/logx"
)
type ControllerMetricsLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewControllerMetricsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ControllerMetricsLogic {
return &ControllerMetricsLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ControllerMetricsLogic) ControllerMetrics(req *types.ControllerMetricsReq) (resp *types.ControllerMetricsResp, err error) {
resp = &types.ControllerMetricsResp{}
metrics := l.svcCtx.PromClient[req.ParticipantId].GetNamedMetricsByTime([]string{"pod_cpu_usage", "pod_memory_usage_wo_cache"}, req.Start, req.End, 10*time.Minute, tracker.ControllerOption{
PodsName: req.Pods,
Namespace: req.Namespace,
})
resp.Data = metrics
return resp, nil
}

View File

@ -26,6 +26,7 @@ import (
"github.com/zeromicro/go-zero/zrpc" "github.com/zeromicro/go-zero/zrpc"
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient" "gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient"
@ -56,6 +57,7 @@ type ServiceContext struct {
Downloader *s3manager.Downloader Downloader *s3manager.Downloader
Uploader *s3manager.Uploader Uploader *s3manager.Uploader
K8sRpc map[int64]kubernetesclient.Kubernetes K8sRpc map[int64]kubernetesclient.Kubernetes
PromClient map[int64]tracker.Prometheus
ParticipantRpc participantservice.ParticipantService ParticipantRpc participantservice.ParticipantService
} }
@ -103,6 +105,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
K8sRpc: make(map[int64]kubernetesclient.Kubernetes), K8sRpc: make(map[int64]kubernetesclient.Kubernetes),
PromClient: make(map[int64]tracker.Prometheus),
ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)), ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)),
DockerClient: dockerClient, DockerClient: dockerClient,
Downloader: downloader, Downloader: downloader,

View File

@ -3325,6 +3325,19 @@ type ShowNodeDetailsResp struct {
ErrorMsg string `json:"errorMsg,omitempty"` ErrorMsg string `json:"errorMsg,omitempty"`
} }
type ControllerMetricsReq struct {
ParticipantId int64 `form:"participantId"`
Namespace string `form:"namespace"`
Pods string `form:"pods"`
Steps string `form:"steps"`
Start string `form:"start"`
End string `form:"end"`
}
type ControllerMetricsResp struct {
Data interface{} `json:"data"`
}
type ApplyReq struct { type ApplyReq struct {
YamlString string `json:"yamlString" copier:"yamlString"` YamlString string `json:"yamlString" copier:"yamlString"`
} }

View File

@ -17,7 +17,6 @@ package scheduler
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo"
@ -38,10 +37,15 @@ func NewCloudScheduler() *cloudScheduler {
} }
func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) { func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) {
//参数为空返回 nil ////参数为空,返回 nil
if len(providers) == 0 || task == nil { //if len(providers) == 0 || task == nil {
return nil, errors.New("算法获取参数为空") // return nil, errors.New("算法获取参数为空")
} //}
//
////仅有一个provider返回nil
//if len(providers) == 1 {
// return nil, nil
//}
//调度算法 //调度算法
strategy := algo.NewK8sStrategy(task, providers...) strategy := algo.NewK8sStrategy(task, providers...)

View File

@ -83,6 +83,9 @@ func (s *scheduler) AssignAndSchedule() error {
// ParticipantIds 返回唯一值 // ParticipantIds 返回唯一值
if len(s.participantIds) == 1 { if len(s.participantIds) == 1 {
if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) {
return errors.Errorf("集群 %d 不可用", s.participantIds[0])
}
s.task.ParticipantId = s.participantIds[0] s.task.ParticipantId = s.participantIds[0]
return nil return nil
} }
@ -93,16 +96,21 @@ func (s *scheduler) AssignAndSchedule() error {
return err return err
} }
//集群数量不满足,指定到标签匹配后第一个集群
if len(providerList) < 2 {
if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) {
return errors.Errorf("集群 %d 不可用", s.participantIds[0])
}
s.task.ParticipantId = s.participantIds[0]
return nil
}
//调度算法
strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...) strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...)
if err != nil { if err != nil {
return err return err
} }
if strategy == nil {
s.task.ParticipantId = s.participantIds[0]
return nil
}
//调度结果 //调度结果
err = s.assignReplicasToResult(strategy, providerList) err = s.assignReplicasToResult(strategy, providerList)
if err != nil { if err != nil {
@ -131,23 +139,38 @@ func (s *scheduler) SaveToDb() error {
func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider, error) { func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider, error) {
task, providerList := s.scheduleService.genTaskAndProviders(s.task, s.dbEngin) task, providerList := s.scheduleService.genTaskAndProviders(s.task, s.dbEngin)
// 查询集群是否可用 // 过滤可用集群
err := s.checkAvailableParticipants(&providerList) err := s.filterAvailableProviders(&providerList)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
//可用集群为0
if len(providerList) == 0 {
return nil, nil, errors.New("未能获取可用集群")
}
return task, providerList, nil return task, providerList, nil
} }
func (s *scheduler) checkAvailableParticipants(providerList *[]*algo.Provider) error { func (s *scheduler) checkIfParticipantAvailable(id ParticipantId) bool {
workingIds, err := s.getAvailableParticipantIds()
if err != nil {
return false
}
return contains(workingIds, int64(id))
}
func (s *scheduler) getAvailableParticipantIds() ([]int64, error) {
resp, err := s.participantRpc.ListParticipant(context.Background(), nil) resp, err := s.participantRpc.ListParticipant(context.Background(), nil)
if err != nil { if err != nil {
return err return nil, err
} }
if resp.Code != 200 { if resp.Code != 200 {
return errors.New("集群列表查询失败") return nil, errors.New("集群列表查询失败")
} }
var workingIds []int64 var workingIds []int64
@ -158,9 +181,19 @@ func (s *scheduler) checkAvailableParticipants(providerList *[]*algo.Provider) e
workingIds = append(workingIds, e.ParticipantId) workingIds = append(workingIds, e.ParticipantId)
} }
return workingIds, nil
}
func (s *scheduler) filterAvailableProviders(providerList *[]*algo.Provider) error {
workingIds, err := s.getAvailableParticipantIds()
if err != nil {
return err
}
var tempList []*algo.Provider var tempList []*algo.Provider
for _, provider := range *providerList { for _, provider := range *providerList {
if contains(workingIds, provider.Pid) { if contains(workingIds, provider.Pid) && contains(s.participantIds, provider.Pid) {
tempList = append(tempList, provider) tempList = append(tempList, provider)
} }
} }

View File

@ -20,7 +20,7 @@ type Interface interface {
//GetMetric(expr string, time time.Time) Metric //GetMetric(expr string, time time.Time) Metric
//GetMetricOverTime(expr string, start, end time.Time, step time.Duration) Metric //GetMetricOverTime(expr string, start, end time.Time, step time.Duration) Metric
GetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []Metric GetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []Metric
//GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt QueryOption) []Metric GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, opt QueryOption) []Metric
//GetMetadata(namespace string) []Metadata //GetMetadata(namespace string) []Metadata
//GetMetricLabelSet(expr string, start, end time.Time) []map[string]string //GetMetricLabelSet(expr string, start, end time.Time) []map[string]string
// //

View File

@ -202,7 +202,7 @@ var promQLTemplates = map[string]string{
"workload_statefulset_unavailable_replicas_ratio": `namespace:statefulset_unavailable_replicas:ratio{$1}`, "workload_statefulset_unavailable_replicas_ratio": `namespace:statefulset_unavailable_replicas:ratio{$1}`,
// pod // pod
"pod_cpu_usage": `round(sum by (namespace, pod) (irate(container_cpu_usage_seconds_total{job="kubelet", pod!="", image!=""}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}, 0.001)`, "pod_cpu_usage": `round(sum by (namespace, pod) (irate(container_cpu_usage_seconds_total{job="kubelet", pod!="", image!=""}[5m])) * on (namespace, pod) group_left(owner_kind,owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}, 0.001)`,
"pod_memory_usage": `sum by (namespace, pod) (container_memory_usage_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`, "pod_memory_usage": `sum by (namespace, pod) (container_memory_usage_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`,
"pod_memory_usage_wo_cache": `sum by (namespace, pod) (container_memory_working_set_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`, "pod_memory_usage_wo_cache": `sum by (namespace, pod) (container_memory_working_set_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`,
"pod_net_bytes_transmitted": `sum by (namespace, pod) (irate(container_network_transmit_bytes_total{pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`, "pod_net_bytes_transmitted": `sum by (namespace, pod) (irate(container_network_transmit_bytes_total{pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`,
@ -274,8 +274,8 @@ func makeExpr(metric string, opts QueryOptions) string {
return makeWorkspaceMetricExpr(tmpl, opts) return makeWorkspaceMetricExpr(tmpl, opts)
case LevelNamespace: case LevelNamespace:
return makeNamespaceMetricExpr(tmpl, opts) return makeNamespaceMetricExpr(tmpl, opts)
case LevelWorkload: case LevelController:
return makeWorkloadMetricExpr(metric, tmpl, opts) return makeControllerMetricExpr(tmpl, opts)
case LevelPod: case LevelPod:
return makePodMetricExpr(tmpl, opts) return makePodMetricExpr(tmpl, opts)
case LevelContainer: case LevelContainer:
@ -324,40 +324,20 @@ func makeNamespaceMetricExpr(tmpl string, o QueryOptions) string {
// For monitoring the specific namespaces // For monitoring the specific namespaces
// GET /namespaces/{namespace} or // GET /namespaces/{namespace} or
// GET /namespaces // GET /namespaces
if o.NamespaceName != "" { if o.Namespace != "" {
namespaceSelector = fmt.Sprintf(`namespace="%s"`, o.NamespaceName) namespaceSelector = fmt.Sprintf(`namespace="%s"`, o.Namespace)
} else { } else {
namespaceSelector = fmt.Sprintf(`namespace=~"%s"`, o.ResourceFilter) namespaceSelector = fmt.Sprintf(`namespace=~"%s"`, o.ResourceFilter)
} }
return strings.Replace(tmpl, "$1", namespaceSelector, -1) return strings.Replace(tmpl, "$1", namespaceSelector, -1)
} }
func makeWorkloadMetricExpr(metric, tmpl string, o QueryOptions) string { func makeControllerMetricExpr(tmpl string, o QueryOptions) string {
var kindSelector, workloadSelector string var namespace, podName string
switch o.WorkloadKind { namespace = fmt.Sprintf(`namespace="%s"`, o.Namespace)
case "deployment": podName = fmt.Sprintf(`pod=~"%s"`, o.PodName)
o.WorkloadKind = Deployment return strings.NewReplacer("$1", namespace, "$2", podName).Replace(tmpl)
case "statefulset":
o.WorkloadKind = StatefulSet
case "daemonset":
o.WorkloadKind = DaemonSet
default:
o.WorkloadKind = ".*"
}
workloadSelector = fmt.Sprintf(`namespace="%s", workload=~"%s:(%s)"`, o.NamespaceName, o.WorkloadKind, o.ResourceFilter)
if strings.Contains(metric, "deployment") {
kindSelector = fmt.Sprintf(`namespace="%s", deployment!="", deployment=~"%s"`, o.NamespaceName, o.ResourceFilter)
}
if strings.Contains(metric, "statefulset") {
kindSelector = fmt.Sprintf(`namespace="%s", statefulset!="", statefulset=~"%s"`, o.NamespaceName, o.ResourceFilter)
}
if strings.Contains(metric, "daemonset") {
kindSelector = fmt.Sprintf(`namespace="%s", daemonset!="", daemonset=~"%s"`, o.NamespaceName, o.ResourceFilter)
}
return strings.NewReplacer("$1", workloadSelector, "$2", kindSelector).Replace(tmpl)
} }
func makePodMetricExpr(tmpl string, o QueryOptions) string { func makePodMetricExpr(tmpl string, o QueryOptions) string {
@ -365,26 +345,16 @@ func makePodMetricExpr(tmpl string, o QueryOptions) string {
// For monitoriong pods of the specific workload // For monitoriong pods of the specific workload
// GET /namespaces/{namespace}/workloads/{kind}/{workload}/pods // GET /namespaces/{namespace}/workloads/{kind}/{workload}/pods
if o.WorkloadName != "" {
switch o.WorkloadKind {
case "deployment":
workloadSelector = fmt.Sprintf(`owner_kind="ReplicaSet", owner_name=~"^%s-[^-]{1,10}$"`, o.WorkloadName)
case "statefulset":
workloadSelector = fmt.Sprintf(`owner_kind="StatefulSet", owner_name="%s"`, o.WorkloadName)
case "daemonset":
workloadSelector = fmt.Sprintf(`owner_kind="DaemonSet", owner_name="%s"`, o.WorkloadName)
}
}
// For monitoring pods in the specific namespace // For monitoring pods in the specific namespace
// GET /namespaces/{namespace}/workloads/{kind}/{workload}/pods or // GET /namespaces/{namespace}/workloads/{kind}/{workload}/pods or
// GET /namespaces/{namespace}/pods/{pod} or // GET /namespaces/{namespace}/pods/{pod} or
// GET /namespaces/{namespace}/pods // GET /namespaces/{namespace}/pods
if o.NamespaceName != "" { if o.Namespace != "" {
if o.PodName != "" { if o.PodName != "" {
podSelector = fmt.Sprintf(`pod="%s", namespace="%s"`, o.PodName, o.NamespaceName) podSelector = fmt.Sprintf(`pod="%s", namespace="%s"`, o.PodName, o.Namespace)
} else { } else {
podSelector = fmt.Sprintf(`pod=~"%s", namespace="%s"`, o.ResourceFilter, o.NamespaceName) podSelector = fmt.Sprintf(`pod=~"%s", namespace="%s"`, o.ResourceFilter, o.Namespace)
} }
} else { } else {
var namespaces, pods []string var namespaces, pods []string
@ -445,9 +415,9 @@ func makePodMetricExpr(tmpl string, o QueryOptions) string {
func makeContainerMetricExpr(tmpl string, o QueryOptions) string { func makeContainerMetricExpr(tmpl string, o QueryOptions) string {
var containerSelector string var containerSelector string
if o.ContainerName != "" { if o.ContainerName != "" {
containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container="%s"`, o.PodName, o.NamespaceName, o.ContainerName) containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container="%s"`, o.PodName, o.Namespace, o.ContainerName)
} else { } else {
containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container=~"%s"`, o.PodName, o.NamespaceName, o.ResourceFilter) containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container=~"%s"`, o.PodName, o.Namespace, o.ResourceFilter)
} }
return strings.Replace(tmpl, "$1", containerSelector, -1) return strings.Replace(tmpl, "$1", containerSelector, -1)
} }
@ -458,11 +428,11 @@ func makePVCMetricExpr(tmpl string, o QueryOptions) string {
// For monitoring persistentvolumeclaims in the specific namespace // For monitoring persistentvolumeclaims in the specific namespace
// GET /namespaces/{namespace}/persistentvolumeclaims/{persistentvolumeclaim} or // GET /namespaces/{namespace}/persistentvolumeclaims/{persistentvolumeclaim} or
// GET /namespaces/{namespace}/persistentvolumeclaims // GET /namespaces/{namespace}/persistentvolumeclaims
if o.NamespaceName != "" { if o.Namespace != "" {
if o.PersistentVolumeClaimName != "" { if o.PersistentVolumeClaimName != "" {
pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim="%s"`, o.NamespaceName, o.PersistentVolumeClaimName) pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim="%s"`, o.Namespace, o.PersistentVolumeClaimName)
} else { } else {
pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.NamespaceName, o.ResourceFilter) pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.Namespace, o.ResourceFilter)
} }
return strings.Replace(tmpl, "$1", pvcSelector, -1) return strings.Replace(tmpl, "$1", pvcSelector, -1)
} }

View File

@ -29,7 +29,7 @@ const (
LevelNamespace LevelNamespace
LevelApplication LevelApplication
LevelOpenpitrix LevelOpenpitrix
LevelWorkload LevelController
LevelService LevelService
LevelPod LevelPod
LevelContainer LevelContainer
@ -44,7 +44,7 @@ var MeteringLevelMap = map[string]int{
"LevelWorkspace": LevelWorkspace, "LevelWorkspace": LevelWorkspace,
"LevelNamespace": LevelNamespace, "LevelNamespace": LevelNamespace,
"LevelApplication": LevelApplication, "LevelApplication": LevelApplication,
"LevelWorkload": LevelWorkload, "LevelController": LevelController,
"LevelService": LevelService, "LevelService": LevelService,
"LevelPod": LevelPod, "LevelPod": LevelPod,
"LevelContainer": LevelContainer, "LevelContainer": LevelContainer,
@ -70,10 +70,11 @@ type QueryOptions struct {
ResourceFilter string ResourceFilter string
NodeName string NodeName string
WorkspaceName string WorkspaceName string
NamespaceName string Namespace string
WorkloadKind string WorkloadKind string
WorkloadName string OwnerName string
PodName string PodName string
PodsName string
ContainerName string ContainerName string
StorageClassName string StorageClassName string
PersistentVolumeClaimName string PersistentVolumeClaimName string
@ -140,7 +141,7 @@ func (no NamespaceOption) Apply(o *QueryOptions) {
o.Level = LevelNamespace o.Level = LevelNamespace
o.ResourceFilter = no.ResourceFilter o.ResourceFilter = no.ResourceFilter
o.WorkspaceName = no.WorkspaceName o.WorkspaceName = no.WorkspaceName
o.NamespaceName = no.NamespaceName o.Namespace = no.NamespaceName
o.PVCFilter = no.PVCFilter o.PVCFilter = no.PVCFilter
o.StorageClassName = no.StorageClassName o.StorageClassName = no.StorageClassName
} }
@ -180,16 +181,16 @@ type ApplicationOption struct {
func (ao ApplicationOption) Apply(o *QueryOptions) { func (ao ApplicationOption) Apply(o *QueryOptions) {
o.Level = LevelApplication o.Level = LevelApplication
o.NamespaceName = ao.NamespaceName o.Namespace = ao.NamespaceName
o.ApplicationName = ao.Application o.ApplicationName = ao.Application
o.StorageClassName = ao.StorageClassName o.StorageClassName = ao.StorageClassName
app_components := strings.Join(ao.ApplicationComponents[:], "|") app_components := strings.Join(ao.ApplicationComponents[:], "|")
if len(app_components) > 0 { if len(app_components) > 0 {
o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.NamespaceName, app_components) o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.Namespace, app_components)
} else { } else {
o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.NamespaceName, ".*") o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.Namespace, ".*")
} }
} }
@ -200,9 +201,9 @@ type WorkloadOption struct {
} }
func (wo WorkloadOption) Apply(o *QueryOptions) { func (wo WorkloadOption) Apply(o *QueryOptions) {
o.Level = LevelWorkload o.Level = LevelController
o.ResourceFilter = wo.ResourceFilter o.ResourceFilter = wo.ResourceFilter
o.NamespaceName = wo.NamespaceName o.Namespace = wo.NamespaceName
o.WorkloadKind = wo.WorkloadKind o.WorkloadKind = wo.WorkloadKind
} }
@ -226,15 +227,15 @@ type ServiceOption struct {
func (so ServiceOption) Apply(o *QueryOptions) { func (so ServiceOption) Apply(o *QueryOptions) {
o.Level = LevelService o.Level = LevelService
o.NamespaceName = so.NamespaceName o.Namespace = so.NamespaceName
o.ServiceName = so.ServiceName o.ServiceName = so.ServiceName
pod_names := strings.Join(so.PodNames, "|") pod_names := strings.Join(so.PodNames, "|")
if len(pod_names) > 0 { if len(pod_names) > 0 {
o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, pod_names, o.NamespaceName) o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, pod_names, o.Namespace)
} else { } else {
o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, ".*", o.NamespaceName) o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, ".*", o.Namespace)
} }
} }
@ -248,17 +249,33 @@ type PodOption struct {
PodName string PodName string
} }
type ControllerOption struct {
PodsName string
Namespace string
Kind string
OwnerName string
}
func (po PodOption) Apply(o *QueryOptions) { func (po PodOption) Apply(o *QueryOptions) {
o.Level = LevelPod o.Level = LevelPod
o.NamespacedResourcesFilter = po.NamespacedResourcesFilter o.NamespacedResourcesFilter = po.NamespacedResourcesFilter
o.ResourceFilter = po.ResourceFilter o.ResourceFilter = po.ResourceFilter
o.NodeName = po.NodeName o.NodeName = po.NodeName
o.NamespaceName = po.NamespaceName o.Namespace = po.NamespaceName
o.WorkloadKind = po.WorkloadKind o.WorkloadKind = po.WorkloadKind
o.WorkloadName = po.WorkloadName o.OwnerName = po.WorkloadName
o.PodName = po.PodName o.PodName = po.PodName
} }
func (co ControllerOption) Apply(o *QueryOptions) {
o.Level = LevelController
o.Namespace = co.Namespace
o.WorkloadKind = co.Kind
o.OwnerName = co.OwnerName
o.PodName = co.PodsName
}
type ContainerOption struct { type ContainerOption struct {
ResourceFilter string ResourceFilter string
NamespaceName string NamespaceName string
@ -269,7 +286,7 @@ type ContainerOption struct {
func (co ContainerOption) Apply(o *QueryOptions) { func (co ContainerOption) Apply(o *QueryOptions) {
o.Level = LevelContainer o.Level = LevelContainer
o.ResourceFilter = co.ResourceFilter o.ResourceFilter = co.ResourceFilter
o.NamespaceName = co.NamespaceName o.Namespace = co.NamespaceName
o.PodName = co.PodName o.PodName = co.PodName
o.ContainerName = co.ContainerName o.ContainerName = co.ContainerName
} }
@ -284,7 +301,7 @@ type PVCOption struct {
func (po PVCOption) Apply(o *QueryOptions) { func (po PVCOption) Apply(o *QueryOptions) {
o.Level = LevelPVC o.Level = LevelPVC
o.ResourceFilter = po.ResourceFilter o.ResourceFilter = po.ResourceFilter
o.NamespaceName = po.NamespaceName o.Namespace = po.NamespaceName
o.StorageClassName = po.StorageClassName o.StorageClassName = po.StorageClassName
o.PersistentVolumeClaimName = po.PersistentVolumeClaimName o.PersistentVolumeClaimName = po.PersistentVolumeClaimName
@ -304,7 +321,7 @@ type IngressOption struct {
func (no IngressOption) Apply(o *QueryOptions) { func (no IngressOption) Apply(o *QueryOptions) {
o.Level = LevelIngress o.Level = LevelIngress
o.ResourceFilter = no.ResourceFilter o.ResourceFilter = no.ResourceFilter
o.NamespaceName = no.NamespaceName o.Namespace = no.NamespaceName
o.Ingress = no.Ingress o.Ingress = no.Ingress
o.Job = no.Job o.Job = no.Job
o.PodName = no.Pod o.PodName = no.Pod

View File

@ -19,26 +19,109 @@ import (
"github.com/prometheus/client_golang/api" "github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1" v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
) )
type prometheus struct { type Prometheus struct {
client v1.API prometheus Interface
client v1.API
} }
// NewPrometheus 初始化Prometheus客户端 // NewPrometheus 初始化Prometheus客户端
func NewPrometheus(address string) (Interface, error) { func NewPrometheus(address string) (Prometheus, error) {
cfg := api.Config{ cfg := api.Config{
Address: address, Address: address,
} }
client, err := api.NewClient(cfg) client, err := api.NewClient(cfg)
return prometheus{client: v1.NewAPI(client)}, err return Prometheus{client: v1.NewAPI(client)}, err
} }
func (p prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric { func ParseTime(timestamp string) (time.Time, error) {
// Parse time params
startInt, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return time.Now(), err
}
return time.Unix(startInt, 0), nil
}
func (p Prometheus) GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, o QueryOption) []Metric {
var res []Metric
var mtx sync.Mutex
var wg sync.WaitGroup
opts := NewQueryOptions()
o.Apply(opts)
for _, metric := range metrics {
wg.Add(1)
go func(metric string) {
parsedResp := Metric{MetricName: metric}
startTimestamp, err := ParseTime(start)
if err != nil {
return
}
endTimestamp, err := ParseTime(end)
if err != nil {
return
}
timeRange := v1.Range{
Start: startTimestamp,
End: endTimestamp,
Step: step,
}
value, _, err := p.client.QueryRange(context.Background(), makeExpr(metric, *opts), timeRange)
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryRangeResp(value, genMetricFilter(o))
}
mtx.Lock()
res = append(res, parsedResp)
mtx.Unlock()
wg.Done()
}(metric)
}
wg.Wait()
return res
}
func parseQueryRangeResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
res := MetricData{MetricType: MetricTypeMatrix}
data, _ := value.(model.Matrix)
for _, v := range data {
if metricFilter != nil && !metricFilter(v.Metric) {
continue
}
mv := MetricValue{
Metadata: make(map[string]string),
}
for k, v := range v.Metric {
mv.Metadata[string(k)] = string(v)
}
for _, k := range v.Values {
mv.Series = append(mv.Series, Point{float64(k.Timestamp) / 1000, float64(k.Value)})
}
res.MetricValues = append(res.MetricValues, mv)
}
return res
}
func (p Prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric {
var res []Metric var res []Metric
var mtx sync.Mutex var mtx sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -20,12 +20,18 @@ import (
) )
func TestGetNamedMetrics(t *testing.T) { func TestGetNamedMetrics(t *testing.T) {
client, _ := NewPrometheus("http://10.101.15.3:32585") //client, _ := NewPrometheus("http://10.101.15.3:32585")
result := client.GetNamedMetrics([]string{"pod_cpu_resource_limits"}, time.Now(), PodOption{ //result := client.GetNamedMetrics([]string{"pod_cpu_resource_limits"}, time.Now(), PodOption{
//
// PodName: "prometheus-k8s-0",
// NamespaceName: "monitoring-system",
//})
//println("zzz", result[0].MetricValues[0].Sample.Value())
PodName: "prometheus-k8s-0", client, _ := NewPrometheus("http://10.105.20.4:30766")
NamespaceName: "monitoring-system", result := client.GetNamedMetricsByTime([]string{"pod_cpu_usage", "pod_memory_usage_wo_cache"}, "1700521446", "1700551446", 10*time.Minute, ControllerOption{
PodsName: "notification-manager-deployment-78664576cb-vkptn|notification-manager-deployment-78664576cb-5m6mt",
Namespace: "kubesphere-monitoring-system",
}) })
println("zzz", result[0].MetricValues[0].Sample.Value()) println("zzz", result)
} }