diff --git a/api/internal/scheduler/common/common.go b/api/internal/scheduler/common/common.go index dd4bf100..ce2ee5e7 100644 --- a/api/internal/scheduler/common/common.go +++ b/api/internal/scheduler/common/common.go @@ -15,18 +15,11 @@ package common import ( - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" + "math" "math/rand" "time" ) -type SubSchedule interface { - GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) - PickOptimalStrategy() (strategy.Strategy, error) - AssignTask(clusters []*strategy.AssignedCluster) error -} - // 求交集 func Intersect(slice1, slice2 []int64) []int64 { m := make(map[int64]int) @@ -90,3 +83,8 @@ func MicsSlice(origin []int64, count int) []int64 { } return result } + +func RoundFloat(val float64, precision uint) float64 { + ratio := math.Pow(10, float64(precision)) + return math.Round(val*ratio) / ratio +} diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index 72eed166..8c3265db 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -22,6 +22,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" @@ -33,7 +34,7 @@ import ( type Scheduler struct { task *response.TaskInfo participantIds []int64 - subSchedule common.SubSchedule + subSchedule SubSchedule dbEngin *gorm.DB result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService @@ -43,7 +44,13 @@ type Scheduler struct { mu sync.RWMutex } -func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { +type SubSchedule interface { + GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) + PickOptimalStrategy() (strategy.Strategy, error) + AssignTask(clusters []*strategy.AssignedCluster) error +} + +func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { var task *response.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { @@ -123,7 +130,7 @@ func (s *Scheduler) TempAssign() error { return nil } -func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error { +func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error { //// 已指定 ParticipantId //if s.task.ParticipantId != 0 { // return nil diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index b88fa481..2d5518fa 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -74,6 +74,9 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { case strategy.RESOURCES_PRICING: strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil + case strategy.DYNAMIC_RESOURCES: + strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1) + return strategy, nil } return nil, errors.New("no strategy has been chosen") diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index 9024d907..735a8610 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -4,6 +4,7 @@ type AiOption struct { AiClusterId string // shuguangAi /octopus ClusterId TaskName string ResourceType string // cpu/gpu/compute card + CpuCoreNum int64 TaskType string // pytorch/tensorflow/mindspore DatasetsName string // mnist/imageNet/iris StrategyName string @@ -29,3 +30,7 @@ type AiOption struct { Image string Model interface{} } + +func (a AiOption) GetOptionType() string { + return AI +} diff --git a/api/internal/scheduler/schedulers/option/option.go b/api/internal/scheduler/schedulers/option/option.go index e7c5c218..f7c9eef1 100644 --- a/api/internal/scheduler/schedulers/option/option.go +++ b/api/internal/scheduler/schedulers/option/option.go @@ -1,5 +1,11 @@ package option -type Option struct { - Name string +const ( + AI = "ai" + CLOUD = "cloud" + HPC = "hpc" +) + +type Option interface { + GetOptionType() string } diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index e0bfe24c..c6e68851 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -8,20 +8,22 @@ type AiCollector interface { type ResourceStats struct { ParticipantId int64 Name string - CpuAvail float64 + CpuCoreAvail int64 MemAvail float64 DiskAvail float64 - GpuAvail float64 - CardToHours map[Card]float64 - CpuToHours map[int]float64 + GpuAvail int64 + CardsAvail []*Card + CpuCoreHours float64 Balance float64 } type Card struct { + Platform string Type string Name string TOpsAtFp16 float64 - Price int32 + CardHours float64 + Num int32 } type DatasetsSpecs struct { diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index 4bb4a83e..ea528311 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -1,8 +1,70 @@ package strategy +import ( + "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" +) + type DynamicResourcesStrategy struct { + replicas int32 + resources []*collector.ResourceStats + opt option.Option +} + +func NewDynamicResourcesStrategy(resources []*collector.ResourceStats, opt option.Option, replica int32) *DynamicResourcesStrategy { + return &DynamicResourcesStrategy{resources: resources, opt: opt, replicas: replica} } func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { - return nil, nil + if ps.replicas < 1 { + return nil, errors.New("replicas must be greater than 0") + } + + switch ps.opt.GetOptionType() { + case option.AI: + opt := (interface{})(ps.opt).(*option.AiOption) + + var maxCardHoursAvailable float64 + var maxCpuCoreHoursAvailable float64 + var assignedCluster *AssignedCluster + var results []*AssignedCluster + for _, res := range ps.resources { + if opt.ResourceType == "" { + if res.CpuCoreHours <= 0 { + cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas} + results = append(results, cluster) + return results, nil + } + + if res.CpuCoreHours > maxCpuCoreHoursAvailable { + maxCpuCoreHoursAvailable = res.CpuCoreHours + assignedCluster.Name = res.Name + assignedCluster.ParticipantId = res.ParticipantId + assignedCluster.Replicas = ps.replicas + } + } + + if opt.ResourceType == "" { + var maxCurrentCardHours float64 + for _, card := range res.CardsAvail { + cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3) + if cardHours > maxCurrentCardHours { + maxCurrentCardHours = cardHours + } + } + if maxCurrentCardHours > maxCardHoursAvailable { + maxCardHoursAvailable = maxCurrentCardHours + assignedCluster.Name = res.Name + assignedCluster.ParticipantId = res.ParticipantId + assignedCluster.Replicas = ps.replicas + } + } + } + results = append(results, assignedCluster) + return results, nil + } + + return nil, errors.New("failed to apply DynamicResourcesStrategy") } diff --git a/api/internal/scheduler/strategy/param/resourcePricing.go b/api/internal/scheduler/strategy/param/resourcePricing.go index 89c056c1..6fc05819 100644 --- a/api/internal/scheduler/strategy/param/resourcePricing.go +++ b/api/internal/scheduler/strategy/param/resourcePricing.go @@ -23,7 +23,7 @@ func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider { for _, resource := range r.Resources { provider := providerPricing.NewProvider( resource.ParticipantId, - resource.CpuAvail, + float64(resource.CpuCoreAvail), resource.MemAvail, resource.DiskAvail, 0.0, 0.0, 0.0) providerList = append(providerList, provider) diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 7dc97026..5342d8ce 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -60,6 +60,10 @@ var ( MLU: CAMBRICON, GCU: ENFLAME, } + cardTopsMap = map[string]float64{ + MLU: CAMBRICONMLU290, + GCU: EnflameT20, + } ) func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink { @@ -245,13 +249,49 @@ func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) { return nil, errors.New(balanceResp.Error.Message) } - //resourceStat := collector.ResourceStats{} - // - //for _, spec := range specResp.TrainResourceSpecs { - // - //} + var cards []*collector.Card + balance := float64(balanceResp.Payload.BillingUser.Amount) + var cpuHours float64 + for _, spec := range specResp.TrainResourceSpecs { + if spec.Price == 0 { + ns := strings.Split(spec.Name, COMMA) + if len(ns) == 2 { + nss := strings.Split(ns[0], COLON) + if nss[0] == CPU { + cpuHours = -1 + } + } + } - return nil, nil + if spec.Price == 1 { + ns := strings.Split(spec.Name, COMMA) + cardSpecs := strings.Split(ns[0], STAR) + + cardTops, isMapContainsKey := cardTopsMap[cardSpecs[1]] + if !isMapContainsKey { + continue + } + + card := &collector.Card{ + Platform: OCTOPUS, + Type: CARD, + Name: cardSpecs[1], + TOpsAtFp16: cardTops, + CardHours: balance / spec.Price, + } + cards = append(cards, card) + } + } + + resourceStats := &collector.ResourceStats{ + ParticipantId: o.participantId, + Name: o.platform, + Balance: balance, + CardsAvail: cards, + CpuCoreHours: cpuHours, + } + + return resourceStats, nil } func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { @@ -349,6 +389,7 @@ func (o *OctopusLink) generateResourceId(option *option.AiOption) error { if err != nil { return err } + return nil } return errors.New("failed to get ResourceId") @@ -433,7 +474,14 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error { func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error { // temporarily set algorithm to cnn - option.AlgorithmName = "cnn" + if option.AlgorithmName == "" { + switch option.DatasetsName { + case "cifar10": + option.AlgorithmName = "cnn" + case "mnist": + option.AlgorithmName = "fcn" + } + } req := &octopus.GetMyAlgorithmListReq{ Platform: o.platform, @@ -457,14 +505,26 @@ func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error { if ns[1] != option.AlgorithmName { continue } - if ns[2] != option.ResourceType { - continue + switch option.ResourceType { + case CPU: + if ns[2] != CPU { + continue + } + case CARD: + if ns[2] != strings.ToLower(option.ComputeCard) { + continue + } } + option.AlgorithmId = algorithm.AlgorithmId return nil } } + if option.AlgorithmId == "" { + return errors.New("Algorithm does not exist") + } + return errors.New("failed to get AlgorithmId") } @@ -487,7 +547,10 @@ func (o *OctopusLink) generateEnv(option *option.AiOption) error { } func (o *OctopusLink) generateParams(option *option.AiOption) error { - + if len(option.Params) == 0 { + epoch := "epoch" + COMMA + "1" + option.Params = append(option.Params, epoch) + } return nil } diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 1bd86443..63a43cf9 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -17,6 +17,7 @@ package storeLink import ( "context" "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" @@ -42,6 +43,8 @@ const ( DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset" ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm" TRAIN_FILE = "train.py" + CPUCOREPRICEPERHOUR = 0.09 + DCUPRICEPERHOUR = 2.0 ) var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ @@ -197,9 +200,9 @@ func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []str } func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { - // set algorithmId temporarily + // set algorithmId temporarily for storelink submit if algorithmId == "" { - algorithmId = "pytorch-mnist-fully_connected_network" + algorithmId = "pytorch-mnist-fcn" } // shuguangAi提交任务 @@ -268,24 +271,41 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { if err != nil { return nil, err } - limitReq := &hpcAC.QueueReq{} - _, err = s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) - if err != nil { - return nil, err - } - diskReq := &hpcAC.ParaStorQuotaReq{} - _, err = s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) - if err != nil { - return nil, err - } + //limitReq := &hpcAC.QueueReq{} + //limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) + //if err != nil { + // return nil, err + //} + + //diskReq := &hpcAC.ParaStorQuotaReq{} + //diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) + //if err != nil { + // return nil, err + //} + + var cards []*collector.Card balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) - _ = &collector.ResourceStats{ + cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3) + cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3) + + dcu := &collector.Card{ + Platform: SHUGUANGAI, + Type: CARD, + Name: DCU, + TOpsAtFp16: DCU_TOPS, + CardHours: cardHours, + } + cards = append(cards, dcu) + resourceStats := &collector.ResourceStats{ ParticipantId: s.participantId, Name: s.platform, Balance: balance, + CardsAvail: cards, + CpuCoreHours: cpuHours, } - return nil, nil + + return resourceStats, nil } func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { @@ -413,6 +433,7 @@ func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error { if option.DatasetsName == "" { return errors.New("DatasetsName not set") } + req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0} list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req) if err != nil { @@ -426,13 +447,34 @@ func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error { for _, file := range list.Data.FileList { ns := strings.Split(file.Name, DASH) if ns[0] == option.DatasetsName { - algorithmId = option.TaskType + DASH + file.Name - option.AlgorithmId = algorithmId - option.AlgorithmName = ns[1] - return nil + algoName := ns[1] + if option.AlgorithmName == "" { + switch option.DatasetsName { + case "cifar10": + algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "cnn" + option.AlgorithmId = algorithmId + option.AlgorithmName = algoName + return nil + case "mnist": + algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "fcn" + option.AlgorithmId = algorithmId + option.AlgorithmName = algoName + return nil + } + } else { + if algoName == option.AlgorithmName { + algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + algoName + option.AlgorithmId = algorithmId + return nil + } + } } } + if algorithmId == "" { + return errors.New("Algorithm does not exist") + } + return errors.New("failed to get AlgorithmId") } @@ -451,8 +493,10 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error { return errors.New("ResourceType not set") } - //epoch := "epoch" + COMMA + "1" - //option.Params = append(option.Params, epoch) + if len(option.Params) == 0 { + epoch := "epoch" + COMMA + "1" + option.Params = append(option.Params, epoch) + } switch option.ResourceType { case CPU: