From f2cf784a7dbc8ea42460dc1c623b91f8a608caf9 Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 14 Mar 2024 17:19:08 +0800 Subject: [PATCH 1/7] modified shuguangai and octopus task submit options for empty algorithmname Former-commit-id: a5e9379e51b049114753623f610c61b2afe9a312 --- api/internal/storeLink/octopus.go | 31 ++++++++++++++++++--- api/internal/storeLink/shuguangai.go | 40 ++++++++++++++++++++++------ 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 7dc97026..4dc7f162 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -349,6 +349,7 @@ func (o *OctopusLink) generateResourceId(option *option.AiOption) error { if err != nil { return err } + return nil } return errors.New("failed to get ResourceId") @@ -433,7 +434,14 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error { func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error { // temporarily set algorithm to cnn - option.AlgorithmName = "cnn" + if option.AlgorithmName == "" { + switch option.DatasetsName { + case "cifar10": + option.AlgorithmName = "cnn" + case "mnist": + option.AlgorithmName = "fcn" + } + } req := &octopus.GetMyAlgorithmListReq{ Platform: o.platform, @@ -457,14 +465,26 @@ func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error { if ns[1] != option.AlgorithmName { continue } - if ns[2] != option.ResourceType { - continue + switch option.ResourceType { + case CPU: + if ns[2] != CPU { + continue + } + case CARD: + if ns[2] != strings.ToLower(option.ComputeCard) { + continue + } } + option.AlgorithmId = algorithm.AlgorithmId return nil } } + if option.AlgorithmId == "" { + return errors.New("Algorithm does not exist") + } + return errors.New("failed to get AlgorithmId") } @@ -487,7 +507,10 @@ func (o *OctopusLink) generateEnv(option *option.AiOption) error { } func (o *OctopusLink) generateParams(option *option.AiOption) error { - + if len(option.Params) == 0 { + epoch := "epoch" + COMMA + "1" + option.Params = append(option.Params, epoch) + } return nil } diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 1bd86443..0e2e5dd6 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -197,9 +197,9 @@ func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []str } func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { - // set algorithmId temporarily + // set algorithmId temporarily for storelink submit if algorithmId == "" { - algorithmId = "pytorch-mnist-fully_connected_network" + algorithmId = "pytorch-mnist-fcn" } // shuguangAi提交任务 @@ -413,6 +413,7 @@ func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error { if option.DatasetsName == "" { return errors.New("DatasetsName not set") } + req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0} list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req) if err != nil { @@ -426,13 +427,34 @@ func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error { for _, file := range list.Data.FileList { ns := strings.Split(file.Name, DASH) if ns[0] == option.DatasetsName { - algorithmId = option.TaskType + DASH + file.Name - option.AlgorithmId = algorithmId - option.AlgorithmName = ns[1] - return nil + algoName := ns[1] + if option.AlgorithmName == "" { + switch option.DatasetsName { + case "cifar10": + algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "cnn" + option.AlgorithmId = algorithmId + option.AlgorithmName = algoName + return nil + case "mnist": + algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "fcn" + option.AlgorithmId = algorithmId + option.AlgorithmName = algoName + return nil + } + } else { + if algoName == option.AlgorithmName { + algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + algoName + option.AlgorithmId = algorithmId + return nil + } + } } } + if algorithmId == "" { + return errors.New("Algorithm does not exist") + } + return errors.New("failed to get AlgorithmId") } @@ -451,8 +473,10 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error { return errors.New("ResourceType not set") } - //epoch := "epoch" + COMMA + "1" - //option.Params = append(option.Params, epoch) + if len(option.Params) == 0 { + epoch := "epoch" + COMMA + "1" + option.Params = append(option.Params, epoch) + } switch option.ResourceType { case CPU: From fd71bfba724d34346ea6bdb2d366fe8546197197 Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 15 Mar 2024 17:34:49 +0800 Subject: [PATCH 2/7] modified ai option and dynamicResource strategy Former-commit-id: 98ae246782e73cca61ee5946d1c85f03ddba4304 --- .../scheduler/schedulers/aiScheduler.go | 3 +++ .../scheduler/schedulers/option/aiOption.go | 4 ++++ .../scheduler/schedulers/option/option.go | 10 +++++++-- .../scheduler/strategy/dynamicResources.go | 22 +++++++++++++++++++ 4 files changed, 37 insertions(+), 2 deletions(-) diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index b88fa481..923a290c 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -74,6 +74,9 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { case strategy.RESOURCES_PRICING: strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil + case strategy.DYNAMIC_RESOURCES: + strategy := strategy.NewDynamicResourcesStrategy(resources, as.option) + return strategy, nil } return nil, errors.New("no strategy has been chosen") diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index 9024d907..b30a372c 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -29,3 +29,7 @@ type AiOption struct { Image string Model interface{} } + +func (a AiOption) GetOptionType() string { + return AI +} diff --git a/api/internal/scheduler/schedulers/option/option.go b/api/internal/scheduler/schedulers/option/option.go index e7c5c218..f7c9eef1 100644 --- a/api/internal/scheduler/schedulers/option/option.go +++ b/api/internal/scheduler/schedulers/option/option.go @@ -1,5 +1,11 @@ package option -type Option struct { - Name string +const ( + AI = "ai" + CLOUD = "cloud" + HPC = "hpc" +) + +type Option interface { + GetOptionType() string } diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index 4bb4a83e..7ad0e9d9 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -1,8 +1,30 @@ package strategy +import ( + "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" +) + type DynamicResourcesStrategy struct { + replicas int32 + resources []*collector.ResourceStats + opt option.Option +} + +func NewDynamicResourcesStrategy(resources []*collector.ResourceStats, opt option.Option) *DynamicResourcesStrategy { + return &DynamicResourcesStrategy{resources: resources, opt: opt} } func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { + if ps.replicas < 1 { + return nil, errors.New("replicas must be greater than 0") + } + + switch ps.opt.GetOptionType() { + case option.AI: + _ = (interface{})(ps.opt).(*option.AiOption) + + } return nil, nil } From 93df06ada021f3d19538e5f7be69747cf4e7593b Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Tue, 19 Mar 2024 09:21:49 +0800 Subject: [PATCH 3/7] fix:Statistics on the number of new virtual machine networks and mirror networks Former-commit-id: 22e384a381516868d9d85cf3c94f965d63dc5a02 --- api/desc/core/pcm-core.api | 20 +++- api/desc/pcm.api | 8 ++ api/desc/vm/pcm-vm.api | 33 ++++-- api/etc/pcm.yaml | 6 +- api/internal/handler/routes.go | 10 ++ api/internal/handler/vm/getimagenumhandler.go | 28 +++++ .../handler/vm/getnetworknumhandler.go | 28 +++++ api/internal/logic/core/commitvmtasklogic.go | 28 ++++- api/internal/logic/vm/getimagenumlogic.go | 49 +++++++++ api/internal/logic/vm/getnetworknumlogic.go | 49 +++++++++ api/internal/logic/vm/listnetworkslogic.go | 2 +- api/internal/mqs/ScheduleVm.go | 51 +++++++++ .../scheduler/schedulers/vmScheduler.go | 56 +++++++++- api/internal/types/types.go | 53 +++++++-- pkg/models/vmmodel.go | 24 +++++ pkg/models/vmmodel_gen.go | 101 ++++++++++++++++++ 16 files changed, 517 insertions(+), 29 deletions(-) create mode 100644 api/internal/handler/vm/getimagenumhandler.go create mode 100644 api/internal/handler/vm/getnetworknumhandler.go create mode 100644 api/internal/logic/vm/getimagenumlogic.go create mode 100644 api/internal/logic/vm/getnetworknumlogic.go create mode 100644 api/internal/mqs/ScheduleVm.go create mode 100644 pkg/models/vmmodel.go create mode 100644 pkg/models/vmmodel_gen.go diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index 2e6d0435..6db76a7e 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -64,6 +64,8 @@ type commitTaskReq { Replicas int64 `json:"replicas,optional"` MatchLabels map[string]string `json:"matchLabels,optional"` YamlList []string `json:"yamlList"` + ClusterName string `json:"clusterName"` + } type ( @@ -87,7 +89,11 @@ type ( type ( commitVmTaskReq { - server ServerCommit `json:"server,optional"` + Name string `json:"name"` + NsID string `json:"nsID"` + Replicas int64 `json:"replicas,optional"` + MatchLabels map[string]string `json:"matchLabels,optional"` + server []ServerCommit `json:"server,optional"` platform string `json:"platform,optional"` } ServerCommit { @@ -114,7 +120,19 @@ type ( uuid string `json:"uuid,optional"` } commitVmTaskResp { + Id string `json:"id" copier:"Id"` + Links []VmLinks `json:"links" copier:"Links"` + OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"` + SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"` + AdminPass string `json:"adminPass" copier:"AdminPass"` + } + VmLinks { + Href string `json:"href " copier:"Href"` + Rel string `json:"rel" copier:"Rel"` + } + VmSecurity_groups_server { + Name string `json:"name" copier:"Name"` } ) diff --git a/api/desc/pcm.api b/api/desc/pcm.api index a5fdc00f..0a5a3c06 100644 --- a/api/desc/pcm.api +++ b/api/desc/pcm.api @@ -354,6 +354,14 @@ service pcm { @handler GetVolumeLimitsHandler get /vm/getVolumeLimits (GetVolumeLimitsReq) returns (GetVolumeLimitsResp) + @doc "查询网络数量" + @handler GetNetworkNumHandler + get /vm/getNetworkNum (ListNetworksReq) returns (NetworkNum) + + @doc "查询镜像列表" + @handler getImageNumHandler + get /vm/getImageNum (ListImagesReq) returns (ImageNum) + @doc "查询虚拟机列表" @handler ListServerHandler get /vm/listServer (ListServersReq) returns (ListServersResp) diff --git a/api/desc/vm/pcm-vm.api b/api/desc/vm/pcm-vm.api index 724d0d45..a5ad6621 100644 --- a/api/desc/vm/pcm-vm.api +++ b/api/desc/vm/pcm-vm.api @@ -343,13 +343,6 @@ type ( CreNetwork { Uuid string `json:"uuid" copier:"Uuid"` } - ServerResp { - Id string `json:"id" copier:"Id"` - Links []Links `json:"links" copier:"Links"` - OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"` - SecurityGroups []Security_groups_server `json:"security_groups" copier:"SecurityGroups"` - AdminPass string `json:"adminPass" copier:"AdminPass"` - } Security_groups_server { Name string `json:"name" copier:"Name"` } @@ -360,6 +353,13 @@ type ( DestinationType string `json:"destination_type" copier:"DestinationType"` DeleteOnTermination bool `json:"delete_on_termination" copier:"DeleteOnTermination"` } + ServerResp { + Id string `json:"id" copier:"Id"` + Links []Links `json:"links" copier:"Links"` + OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"` + SecurityGroups []Security_groups_server `json:"security_groups" copier:"SecurityGroups"` + AdminPass string `json:"adminPass" copier:"AdminPass"` + } ) type( @@ -678,6 +678,25 @@ type ( /******************find images end*************************/ /******************find Networks end*************************/ +type ( + + NetworkNum { + NetworkNum int32 `json:"networkNum"` + Code int32 `json:"code,omitempty"` + Msg string `json:"msg,omitempty"` + ErrorMsg string `json:"errorMsg,omitempty"` + } +) + +type ( + ImageNum { + ImageNum int32 `json:"imageNum"` + Code int32 `json:"code,omitempty"` + Msg string `json:"msg,omitempty"` + ErrorMsg string `json:"errorMsg,omitempty"` + } +) + type ( ListNetworksReq { Platform string `json:"platform,optional"` diff --git a/api/etc/pcm.yaml b/api/etc/pcm.yaml index 32e7b269..108e9c56 100644 --- a/api/etc/pcm.yaml +++ b/api/etc/pcm.yaml @@ -67,9 +67,9 @@ OctopusRpcConf: Timeout: 20000 OpenstackRpcConf: - target: nacos://10.206.0.12:8848/pcm.openstack.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api - # Endpoints: - # - 127.0.0.1:8888 + # target: nacos://10.206.0.12:8848/pcm.openstack.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api + Endpoints: + - 127.0.0.1:2010 NonBlock: true Timeout: 20000 diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index 7ccdb138..070b7cc3 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -415,6 +415,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/vm/getVolumeLimits", Handler: vm.GetVolumeLimitsHandler(serverCtx), }, + { + Method: http.MethodGet, + Path: "/vm/getNetworkNum", + Handler: vm.GetNetworkNumHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/vm/getImageNum", + Handler: vm.GetImageNumHandler(serverCtx), + }, { Method: http.MethodGet, Path: "/vm/listServer", diff --git a/api/internal/handler/vm/getimagenumhandler.go b/api/internal/handler/vm/getimagenumhandler.go new file mode 100644 index 00000000..ecaf4ea1 --- /dev/null +++ b/api/internal/handler/vm/getimagenumhandler.go @@ -0,0 +1,28 @@ +package vm + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/vm" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" +) + +func GetImageNumHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.ListImagesReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := vm.NewGetImageNumLogic(r.Context(), svcCtx) + resp, err := l.GetImageNum(&req) + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/vm/getnetworknumhandler.go b/api/internal/handler/vm/getnetworknumhandler.go new file mode 100644 index 00000000..d1115cdf --- /dev/null +++ b/api/internal/handler/vm/getnetworknumhandler.go @@ -0,0 +1,28 @@ +package vm + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/vm" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" +) + +func GetNetworkNumHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.ListNetworksReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := vm.NewGetNetworkNumLogic(r.Context(), svcCtx) + resp, err := l.GetNetworkNum(&req) + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/logic/core/commitvmtasklogic.go b/api/internal/logic/core/commitvmtasklogic.go index 782b0af3..5d803bf2 100644 --- a/api/internal/logic/core/commitvmtasklogic.go +++ b/api/internal/logic/core/commitvmtasklogic.go @@ -2,9 +2,13 @@ package core import ( "context" - + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/mqs" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "time" "github.com/zeromicro/go-zero/core/logx" ) @@ -25,6 +29,26 @@ func NewCommitVmTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) { // todo: add your logic here and delete this line - + //Building the main task structure + taskModel := models.Task{ + Status: constants.Saved, + Name: req.Name, + CommitTime: time.Now(), + NsID: req.NsID, + } + // Save task data to database + tx := l.svcCtx.DbEngin.Create(&taskModel) + if tx.Error != nil { + return nil, tx.Error + } + /* hpc := models.Hpc{} + tool.Convert(req, &hpc)*/ + mqInfo := response.TaskInfo{ + TaskId: taskModel.Id, + TaskType: "vm", + MatchLabels: req.MatchLabels, + NsID: req.NsID, + } + mqs.InsQueue.Beta.Add(&mqInfo) return } diff --git a/api/internal/logic/vm/getimagenumlogic.go b/api/internal/logic/vm/getimagenumlogic.go new file mode 100644 index 00000000..eac511af --- /dev/null +++ b/api/internal/logic/vm/getimagenumlogic.go @@ -0,0 +1,49 @@ +package vm + +import ( + "context" + "fmt" + "github.com/jinzhu/copier" + "github.com/pkg/errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gitlink.org.cn/JointCloud/pcm-openstack/openstack" + + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetImageNumLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetImageNumLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetImageNumLogic { + return &GetImageNumLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetImageNumLogic) GetImageNum(req *types.ListImagesReq) (resp *types.ImageNum, err error) { + // todo: add your logic here and delete this line + resp = &types.ImageNum{} + ListImagesReq := &openstack.ListImagesReq{} + err = copier.CopyWithOption(ListImagesReq, req, copier.Option{Converters: utils.Converters}) + ListImagesResp, err := l.svcCtx.OpenstackRpc.ListImages(l.ctx, ListImagesReq) + if err != nil { + return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get Networks list"), "Failed to get db Servers list err : %v ,req:%+v", err, req) + } + var count int = len(ListImagesResp.Images) + resp.ImageNum = int32(count) + fmt.Println(count) + if err != nil { + return nil, result.NewDefaultError(err.Error()) + } + return resp, err +} diff --git a/api/internal/logic/vm/getnetworknumlogic.go b/api/internal/logic/vm/getnetworknumlogic.go new file mode 100644 index 00000000..65084eba --- /dev/null +++ b/api/internal/logic/vm/getnetworknumlogic.go @@ -0,0 +1,49 @@ +package vm + +import ( + "context" + "fmt" + "github.com/jinzhu/copier" + "github.com/pkg/errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gitlink.org.cn/JointCloud/pcm-openstack/openstack" + + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetNetworkNumLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetNetworkNumLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetNetworkNumLogic { + return &GetNetworkNumLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetNetworkNumLogic) GetNetworkNum(req *types.ListNetworksReq) (resp *types.NetworkNum, err error) { + // todo: add your logic here and delete this line + resp = &types.NetworkNum{} + ListNetworksReq := &openstack.ListNetworksReq{} + err = copier.CopyWithOption(ListNetworksReq, req, copier.Option{Converters: utils.Converters}) + ListNetworksResp, err := l.svcCtx.OpenstackRpc.ListNetworks(l.ctx, ListNetworksReq) + if err != nil { + return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get Networks list"), "Failed to get db Servers list err : %v ,req:%+v", err, req) + } + var count int = len(ListNetworksResp.Networks) + resp.NetworkNum = int32(count) + fmt.Println(count) + if err != nil { + return nil, result.NewDefaultError(err.Error()) + } + return resp, err +} diff --git a/api/internal/logic/vm/listnetworkslogic.go b/api/internal/logic/vm/listnetworkslogic.go index 19b8af42..2433324d 100644 --- a/api/internal/logic/vm/listnetworkslogic.go +++ b/api/internal/logic/vm/listnetworkslogic.go @@ -49,7 +49,7 @@ func (l *ListNetworksLogic) ListNetworks(req *types.ListNetworksReq) (resp *type err = copier.CopyWithOption(ListNetworksReq, req, copier.Option{Converters: utils.Converters}) ListNetworksResp, err := l.svcCtx.OpenstackRpc.ListNetworks(l.ctx, ListNetworksReq) if err != nil { - return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get Servers list"), "Failed to get db Servers list err : %v ,req:%+v", err, req) + return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get Networks list"), "Failed to get db Servers list err : %v ,req:%+v", err, req) } marshal, err := json.Marshal(&ListNetworksResp) if err != nil { diff --git a/api/internal/mqs/ScheduleVm.go b/api/internal/mqs/ScheduleVm.go new file mode 100644 index 00000000..9cf4c203 --- /dev/null +++ b/api/internal/mqs/ScheduleVm.go @@ -0,0 +1,51 @@ +package mqs + +import ( + "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +/* +* + */ +type VmMq struct { + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewVmMq(ctx context.Context, svcCtx *svc.ServiceContext) *VmMq { + return &VmMq{ + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *VmMq) Consume(val string) error { + // 接受消息, 根据标签筛选过滤 + vmScheduler := schedulers.NewVmScheduler() + schdl, err := scheduler.NewScheduler(vmScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) + if err != nil { + return err + } + + //检测是否指定了集群列表 + schdl.SpecifyClusters() + + //检测是否指定了nsID + schdl.SpecifyNsID() + + //通过标签匹配筛选出集群范围 + schdl.MatchLabels() + + //todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 + schdl.TempAssign() + + // 存储数据 + err = schdl.SaveToDb() + if err != nil { + return err + } + return nil +} diff --git a/api/internal/scheduler/schedulers/vmScheduler.go b/api/internal/scheduler/schedulers/vmScheduler.go index 54b249e4..d2a3bb91 100644 --- a/api/internal/scheduler/schedulers/vmScheduler.go +++ b/api/internal/scheduler/schedulers/vmScheduler.go @@ -1,21 +1,67 @@ package schedulers import ( + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" ) type VmScheduler struct { + storage database.Storage } -func (v VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { - //TODO implement me - panic("implement me") +func NewVmScheduler() *VmScheduler { + return &VmScheduler{} } -func (v VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) { +func (vm *VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) { + //获取所有计算中心 + //调度算法 + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{}) + return strategy, nil +} + +func (v *VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { //TODO implement me - panic("implement me") + vm := models.Vm{} + utils.Convert(task.Metadata, &vm) + vm.Id = utils.GenSnowflakeID() + vm.TaskId = vm.TaskId + vm.Status = constants.Saved + vm.ParticipantId = participantId + return vm, nil + //vm.YamlString =v.yamlString + /* vm. = utils.GenSnowflakeID() + vm.NsID = task.NsID + vm.ParticipantId = participantId*/ +} + +/* + func (vm *VmScheduler) UnMarshalVmStruct(yamlString string, taskId int64, nsID string) models.vm { + var vm models.Vm + vm := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) + } +*/ +func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) { + proParams, err := vm.storage.GetProviderParams() + if err != nil { + return nil, nil, nil + } + var providerList []*providerPricing.Provider + for _, p := range proParams { + provider := providerPricing.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0) + providerList = append(providerList, provider) + } + + //replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + //t := algorithm.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000) + + return nil, providerList, nil } func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 7d4c971c..4adf15a3 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -56,6 +56,7 @@ type CommitTaskReq struct { Replicas int64 `json:"replicas,optional"` MatchLabels map[string]string `json:"matchLabels,optional"` YamlList []string `json:"yamlList"` + ClusterName string `json:"clusterName"` } type ScheduleTaskByYamlReq struct { @@ -77,8 +78,12 @@ type TaskYaml struct { } type CommitVmTaskReq struct { - Server ServerCommit `json:"server,optional"` - Platform string `json:"platform,optional"` + Name string `json:"name"` + NsID string `json:"nsID"` + Replicas int64 `json:"replicas,optional"` + MatchLabels map[string]string `json:"matchLabels,optional"` + Server ServerCommit `json:"server,optional"` + Platform string `json:"platform,optional"` } type ServerCommit struct { @@ -108,6 +113,20 @@ type Block_device_mapping_v2Commit struct { } type CommitVmTaskResp struct { + Id string `json:"id" copier:"Id"` + Links []VmLinks `json:"links" copier:"Links"` + OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"` + SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"` + AdminPass string `json:"adminPass" copier:"AdminPass"` +} + +type VmLinks struct { + Href string `json:"href " copier:"Href"` + Rel string `json:"rel" copier:"Rel"` +} + +type VmSecurity_groups_server struct { + Name string `json:"name" copier:"Name"` } type ScheduleTaskByYamlResp struct { @@ -2795,14 +2814,6 @@ type CreNetwork struct { Uuid string `json:"uuid" copier:"Uuid"` } -type ServerResp struct { - Id string `json:"id" copier:"Id"` - Links []Links `json:"links" copier:"Links"` - OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"` - SecurityGroups []Security_groups_server `json:"security_groups" copier:"SecurityGroups"` - AdminPass string `json:"adminPass" copier:"AdminPass"` -} - type Security_groups_server struct { Name string `json:"name" copier:"Name"` } @@ -2815,6 +2826,14 @@ type Block_device_mapping_v2 struct { DeleteOnTermination bool `json:"delete_on_termination" copier:"DeleteOnTermination"` } +type ServerResp struct { + Id string `json:"id" copier:"Id"` + Links []Links `json:"links" copier:"Links"` + OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"` + SecurityGroups []Security_groups_server `json:"security_groups" copier:"SecurityGroups"` + AdminPass string `json:"adminPass" copier:"AdminPass"` +} + type RebuildServerReq struct { ServerId string `json:"server_id" copier:"ServerId"` Platform string `json:"platform,optional"` @@ -3122,6 +3141,20 @@ type DeleteImageResp struct { ErrorMsg string `json:"errorMsg,omitempty"` } +type NetworkNum struct { + NetworkNum int32 `json:"networkNum"` + Code int32 `json:"code,omitempty"` + Msg string `json:"msg,omitempty"` + ErrorMsg string `json:"errorMsg,omitempty"` +} + +type ImageNum struct { + ImageNum int32 `json:"imageNum"` + Code int32 `json:"code,omitempty"` + Msg string `json:"msg,omitempty"` + ErrorMsg string `json:"errorMsg,omitempty"` +} + type ListNetworksReq struct { Platform string `json:"platform,optional"` } diff --git a/pkg/models/vmmodel.go b/pkg/models/vmmodel.go new file mode 100644 index 00000000..0649c427 --- /dev/null +++ b/pkg/models/vmmodel.go @@ -0,0 +1,24 @@ +package models + +import "github.com/zeromicro/go-zero/core/stores/sqlx" + +var _ VmModel = (*customVmModel)(nil) + +type ( + // VmModel is an interface to be customized, add more methods here, + // and implement the added methods in customVmModel. + VmModel interface { + vmModel + } + + customVmModel struct { + *defaultVmModel + } +) + +// NewVmModel returns a model for the database table. +func NewVmModel(conn sqlx.SqlConn) VmModel { + return &customVmModel{ + defaultVmModel: newVmModel(conn), + } +} diff --git a/pkg/models/vmmodel_gen.go b/pkg/models/vmmodel_gen.go new file mode 100644 index 00000000..3fc2beeb --- /dev/null +++ b/pkg/models/vmmodel_gen.go @@ -0,0 +1,101 @@ +// Code generated by goctl. DO NOT EDIT. + +package models + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/zeromicro/go-zero/core/stores/builder" + "github.com/zeromicro/go-zero/core/stores/sqlc" + "github.com/zeromicro/go-zero/core/stores/sqlx" + "github.com/zeromicro/go-zero/core/stringx" +) + +var ( + vmFieldNames = builder.RawFieldNames(&Vm{}) + vmRows = strings.Join(vmFieldNames, ",") + vmRowsExpectAutoSet = strings.Join(stringx.Remove(vmFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",") + vmRowsWithPlaceHolder = strings.Join(stringx.Remove(vmFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?" +) + +type ( + vmModel interface { + Insert(ctx context.Context, data *Vm) (sql.Result, error) + FindOne(ctx context.Context, id int64) (*Vm, error) + Update(ctx context.Context, data *Vm) error + Delete(ctx context.Context, id int64) error + } + + defaultVmModel struct { + conn sqlx.SqlConn + table string + } + + Vm struct { + Id int64 `db:"id"` // id + TaskId int64 `db:"task_id"` // 任务id + ParticipantId int64 `db:"participant_id"` // p端id + ApiVersion sql.NullString `db:"api_version"` // api版本 + Name sql.NullString `db:"name"` // 名字 + Namespace sql.NullString `db:"namespace"` // 命名空间 + Kind sql.NullString `db:"kind"` // 种类 + CreatedBy sql.NullInt64 `db:"created_by"` // 创建人 + CreatedTime sql.NullTime `db:"created_time"` // 创建时间 + UpdateBy sql.NullInt64 `db:"update_by"` // 修改人 + UpdateTime sql.NullTime `db:"update_time"` // 修改时间 + Status string `db:"status"` // 状态 + } +) + +func newVmModel(conn sqlx.SqlConn) *defaultVmModel { + return &defaultVmModel{ + conn: conn, + table: "`vm`", + } +} + +func (m *defaultVmModel) withSession(session sqlx.Session) *defaultVmModel { + return &defaultVmModel{ + conn: sqlx.NewSqlConnFromSession(session), + table: "`vm`", + } +} + +func (m *defaultVmModel) Delete(ctx context.Context, id int64) error { + query := fmt.Sprintf("delete from %s where `id` = ?", m.table) + _, err := m.conn.ExecCtx(ctx, query, id) + return err +} + +func (m *defaultVmModel) FindOne(ctx context.Context, id int64) (*Vm, error) { + query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", vmRows, m.table) + var resp Vm + err := m.conn.QueryRowCtx(ctx, &resp, query, id) + switch err { + case nil: + return &resp, nil + case sqlc.ErrNotFound: + return nil, ErrNotFound + default: + return nil, err + } +} + +func (m *defaultVmModel) Insert(ctx context.Context, data *Vm) (sql.Result, error) { + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, vmRowsExpectAutoSet) + ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.ParticipantId, data.ApiVersion, data.Name, data.Namespace, data.Kind, data.CreatedBy, data.CreatedTime, data.UpdateBy, data.Status) + return ret, err +} + +func (m *defaultVmModel) Update(ctx context.Context, data *Vm) error { + query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, vmRowsWithPlaceHolder) + _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.ParticipantId, data.ApiVersion, data.Name, data.Namespace, data.Kind, data.CreatedBy, data.CreatedTime, data.UpdateBy, data.Status, data.Id) + return err +} + +func (m *defaultVmModel) tableName() string { + return m.table +} From d3ff595ea197d5c638f1b808a3475e554649de91 Mon Sep 17 00:00:00 2001 From: zhouqunjie Date: Tue, 19 Mar 2024 09:47:55 +0800 Subject: [PATCH 4/7] add pull and push task api Former-commit-id: 40540a6dd07fa468ceeb2e6f3e3790032b05bed4 --- api/client/task.go | 12 +- api/client/task_impl.go | 160 +++++------------- .../handler/core/pulltaskinfohandler.go | 28 +++ .../handler/core/pushtaskinfohandler.go | 28 +++ api/internal/handler/routes.go | 10 ++ api/internal/logic/core/pulltaskinfologic.go | 94 ++++++++++ api/internal/logic/core/pushtaskinfologic.go | 105 ++++++++++++ 7 files changed, 309 insertions(+), 128 deletions(-) create mode 100644 api/internal/handler/core/pulltaskinfohandler.go create mode 100644 api/internal/handler/core/pushtaskinfohandler.go create mode 100644 api/internal/logic/core/pulltaskinfologic.go create mode 100644 api/internal/logic/core/pushtaskinfologic.go diff --git a/api/client/task.go b/api/client/task.go index 557f2052..086249f7 100644 --- a/api/client/task.go +++ b/api/client/task.go @@ -6,6 +6,12 @@ type TaskOptions struct { pushResourceInfoReq PushResourceInfoReq } +type Task interface { + PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) + PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) + PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error +} + type PullTaskInfoReq struct { AdapterId int64 `json:"adapterId"` } @@ -33,9 +39,3 @@ type PushTaskInfoResp struct { type PushResourceInfoReq struct { AdapterId int64 `json:"adapterId"` } - -type Task interface { - PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) - PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) - PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) -} diff --git a/api/client/task_impl.go b/api/client/task_impl.go index 8b141cea..a339c298 100644 --- a/api/client/task_impl.go +++ b/api/client/task_impl.go @@ -1,13 +1,10 @@ package client import ( - "github.com/jinzhu/copier" - "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "gorm.io/gorm" + "io/ioutil" + "k8s.io/apimachinery/pkg/util/json" "log" + "net/http" "strings" "sync" ) @@ -29,128 +26,47 @@ func newTask(client *client, options *TaskOptions) (*task, error) { return task, nil } -func (t task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) { - result := PullTaskInfoResp{} - // 查询p端类型 - var kind int32 - t.client.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", pullTaskInfoReq.AdapterId).Scan(&kind) - // 查询云智超中的数据列表 - switch kind { - case 2: - var hpcModelList []models.TaskHpc - findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &hpcModelList) - utils.Convert(hpcModelList, &result.HpcInfoList) - if len(result.HpcInfoList) > 0 { - for i, hpcInfo := range hpcModelList { - err := copier.CopyWithOption(result.HpcInfoList[i], hpcInfo, copier.Option{Converters: utils.Converters}) - if err != nil { - return nil, err - } - var clusterType string - t.client.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType) +func (t *task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) { - result.HpcInfoList[i].ClusterType = clusterType - } - } - case 0: - var cloudModelList []models.Cloud - findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &cloudModelList) - utils.Convert(cloudModelList, &result.CloudInfoList) - case 1: - var aiModelList []models.Ai - findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &aiModelList) - utils.Convert(aiModelList, &result.AiInfoList) - } - return &result, nil + url := t.client.url + "/pcm/v1/core/pullTaskInfo" + method := "GET" + infoReq := PullTaskInfoReq{AdapterId: pullTaskInfoReq.AdapterId} + jsonStr, _ := json.Marshal(infoReq) + payload := strings.NewReader(string(jsonStr)) + + client := &http.Client{} + req, _ := http.NewRequest(method, url, payload) + req.Header.Add("Content-Type", "application/json") + res, _ := client.Do(req) + defer res.Body.Close() + + body, _ := ioutil.ReadAll(res.Body) + var resp PullTaskInfoResp + json.Unmarshal(body, &resp) + return &resp, nil } -func (t task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) { - // 查询p端类型 - var kind int32 - t.client.DbEngin.Raw("select type as kind from t_adapter where id = ?", pushTaskInfoReq.AdapterId).Scan(&kind) - switch kind { - case 0: - for _, cloudInfo := range pushTaskInfoReq.CloudInfoList { - t.client.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?", - cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, pushTaskInfoReq.AdapterId, cloudInfo.Id) - syncTask(t.client.DbEngin, cloudInfo.TaskId) - } - case 2: - for _, hpcInfo := range pushTaskInfoReq.HpcInfoList { - t.client.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", - hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, pushTaskInfoReq.AdapterId, hpcInfo.TaskId, hpcInfo.Name) - syncTask(t.client.DbEngin, hpcInfo.TaskId) - } - case 1: - for _, aiInfo := range pushTaskInfoReq.AiInfoList { - t.client.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", - aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, pushTaskInfoReq.AdapterId, aiInfo.TaskId, aiInfo.Name) - syncTask(t.client.DbEngin, aiInfo.TaskId) - } - } +func (t *task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) { - return &PushTaskInfoResp{}, nil + url := t.client.url + "/pcm/v1/core/pushTaskInfo" + method := "POST" + infoReq := PullTaskInfoReq{AdapterId: pushTaskInfoReq.AdapterId} + jsonStr, _ := json.Marshal(infoReq) + payload := strings.NewReader(string(jsonStr)) + + client := &http.Client{} + req, _ := http.NewRequest(method, url, payload) + req.Header.Add("Content-Type", "application/json") + res, _ := client.Do(req) + defer res.Body.Close() + + body, _ := ioutil.ReadAll(res.Body) + var resp PushTaskInfoResp + json.Unmarshal(body, &resp) + return &resp, nil } -func (t task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) { +func (t *task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error { //TODO implement me panic("implement me") } - -func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error { - tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data) - if tx.Error != nil { - return tx.Error - } - return nil -} - -func syncTask(gorm *gorm.DB, taskId int64) { - - var allStatus string - tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) - if tx.Error != nil { - logx.Error(tx.Error) - } - // 子状态统一则修改主任务状态 - statusArray := strings.Split(allStatus, ",") - if len(removeRepeatedElement(statusArray)) == 1 { - updateTask(gorm, taskId, statusArray[0]) - - } - // 子任务包含失败状态 主任务则失败 - if strings.Contains(allStatus, constants.Failed) { - updateTask(gorm, taskId, constants.Failed) - - } - if strings.Contains(allStatus, constants.Running) { - updateTask(gorm, taskId, constants.Running) - } - -} - -func updateTask(gorm *gorm.DB, taskId int64, status string) { - var task models.Task - gorm.Where("id = ? ", taskId).Find(&task) - if task.Status != status { - task.Status = status - gorm.Updates(&task) - } -} - -func removeRepeatedElement(arr []string) (newArr []string) { - newArr = make([]string, 0) - for i := 0; i < len(arr); i++ { - repeat := false - for j := i + 1; j < len(arr); j++ { - if arr[i] == arr[j] { - repeat = true - break - } - } - if !repeat { - newArr = append(newArr, arr[i]) - } - } - return -} diff --git a/api/internal/handler/core/pulltaskinfohandler.go b/api/internal/handler/core/pulltaskinfohandler.go new file mode 100644 index 00000000..e1777968 --- /dev/null +++ b/api/internal/handler/core/pulltaskinfohandler.go @@ -0,0 +1,28 @@ +package core + +import ( + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func PullTaskInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req clientCore.PullTaskInfoReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := core.NewPullTaskInfoLogic(r.Context(), svcCtx) + resp, err := l.PullTaskInfo(&req) + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/core/pushtaskinfohandler.go b/api/internal/handler/core/pushtaskinfohandler.go new file mode 100644 index 00000000..fa7f5e91 --- /dev/null +++ b/api/internal/handler/core/pushtaskinfohandler.go @@ -0,0 +1,28 @@ +package core + +import ( + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +func PushTaskInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req clientCore.PushTaskInfoReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := core.NewPushTaskInfoLogic(r.Context(), svcCtx) + resp, err := l.PushTaskInfo(&req) + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index 799d3e73..f01f8ea0 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -134,6 +134,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/core/metrics", Handler: core.MetricsHandler(serverCtx), }, + { + Method: http.MethodGet, + Path: "/core/pullTaskInfo", + Handler: core.PullTaskInfoHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/core/pushTaskInfo", + Handler: core.PushTaskInfoHandler(serverCtx), + }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/api/internal/logic/core/pulltaskinfologic.go b/api/internal/logic/core/pulltaskinfologic.go new file mode 100644 index 00000000..438475d4 --- /dev/null +++ b/api/internal/logic/core/pulltaskinfologic.go @@ -0,0 +1,94 @@ +package core + +import ( + "context" + "github.com/jinzhu/copier" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gorm.io/gorm" + + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +type PullTaskInfoLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewPullTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PullTaskInfoLogic { + return &PullTaskInfoLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clientCore.PullTaskInfoResp, error) { + //opt := clientPCM.Options{ + // Url: "http://localhost:8999", + // DataSource: "root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local", + //} + //coreCli, _ := clientPCM.NewClient(opt) + //taskOpt := clientPCM.TaskOptions{} + //coreTask, _ := coreCli.Task(taskOpt) + //adapterId := 1706858330967773111 + //// 查询core端分发下来的任务列表 + //pullReq := types.PullTaskInfoReq{ + // AdapterId: int64(adapterId), + //} + //hpcList, _ := coreTask.PullTaskInfo(pullReq) + //println(hpcList) + // 查询p端类型 + resp := clientCore.PullTaskInfoResp{} + + var kind int32 + l.svcCtx.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", req.AdapterId).Scan(&kind) + // 查询云智超中的数据列表 + switch kind { + case 2: + var hpcModelList []models.TaskHpc + err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &hpcModelList) + if err != nil { + return nil, err + } + utils.Convert(hpcModelList, &resp.HpcInfoList) + if len(resp.HpcInfoList) > 0 { + for i, hpcInfo := range hpcModelList { + err := copier.CopyWithOption(resp.HpcInfoList[i], hpcInfo, copier.Option{Converters: utils.Converters}) + if err != nil { + return nil, err + } + var clusterType string + l.svcCtx.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType) + + resp.HpcInfoList[i].ClusterType = clusterType + } + } + case 0: + var cloudModelList []models.Cloud + err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList) + if err != nil { + return nil, err + } + utils.Convert(cloudModelList, &resp.CloudInfoList) + case 1: + var aiModelList []models.Ai + err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &aiModelList) + if err != nil { + return nil, err + } + utils.Convert(aiModelList, &resp.AiInfoList) + } + return &resp, nil +} + +func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error { + tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data) + if tx.Error != nil { + return tx.Error + } + return nil +} diff --git a/api/internal/logic/core/pushtaskinfologic.go b/api/internal/logic/core/pushtaskinfologic.go new file mode 100644 index 00000000..6a882aee --- /dev/null +++ b/api/internal/logic/core/pushtaskinfologic.go @@ -0,0 +1,105 @@ +package core + +import ( + "context" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gorm.io/gorm" + "strings" + + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" +) + +type PushTaskInfoLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewPushTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PushTaskInfoLogic { + return &PushTaskInfoLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clientCore.PushTaskInfoResp, error) { + resp := clientCore.PushTaskInfoResp{} + var kind int32 + l.svcCtx.DbEngin.Raw("select type as kind from t_adapter where id = ?", req.AdapterId).Scan(&kind) + switch kind { + case 0: + for _, cloudInfo := range req.CloudInfoList { + l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?", + cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, req.AdapterId, cloudInfo.Id) + syncTask(l.svcCtx.DbEngin, cloudInfo.TaskId) + } + case 2: + for _, hpcInfo := range req.HpcInfoList { + l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", + hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, req.AdapterId, hpcInfo.TaskId, hpcInfo.Name) + syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId) + } + case 1: + for _, aiInfo := range req.AiInfoList { + l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", + aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, req.AdapterId, aiInfo.TaskId, aiInfo.Name) + syncTask(l.svcCtx.DbEngin, aiInfo.TaskId) + } + } + + return &resp, nil +} + +func syncTask(gorm *gorm.DB, taskId int64) { + + var allStatus string + tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) + if tx.Error != nil { + logx.Error(tx.Error) + } + // 子状态统一则修改主任务状态 + statusArray := strings.Split(allStatus, ",") + if len(removeRepeatedElement(statusArray)) == 1 { + updateTask(gorm, taskId, statusArray[0]) + + } + // 子任务包含失败状态 主任务则失败 + if strings.Contains(allStatus, constants.Failed) { + updateTask(gorm, taskId, constants.Failed) + + } + if strings.Contains(allStatus, constants.Running) { + updateTask(gorm, taskId, constants.Running) + } + +} + +func updateTask(gorm *gorm.DB, taskId int64, status string) { + var task models.Task + gorm.Where("id = ? ", taskId).Find(&task) + if task.Status != status { + task.Status = status + gorm.Updates(&task) + } +} + +func removeRepeatedElement(arr []string) (newArr []string) { + newArr = make([]string, 0) + for i := 0; i < len(arr); i++ { + repeat := false + for j := i + 1; j < len(arr); j++ { + if arr[i] == arr[j] { + repeat = true + break + } + } + if !repeat { + newArr = append(newArr, arr[i]) + } + } + return +} From c807d544cac67a4a193e835ef2dbccc9d7b639a9 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 20 Mar 2024 17:35:41 +0800 Subject: [PATCH 5/7] modified ai platform getResourcestats methods and dynamicResources strategy Former-commit-id: ee8836b10b362b2d74596f5dcc5f3ec8a81d78ce --- api/internal/scheduler/common/common.go | 14 +++-- api/internal/scheduler/scheduler.go | 13 +++-- .../scheduler/schedulers/aiScheduler.go | 2 +- .../scheduler/schedulers/option/aiOption.go | 1 + .../scheduler/service/collector/collector.go | 12 +++-- .../scheduler/strategy/dynamicResources.go | 48 +++++++++++++++-- .../strategy/param/resourcePricing.go | 2 +- api/internal/storeLink/octopus.go | 52 ++++++++++++++++--- api/internal/storeLink/shuguangai.go | 44 +++++++++++----- 9 files changed, 148 insertions(+), 40 deletions(-) diff --git a/api/internal/scheduler/common/common.go b/api/internal/scheduler/common/common.go index dd4bf100..ce2ee5e7 100644 --- a/api/internal/scheduler/common/common.go +++ b/api/internal/scheduler/common/common.go @@ -15,18 +15,11 @@ package common import ( - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" + "math" "math/rand" "time" ) -type SubSchedule interface { - GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) - PickOptimalStrategy() (strategy.Strategy, error) - AssignTask(clusters []*strategy.AssignedCluster) error -} - // 求交集 func Intersect(slice1, slice2 []int64) []int64 { m := make(map[int64]int) @@ -90,3 +83,8 @@ func MicsSlice(origin []int64, count int) []int64 { } return result } + +func RoundFloat(val float64, precision uint) float64 { + ratio := math.Pow(10, float64(precision)) + return math.Round(val*ratio) / ratio +} diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index 72eed166..8c3265db 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -22,6 +22,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" @@ -33,7 +34,7 @@ import ( type Scheduler struct { task *response.TaskInfo participantIds []int64 - subSchedule common.SubSchedule + subSchedule SubSchedule dbEngin *gorm.DB result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService @@ -43,7 +44,13 @@ type Scheduler struct { mu sync.RWMutex } -func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { +type SubSchedule interface { + GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) + PickOptimalStrategy() (strategy.Strategy, error) + AssignTask(clusters []*strategy.AssignedCluster) error +} + +func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { var task *response.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { @@ -123,7 +130,7 @@ func (s *Scheduler) TempAssign() error { return nil } -func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error { +func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error { //// 已指定 ParticipantId //if s.task.ParticipantId != 0 { // return nil diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 923a290c..2d5518fa 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -75,7 +75,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil case strategy.DYNAMIC_RESOURCES: - strategy := strategy.NewDynamicResourcesStrategy(resources, as.option) + strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1) return strategy, nil } diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index b30a372c..735a8610 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -4,6 +4,7 @@ type AiOption struct { AiClusterId string // shuguangAi /octopus ClusterId TaskName string ResourceType string // cpu/gpu/compute card + CpuCoreNum int64 TaskType string // pytorch/tensorflow/mindspore DatasetsName string // mnist/imageNet/iris StrategyName string diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index e0bfe24c..c6e68851 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -8,20 +8,22 @@ type AiCollector interface { type ResourceStats struct { ParticipantId int64 Name string - CpuAvail float64 + CpuCoreAvail int64 MemAvail float64 DiskAvail float64 - GpuAvail float64 - CardToHours map[Card]float64 - CpuToHours map[int]float64 + GpuAvail int64 + CardsAvail []*Card + CpuCoreHours float64 Balance float64 } type Card struct { + Platform string Type string Name string TOpsAtFp16 float64 - Price int32 + CardHours float64 + Num int32 } type DatasetsSpecs struct { diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index 7ad0e9d9..ea528311 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -2,6 +2,7 @@ package strategy import ( "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" ) @@ -12,8 +13,8 @@ type DynamicResourcesStrategy struct { opt option.Option } -func NewDynamicResourcesStrategy(resources []*collector.ResourceStats, opt option.Option) *DynamicResourcesStrategy { - return &DynamicResourcesStrategy{resources: resources, opt: opt} +func NewDynamicResourcesStrategy(resources []*collector.ResourceStats, opt option.Option, replica int32) *DynamicResourcesStrategy { + return &DynamicResourcesStrategy{resources: resources, opt: opt, replicas: replica} } func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { @@ -23,8 +24,47 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { switch ps.opt.GetOptionType() { case option.AI: - _ = (interface{})(ps.opt).(*option.AiOption) + opt := (interface{})(ps.opt).(*option.AiOption) + var maxCardHoursAvailable float64 + var maxCpuCoreHoursAvailable float64 + var assignedCluster *AssignedCluster + var results []*AssignedCluster + for _, res := range ps.resources { + if opt.ResourceType == "" { + if res.CpuCoreHours <= 0 { + cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas} + results = append(results, cluster) + return results, nil + } + + if res.CpuCoreHours > maxCpuCoreHoursAvailable { + maxCpuCoreHoursAvailable = res.CpuCoreHours + assignedCluster.Name = res.Name + assignedCluster.ParticipantId = res.ParticipantId + assignedCluster.Replicas = ps.replicas + } + } + + if opt.ResourceType == "" { + var maxCurrentCardHours float64 + for _, card := range res.CardsAvail { + cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3) + if cardHours > maxCurrentCardHours { + maxCurrentCardHours = cardHours + } + } + if maxCurrentCardHours > maxCardHoursAvailable { + maxCardHoursAvailable = maxCurrentCardHours + assignedCluster.Name = res.Name + assignedCluster.ParticipantId = res.ParticipantId + assignedCluster.Replicas = ps.replicas + } + } + } + results = append(results, assignedCluster) + return results, nil } - return nil, nil + + return nil, errors.New("failed to apply DynamicResourcesStrategy") } diff --git a/api/internal/scheduler/strategy/param/resourcePricing.go b/api/internal/scheduler/strategy/param/resourcePricing.go index 89c056c1..6fc05819 100644 --- a/api/internal/scheduler/strategy/param/resourcePricing.go +++ b/api/internal/scheduler/strategy/param/resourcePricing.go @@ -23,7 +23,7 @@ func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider { for _, resource := range r.Resources { provider := providerPricing.NewProvider( resource.ParticipantId, - resource.CpuAvail, + float64(resource.CpuCoreAvail), resource.MemAvail, resource.DiskAvail, 0.0, 0.0, 0.0) providerList = append(providerList, provider) diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 4dc7f162..5342d8ce 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -60,6 +60,10 @@ var ( MLU: CAMBRICON, GCU: ENFLAME, } + cardTopsMap = map[string]float64{ + MLU: CAMBRICONMLU290, + GCU: EnflameT20, + } ) func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink { @@ -245,13 +249,49 @@ func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) { return nil, errors.New(balanceResp.Error.Message) } - //resourceStat := collector.ResourceStats{} - // - //for _, spec := range specResp.TrainResourceSpecs { - // - //} + var cards []*collector.Card + balance := float64(balanceResp.Payload.BillingUser.Amount) + var cpuHours float64 + for _, spec := range specResp.TrainResourceSpecs { + if spec.Price == 0 { + ns := strings.Split(spec.Name, COMMA) + if len(ns) == 2 { + nss := strings.Split(ns[0], COLON) + if nss[0] == CPU { + cpuHours = -1 + } + } + } - return nil, nil + if spec.Price == 1 { + ns := strings.Split(spec.Name, COMMA) + cardSpecs := strings.Split(ns[0], STAR) + + cardTops, isMapContainsKey := cardTopsMap[cardSpecs[1]] + if !isMapContainsKey { + continue + } + + card := &collector.Card{ + Platform: OCTOPUS, + Type: CARD, + Name: cardSpecs[1], + TOpsAtFp16: cardTops, + CardHours: balance / spec.Price, + } + cards = append(cards, card) + } + } + + resourceStats := &collector.ResourceStats{ + ParticipantId: o.participantId, + Name: o.platform, + Balance: balance, + CardsAvail: cards, + CpuCoreHours: cpuHours, + } + + return resourceStats, nil } func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 0e2e5dd6..63a43cf9 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -17,6 +17,7 @@ package storeLink import ( "context" "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" @@ -42,6 +43,8 @@ const ( DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset" ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm" TRAIN_FILE = "train.py" + CPUCOREPRICEPERHOUR = 0.09 + DCUPRICEPERHOUR = 2.0 ) var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ @@ -268,24 +271,41 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { if err != nil { return nil, err } - limitReq := &hpcAC.QueueReq{} - _, err = s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) - if err != nil { - return nil, err - } - diskReq := &hpcAC.ParaStorQuotaReq{} - _, err = s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) - if err != nil { - return nil, err - } + //limitReq := &hpcAC.QueueReq{} + //limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) + //if err != nil { + // return nil, err + //} + + //diskReq := &hpcAC.ParaStorQuotaReq{} + //diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) + //if err != nil { + // return nil, err + //} + + var cards []*collector.Card balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) - _ = &collector.ResourceStats{ + cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3) + cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3) + + dcu := &collector.Card{ + Platform: SHUGUANGAI, + Type: CARD, + Name: DCU, + TOpsAtFp16: DCU_TOPS, + CardHours: cardHours, + } + cards = append(cards, dcu) + resourceStats := &collector.ResourceStats{ ParticipantId: s.participantId, Name: s.platform, Balance: balance, + CardsAvail: cards, + CpuCoreHours: cpuHours, } - return nil, nil + + return resourceStats, nil } func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { From 021092b6b46cd512c7c11bd46259198d133313cc Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Thu, 21 Mar 2024 09:11:12 +0800 Subject: [PATCH 6/7] Providing metrics information to Prometheus Former-commit-id: fcca6c8438877872d6c6e65bde522039f1a8668d --- api/desc/core/pcm-core.api | 318 +++++++++--------- api/internal/config/config.go | 1 + api/internal/cron/participant.go | 2 +- api/internal/handler/routes.go | 10 - .../logic/cloud/controllermetricslogic.go | 6 +- .../logic/core/centerresourceslogic.go | 62 ++-- .../logic/core/syncclusterloadlogic.go | 15 +- api/internal/svc/servicecontext.go | 14 +- api/internal/types/types.go | 19 +- pkg/tracker/interface.go | 6 +- pkg/tracker/promql.go | 63 ++-- pkg/tracker/queryoptions.go | 25 +- pkg/tracker/tracker.go | 59 +++- 13 files changed, 322 insertions(+), 278 deletions(-) diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index 8104cae7..cab53a20 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -24,9 +24,9 @@ type ( } CenterIndex { name string `json:"name"` - cpu float32 `json:"cpu"` - memory float32 `json:"memory"` - storage float32 `json:"storage"` + cpu string `json:"cpu"` + memory string `json:"memory"` + storage string `json:"storage"` centerType string `json:"centerType"` } ) @@ -35,9 +35,16 @@ type ( clusterLoadRecords []ClusterLoadRecord `json:"clusterLoadRecords"` } ClusterLoadRecord { + AdapterId int64 `json:"adapterId"` ClusterName string `json:"clusterName"` + CpuAvail float64 `json:"cpuAvail"` + CpuTotal float64 `json:"cpuTotal"` CpuUsage float64 `json:"cpuUsage"` + MemoryAvail float64 `json:"memoryAvail"` MemoryUsage float64 `json:"memoryUsage"` + MemoryTotal float64 `json:"memoryTotal"` + DiskAvail float64 `json:"diskAvail"` + DiskTotal float64 `json:"diskTotal"` DiskUsage float64 `json:"diskUsage"` } ) @@ -79,7 +86,6 @@ type commitTaskReq { MatchLabels map[string]string `json:"matchLabels,optional"` YamlList []string `json:"yamlList"` ClusterName string `json:"clusterName"` - } type ( @@ -536,7 +542,7 @@ type ( type ( AdapterQueryReq { - Id string `form:"id,optional" db:"id"` + Id string `form:"id,optional" db:"id"` Name string `form:"name,optional"` Type string `form:"type,optional"` Nickname string `form:"nickname,optional"` @@ -545,7 +551,7 @@ type ( PageInfo } AdapterReq { - Id string `json:"id,optional" db:"id"` + Id string `json:"id,optional" db:"id"` Name string `json:"name,optional"` Type string `json:"type,optional"` Nickname string `json:"nickname,optional"` @@ -553,7 +559,7 @@ type ( Server string `json:"server,optional"` } AdapterCreateReq { - Id string `json:"id,optional" db:"id"` + Id string `json:"id,optional" db:"id"` Name string `json:"name"` Type string `json:"type"` Nickname string `json:"nickname"` @@ -561,7 +567,7 @@ type ( Server string `json:"server"` } AdapterDelReq { - Id string `form:"id,optional" db:"id"` + Id string `form:"id,optional" db:"id"` } AdapterInfo { Id string `json:"id,omitempty" db:"id"` @@ -595,79 +601,79 @@ type ( type ( ClusterReq { - Id string `form:"id,optional"` - AdapterId string `form:"adapterId,optional"` - Name string `form:"name,optional"` - Nickname string `form:"nickname,optional"` - Description string `form:"description,optional"` - Server string `form:"server,optional"` - MonitorServer string `form:"monitorServer,optional"` - Username string `form:"username,optional"` - Password string `form:"password,optional"` - Token string `form:"token,optional"` - Ak string `form:"ak,optional"` - Sk string `form:"sk,optional"` - Region string `form:"region,optional"` - ProjectId string `form:"projectId,optional"` - Version string `form:"version,optional"` - Label string `form:"label,optional"` - OwnerId string `form:"ownerId,omitempty,optional"` - AuthType string `form:"authType,optional"` - Type string `form:"type,optional"` - producerDict string `form:"producerDict,optional"` - regionDict string `form:"regionDict,optional"` + Id string `form:"id,optional"` + AdapterId string `form:"adapterId,optional"` + Name string `form:"name,optional"` + Nickname string `form:"nickname,optional"` + Description string `form:"description,optional"` + Server string `form:"server,optional"` + MonitorServer string `form:"monitorServer,optional"` + Username string `form:"username,optional"` + Password string `form:"password,optional"` + Token string `form:"token,optional"` + Ak string `form:"ak,optional"` + Sk string `form:"sk,optional"` + Region string `form:"region,optional"` + ProjectId string `form:"projectId,optional"` + Version string `form:"version,optional"` + Label string `form:"label,optional"` + OwnerId string `form:"ownerId,omitempty,optional"` + AuthType string `form:"authType,optional"` + Type string `form:"type,optional"` + producerDict string `form:"producerDict,optional"` + regionDict string `form:"regionDict,optional"` PageInfo } ClusterCreateReq { - Id string `json:"id,optional"` - AdapterId string `json:"adapterId,optional"` - Name string `json:"name,optional"` - Nickname string `json:"nickname,optional"` - Description string `json:"description,optional"` - Server string `json:"server,optional"` - MonitorServer string `json:"monitorServer,optional"` - Username string `json:"username,optional"` - Password string `json:"password,optional"` - Token string `json:"token,optional"` - Ak string `json:"ak,optional"` - Sk string `json:"sk,optional"` - Region string `json:"region,optional"` - ProjectId string `json:"projectId,optional"` - Version string `json:"version,optional"` - Label string `json:"label,optional"` - OwnerId string `json:"ownerId,omitempty,optional"` - AuthType string `json:"authType,optional"` - producerDict string `json:"producerDict,optional"` - regionDict string `json:"regionDict,optional"` + Id string `json:"id,optional"` + AdapterId string `json:"adapterId,optional"` + Name string `json:"name,optional"` + Nickname string `json:"nickname,optional"` + Description string `json:"description,optional"` + Server string `json:"server,optional"` + MonitorServer string `json:"monitorServer,optional"` + Username string `json:"username,optional"` + Password string `json:"password,optional"` + Token string `json:"token,optional"` + Ak string `json:"ak,optional"` + Sk string `json:"sk,optional"` + Region string `json:"region,optional"` + ProjectId string `json:"projectId,optional"` + Version string `json:"version,optional"` + Label string `json:"label,optional"` + OwnerId string `json:"ownerId,omitempty,optional"` + AuthType string `json:"authType,optional"` + producerDict string `json:"producerDict,optional"` + regionDict string `json:"regionDict,optional"` } ClusterInfo { - Id string `json:"id,omitempty" db:"id"` - AdapterId string `json:"adapterId,omitempty" db:"adapter_id"` - Name string `json:"name,omitempty" db:"name"` - Nickname string `json:"nickname,omitempty" db:"nickname"` - Description string `json:"description,omitempty" db:"description"` - Server string `json:"server,omitempty" db:"server"` - MonitorServer string `json:"monitorServer,omitempty" db:"monitor_server"` - Username string `json:"username,omitempty" db:"username"` - Password string `json:"password,omitempty" db:"password"` - Token string `json:"token,omitempty" db:"token"` - Ak string `json:"ak,omitempty" db:"ak"` - Sk string `json:"sk,omitempty" db:"sk"` - Region string `json:"region,omitempty" db:"region"` - ProjectId string `json:"projectId,omitempty" db:"project_id"` - Version string `json:"version,omitempty" db:"version"` - Label string `json:"label,omitempty" db:"label"` - OwnerId string `json:"ownerId,omitempty" db:"owner_id"` - AuthType string `json:"authType,omitempty" db:"auth_type"` - producerDict string `json:"producerDict,omitempty" db:"producer_dict"` - regionDict string `json:"regionDict,omitempty" db:"region_dict"` - CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + Id string `json:"id,omitempty" db:"id"` + AdapterId string `json:"adapterId,omitempty" db:"adapter_id"` + Name string `json:"name,omitempty" db:"name"` + Nickname string `json:"nickname,omitempty" db:"nickname"` + Description string `json:"description,omitempty" db:"description"` + Server string `json:"server,omitempty" db:"server"` + MonitorServer string `json:"monitorServer,omitempty" db:"monitor_server"` + Username string `json:"username,omitempty" db:"username"` + Password string `json:"password,omitempty" db:"password"` + Token string `json:"token,omitempty" db:"token"` + Ak string `json:"ak,omitempty" db:"ak"` + Sk string `json:"sk,omitempty" db:"sk"` + Region string `json:"region,omitempty" db:"region"` + ProjectId string `json:"projectId,omitempty" db:"project_id"` + Version string `json:"version,omitempty" db:"version"` + Label string `json:"label,omitempty" db:"label"` + OwnerId string `json:"ownerId,omitempty" db:"owner_id"` + AuthType string `json:"authType,omitempty" db:"auth_type"` + producerDict string `json:"producerDict,omitempty" db:"producer_dict"` + regionDict string `json:"regionDict,omitempty" db:"region_dict"` + CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` } ) type ClusterDelReq { - Id string `form:"id,optional"` + Id string `form:"id,optional"` } type ClusterResp { @@ -696,66 +702,66 @@ type ClusterRelationInfo { Version string `json:"version,omitempty" db:"version"` Server string `json:"server,omitempty" db:"server"` CreateTime string `json:"createTime,omitempty" db:"create_time" gorm:"autoCreateTime"` - CId string `json:"cId,omitempty" db:"id"` - CAdapterId string `json:"cAdapterId,omitempty" db:"adapter_id"` - CName string `json:"cName,omitempty" db:"name"` - CNickname string `json:"cNickname,omitempty" db:"nickname"` - CDescription string `json:"cDescription,omitempty" db:"description"` - CServer string `json:"cServer,omitempty" db:"server"` - CMonitorServer string `json:"cMonitorServer,omitempty" db:"monitor_server"` - CUsername string `json:"cUsername,omitempty" db:"username"` - CPassword string `json:"cPassword,omitempty" db:"password"` - CToken string `json:"cToken,omitempty" db:"token"` - CAk string `json:"cAk,omitempty" db:"ak"` - CSk string `json:"cSk,omitempty" db:"sk"` - CRegion string `json:"cRegion,omitempty" db:"region"` - CProjectId string `json:"cProjectId,omitempty" db:"project_id"` - CVersion string `json:"cVersion,omitempty" db:"version"` - CLabel string `json:"cLabel,omitempty" db:"label"` - COwnerId string `json:"cOwnerId,omitempty" db:"owner_id"` - CAuthType string `json:"cAuthType,omitempty" db:"auth_type"` - CCreateTime string `json:"cCreateTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + CId string `json:"cId,omitempty" db:"id"` + CAdapterId string `json:"cAdapterId,omitempty" db:"adapter_id"` + CName string `json:"cName,omitempty" db:"name"` + CNickname string `json:"cNickname,omitempty" db:"nickname"` + CDescription string `json:"cDescription,omitempty" db:"description"` + CServer string `json:"cServer,omitempty" db:"server"` + CMonitorServer string `json:"cMonitorServer,omitempty" db:"monitor_server"` + CUsername string `json:"cUsername,omitempty" db:"username"` + CPassword string `json:"cPassword,omitempty" db:"password"` + CToken string `json:"cToken,omitempty" db:"token"` + CAk string `json:"cAk,omitempty" db:"ak"` + CSk string `json:"cSk,omitempty" db:"sk"` + CRegion string `json:"cRegion,omitempty" db:"region"` + CProjectId string `json:"cProjectId,omitempty" db:"project_id"` + CVersion string `json:"cVersion,omitempty" db:"version"` + CLabel string `json:"cLabel,omitempty" db:"label"` + COwnerId string `json:"cOwnerId,omitempty" db:"owner_id"` + CAuthType string `json:"cAuthType,omitempty" db:"auth_type"` + CCreateTime string `json:"cCreateTime,omitempty" db:"created_time" gorm:"autoCreateTime"` } type ( DictInfo { - Id string `json:"id,omitempty"` - DictName string `json:"dictName,omitempty"` - DictCode string `json:"dictCode,omitempty"` - Description string `json:"description,omitempty"` - Type string `json:"type,omitempty" db:"type"` - Status string `json:"status,omitempty" db:"status"` - CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + Id string `json:"id,omitempty"` + DictName string `json:"dictName,omitempty"` + DictCode string `json:"dictCode,omitempty"` + Description string `json:"description,omitempty"` + Type string `json:"type,omitempty" db:"type"` + Status string `json:"status,omitempty" db:"status"` + CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` } DictReq { - Id string `form:"id,optional"` - DictName string `form:"dictName,optional"` - DictCode string `form:"dictCode,optional"` + Id string `form:"id,optional"` + DictName string `form:"dictName,optional"` + DictCode string `form:"dictCode,optional"` Description string `form:"description,optional"` - Type string `form:"type,optional"` - Status string `form:"status,optional"` + Type string `form:"type,optional"` + Status string `form:"status,optional"` PageInfo } DictEditReq { - Id string `json:"id,optional"` - DictName string `json:"dictName,optional"` - DictCode string `json:"dictCode,optional"` + Id string `json:"id,optional"` + DictName string `json:"dictName,optional"` + DictCode string `json:"dictCode,optional"` Description string `json:"description,optional"` - Type string `json:"type,optional"` - Status string `json:"status,optional"` + Type string `json:"type,optional"` + Status string `json:"status,optional"` } DictResp { - Id string `json:"id,omitempty"` - DictName string `json:"dictName,omitempty"` - DictCode string `json:"dictCode,omitempty"` - Description string `json:"description,omitempty"` - Type string `json:"type,omitempty"` - Status string `json:"status,omitempty"` - CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` - DictItemInfo []*DictItemInfo `json:"dictItemInfo,omitempty"` + Id string `json:"id,omitempty"` + DictName string `json:"dictName,omitempty"` + DictCode string `json:"dictCode,omitempty"` + Description string `json:"description,omitempty"` + Type string `json:"type,omitempty"` + Status string `json:"status,omitempty"` + CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + DictItemInfo []*DictItemInfo `json:"dictItemInfo,omitempty"` } Dicts { @@ -764,55 +770,55 @@ type ( DictItemInfo { - Id string `json:"id,omitempty"` - DictId string `json:"dictId,omitempty"` - ItemText string `json:"itemText,omitempty"` - ItemValue string `json:"itemValue,omitempty"` + Id string `json:"id,omitempty"` + DictId string `json:"dictId,omitempty"` + ItemText string `json:"itemText,omitempty"` + ItemValue string `json:"itemValue,omitempty"` Description string `json:"description,omitempty"` - SortOrder string `json:"sortOrder,omitempty"` - Type string `json:"type,omitempty" db:"type"` - ParentId string `json:"parentId,omitempty"` - Status string `json:"status,omitempty" db:"status"` - CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + SortOrder string `json:"sortOrder,omitempty"` + Type string `json:"type,omitempty" db:"type"` + ParentId string `json:"parentId,omitempty"` + Status string `json:"status,omitempty" db:"status"` + CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` } DictItemReq { - Id string `form:"id,optional"` - DictId string `form:"dictId,optional"` - ItemText string `form:"itemText,optional"` - ItemValue string `form:"itemValue,optional"` + Id string `form:"id,optional"` + DictId string `form:"dictId,optional"` + ItemText string `form:"itemText,optional"` + ItemValue string `form:"itemValue,optional"` Description string `form:"description,optional"` - SortOrder string `form:"sortOrder,optional"` - Type string `form:"type,optional"` - ParentId string `form:"parentId,optional"` - Status string `form:"status,optional"` + SortOrder string `form:"sortOrder,optional"` + Type string `form:"type,optional"` + ParentId string `form:"parentId,optional"` + Status string `form:"status,optional"` PageInfo } DictItemEditReq { - Id string `json:"id,optional"` - DictId string `json:"dictId,optional"` - ItemText string `json:"itemText,optional"` - ItemValue string `json:"itemValue,optional"` + Id string `json:"id,optional"` + DictId string `json:"dictId,optional"` + ItemText string `json:"itemText,optional"` + ItemValue string `json:"itemValue,optional"` Description string `json:"description,optional"` - SortOrder string `json:"sortOrder,optional"` - Type string `json:"type,optional"` - ParentId string `json:"parentId,optional"` - Status string `json:"status,optional"` + SortOrder string `json:"sortOrder,optional"` + Type string `json:"type,optional"` + ParentId string `json:"parentId,optional"` + Status string `json:"status,optional"` } DictItemResp { - Id string `json:"id,omitempty"` - DictId string `json:"dictId,omitempty"` - ItemText string `json:"itemText,omitempty"` - ItemValue string `json:"itemValue,omitempty"` + Id string `json:"id,omitempty"` + DictId string `json:"dictId,omitempty"` + ItemText string `json:"itemText,omitempty"` + ItemValue string `json:"itemValue,omitempty"` Description string `json:"description,omitempty"` - SortOrder string `json:"sortOrder,omitempty"` - Type string `json:"type,omitempty"` - ParentId string `json:"parentId,omitempty"` - Status string `json:"status,omitempty"` - CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` - DictInfo *DictInfo `json:"dictInfo,omitempty"` + SortOrder string `json:"sortOrder,omitempty"` + Type string `json:"type,omitempty"` + ParentId string `json:"parentId,omitempty"` + Status string `json:"status,omitempty"` + CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + DictInfo *DictInfo `json:"dictInfo,omitempty"` } DictItems { @@ -840,14 +846,14 @@ type ( type ( PageInfo { - PageNum int `form:"pageNum"` - PageSize int `form:"pageSize"` + PageNum int `form:"pageNum"` + PageSize int `form:"pageSize"` } PageResult { - List interface{} `json:"list,omitempty"` - Total int64 `json:"total,omitempty"` - PageNum int `json:"pageNum,omitempty"` - PageSize int `json:"pageSize,omitempty"` + List interface{} `json:"list,omitempty"` + Total int64 `json:"total,omitempty"` + PageNum int `json:"pageNum,omitempty"` + PageSize int `json:"pageSize,omitempty"` } ) \ No newline at end of file diff --git a/api/internal/config/config.go b/api/internal/config/config.go index dee5d7ed..3debd22e 100644 --- a/api/internal/config/config.go +++ b/api/internal/config/config.go @@ -49,6 +49,7 @@ type Config struct { Password string } SnowflakeConf SnowflakeConf + PromUrl string } // SnowflakeConf 雪花算法机器id配置 diff --git a/api/internal/cron/participant.go b/api/internal/cron/participant.go index 9ee53989..7d085bc9 100644 --- a/api/internal/cron/participant.go +++ b/api/internal/cron/participant.go @@ -33,7 +33,7 @@ func SyncParticipantRpc(svc *svc.ServiceContext) { if err != nil { return } - svc.PromClient[participant.Id] = promClient + svc.MonitorClient[participant.Id] = promClient } } } diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index e8c4b9a9..6822d544 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -134,16 +134,6 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/core/metrics", Handler: core.MetricsHandler(serverCtx), }, - { - Method: http.MethodGet, - Path: "/core/pullTaskInfo", - Handler: core.PullTaskInfoHandler(serverCtx), - }, - { - Method: http.MethodGet, - Path: "/core/pushTaskInfo", - Handler: core.PushTaskInfoHandler(serverCtx), - }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/api/internal/logic/cloud/controllermetricslogic.go b/api/internal/logic/cloud/controllermetricslogic.go index f0ade821..50409bf6 100644 --- a/api/internal/logic/cloud/controllermetricslogic.go +++ b/api/internal/logic/cloud/controllermetricslogic.go @@ -27,14 +27,14 @@ func NewControllerMetricsLogic(ctx context.Context, svcCtx *svc.ServiceContext) func (l *ControllerMetricsLogic) ControllerMetrics(req *types.ControllerMetricsReq) (resp *types.ControllerMetricsResp, err error) { resp = &types.ControllerMetricsResp{} - if _, ok := l.svcCtx.PromClient[req.ParticipantId]; ok { + if _, ok := l.svcCtx.MonitorClient[req.ParticipantId]; ok { if len(req.Pod) != 0 { - resp.Data = l.svcCtx.PromClient[req.ParticipantId].GetNamedMetricsByTime(req.Metrics, req.Start, req.End, 60*time.Minute, tracker.PodOption{ + resp.Data = l.svcCtx.MonitorClient[req.ParticipantId].GetNamedMetricsByTime(req.Metrics, req.Start, req.End, 60*time.Minute, tracker.PodOption{ PodName: req.Pod, }) } else { - resp.Data = l.svcCtx.PromClient[req.ParticipantId].GetNamedMetricsByTime(req.Metrics, req.Start, req.End, 60*time.Minute, tracker.ControllerOption{ + resp.Data = l.svcCtx.MonitorClient[req.ParticipantId].GetNamedMetricsByTime(req.Metrics, req.Start, req.End, 60*time.Minute, tracker.ControllerOption{ WorkloadName: req.WorkloadName, }) } diff --git a/api/internal/logic/core/centerresourceslogic.go b/api/internal/logic/core/centerresourceslogic.go index be3f6da5..714eeac3 100644 --- a/api/internal/logic/core/centerresourceslogic.go +++ b/api/internal/logic/core/centerresourceslogic.go @@ -2,6 +2,8 @@ package core import ( "context" + "github.com/prometheus/common/model" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,31 +26,41 @@ func NewCenterResourcesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *C } func (l *CenterResourcesLogic) CenterResources() (resp *types.CenterResourcesResp, err error) { - // todo: add your logic here and delete this line - centerIndex1 := types.CenterIndex{ - Name: "阿里云", - Cpu: float32(12.33), - Memory: float32(64.55), - Storage: float32(33.90), - CenterType: "cloud", - } - centerIndex2 := types.CenterIndex{ - Name: "A超算中心", - Cpu: float32(42.36), - Memory: float32(66.55), - Storage: float32(23.231), - CenterType: "hpc", - } - centerIndex3 := types.CenterIndex{ - Name: "智算中心", - Cpu: float32(78.33), - Memory: float32(36.55), - Storage: float32(88.93), - CenterType: "ai", - } resp = &types.CenterResourcesResp{} - resp.CentersIndex = append(resp.CentersIndex, centerIndex1) - resp.CentersIndex = append(resp.CentersIndex, centerIndex2) - resp.CentersIndex = append(resp.CentersIndex, centerIndex3) + rawData, err := l.svcCtx.PromClient.GetRawData("resource_top3", tracker.ClusterOption{}) + if err != nil { + return nil, err + } + var centersIndex []*types.CenterIndex + data := rawData.(model.Vector) + for _, d := range data { + for _, v := range d.Metric { + centersIndex = append(centersIndex, &types.CenterIndex{Name: string(v)}) + } + } + for _, centerIndex := range centersIndex { + // Query the types of resource centers + //l.svcCtx.DbEngin.Raw().Scan(¢erIndex.CenterType) + cpuRawData, err := l.svcCtx.PromClient.GetRawData("cluster_cpu_usage", tracker.ClusterOption{ClusterName: centerIndex.Name}) + cpuData := cpuRawData.(model.Vector) + if err != nil { + return nil, err + } + centerIndex.Cpu = cpuData[0].Value.String() + memoryRawData, err := l.svcCtx.PromClient.GetRawData("cluster_memory_usage", tracker.ClusterOption{ClusterName: centerIndex.Name}) + if err != nil { + return nil, err + } + memoryData := memoryRawData.(model.Vector) + + centerIndex.Memory = memoryData[0].Value.String() + diskRawData, err := l.svcCtx.PromClient.GetRawData("cluster_disk_usage", tracker.ClusterOption{ClusterName: centerIndex.Name}) + if err != nil { + return nil, err + } + diskData := diskRawData.(model.Vector) + centerIndex.Storage = diskData[0].Value.String() + resp.CentersIndex = append(resp.CentersIndex, *centerIndex) + } return resp, nil } diff --git a/api/internal/logic/core/syncclusterloadlogic.go b/api/internal/logic/core/syncclusterloadlogic.go index 2c0f87e9..75f10c5e 100644 --- a/api/internal/logic/core/syncclusterloadlogic.go +++ b/api/internal/logic/core/syncclusterloadlogic.go @@ -5,6 +5,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" + "strconv" "github.com/zeromicro/go-zero/core/logx" ) @@ -26,9 +27,17 @@ func NewSyncClusterLoadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *S func (l *SyncClusterLoadLogic) SyncClusterLoad(req *types.SyncClusterLoadReq) error { if len(req.ClusterLoadRecords) != 0 { for _, record := range req.ClusterLoadRecords { - tracker.ClusterCpuGauge.WithLabelValues(record.ClusterName).Set(record.CpuUsage) - tracker.ClusterMemoryGauge.WithLabelValues(record.ClusterName).Set(record.MemoryUsage) - tracker.ClusterDiskGauge.WithLabelValues(record.ClusterName).Set(record.DiskUsage) + tracker.ClusterCpuUsageGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuUsage) + tracker.ClusterCpuAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuAvail) + tracker.ClusterCpuTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuTotal) + + tracker.ClusterMemoryUsageGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryUsage) + tracker.ClusterMemoryAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryAvail) + tracker.ClusterMemoryTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryTotal) + + tracker.ClusterDiskUsageGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskUsage) + tracker.ClusterDiskAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskAvail) + tracker.ClusterDiskTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskTotal) } } return nil diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index e165ceb6..e96cf601 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -59,8 +59,9 @@ type ServiceContext struct { Downloader *s3manager.Downloader Uploader *s3manager.Uploader K8sRpc kubernetesclient.Kubernetes - PromClient map[int64]tracker.Prometheus + MonitorClient map[int64]tracker.Prometheus ParticipantRpc participantservice.ParticipantService + PromClient tracker.Prometheus } func NewServiceContext(c config.Config) *ServiceContext { @@ -72,8 +73,14 @@ func NewServiceContext(c config.Config) *ServiceContext { DisableSSL: aws.Bool(false), //是否禁用https,这里表示不禁用,即使用HTTPS S3ForcePathStyle: aws.Bool(true), //使用路径样式而非虚拟主机样式,区别请参考:https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html }) + promClient, err := tracker.NewPrometheus(c.PromUrl) + if err != nil { + logx.Errorf("InitPrometheus err: %v", err) + panic("InitSnowflake err") + } + //添加snowflake支持 - err := utils.InitSnowflake(c.SnowflakeConf.MachineId) + err = utils.InitSnowflake(c.SnowflakeConf.MachineId) if err != nil { logx.Errorf("InitSnowflake err: %v", err) panic("InitSnowflake err") @@ -122,10 +129,11 @@ func NewServiceContext(c config.Config) *ServiceContext { OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)), - PromClient: make(map[int64]tracker.Prometheus), + MonitorClient: make(map[int64]tracker.Prometheus), ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)), DockerClient: dockerClient, Downloader: downloader, Uploader: uploader, + PromClient: promClient, } } diff --git a/api/internal/types/types.go b/api/internal/types/types.go index da62e19a..b502caac 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -17,11 +17,11 @@ type CenterResourcesResp struct { } type CenterIndex struct { - Name string `json:"name"` - Cpu float32 `json:"cpu"` - Memory float32 `json:"memory"` - Storage float32 `json:"storage"` - CenterType string `json:"centerType"` + Name string `json:"name"` + Cpu string `json:"cpu"` + Memory string `json:"memory"` + Storage string `json:"storage"` + CenterType string `json:"centerType"` } type SyncClusterLoadReq struct { @@ -29,9 +29,16 @@ type SyncClusterLoadReq struct { } type ClusterLoadRecord struct { + AdapterId int64 `json:"adapterId"` ClusterName string `json:"clusterName"` + CpuAvail float64 `json:"cpuAvail"` + CpuTotal float64 `json:"cpuTotal"` CpuUsage float64 `json:"cpuUsage"` + MemoryAvail float64 `json:"memoryAvail"` MemoryUsage float64 `json:"memoryUsage"` + MemoryTotal float64 `json:"memoryTotal"` + DiskAvail float64 `json:"diskAvail"` + DiskTotal float64 `json:"diskTotal"` DiskUsage float64 `json:"diskUsage"` } @@ -93,7 +100,7 @@ type CommitVmTaskReq struct { NsID string `json:"nsID"` Replicas int64 `json:"replicas,optional"` MatchLabels map[string]string `json:"matchLabels,optional"` - Server ServerCommit `json:"server,optional"` + Server []ServerCommit `json:"server,optional"` Platform string `json:"platform,optional"` } diff --git a/pkg/tracker/interface.go b/pkg/tracker/interface.go index 66cdb44b..f7362c03 100644 --- a/pkg/tracker/interface.go +++ b/pkg/tracker/interface.go @@ -14,7 +14,10 @@ package tracker -import "time" +import ( + "github.com/prometheus/common/model" + "time" +) type Interface interface { //GetMetric(expr string, time time.Time) Metric @@ -27,4 +30,5 @@ type Interface interface { //// meter //GetNamedMeters(meters []string, time time.Time, opts []QueryOption) []Metric //GetNamedMetersOverTime(metrics []string, start, end time.Time, step time.Duration, opts []QueryOption) []Metric + GetRawData(expr string, o QueryOption) (model.Value, error) } diff --git a/pkg/tracker/promql.go b/pkg/tracker/promql.go index db9e5586..a3da8989 100644 --- a/pkg/tracker/promql.go +++ b/pkg/tracker/promql.go @@ -27,37 +27,15 @@ const ( var promQLTemplates = map[string]string{ - //namespace - "namespace_cpu_usage": `round(namespace:container_cpu_usage_seconds_total:sum_rate{namespace!="", $1}, 0.001)`, - "namespace_memory_usage": `namespace:container_memory_usage_bytes:sum{namespace!="", $1}`, - "namespace_memory_usage_wo_cache": `namespace:container_memory_usage_bytes_wo_cache:sum{namespace!="", $1}`, - "namespace_net_bytes_transmitted": `sum by (namespace) (irate(container_network_transmit_bytes_total{namespace!="", pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m]) * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`, - "namespace_net_bytes_received": `sum by (namespace) (irate(container_network_receive_bytes_total{namespace!="", pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m]) * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`, - "namespace_pod_count": `sum by (namespace) (kube_pod_status_phase{phase!~"Failed|Succeeded", namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`, - "namespace_pod_running_count": `sum by (namespace) (kube_pod_status_phase{phase="Running", namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`, - "namespace_pod_succeeded_count": `sum by (namespace) (kube_pod_status_phase{phase="Succeeded", namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`, - "namespace_pod_abnormal_count": `namespace:pod_abnormal:count{namespace!="", $1}`, - "namespace_pod_abnormal_ratio": `namespace:pod_abnormal:ratio{namespace!="", $1}`, - "namespace_memory_limit_hard": `min by (namespace) (kube_resourcequota{resourcequota!="quota", type="hard", namespace!="", resource="limits.memory"} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_cpu_limit_hard": `min by (namespace) (kube_resourcequota{resourcequota!="quota", type="hard", namespace!="", resource="limits.cpu"} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_pod_count_hard": `min by (namespace) (kube_resourcequota{resourcequota!="quota", type="hard", namespace!="", resource="count/pods"} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_cronjob_count": `sum by (namespace) (kube_cronjob_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_pvc_count": `sum by (namespace) (kube_persistentvolumeclaim_info{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_daemonset_count": `sum by (namespace) (kube_daemonset_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_deployment_count": `sum by (namespace) (kube_deployment_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_endpoint_count": `sum by (namespace) (kube_endpoint_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_hpa_count": `sum by (namespace) (kube_horizontalpodautoscaler_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_job_count": `sum by (namespace) (kube_job_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_statefulset_count": `sum by (namespace) (kube_statefulset_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_replicaset_count": `count by (namespace) (kube_replicaset_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_service_count": `sum by (namespace) (kube_service_info{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_secret_count": `sum by (namespace) (kube_secret_info{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_configmap_count": `sum by (namespace) (kube_configmap_info{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_ingresses_extensions_count": `sum by (namespace) (kube_ingress_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - "namespace_s2ibuilder_count": `sum by (namespace) (s2i_s2ibuilder_created{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`, - - "controller_cpu_usage_rate": `sum( node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{$1}) by (workload)/sum( kube_pod_container_resource_limits{job="kube-state-metrics", resource="cpu"}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{ }) by (workload)`, - "controller_memory_usage_rate": `sum( container_memory_working_set_bytes{job="kubelet", metrics_path="/metrics/cadvisor", container!="", image!=""} * on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{$1}) by (workload)/sum( kube_pod_container_resource_limits{job="kube-state-metrics", resource="memory"}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{ }) by (workload)`, + "cluster_cpu_usage": "sum by (cluster_name)(cluster_cpu_usage{$1})", + "cluster_memory_usage": "sum by (cluster_name)(cluster_memory_usage{$1})", + "cluster_disk_usage": "sum by (cluster_name)(cluster_disk_usage{$1})", + "resource_top3": "topk(3,sum by (cluster_name)(cluster_cpu_usage +cluster_memory_usage +cluster_disk_usage)/3)", + "namespace_cpu_usage": `round(namespace:container_cpu_usage_seconds_total:sum_rate{namespace!="", $1}, 0.001)`, + "namespace_memory_usage": `namespace:container_memory_usage_bytes:sum{namespace!="", $1}`, + "namespace_memory_usage_wo_cache": `namespace:container_memory_usage_bytes_wo_cache:sum{namespace!="", $1}`, + "controller_cpu_usage_rate": `sum( node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{$1}) by (workload)/sum( kube_pod_container_resource_limits{job="kube-state-metrics", resource="cpu"}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{ }) by (workload)`, + "controller_memory_usage_rate": `sum( container_memory_working_set_bytes{job="kubelet", metrics_path="/metrics/cadvisor", container!="", image!=""} * on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{$1}) by (workload)/sum( kube_pod_container_resource_limits{job="kube-state-metrics", resource="memory"}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{ }) by (workload)`, // pod "pod_cpu_usage": `round(sum by (namespace, pod) (irate(container_cpu_usage_seconds_total{job="kubelet", pod!="", image!=""}[5m])) * on (namespace, pod) group_left(owner_kind,owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}, 0.001)`, "pod_cpu_usage_rate": `sum(node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{ $1}) by (pod) / sum(kube_pod_container_resource_limits{ $1,unit="core"}) by (pod)`, @@ -81,7 +59,7 @@ func makeExpr(metric string, opts QueryOptions) string { tmpl := promQLTemplates[metric] switch opts.Level { case LevelCluster: - return tmpl + return makeClusterMetricExpr(tmpl, opts) case LevelNode: return makeNodeMetricExpr(tmpl, opts) case LevelWorkspace: @@ -105,6 +83,14 @@ func makeExpr(metric string, opts QueryOptions) string { } } +func makeClusterMetricExpr(tmpl string, o QueryOptions) string { + var clusterSelector string + if o.ClusterName != "" { + clusterSelector = fmt.Sprintf(`cluster_name="%s"`, o.ClusterName) + } + return strings.Replace(tmpl, "$1", clusterSelector, -1) + +} func makeNodeMetricExpr(tmpl string, o QueryOptions) string { var nodeSelector string if o.NodeName != "" { @@ -177,19 +163,12 @@ func makePVCMetricExpr(tmpl string, o QueryOptions) string { // GET /namespaces/{namespace}/persistentvolumeclaims/{persistentvolumeclaim} or // GET /namespaces/{namespace}/persistentvolumeclaims if o.Namespace != "" { - if o.PersistentVolumeClaimName != "" { - pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim="%s"`, o.Namespace, o.PersistentVolumeClaimName) - } else { - pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.Namespace, o.ResourceFilter) - } + + pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.Namespace, o.ResourceFilter) + return strings.Replace(tmpl, "$1", pvcSelector, -1) } - // For monitoring persistentvolumeclaims of the specific storageclass - // GET /storageclasses/{storageclass}/persistentvolumeclaims - if o.StorageClassName != "" { - pvcSelector = fmt.Sprintf(`storageclass="%s", persistentvolumeclaim=~"%s"`, o.StorageClassName, o.ResourceFilter) - } return strings.Replace(tmpl, "$1", pvcSelector, -1) } diff --git a/pkg/tracker/queryoptions.go b/pkg/tracker/queryoptions.go index bc0bc9ac..ac1c818a 100644 --- a/pkg/tracker/queryoptions.go +++ b/pkg/tracker/queryoptions.go @@ -68,6 +68,7 @@ type QueryOptions struct { NamespacedResourcesFilter string QueryType string ResourceFilter string + ClusterName string NodeName string WorkspaceName string Namespace string @@ -77,10 +78,6 @@ type QueryOptions struct { PodName string PodsName string ContainerName string - StorageClassName string - PersistentVolumeClaimName string - PVCFilter string - ApplicationName string ServiceName string Ingress string Job string @@ -92,10 +89,13 @@ func NewQueryOptions() *QueryOptions { return &QueryOptions{} } -type ClusterOption struct{} +type ClusterOption struct { + ClusterName string +} -func (_ ClusterOption) Apply(o *QueryOptions) { +func (c ClusterOption) Apply(o *QueryOptions) { o.Level = LevelCluster + o.ClusterName = c.ClusterName } type NodeOption struct { @@ -110,8 +110,6 @@ func (no NodeOption) Apply(o *QueryOptions) { o.Level = LevelNode o.ResourceFilter = no.ResourceFilter o.NodeName = no.NodeName - o.PVCFilter = no.PVCFilter - o.StorageClassName = no.StorageClassName o.QueryType = no.QueryType } @@ -126,8 +124,6 @@ func (wo WorkspaceOption) Apply(o *QueryOptions) { o.Level = LevelWorkspace o.ResourceFilter = wo.ResourceFilter o.WorkspaceName = wo.WorkspaceName - o.PVCFilter = wo.PVCFilter - o.StorageClassName = wo.StorageClassName } type NamespaceOption struct { @@ -143,8 +139,6 @@ func (no NamespaceOption) Apply(o *QueryOptions) { o.ResourceFilter = no.ResourceFilter o.WorkspaceName = no.WorkspaceName o.Namespace = no.NamespaceName - o.PVCFilter = no.PVCFilter - o.StorageClassName = no.StorageClassName } type ApplicationsOption struct { @@ -183,8 +177,6 @@ type ApplicationOption struct { func (ao ApplicationOption) Apply(o *QueryOptions) { o.Level = LevelApplication o.Namespace = ao.NamespaceName - o.ApplicationName = ao.Application - o.StorageClassName = ao.StorageClassName app_components := strings.Join(ao.ApplicationComponents[:], "|") @@ -303,11 +295,6 @@ func (po PVCOption) Apply(o *QueryOptions) { o.Level = LevelPVC o.ResourceFilter = po.ResourceFilter o.Namespace = po.NamespaceName - o.StorageClassName = po.StorageClassName - o.PersistentVolumeClaimName = po.PersistentVolumeClaimName - - // for meter - o.PVCFilter = po.PersistentVolumeClaimName } type IngressOption struct { diff --git a/pkg/tracker/tracker.go b/pkg/tracker/tracker.go index 298d7635..47b66a27 100644 --- a/pkg/tracker/tracker.go +++ b/pkg/tracker/tracker.go @@ -27,22 +27,53 @@ import ( ) var ( - ClusterCpuGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + ClusterCpuUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "cluster_cpu_usage", Help: "Cluster CPU Utilization Rate.", - }, []string{"cluster_name"}) - ClusterMemoryGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + }, []string{"cluster_name", "adapter_id"}) + ClusterCpuAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cluster_cpu_avail", + Help: "Cluster CPU Available.", + }, []string{"cluster_name", "adapter_id"}) + ClusterCpuTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cluster_cpu_total", + Help: "Cluster CPU Total.", + }, []string{"cluster_name", "adapter_id"}) + ClusterMemoryUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "cluster_memory_usage", Help: "Cluster Memory Utilization Rate.", - }, []string{"cluster_name"}) - ClusterDiskGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + }, []string{"cluster_name", "adapter_id"}) + ClusterMemoryAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cluster_memory_avail", + Help: "Cluster Memory Available.", + }, []string{"cluster_name", "adapter_id"}) + ClusterMemoryTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cluster_memory_total", + Help: "Cluster Memory Total.", + }, []string{"cluster_name", "adapter_id"}) + ClusterDiskUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "cluster_disk_usage", Help: "Cluster Disk Utilization Rate.", - }, []string{"cluster_name"}) + }, []string{"cluster_name", "adapter_id"}) + ClusterDiskAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cluster_disk_avail", + Help: "Cluster Disk Available.", + }, []string{"cluster_name", "adapter_id"}) + ClusterDiskTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cluster_disk_total", + Help: "Cluster Disk Total.", + }, []string{"cluster_name", "adapter_id"}) + metrics = []prometheus.Collector{ - ClusterCpuGauge, - ClusterMemoryGauge, - ClusterDiskGauge, + ClusterCpuUsageGauge, + ClusterCpuAvailGauge, + ClusterCpuTotalGauge, + ClusterMemoryUsageGauge, + ClusterMemoryAvailGauge, + ClusterMemoryTotalGauge, + ClusterDiskUsageGauge, + ClusterDiskAvailGauge, + ClusterDiskTotalGauge, } ) @@ -225,3 +256,13 @@ func genMetricFilter(o QueryOption) func(metric model.Metric) bool { return true } } + +func (p Prometheus) GetRawData(expr string, o QueryOption) (model.Value, error) { + opts := NewQueryOptions() + o.Apply(opts) + value, _, err := p.client.Query(context.Background(), makeExpr(expr, *opts), time.Now()) + if err != nil { + return nil, err + } + return value, nil +} From 7b6c0223d9337fbcce8056bbc21bf2df6207ac8b Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Thu, 21 Mar 2024 09:13:17 +0800 Subject: [PATCH 7/7] Providing metrics information to Prometheus Former-commit-id: c4ccd504af0ba70f287951a7fa1f3637d231e661 --- api/etc/pcm.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/etc/pcm.yaml b/api/etc/pcm.yaml index 11432c95..37a90f85 100644 --- a/api/etc/pcm.yaml +++ b/api/etc/pcm.yaml @@ -15,6 +15,8 @@ Cache: - Host: 10.206.0.12:6379 Pass: redisPW123 +PromUrl: http://47.92.39.128:30877 + # k8s rpc K8sNativeConf: # target: nacos://10.206.0.12:8848/pcm.kubenative.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api