From c807d544cac67a4a193e835ef2dbccc9d7b639a9 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 20 Mar 2024 17:35:41 +0800 Subject: [PATCH] modified ai platform getResourcestats methods and dynamicResources strategy Former-commit-id: ee8836b10b362b2d74596f5dcc5f3ec8a81d78ce --- api/internal/scheduler/common/common.go | 14 +++-- api/internal/scheduler/scheduler.go | 13 +++-- .../scheduler/schedulers/aiScheduler.go | 2 +- .../scheduler/schedulers/option/aiOption.go | 1 + .../scheduler/service/collector/collector.go | 12 +++-- .../scheduler/strategy/dynamicResources.go | 48 +++++++++++++++-- .../strategy/param/resourcePricing.go | 2 +- api/internal/storeLink/octopus.go | 52 ++++++++++++++++--- api/internal/storeLink/shuguangai.go | 44 +++++++++++----- 9 files changed, 148 insertions(+), 40 deletions(-) diff --git a/api/internal/scheduler/common/common.go b/api/internal/scheduler/common/common.go index dd4bf100..ce2ee5e7 100644 --- a/api/internal/scheduler/common/common.go +++ b/api/internal/scheduler/common/common.go @@ -15,18 +15,11 @@ package common import ( - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" + "math" "math/rand" "time" ) -type SubSchedule interface { - GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) - PickOptimalStrategy() (strategy.Strategy, error) - AssignTask(clusters []*strategy.AssignedCluster) error -} - // 求交集 func Intersect(slice1, slice2 []int64) []int64 { m := make(map[int64]int) @@ -90,3 +83,8 @@ func MicsSlice(origin []int64, count int) []int64 { } return result } + +func RoundFloat(val float64, precision uint) float64 { + ratio := math.Pow(10, float64(precision)) + return math.Round(val*ratio) / ratio +} diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index 72eed166..8c3265db 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -22,6 +22,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" @@ -33,7 +34,7 @@ import ( type Scheduler struct { task *response.TaskInfo participantIds []int64 - subSchedule common.SubSchedule + subSchedule SubSchedule dbEngin *gorm.DB result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService @@ -43,7 +44,13 @@ type Scheduler struct { mu sync.RWMutex } -func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { +type SubSchedule interface { + GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) + PickOptimalStrategy() (strategy.Strategy, error) + AssignTask(clusters []*strategy.AssignedCluster) error +} + +func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { var task *response.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { @@ -123,7 +130,7 @@ func (s *Scheduler) TempAssign() error { return nil } -func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error { +func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error { //// 已指定 ParticipantId //if s.task.ParticipantId != 0 { // return nil diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 923a290c..2d5518fa 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -75,7 +75,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil case strategy.DYNAMIC_RESOURCES: - strategy := strategy.NewDynamicResourcesStrategy(resources, as.option) + strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1) return strategy, nil } diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index b30a372c..735a8610 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -4,6 +4,7 @@ type AiOption struct { AiClusterId string // shuguangAi /octopus ClusterId TaskName string ResourceType string // cpu/gpu/compute card + CpuCoreNum int64 TaskType string // pytorch/tensorflow/mindspore DatasetsName string // mnist/imageNet/iris StrategyName string diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index e0bfe24c..c6e68851 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -8,20 +8,22 @@ type AiCollector interface { type ResourceStats struct { ParticipantId int64 Name string - CpuAvail float64 + CpuCoreAvail int64 MemAvail float64 DiskAvail float64 - GpuAvail float64 - CardToHours map[Card]float64 - CpuToHours map[int]float64 + GpuAvail int64 + CardsAvail []*Card + CpuCoreHours float64 Balance float64 } type Card struct { + Platform string Type string Name string TOpsAtFp16 float64 - Price int32 + CardHours float64 + Num int32 } type DatasetsSpecs struct { diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index 7ad0e9d9..ea528311 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -2,6 +2,7 @@ package strategy import ( "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" ) @@ -12,8 +13,8 @@ type DynamicResourcesStrategy struct { opt option.Option } -func NewDynamicResourcesStrategy(resources []*collector.ResourceStats, opt option.Option) *DynamicResourcesStrategy { - return &DynamicResourcesStrategy{resources: resources, opt: opt} +func NewDynamicResourcesStrategy(resources []*collector.ResourceStats, opt option.Option, replica int32) *DynamicResourcesStrategy { + return &DynamicResourcesStrategy{resources: resources, opt: opt, replicas: replica} } func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { @@ -23,8 +24,47 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { switch ps.opt.GetOptionType() { case option.AI: - _ = (interface{})(ps.opt).(*option.AiOption) + opt := (interface{})(ps.opt).(*option.AiOption) + var maxCardHoursAvailable float64 + var maxCpuCoreHoursAvailable float64 + var assignedCluster *AssignedCluster + var results []*AssignedCluster + for _, res := range ps.resources { + if opt.ResourceType == "" { + if res.CpuCoreHours <= 0 { + cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas} + results = append(results, cluster) + return results, nil + } + + if res.CpuCoreHours > maxCpuCoreHoursAvailable { + maxCpuCoreHoursAvailable = res.CpuCoreHours + assignedCluster.Name = res.Name + assignedCluster.ParticipantId = res.ParticipantId + assignedCluster.Replicas = ps.replicas + } + } + + if opt.ResourceType == "" { + var maxCurrentCardHours float64 + for _, card := range res.CardsAvail { + cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3) + if cardHours > maxCurrentCardHours { + maxCurrentCardHours = cardHours + } + } + if maxCurrentCardHours > maxCardHoursAvailable { + maxCardHoursAvailable = maxCurrentCardHours + assignedCluster.Name = res.Name + assignedCluster.ParticipantId = res.ParticipantId + assignedCluster.Replicas = ps.replicas + } + } + } + results = append(results, assignedCluster) + return results, nil } - return nil, nil + + return nil, errors.New("failed to apply DynamicResourcesStrategy") } diff --git a/api/internal/scheduler/strategy/param/resourcePricing.go b/api/internal/scheduler/strategy/param/resourcePricing.go index 89c056c1..6fc05819 100644 --- a/api/internal/scheduler/strategy/param/resourcePricing.go +++ b/api/internal/scheduler/strategy/param/resourcePricing.go @@ -23,7 +23,7 @@ func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider { for _, resource := range r.Resources { provider := providerPricing.NewProvider( resource.ParticipantId, - resource.CpuAvail, + float64(resource.CpuCoreAvail), resource.MemAvail, resource.DiskAvail, 0.0, 0.0, 0.0) providerList = append(providerList, provider) diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 4dc7f162..5342d8ce 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -60,6 +60,10 @@ var ( MLU: CAMBRICON, GCU: ENFLAME, } + cardTopsMap = map[string]float64{ + MLU: CAMBRICONMLU290, + GCU: EnflameT20, + } ) func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink { @@ -245,13 +249,49 @@ func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) { return nil, errors.New(balanceResp.Error.Message) } - //resourceStat := collector.ResourceStats{} - // - //for _, spec := range specResp.TrainResourceSpecs { - // - //} + var cards []*collector.Card + balance := float64(balanceResp.Payload.BillingUser.Amount) + var cpuHours float64 + for _, spec := range specResp.TrainResourceSpecs { + if spec.Price == 0 { + ns := strings.Split(spec.Name, COMMA) + if len(ns) == 2 { + nss := strings.Split(ns[0], COLON) + if nss[0] == CPU { + cpuHours = -1 + } + } + } - return nil, nil + if spec.Price == 1 { + ns := strings.Split(spec.Name, COMMA) + cardSpecs := strings.Split(ns[0], STAR) + + cardTops, isMapContainsKey := cardTopsMap[cardSpecs[1]] + if !isMapContainsKey { + continue + } + + card := &collector.Card{ + Platform: OCTOPUS, + Type: CARD, + Name: cardSpecs[1], + TOpsAtFp16: cardTops, + CardHours: balance / spec.Price, + } + cards = append(cards, card) + } + } + + resourceStats := &collector.ResourceStats{ + ParticipantId: o.participantId, + Name: o.platform, + Balance: balance, + CardsAvail: cards, + CpuCoreHours: cpuHours, + } + + return resourceStats, nil } func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 0e2e5dd6..63a43cf9 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -17,6 +17,7 @@ package storeLink import ( "context" "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" @@ -42,6 +43,8 @@ const ( DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset" ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm" TRAIN_FILE = "train.py" + CPUCOREPRICEPERHOUR = 0.09 + DCUPRICEPERHOUR = 2.0 ) var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ @@ -268,24 +271,41 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { if err != nil { return nil, err } - limitReq := &hpcAC.QueueReq{} - _, err = s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) - if err != nil { - return nil, err - } - diskReq := &hpcAC.ParaStorQuotaReq{} - _, err = s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) - if err != nil { - return nil, err - } + //limitReq := &hpcAC.QueueReq{} + //limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) + //if err != nil { + // return nil, err + //} + + //diskReq := &hpcAC.ParaStorQuotaReq{} + //diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) + //if err != nil { + // return nil, err + //} + + var cards []*collector.Card balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) - _ = &collector.ResourceStats{ + cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3) + cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3) + + dcu := &collector.Card{ + Platform: SHUGUANGAI, + Type: CARD, + Name: DCU, + TOpsAtFp16: DCU_TOPS, + CardHours: cardHours, + } + cards = append(cards, dcu) + resourceStats := &collector.ResourceStats{ ParticipantId: s.participantId, Name: s.platform, Balance: balance, + CardsAvail: cards, + CpuCoreHours: cpuHours, } - return nil, nil + + return resourceStats, nil } func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {