From 08138fe02e2c1b2fe41bb2282212e6dabd88a900 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 26 Mar 2024 16:34:46 +0800 Subject: [PATCH 1/2] added GetAlgorithms for ai scheduler Former-commit-id: 02fabcc30a84ad7b9afbde1a693a34ffacf47d4b --- api/desc/pcm.api | 3 ++ api/desc/schedule/pcm-schedule.api | 10 +++++ .../scheduler/service/collector/collector.go | 7 +++ api/internal/storeLink/modelarts.go | 4 ++ api/internal/storeLink/octopus.go | 23 ++++++++++ api/internal/storeLink/shuguangai.go | 20 +++++++++ api/internal/storeLink/storeLink.go | 43 +++++++++++++++++++ 7 files changed, 110 insertions(+) diff --git a/api/desc/pcm.api b/api/desc/pcm.api index dca9d708..1180e129 100644 --- a/api/desc/pcm.api +++ b/api/desc/pcm.api @@ -876,6 +876,9 @@ service pcm { @handler ScheduleGetStrategyHandler get /schedule/ai/getStrategies returns (AiStrategyResp) + @handler ScheduleGetAlgorithmsHandler + get /schedule/ai/getAlgorithms (AiAlgorithmsReq) returns (AiAlgorithmsResp) + @handler ScheduleSubmitHandler post /schedule/submit (ScheduleReq) returns (ScheduleResp) } diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index 64e2a77d..d9946a33 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -41,4 +41,14 @@ type ( AiStrategyResp { Strategies []string `json:"strategies"` } + + AiAlgorithmsReq { + ResourceType string `json:"resourceType"` + TaskType string `json:"taskType"` + Dataset string `json:"dataset"` + } + + AiAlgorithmsResp { + Algorithms []string `json:"algorithms"` + } ) \ No newline at end of file diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index b4d66c68..8d67175c 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -3,6 +3,7 @@ package collector type AiCollector interface { GetResourceStats() (*ResourceStats, error) GetDatasetsSpecs() ([]*DatasetsSpecs, error) + GetAlgorithms() ([]*Algorithm, error) } type ResourceStats struct { @@ -33,3 +34,9 @@ type DatasetsSpecs struct { Name string Size string } + +type Algorithm struct { + Name string + Platform string + TaskType string +} diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index b9bf8abe..64c3437b 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -157,6 +157,10 @@ func (m *ModelArtsLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { return nil, nil } +func (m *ModelArtsLink) GetAlgorithms() ([]*collector.Algorithm, error) { + return nil, nil +} + func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) { err := m.GenerateSubmitParams(option) if err != nil { diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 5342d8ce..b3bd546a 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -315,6 +315,29 @@ func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { return specs, nil } +func (o *OctopusLink) GetAlgorithms() ([]*collector.Algorithm, error) { + var algorithms []*collector.Algorithm + + req := &octopus.GetMyAlgorithmListReq{ + Platform: o.platform, + PageIndex: o.pageIndex, + PageSize: o.pageSize, + } + resp, err := o.svcCtx.OctopusRpc.GetMyAlgorithmList(o.ctx, req) + if err != nil { + return nil, err + } + if !resp.Success { + return nil, errors.New("failed to get algorithms") + } + + for _, a := range resp.Payload.Algorithms { + algorithm := &collector.Algorithm{Name: a.AlgorithmName, Platform: OCTOPUS, TaskType: strings.ToLower(a.FrameworkName)} + algorithms = append(algorithms, algorithm) + } + return algorithms, nil +} + func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) { err := o.GenerateSubmitParams(option) if err != nil { diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index f44466fe..f9d8b918 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -341,6 +341,26 @@ func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { return specs, nil } +func (s *ShuguangAi) GetAlgorithms() ([]*collector.Algorithm, error) { + var algorithms []*collector.Algorithm + for _, t := range GetTaskTypes() { + taskType := t + req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0} + list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req) + if err != nil { + return nil, err + } + if list.Code != "0" { + return nil, errors.New(list.Msg) + } + for _, file := range list.Data.FileList { + algorithm := &collector.Algorithm{Name: file.Name, Platform: SHUGUANGAI, TaskType: taskType} + algorithms = append(algorithms, algorithm) + } + } + return algorithms, nil +} + func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) { err := s.GenerateSubmitParams(option) if err != nil { diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index c77d3df2..e54c722c 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -27,6 +27,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts" "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus" "gorm.io/gorm" + "strings" ) type Linkage interface { @@ -152,6 +153,48 @@ func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string, return names, nil } +func GetAlgorithms(collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) { + var names []string + colMap := *collectorMap + for _, col := range colMap { + var ns []string + algorithms, err := col.GetAlgorithms() + if err != nil { + return nil, err + } + for _, algorithm := range algorithms { + if algorithm.TaskType != taskType { + continue + } + switch algorithm.Platform { + case OCTOPUS: + splitns := strings.Split(algorithm.Name, UNDERSCORE) + if dataset != splitns[0] || len(splitns) == 1 { + continue + } + ns = append(ns, splitns[1]) + case SHUGUANGAI: + splitns := strings.Split(algorithm.Name, DASH) + if dataset != splitns[0] || len(splitns) == 1 { + continue + } + ns = append(ns, splitns[1]) + } + } + if len(ns) == 0 { + continue + } + if len(names) == 0 { + names = ns + continue + } + + names = common.IntersectString(names, ns) + } + names = common.RemoveDuplicates(names) + return names, nil +} + func GetTaskTypes() []string { return taskTypes } From d56e4bd69f4a37ab5258788f606928adfede76ed Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 26 Mar 2024 17:36:04 +0800 Subject: [PATCH 2/2] updated shuguangAi GetResourceStats Former-commit-id: 2e69f551307dd1be671f0cc535b541760ff59aa6 --- api/internal/storeLink/shuguangai.go | 51 ++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index f9d8b918..654e12b4 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -267,12 +267,15 @@ func (s *ShuguangAi) QuerySpecs() (interface{}, error) { } func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { + //balance userReq := &hpcAC.GetUserInfoReq{} userinfo, err := s.svcCtx.ACRpc.GetUserInfo(s.ctx, userReq) if err != nil { return nil, err } + balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) + //resource limit limitReq := &hpcAC.QueueReq{} limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) if err != nil { @@ -281,20 +284,54 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { totalCpu := limitResp.Data.AccountMaxCpu totalDcu := limitResp.Data.AccountMaxDcu + //disk diskReq := &hpcAC.ParaStorQuotaReq{} diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) if err != nil { return nil, err } - totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB, 3) - availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB, 3) + totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3) + availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) - generalInfo, err := s.svcCtx.ACRpc.GetGeneralInfo(s.ctx, nil) - memSize := common.RoundFloat(float64(generalInfo.MemoryInGib)*KB*KB, 3) + //memory + nodeResp, err := s.svcCtx.ACRpc.GetNodeResources(s.ctx, nil) + if err != nil { + return nil, err + } + memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES + //resources being occupied + memberJobResp, err := s.svcCtx.ACRpc.GetMemberJobs(s.ctx, nil) + if err != nil { + return nil, err + } + var CpuCoreAvail int64 + var MemAvail float64 + if len(memberJobResp.Data) != 0 { + CpuCoreAvail = totalCpu + MemAvail = memSize + } else { + var cpuCoreUsed int64 + var memUsed float64 + for _, datum := range memberJobResp.Data { + cpuCoreUsed += datum.CpuCore + } + memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core + if cpuCoreUsed > totalCpu { + CpuCoreAvail = 0 + } else { + CpuCoreAvail = totalCpu - cpuCoreUsed + } + if memUsed > memSize { + MemAvail = 0 + } else { + MemAvail = memSize - memUsed + } + } + + //usable hours var cards []*collector.Card - balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3) cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3) @@ -312,11 +349,11 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { Name: s.platform, Balance: balance, CpuCoreTotal: totalCpu, - CpuCoreAvail: 0, + CpuCoreAvail: CpuCoreAvail, DiskTotal: totalDisk, DiskAvail: availDisk, MemTotal: memSize, - MemAvail: 0, + MemAvail: MemAvail, CpuCoreHours: cpuHours, CardsAvail: cards, }