diff --git a/api/internal/logic/storelink/submitlinktasklogic.go b/api/internal/logic/storelink/submitlinktasklogic.go index 98f77f85..80dfab96 100644 --- a/api/internal/logic/storelink/submitlinktasklogic.go +++ b/api/internal/logic/storelink/submitlinktasklogic.go @@ -67,7 +67,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp envs = append(envs, env) } } - task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId) + task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "") if err != nil { return nil, err } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 68fc15f1..8b11faf5 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -17,11 +17,10 @@ package schedulers import ( "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" @@ -31,7 +30,7 @@ type AiScheduler struct { yamlString string task *response.TaskInfo *scheduler.Scheduler - option option.AiOption + option *option.AiOption } func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) { @@ -50,36 +49,24 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin } func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - resources, err := as.findClustersWithResource() + resources, err := as.findClustersWithResources() if err != nil { return nil, err } + if len(resources) == 0 { + return nil, errors.New("no cluster has resources") + } + params := ¶m.Params{Resources: resources} if len(resources) < 2 /*|| as.task */ { - var pros []entity.Participant - for _, resource := range resources { - pros = append(pros, entity.Participant{ - Participant_id: resource.ParticipantId, - Name: resource.Name, - }) - } - strategy := strategy.NewReplicationStrategy(nil) + strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params /*, Replicas: 1*/}) return strategy, nil } - task, providerList := as.genTaskAndProviders() - if err != nil { - return nil, nil - } - strategy := strategy.NewPricingStrategy(task, providerList...) + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params}) return strategy, nil } -func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) { - - return nil, nil -} - func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { if clusters == nil { return errors.New("clusters is nil") @@ -87,7 +74,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { executorMap := *as.AiExecutor for _, cluster := range clusters { - _, err := executorMap[cluster.Name].Execute(option.AiOption{}) + _, err := executorMap[cluster.Name].Execute(as.option) if err != nil { // TODO: database operation } @@ -97,7 +84,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return nil } -func (as *AiScheduler) findClustersWithResource() ([]*collector.ResourceSpecs, error) { +func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceSpecs, error) { var resourceSpecs []*collector.ResourceSpecs for _, resourceCollector := range *as.ResourceCollector { spec, err := resourceCollector.GetResourceSpecs() diff --git a/api/internal/scheduler/schedulers/cloudScheduler.go b/api/internal/scheduler/schedulers/cloudScheduler.go index 6d9fcbe1..6024be02 100644 --- a/api/internal/scheduler/schedulers/cloudScheduler.go +++ b/api/internal/scheduler/schedulers/cloudScheduler.go @@ -19,6 +19,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/database" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" @@ -38,12 +39,9 @@ func NewCloudScheduler() *CloudScheduler { } func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - task, providerList, err := cs.genTaskAndProviders() - if err != nil { - return nil, nil - } + //获取所有计算中心 //调度算法 - strategy := strategy.NewPricingStrategy(task, providerList...) + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{}) return strategy, nil } diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index 2d45383c..b1029d37 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -1,17 +1,20 @@ package option type AiOption struct { - aiType string // shuguangAi/octopus - resourceType string // cpu/gpu/compute card - taskType string // pytorch/tensorflow + AiType string // shuguangAi/octopus + ResourceType string // cpu/gpu/compute card + TaskType string // pytorch/tensorflow - imageId string - specId string - datasetsId string - codeId string + ImageId string + SpecId string + DatasetsId string + CodeId string + ResourceId string - cmd string + Cmd string + Envs []string + Params []string - datasets string - code string + Datasets string + Code string } diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 29ab4653..2ea57cec 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -19,7 +19,7 @@ var ( "Hanwuji": OCTOPUS, "Suiyan": OCTOPUS, "Sailingsi": OCTOPUS, - "modelarts-CloudBrain2": MODELARTS, + "Modelarts-CloudBrain2": MODELARTS, "ShuguangAi": SHUGUANGAI, } ) diff --git a/api/internal/scheduler/service/executor/aiExecutor.go b/api/internal/scheduler/service/executor/aiExecutor.go index a52ab062..abe91b0c 100644 --- a/api/internal/scheduler/service/executor/aiExecutor.go +++ b/api/internal/scheduler/service/executor/aiExecutor.go @@ -6,6 +6,6 @@ import ( ) type AiExecutor interface { - Execute(option option.AiOption) (interface{}, error) + Execute(option *option.AiOption) (interface{}, error) storeLink.Linkage } diff --git a/api/internal/scheduler/strategy/params/params.go b/api/internal/scheduler/strategy/param/params.go similarity index 69% rename from api/internal/scheduler/strategy/params/params.go rename to api/internal/scheduler/strategy/param/params.go index 44c29f13..78270fc0 100644 --- a/api/internal/scheduler/strategy/params/params.go +++ b/api/internal/scheduler/strategy/param/params.go @@ -1,9 +1,9 @@ -package params +package param import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" ) type Params struct { - resources []*collector.ResourceSpecs + Resources []*collector.ResourceSpecs } diff --git a/api/internal/scheduler/strategy/param/replication.go b/api/internal/scheduler/strategy/param/replication.go new file mode 100644 index 00000000..6699ce6b --- /dev/null +++ b/api/internal/scheduler/strategy/param/replication.go @@ -0,0 +1,23 @@ +package param + +import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + +type ReplicationParams struct { + Replicas int32 + *Params +} + +func (r *ReplicationParams) GetReplicas() int32 { + return r.Replicas +} + +func (r *ReplicationParams) GetParticipants() []*entity.Participant { + var participants []*entity.Participant + for _, resource := range r.Resources { + participants = append(participants, &entity.Participant{ + Participant_id: resource.ParticipantId, + Name: resource.Name, + }) + } + return participants +} diff --git a/api/internal/scheduler/strategy/param/resourcePricing.go b/api/internal/scheduler/strategy/param/resourcePricing.go new file mode 100644 index 00000000..570e4422 --- /dev/null +++ b/api/internal/scheduler/strategy/param/resourcePricing.go @@ -0,0 +1,32 @@ +package param + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" +) + +type ResourcePricingParams struct { + replicas int32 + task *providerPricing.Task + *Params +} + +func (r *ResourcePricingParams) GetReplicas() int32 { + return r.replicas +} + +func (r *ResourcePricingParams) GetTask() *providerPricing.Task { + return r.task +} + +func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider { + var providerList []*providerPricing.Provider + for _, resource := range r.Resources { + provider := providerPricing.NewProvider( + resource.ParticipantId, + resource.CpuAvail, + resource.MemAvail, + resource.DiskAvail, 0.0, 0.0, 0.0) + providerList = append(providerList, provider) + } + return providerList +} diff --git a/api/internal/scheduler/strategy/params/replicationParams.go b/api/internal/scheduler/strategy/params/replicationParams.go deleted file mode 100644 index 7adfaea0..00000000 --- a/api/internal/scheduler/strategy/params/replicationParams.go +++ /dev/null @@ -1,16 +0,0 @@ -package params - -import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" - -type ReplicationOption struct { - replicas int32 - participants []entity.Participant -} - -func (o *ReplicationOption) GetReplicas() int32 { - return o.replicas -} - -func (o *ReplicationOption) GetParticipants() []entity.Participant { - return o.participants -} diff --git a/api/internal/scheduler/strategy/params/resourcePricingParams.go b/api/internal/scheduler/strategy/params/resourcePricingParams.go deleted file mode 100644 index 10fe7bd6..00000000 --- a/api/internal/scheduler/strategy/params/resourcePricingParams.go +++ /dev/null @@ -1,26 +0,0 @@ -package params - -import ( - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" -) - -type ResourcePricingOption struct { - replicas int32 - task *providerPricing.Task - providers []*providerPricing.Provider - *Params -} - -func NewResourcePricingOption(params *Params) *ResourcePricingOption { - return &ResourcePricingOption{ - Params: params, - } -} - -func (r *ResourcePricingOption) GetReplicas() int32 { - return r.replicas -} - -func (r *ResourcePricingOption) GetProviders() []*providerPricing.Provider { - return r.providers -} diff --git a/api/internal/scheduler/strategy/replication.go b/api/internal/scheduler/strategy/replication.go index 3f9549b9..fad6fcde 100644 --- a/api/internal/scheduler/strategy/replication.go +++ b/api/internal/scheduler/strategy/replication.go @@ -3,15 +3,15 @@ package strategy import ( "github.com/pkg/errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/params" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" ) type ReplicationStrategy struct { replicas int32 - participants []entity.Participant + participants []*entity.Participant } -func NewReplicationStrategy(params *params.ReplicationOption) *ReplicationStrategy { +func NewReplicationStrategy(params *param.ReplicationParams) *ReplicationStrategy { return &ReplicationStrategy{replicas: params.GetReplicas(), participants: params.GetParticipants(), } diff --git a/api/internal/scheduler/strategy/resourcePricing.go b/api/internal/scheduler/strategy/resourcePricing.go index 2abf2af6..e1614164 100644 --- a/api/internal/scheduler/strategy/resourcePricing.go +++ b/api/internal/scheduler/strategy/resourcePricing.go @@ -17,6 +17,7 @@ package strategy import ( "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" ) type PricingStrategy struct { @@ -25,7 +26,9 @@ type PricingStrategy struct { StrategyList []*providerPricing.Strategy } -func NewPricingStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) *PricingStrategy { +func NewPricingStrategy(params *param.ResourcePricingParams) *PricingStrategy { + providers := params.GetProviders() + task := params.GetTask() var providerList []*providerPricing.Provider var res [][]int diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index 767d71c4..6331d136 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -3,7 +3,9 @@ package test import ( "fmt" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" "testing" ) @@ -13,10 +15,23 @@ func TestReplication(t *testing.T) { {Name: "test2", Participant_id: 2}, {Name: "test3", Participant_id: 3}, } + rsc := []*collector.ResourceSpecs{ + { + ParticipantId: 1, + Name: "test1", + }, + { + ParticipantId: 1, + Name: "test2"}, + { + ParticipantId: 1, + Name: "test3"}, + } tests := []struct { name string replica int32 ps []entity.Participant + res []*collector.ResourceSpecs }{ { name: "test1", @@ -32,7 +47,8 @@ func TestReplication(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - repl := strategy.NewReplicationStrategy(nil) + params := ¶m.Params{Resources: rsc} + repl := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: tt.replica}) schedule, err := repl.Schedule() if err != nil { return diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index 31489205..14a8a181 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -63,7 +63,7 @@ func (o *ModelArtsLink) QueryImageList() (interface{}, error) { return resp, nil } -func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { +func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { // modelArts提交任务 environments := make(map[string]string) parameters := make([]*modelarts.ParametersTrainJob, 0) @@ -153,6 +153,10 @@ func (o *ModelArtsLink) GetResourceSpecs() (*collector.ResourceSpecs, error) { return nil, nil } -func (o *ModelArtsLink) Execute(option option.AiOption) (interface{}, error) { - return nil, nil +func (o *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) { + task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType) + if err != nil { + return nil, err + } + return task, nil } diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index b40da2ee..cdc97ea9 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -107,7 +107,7 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) { return resp, nil } -func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { +func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { // octopus提交任务 // python参数 @@ -200,6 +200,10 @@ func (o *OctopusLink) GetResourceSpecs() (*collector.ResourceSpecs, error) { return nil, nil } -func (o *OctopusLink) Execute(option option.AiOption) (interface{}, error) { - return nil, nil +func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) { + task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType) + if err != nil { + return nil, err + } + return task, nil } diff --git a/api/internal/storeLink/shuguangHpc.go b/api/internal/storeLink/shuguangHpc.go index 7c80b456..f7f0af82 100644 --- a/api/internal/storeLink/shuguangHpc.go +++ b/api/internal/storeLink/shuguangHpc.go @@ -144,7 +144,7 @@ func (s ShuguangHpc) QueryImageList() (interface{}, error) { return nil, nil } -func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { +func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { // shuguangHpc提交任务 //判断是否resourceId匹配自定义资源Id diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index ad84315d..57fecfc6 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -22,6 +22,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" + "strconv" "strings" ) @@ -75,9 +76,7 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) { return resp, nil } -func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { - // shuguangAi提交任务 - +func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { //判断是否resourceId匹配自定义资源Id if resourceId != SHUGUANGAI_CUSTOM_RESOURCE_ID { return nil, errors.New("shuguangAi资源Id不存在") @@ -132,6 +131,18 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, param return resp, nil } +func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { + // shuguangAi提交任务 + if aiType == PYTORCH { + task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId) + if err != nil { + return nil, err + } + return task, nil + } + return nil, errors.New("shuguangAi不支持的任务类型") +} + func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) { // shuguangAi获取任务 req := &hpcAC.GetPytorchTaskReq{ @@ -173,9 +184,35 @@ func (o *ShuguangAi) QuerySpecs() (interface{}, error) { } func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) { + userReq := &hpcAC.GetUserInfoReq{} + userinfo, err := o.svcCtx.ACRpc.GetUserInfo(o.ctx, userReq) + if err != nil { + return nil, err + } + limitReq := &hpcAC.QueueReq{} + _, err = o.svcCtx.ACRpc.QueryUserQuotasLimit(o.ctx, limitReq) + if err != nil { + return nil, err + } + diskReq := &hpcAC.ParaStorQuotaReq{} + _, err = o.svcCtx.ACRpc.ParaStorQuota(o.ctx, diskReq) + if err != nil { + return nil, err + } + + balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) + _ = &collector.ResourceSpecs{ + ParticipantId: o.participantId, + Name: o.platform, + Balance: balance, + } return nil, nil } -func (o *ShuguangAi) Execute(option option.AiOption) (interface{}, error) { - return nil, nil +func (o *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) { + task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType) + if err != nil { + return nil, err + } + return task, nil } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index 18046da4..3a644d8e 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -31,7 +31,7 @@ type Linkage interface { UploadImage(path string) (interface{}, error) DeleteImage(imageId string) (interface{}, error) QueryImageList() (interface{}, error) - SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) + SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) QueryTask(taskId string) (interface{}, error) QuerySpecs() (interface{}, error) DeleteTask(taskId string) (interface{}, error)