From e5b02d4d322f1e0b3ecd59e299cc7c24e7a2c376 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Wed, 22 Nov 2023 14:37:26 +0800 Subject: [PATCH] =?UTF-8?q?pod=E7=9B=91=E6=8E=A7=E5=9B=BE=E8=A1=A8?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 2e7cfdfda57c7db575870bacd21b4bfa6d14bd4d --- api/desc/cloud/pcm-cloud.api | 15 +++ api/desc/pcm.api | 4 + api/internal/cron/cron.go | 2 +- api/internal/cron/participant.go | 11 ++- .../handler/cloud/controllermetricshandler.go | 25 +++++ api/internal/handler/routes.go | 5 + .../logic/cloud/controllermetricslogic.go | 35 +++++++ api/internal/svc/servicecontext.go | 3 + api/internal/types/types.go | 13 +++ pkg/tracker/interface.go | 2 +- pkg/tracker/promql.go | 66 ++++--------- pkg/tracker/queryoptions.go | 53 +++++++---- pkg/tracker/tracker.go | 93 ++++++++++++++++++- pkg/tracker/tracker_test.go | 18 ++-- 14 files changed, 265 insertions(+), 80 deletions(-) create mode 100644 api/internal/handler/cloud/controllermetricshandler.go create mode 100644 api/internal/logic/cloud/controllermetricslogic.go diff --git a/api/desc/cloud/pcm-cloud.api b/api/desc/cloud/pcm-cloud.api index a6364322..7cc0434f 100644 --- a/api/desc/cloud/pcm-cloud.api +++ b/api/desc/cloud/pcm-cloud.api @@ -8,6 +8,21 @@ info( ) /******************find datasetList start*************************/ +type ControllerMetricsReq { + + ParticipantId int64 `form:"participantId"` + Namespace string `form:"namespace"` + Pods string `form:"pods"` + Steps string `form:"steps"` + Start string `form:"start"` + End string `form:"end"` +} + + +type ControllerMetricsResp { + Data interface{} `json:"data"` +} + type ApplyReq { YamlString string `json:"yamlString" copier:"yamlString"` } diff --git a/api/desc/pcm.api b/api/desc/pcm.api index 69ab4ceb..fc717ff4 100644 --- a/api/desc/pcm.api +++ b/api/desc/pcm.api @@ -141,6 +141,10 @@ service pcm { @doc "yaml删除" @handler deleteYamlHandler get /cloud/DeleteYaml (ApplyReq) returns (DeleteResp) + + @doc "控制器监控" + @handler controllerMetricsHandler + get /cloud/controller/Metrics (ControllerMetricsReq) returns (ControllerMetricsResp) } //智算二级接口 diff --git a/api/internal/cron/cron.go b/api/internal/cron/cron.go index 9d8e0155..fbf3351b 100644 --- a/api/internal/cron/cron.go +++ b/api/internal/cron/cron.go @@ -24,7 +24,7 @@ func AddCronGroup(svc *svc.ServiceContext) { SyncParticipantRpc(svc) }) // 删除三天前的监控信息 - svc.Cron.AddFunc("*/5 * * * * ?", func() { + svc.Cron.AddFunc("0 0 0 ? * ? ", func() { ClearMetricsData(svc) }) diff --git a/api/internal/cron/participant.go b/api/internal/cron/participant.go index ab3b9bf2..e8fd5b63 100644 --- a/api/internal/cron/participant.go +++ b/api/internal/cron/participant.go @@ -19,6 +19,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker" "gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient" ) @@ -30,14 +31,22 @@ func SyncParticipantRpc(svc *svc.ServiceContext) { } for _, participant := range participants { - // 初始化p端rpc客户端 + if len(participant.RpcAddress) != 0 && svc.K8sRpc[participant.Id] == nil { switch participant.Type { case constants.CLOUD: + // 初始化p端rpc客户端 svc.K8sRpc[participant.Id] = kubernetesclient.NewKubernetes(zrpc.MustNewClient(zrpc.RpcClientConf{ Endpoints: []string{participant.RpcAddress}, NonBlock: true, })) + + // 初始化p端prometheus client + promClient, err := tracker.NewPrometheus(participant.MetricsUrl) + if err != nil { + return + } + svc.PromClient[participant.Id] = promClient } } } diff --git a/api/internal/handler/cloud/controllermetricshandler.go b/api/internal/handler/cloud/controllermetricshandler.go new file mode 100644 index 00000000..8b2c8e7a --- /dev/null +++ b/api/internal/handler/cloud/controllermetricshandler.go @@ -0,0 +1,25 @@ +package cloud + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result" + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/cloud" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" +) + +func ControllerMetricsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.ControllerMetricsReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := cloud.NewControllerMetricsLogic(r.Context(), svcCtx) + resp, err := l.ControllerMetrics(&req) + result.HttpResult(r, w, resp, err) + } +} diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index 292ed1d1..939f7c88 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -162,6 +162,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/cloud/DeleteYaml", Handler: cloud.DeleteYamlHandler(serverCtx), }, + { + Method: http.MethodGet, + Path: "/cloud/controller/Metrics", + Handler: cloud.ControllerMetricsHandler(serverCtx), + }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/api/internal/logic/cloud/controllermetricslogic.go b/api/internal/logic/cloud/controllermetricslogic.go new file mode 100644 index 00000000..87c941d8 --- /dev/null +++ b/api/internal/logic/cloud/controllermetricslogic.go @@ -0,0 +1,35 @@ +package cloud + +import ( + "context" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker" + "time" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ControllerMetricsLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewControllerMetricsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ControllerMetricsLogic { + return &ControllerMetricsLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ControllerMetricsLogic) ControllerMetrics(req *types.ControllerMetricsReq) (resp *types.ControllerMetricsResp, err error) { + resp = &types.ControllerMetricsResp{} + metrics := l.svcCtx.PromClient[req.ParticipantId].GetNamedMetricsByTime([]string{"pod_cpu_usage", "pod_memory_usage_wo_cache"}, req.Start, req.End, 10*time.Minute, tracker.ControllerOption{ + PodsName: req.Pods, + Namespace: req.Namespace, + }) + resp.Data = metrics + return resp, nil +} diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index 2a20ab4c..aab8a913 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -26,6 +26,7 @@ import ( "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient" @@ -56,6 +57,7 @@ type ServiceContext struct { Downloader *s3manager.Downloader Uploader *s3manager.Uploader K8sRpc map[int64]kubernetesclient.Kubernetes + PromClient map[int64]tracker.Prometheus ParticipantRpc participantservice.ParticipantService } @@ -103,6 +105,7 @@ func NewServiceContext(c config.Config) *ServiceContext { OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), K8sRpc: make(map[int64]kubernetesclient.Kubernetes), + PromClient: make(map[int64]tracker.Prometheus), ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)), DockerClient: dockerClient, Downloader: downloader, diff --git a/api/internal/types/types.go b/api/internal/types/types.go index fd533cd2..0f4e49cc 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -3325,6 +3325,19 @@ type ShowNodeDetailsResp struct { ErrorMsg string `json:"errorMsg,omitempty"` } +type ControllerMetricsReq struct { + ParticipantId int64 `form:"participantId"` + Namespace string `form:"namespace"` + Pods string `form:"pods"` + Steps string `form:"steps"` + Start string `form:"start"` + End string `form:"end"` +} + +type ControllerMetricsResp struct { + Data interface{} `json:"data"` +} + type ApplyReq struct { YamlString string `json:"yamlString" copier:"yamlString"` } diff --git a/pkg/tracker/interface.go b/pkg/tracker/interface.go index b607c862..66cdb44b 100644 --- a/pkg/tracker/interface.go +++ b/pkg/tracker/interface.go @@ -20,7 +20,7 @@ type Interface interface { //GetMetric(expr string, time time.Time) Metric //GetMetricOverTime(expr string, start, end time.Time, step time.Duration) Metric GetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []Metric - //GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt QueryOption) []Metric + GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, opt QueryOption) []Metric //GetMetadata(namespace string) []Metadata //GetMetricLabelSet(expr string, start, end time.Time) []map[string]string // diff --git a/pkg/tracker/promql.go b/pkg/tracker/promql.go index fc90f29a..b45ac894 100644 --- a/pkg/tracker/promql.go +++ b/pkg/tracker/promql.go @@ -202,7 +202,7 @@ var promQLTemplates = map[string]string{ "workload_statefulset_unavailable_replicas_ratio": `namespace:statefulset_unavailable_replicas:ratio{$1}`, // pod - "pod_cpu_usage": `round(sum by (namespace, pod) (irate(container_cpu_usage_seconds_total{job="kubelet", pod!="", image!=""}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}, 0.001)`, + "pod_cpu_usage": `round(sum by (namespace, pod) (irate(container_cpu_usage_seconds_total{job="kubelet", pod!="", image!=""}[5m])) * on (namespace, pod) group_left(owner_kind,owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}, 0.001)`, "pod_memory_usage": `sum by (namespace, pod) (container_memory_usage_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`, "pod_memory_usage_wo_cache": `sum by (namespace, pod) (container_memory_working_set_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`, "pod_net_bytes_transmitted": `sum by (namespace, pod) (irate(container_network_transmit_bytes_total{pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`, @@ -274,8 +274,8 @@ func makeExpr(metric string, opts QueryOptions) string { return makeWorkspaceMetricExpr(tmpl, opts) case LevelNamespace: return makeNamespaceMetricExpr(tmpl, opts) - case LevelWorkload: - return makeWorkloadMetricExpr(metric, tmpl, opts) + case LevelController: + return makeControllerMetricExpr(tmpl, opts) case LevelPod: return makePodMetricExpr(tmpl, opts) case LevelContainer: @@ -324,40 +324,20 @@ func makeNamespaceMetricExpr(tmpl string, o QueryOptions) string { // For monitoring the specific namespaces // GET /namespaces/{namespace} or // GET /namespaces - if o.NamespaceName != "" { - namespaceSelector = fmt.Sprintf(`namespace="%s"`, o.NamespaceName) + if o.Namespace != "" { + namespaceSelector = fmt.Sprintf(`namespace="%s"`, o.Namespace) } else { namespaceSelector = fmt.Sprintf(`namespace=~"%s"`, o.ResourceFilter) } return strings.Replace(tmpl, "$1", namespaceSelector, -1) } -func makeWorkloadMetricExpr(metric, tmpl string, o QueryOptions) string { - var kindSelector, workloadSelector string +func makeControllerMetricExpr(tmpl string, o QueryOptions) string { + var namespace, podName string - switch o.WorkloadKind { - case "deployment": - o.WorkloadKind = Deployment - case "statefulset": - o.WorkloadKind = StatefulSet - case "daemonset": - o.WorkloadKind = DaemonSet - default: - o.WorkloadKind = ".*" - } - workloadSelector = fmt.Sprintf(`namespace="%s", workload=~"%s:(%s)"`, o.NamespaceName, o.WorkloadKind, o.ResourceFilter) - - if strings.Contains(metric, "deployment") { - kindSelector = fmt.Sprintf(`namespace="%s", deployment!="", deployment=~"%s"`, o.NamespaceName, o.ResourceFilter) - } - if strings.Contains(metric, "statefulset") { - kindSelector = fmt.Sprintf(`namespace="%s", statefulset!="", statefulset=~"%s"`, o.NamespaceName, o.ResourceFilter) - } - if strings.Contains(metric, "daemonset") { - kindSelector = fmt.Sprintf(`namespace="%s", daemonset!="", daemonset=~"%s"`, o.NamespaceName, o.ResourceFilter) - } - - return strings.NewReplacer("$1", workloadSelector, "$2", kindSelector).Replace(tmpl) + namespace = fmt.Sprintf(`namespace="%s"`, o.Namespace) + podName = fmt.Sprintf(`pod=~"%s"`, o.PodName) + return strings.NewReplacer("$1", namespace, "$2", podName).Replace(tmpl) } func makePodMetricExpr(tmpl string, o QueryOptions) string { @@ -365,26 +345,16 @@ func makePodMetricExpr(tmpl string, o QueryOptions) string { // For monitoriong pods of the specific workload // GET /namespaces/{namespace}/workloads/{kind}/{workload}/pods - if o.WorkloadName != "" { - switch o.WorkloadKind { - case "deployment": - workloadSelector = fmt.Sprintf(`owner_kind="ReplicaSet", owner_name=~"^%s-[^-]{1,10}$"`, o.WorkloadName) - case "statefulset": - workloadSelector = fmt.Sprintf(`owner_kind="StatefulSet", owner_name="%s"`, o.WorkloadName) - case "daemonset": - workloadSelector = fmt.Sprintf(`owner_kind="DaemonSet", owner_name="%s"`, o.WorkloadName) - } - } // For monitoring pods in the specific namespace // GET /namespaces/{namespace}/workloads/{kind}/{workload}/pods or // GET /namespaces/{namespace}/pods/{pod} or // GET /namespaces/{namespace}/pods - if o.NamespaceName != "" { + if o.Namespace != "" { if o.PodName != "" { - podSelector = fmt.Sprintf(`pod="%s", namespace="%s"`, o.PodName, o.NamespaceName) + podSelector = fmt.Sprintf(`pod="%s", namespace="%s"`, o.PodName, o.Namespace) } else { - podSelector = fmt.Sprintf(`pod=~"%s", namespace="%s"`, o.ResourceFilter, o.NamespaceName) + podSelector = fmt.Sprintf(`pod=~"%s", namespace="%s"`, o.ResourceFilter, o.Namespace) } } else { var namespaces, pods []string @@ -445,9 +415,9 @@ func makePodMetricExpr(tmpl string, o QueryOptions) string { func makeContainerMetricExpr(tmpl string, o QueryOptions) string { var containerSelector string if o.ContainerName != "" { - containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container="%s"`, o.PodName, o.NamespaceName, o.ContainerName) + containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container="%s"`, o.PodName, o.Namespace, o.ContainerName) } else { - containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container=~"%s"`, o.PodName, o.NamespaceName, o.ResourceFilter) + containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container=~"%s"`, o.PodName, o.Namespace, o.ResourceFilter) } return strings.Replace(tmpl, "$1", containerSelector, -1) } @@ -458,11 +428,11 @@ func makePVCMetricExpr(tmpl string, o QueryOptions) string { // For monitoring persistentvolumeclaims in the specific namespace // GET /namespaces/{namespace}/persistentvolumeclaims/{persistentvolumeclaim} or // GET /namespaces/{namespace}/persistentvolumeclaims - if o.NamespaceName != "" { + if o.Namespace != "" { if o.PersistentVolumeClaimName != "" { - pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim="%s"`, o.NamespaceName, o.PersistentVolumeClaimName) + pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim="%s"`, o.Namespace, o.PersistentVolumeClaimName) } else { - pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.NamespaceName, o.ResourceFilter) + pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.Namespace, o.ResourceFilter) } return strings.Replace(tmpl, "$1", pvcSelector, -1) } diff --git a/pkg/tracker/queryoptions.go b/pkg/tracker/queryoptions.go index 2aa218ac..c290d4aa 100644 --- a/pkg/tracker/queryoptions.go +++ b/pkg/tracker/queryoptions.go @@ -29,7 +29,7 @@ const ( LevelNamespace LevelApplication LevelOpenpitrix - LevelWorkload + LevelController LevelService LevelPod LevelContainer @@ -44,7 +44,7 @@ var MeteringLevelMap = map[string]int{ "LevelWorkspace": LevelWorkspace, "LevelNamespace": LevelNamespace, "LevelApplication": LevelApplication, - "LevelWorkload": LevelWorkload, + "LevelController": LevelController, "LevelService": LevelService, "LevelPod": LevelPod, "LevelContainer": LevelContainer, @@ -70,10 +70,11 @@ type QueryOptions struct { ResourceFilter string NodeName string WorkspaceName string - NamespaceName string + Namespace string WorkloadKind string - WorkloadName string + OwnerName string PodName string + PodsName string ContainerName string StorageClassName string PersistentVolumeClaimName string @@ -140,7 +141,7 @@ func (no NamespaceOption) Apply(o *QueryOptions) { o.Level = LevelNamespace o.ResourceFilter = no.ResourceFilter o.WorkspaceName = no.WorkspaceName - o.NamespaceName = no.NamespaceName + o.Namespace = no.NamespaceName o.PVCFilter = no.PVCFilter o.StorageClassName = no.StorageClassName } @@ -180,16 +181,16 @@ type ApplicationOption struct { func (ao ApplicationOption) Apply(o *QueryOptions) { o.Level = LevelApplication - o.NamespaceName = ao.NamespaceName + o.Namespace = ao.NamespaceName o.ApplicationName = ao.Application o.StorageClassName = ao.StorageClassName app_components := strings.Join(ao.ApplicationComponents[:], "|") if len(app_components) > 0 { - o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.NamespaceName, app_components) + o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.Namespace, app_components) } else { - o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.NamespaceName, ".*") + o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.Namespace, ".*") } } @@ -200,9 +201,9 @@ type WorkloadOption struct { } func (wo WorkloadOption) Apply(o *QueryOptions) { - o.Level = LevelWorkload + o.Level = LevelController o.ResourceFilter = wo.ResourceFilter - o.NamespaceName = wo.NamespaceName + o.Namespace = wo.NamespaceName o.WorkloadKind = wo.WorkloadKind } @@ -226,15 +227,15 @@ type ServiceOption struct { func (so ServiceOption) Apply(o *QueryOptions) { o.Level = LevelService - o.NamespaceName = so.NamespaceName + o.Namespace = so.NamespaceName o.ServiceName = so.ServiceName pod_names := strings.Join(so.PodNames, "|") if len(pod_names) > 0 { - o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, pod_names, o.NamespaceName) + o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, pod_names, o.Namespace) } else { - o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, ".*", o.NamespaceName) + o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, ".*", o.Namespace) } } @@ -248,17 +249,33 @@ type PodOption struct { PodName string } +type ControllerOption struct { + PodsName string + Namespace string + Kind string + OwnerName string +} + func (po PodOption) Apply(o *QueryOptions) { o.Level = LevelPod o.NamespacedResourcesFilter = po.NamespacedResourcesFilter o.ResourceFilter = po.ResourceFilter o.NodeName = po.NodeName - o.NamespaceName = po.NamespaceName + o.Namespace = po.NamespaceName o.WorkloadKind = po.WorkloadKind - o.WorkloadName = po.WorkloadName + o.OwnerName = po.WorkloadName o.PodName = po.PodName } +func (co ControllerOption) Apply(o *QueryOptions) { + o.Level = LevelController + o.Namespace = co.Namespace + o.WorkloadKind = co.Kind + o.OwnerName = co.OwnerName + o.PodName = co.PodsName + +} + type ContainerOption struct { ResourceFilter string NamespaceName string @@ -269,7 +286,7 @@ type ContainerOption struct { func (co ContainerOption) Apply(o *QueryOptions) { o.Level = LevelContainer o.ResourceFilter = co.ResourceFilter - o.NamespaceName = co.NamespaceName + o.Namespace = co.NamespaceName o.PodName = co.PodName o.ContainerName = co.ContainerName } @@ -284,7 +301,7 @@ type PVCOption struct { func (po PVCOption) Apply(o *QueryOptions) { o.Level = LevelPVC o.ResourceFilter = po.ResourceFilter - o.NamespaceName = po.NamespaceName + o.Namespace = po.NamespaceName o.StorageClassName = po.StorageClassName o.PersistentVolumeClaimName = po.PersistentVolumeClaimName @@ -304,7 +321,7 @@ type IngressOption struct { func (no IngressOption) Apply(o *QueryOptions) { o.Level = LevelIngress o.ResourceFilter = no.ResourceFilter - o.NamespaceName = no.NamespaceName + o.Namespace = no.NamespaceName o.Ingress = no.Ingress o.Job = no.Job o.PodName = no.Pod diff --git a/pkg/tracker/tracker.go b/pkg/tracker/tracker.go index 176d38cb..c24b07f9 100644 --- a/pkg/tracker/tracker.go +++ b/pkg/tracker/tracker.go @@ -19,26 +19,109 @@ import ( "github.com/prometheus/client_golang/api" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" + "strconv" "strings" "sync" "time" ) -type prometheus struct { - client v1.API +type Prometheus struct { + prometheus Interface + client v1.API } // NewPrometheus 初始化Prometheus客户端 -func NewPrometheus(address string) (Interface, error) { +func NewPrometheus(address string) (Prometheus, error) { cfg := api.Config{ Address: address, } client, err := api.NewClient(cfg) - return prometheus{client: v1.NewAPI(client)}, err + return Prometheus{client: v1.NewAPI(client)}, err } -func (p prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric { +func ParseTime(timestamp string) (time.Time, error) { + // Parse time params + startInt, err := strconv.ParseInt(timestamp, 10, 64) + if err != nil { + return time.Now(), err + } + return time.Unix(startInt, 0), nil + +} + +func (p Prometheus) GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, o QueryOption) []Metric { + var res []Metric + var mtx sync.Mutex + var wg sync.WaitGroup + opts := NewQueryOptions() + o.Apply(opts) + + for _, metric := range metrics { + wg.Add(1) + go func(metric string) { + parsedResp := Metric{MetricName: metric} + startTimestamp, err := ParseTime(start) + if err != nil { + return + } + endTimestamp, err := ParseTime(end) + if err != nil { + return + } + timeRange := v1.Range{ + Start: startTimestamp, + End: endTimestamp, + Step: step, + } + value, _, err := p.client.QueryRange(context.Background(), makeExpr(metric, *opts), timeRange) + if err != nil { + parsedResp.Error = err.Error() + } else { + parsedResp.MetricData = parseQueryRangeResp(value, genMetricFilter(o)) + } + + mtx.Lock() + res = append(res, parsedResp) + mtx.Unlock() + + wg.Done() + }(metric) + } + + wg.Wait() + + return res +} + +func parseQueryRangeResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData { + res := MetricData{MetricType: MetricTypeMatrix} + + data, _ := value.(model.Matrix) + + for _, v := range data { + if metricFilter != nil && !metricFilter(v.Metric) { + continue + } + mv := MetricValue{ + Metadata: make(map[string]string), + } + + for k, v := range v.Metric { + mv.Metadata[string(k)] = string(v) + } + + for _, k := range v.Values { + mv.Series = append(mv.Series, Point{float64(k.Timestamp) / 1000, float64(k.Value)}) + } + + res.MetricValues = append(res.MetricValues, mv) + } + + return res +} + +func (p Prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric { var res []Metric var mtx sync.Mutex var wg sync.WaitGroup diff --git a/pkg/tracker/tracker_test.go b/pkg/tracker/tracker_test.go index 495ac968..cd6be9cd 100644 --- a/pkg/tracker/tracker_test.go +++ b/pkg/tracker/tracker_test.go @@ -20,12 +20,18 @@ import ( ) func TestGetNamedMetrics(t *testing.T) { - client, _ := NewPrometheus("http://10.101.15.3:32585") - result := client.GetNamedMetrics([]string{"pod_cpu_resource_limits"}, time.Now(), PodOption{ + //client, _ := NewPrometheus("http://10.101.15.3:32585") + //result := client.GetNamedMetrics([]string{"pod_cpu_resource_limits"}, time.Now(), PodOption{ + // + // PodName: "prometheus-k8s-0", + // NamespaceName: "monitoring-system", + //}) + //println("zzz", result[0].MetricValues[0].Sample.Value()) - PodName: "prometheus-k8s-0", - NamespaceName: "monitoring-system", + client, _ := NewPrometheus("http://10.105.20.4:30766") + result := client.GetNamedMetricsByTime([]string{"pod_cpu_usage", "pod_memory_usage_wo_cache"}, "1700521446", "1700551446", 10*time.Minute, ControllerOption{ + PodsName: "notification-manager-deployment-78664576cb-vkptn|notification-manager-deployment-78664576cb-5m6mt", + Namespace: "kubesphere-monitoring-system", }) - println("zzz", result[0].MetricValues[0].Sample.Value()) - + println("zzz", result) }