From 41474ecaff971a10259559f7a492e6104fc90790 Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 1 Feb 2024 17:27:18 +0800 Subject: [PATCH 1/3] modified storage implementations Former-commit-id: 06cbd0eae64d3b1bf3d8e1bf1d4296a98a4d4898 --- api/internal/scheduler/database/aiStorage.go | 15 +++++++++++++-- .../scheduler/schedulers/option/option.go | 5 +++++ 2 files changed, 18 insertions(+), 2 deletions(-) create mode 100644 api/internal/scheduler/schedulers/option/option.go diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index f8f424a7..f7f7ead0 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -1,8 +1,19 @@ package database +import ( + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" + "gorm.io/gorm" +) + type AiStorage struct { + DbEngin *gorm.DB } -func (s *AiStorage) getParticipants() { - +func (s *AiStorage) GetParticipants() { + var resp types.ClusterListResp + tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.Data) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + } } diff --git a/api/internal/scheduler/schedulers/option/option.go b/api/internal/scheduler/schedulers/option/option.go new file mode 100644 index 00000000..e7c5c218 --- /dev/null +++ b/api/internal/scheduler/schedulers/option/option.go @@ -0,0 +1,5 @@ +package option + +type Option struct { + Name string +} From e2ca3b5be4a32363925ba0adc76c1635153740a2 Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 2 Feb 2024 11:28:19 +0800 Subject: [PATCH 2/3] modified shuguangAi implementations Former-commit-id: ec3fe090a5fc92df53e058aa6602923dca56e089 --- api/internal/scheduler/schedulers/option/aiOption.go | 4 +++- api/internal/scheduler/strategy/replication.go | 2 +- api/internal/storeLink/shuguangai.go | 7 ++++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index b1029d37..cd0d36fb 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -1,9 +1,11 @@ package option type AiOption struct { - AiType string // shuguangAi/octopus + //AiType string // shuguangAi/octopus ResourceType string // cpu/gpu/compute card TaskType string // pytorch/tensorflow + DatasetsType string + CodeType string ImageId string SpecId string diff --git a/api/internal/scheduler/strategy/replication.go b/api/internal/scheduler/strategy/replication.go index fad6fcde..f9b04b1f 100644 --- a/api/internal/scheduler/strategy/replication.go +++ b/api/internal/scheduler/strategy/replication.go @@ -1,7 +1,7 @@ package strategy import ( - "github.com/pkg/errors" + "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" ) diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 57fecfc6..ad430d55 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -210,9 +210,14 @@ func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) { } func (o *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) { - task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType) + o.generateSubmitParams(option) + task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType) if err != nil { return nil, err } return task, nil } + +func (o *ShuguangAi) generateSubmitParams(option *option.AiOption) { + +} From 757f30c92b9e420d713fcd8d8ae1e2b4348657d3 Mon Sep 17 00:00:00 2001 From: tzwang Date: Sun, 4 Feb 2024 23:05:21 +0800 Subject: [PATCH 3/3] added schedule.api and modified ai scheduler implementations Former-commit-id: 9c9d0562d2e688d19f022b90d968543456c0fb47 --- api/desc/pcm.api | 23 ++++ api/desc/schedule/pcm-schedule.api | 44 +++++++ .../logic/storelink/submitlinktasklogic.go | 2 +- api/internal/mqs/ScheduleAi.go | 5 +- api/internal/scheduler/common/common.go | 28 +++++ api/internal/scheduler/scheduler.go | 4 +- .../scheduler/schedulers/aiScheduler.go | 30 +++-- .../scheduler/schedulers/option/aiOption.go | 19 +-- api/internal/scheduler/service/aiService.go | 20 ++-- .../scheduler/service/collector/collector.go | 12 +- .../scheduler/service/executor/aiExecutor.go | 2 - .../scheduler/strategy/param/params.go | 2 +- .../strategy/param/resourcePricing.go | 4 +- .../scheduler/strategy/singleAssignment.go | 11 ++ api/internal/scheduler/strategy/strategy.go | 15 +++ api/internal/storeLink/modelarts.go | 102 ++++++++++++---- api/internal/storeLink/octopus.go | 79 +++++++++++- api/internal/storeLink/shuguangai.go | 112 +++++++++++++++--- api/internal/storeLink/storeLink.go | 48 ++++++++ go.mod | 2 +- 20 files changed, 484 insertions(+), 80 deletions(-) create mode 100644 api/desc/schedule/pcm-schedule.api create mode 100644 api/internal/scheduler/strategy/singleAssignment.go diff --git a/api/desc/pcm.api b/api/desc/pcm.api index fd5e65b2..1fa4e231 100644 --- a/api/desc/pcm.api +++ b/api/desc/pcm.api @@ -8,6 +8,7 @@ import ( "vm/pcm-vm.api" "cloud/pcm-cloud.api" "storelink/pcm-storelink.api" + "schedule/pcm-schedule.api" ) info( @@ -617,4 +618,26 @@ service pcm { @handler GetClusterHandler get /adapter/cluster/get (ClusterDelReq) returns (ClusterResp) +} + +@server( + prefix: pcm/v1 + group : schedule +) + +service pcm { + @handler ScheduleGetAiResourceTypesHandler + get /schedule/ai/getResourceTypes returns (AiResourceTypesResp) + + @handler ScheduleGetAiTaskTypesHandler + get /schedule/ai/getTaskTypes returns (AiTaskTypesResp) + + @handler ScheduleGetDatasetsHandler + get /schedule/ai/getDatasets returns (AiDatasetsResp) + + @handler ScheduleGetStrategyHandler + get /schedule/ai/getStrategies returns (AiStrategyResp) + + @handler ScheduleSubmitHandler + post /schedule/submit (ScheduleResp) returns (ScheduleResp) } \ No newline at end of file diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api new file mode 100644 index 00000000..64e2a77d --- /dev/null +++ b/api/desc/schedule/pcm-schedule.api @@ -0,0 +1,44 @@ +syntax = "v1" + +info( + title: "schedule" + desc: "调度服务" + author: "tzwang" + email: "tzwang@qq.com" +) + +type ( + ScheduleReq { + AiOption *AiOption `json:"aiOption,optional"` + } + + ScheduleResp { + Success bool `json:"success"` + TaskId string `json:"taskId"` + ClusterId string `json:"clusterId"` + ErrorMsg string `json:"errorMsg"` + } + + AiOption { + ResourceType string `json:"resourceType"` + TaskType string `json:"taskType"` + Datasets string `json:"datasets"` + Strategy string `json:"strategy"` + } + + AiResourceTypesResp { + ResourceTypes []string `json:"resourceTypes"` + } + + AiTaskTypesResp { + TaskTypes []string `json:"taskTypes"` + } + + AiDatasetsResp { + Datasets []string `json:"datasets"` + } + + AiStrategyResp { + Strategies []string `json:"strategies"` + } +) \ No newline at end of file diff --git a/api/internal/logic/storelink/submitlinktasklogic.go b/api/internal/logic/storelink/submitlinktasklogic.go index 80dfab96..45710841 100644 --- a/api/internal/logic/storelink/submitlinktasklogic.go +++ b/api/internal/logic/storelink/submitlinktasklogic.go @@ -67,7 +67,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp envs = append(envs, env) } } - task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "") + task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "pytorch") if err != nil { return nil, err } diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index 17fd5520..065dfe78 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -37,13 +37,12 @@ func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { return &AiQueue{ ctx: ctx, svcCtx: svcCtx, - scheduler: scheduler.NewScheduler2(aiCollectorMap, nil, aiExecutorMap), + scheduler: scheduler.NewSchdlr(aiCollectorMap, nil, aiExecutorMap), } } func (l *AiQueue) Consume(val string) error { - // 接受消息, 根据标签筛选过滤 - aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler) + aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler, nil) // 调度算法 err := l.scheduler.AssignAndSchedule(aiSchdl) diff --git a/api/internal/scheduler/common/common.go b/api/internal/scheduler/common/common.go index 3da3609c..c4337730 100644 --- a/api/internal/scheduler/common/common.go +++ b/api/internal/scheduler/common/common.go @@ -44,6 +44,34 @@ func Intersect(slice1, slice2 []int64) []int64 { return nn } +func IntersectString(slice1, slice2 []string) []string { + k := make(map[string]int) + for _, num := range slice1 { + k[num]++ + } + var output []string + for _, num := range slice2 { + if k[num] > 0 { + output = append(output, num) + k[num]-- + } + } + return output +} + +func RemoveDuplicates(slc []string) []string { + keys := make(map[string]bool) + list := []string{} + + for _, entry := range slc { + if _, value := keys[entry]; !value { + keys[entry] = true + list = append(list, entry) + } + } + return list +} + func MicsSlice(origin []int64, count int) []int64 { tmpOrigin := make([]int64, len(origin)) copy(tmpOrigin, origin) diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index e791c9a0..431d93a9 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -37,7 +37,7 @@ type Scheduler struct { dbEngin *gorm.DB result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService - ResourceCollector *map[string]collector.ResourceCollector + ResourceCollector *map[string]collector.AiCollector Storages database.Storage AiExecutor *map[string]executor.AiExecutor mu sync.RWMutex @@ -52,7 +52,7 @@ func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler { +func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler { return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor} } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 8b11faf5..3077b67b 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -33,8 +33,8 @@ type AiScheduler struct { option *option.AiOption } -func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) { - return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil +func NewAiScheduler(val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) { + return &AiScheduler{yamlString: val, Scheduler: scheduler, option: option}, nil } func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { @@ -49,6 +49,11 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin } func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { + if as.option.AiClusterId != "" { + // TODO database operation Find + return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil + } + resources, err := as.findClustersWithResources() if err != nil { return nil, err @@ -58,13 +63,20 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { } params := ¶m.Params{Resources: resources} - if len(resources) < 2 /*|| as.task */ { - strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params /*, Replicas: 1*/}) + if len(resources) == 1 { + return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil + } + + switch as.option.StrategyName { + case strategy.REPLICATION: + strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: 1}) + return strategy, nil + case strategy.RESOURCE_PRICING: + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil } - strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params}) - return strategy, nil + return nil, errors.New("no strategy has been chosen") } func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { @@ -84,10 +96,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return nil } -func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceSpecs, error) { - var resourceSpecs []*collector.ResourceSpecs +func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { + var resourceSpecs []*collector.ResourceStats for _, resourceCollector := range *as.ResourceCollector { - spec, err := resourceCollector.GetResourceSpecs() + spec, err := resourceCollector.GetResourceStats() if err != nil { continue } diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index cd0d36fb..9512b3e9 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -1,15 +1,17 @@ package option type AiOption struct { - //AiType string // shuguangAi/octopus - ResourceType string // cpu/gpu/compute card - TaskType string // pytorch/tensorflow - DatasetsType string - CodeType string + AiClusterId string // shuguangAi /octopus ClusterId + ResourceType string // cpu/gpu/compute card + TaskType string // pytorch/tensorflow/mindspore + DatasetsName string // mnist/imageNet/iris + StrategyName string + ClusterToStaticWeight map[string]int32 + CodeType string - ImageId string - SpecId string - DatasetsId string + ImageId string + SpecId string + //DatasetsId string CodeId string ResourceId string @@ -19,4 +21,5 @@ type AiOption struct { Datasets string Code string + Model interface{} } diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 2ea57cec..01e41685 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -9,24 +9,24 @@ import ( ) const ( - OCTOPUS = "Octopus" - MODELARTS = "Modelarts" - SHUGUANGAI = "ShuguangAi" + OCTOPUS = "octopus" + MODELARTS = "modelarts" + SHUGUANGAI = "shuguangAi" ) var ( AiTypeMap = map[string]string{ - "Hanwuji": OCTOPUS, - "Suiyan": OCTOPUS, - "Sailingsi": OCTOPUS, - "Modelarts-CloudBrain2": MODELARTS, - "ShuguangAi": SHUGUANGAI, + "hanwuji": OCTOPUS, + "suiyan": OCTOPUS, + "sailingsi": OCTOPUS, + "modelarts-CloudBrain2": MODELARTS, + "shuguangAi": SHUGUANGAI, } ) -func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.ResourceCollector) { +func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.AiCollector) { executorMap := make(map[string]executor.AiExecutor) - collectorMap := make(map[string]collector.ResourceCollector) + collectorMap := make(map[string]collector.AiCollector) for k, v := range AiTypeMap { switch v { case OCTOPUS: diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index 0f1d5720..31a144a6 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -1,10 +1,11 @@ package collector -type ResourceCollector interface { - GetResourceSpecs() (*ResourceSpecs, error) +type AiCollector interface { + GetResourceStats() (*ResourceStats, error) + GetDatasetsSpecs() ([]*DatasetsSpecs, error) } -type ResourceSpecs struct { +type ResourceStats struct { ParticipantId int64 Name string CpuAvail float64 @@ -20,3 +21,8 @@ type Card struct { Name string TOpsAtFp16 float64 } + +type DatasetsSpecs struct { + Name string + Size string +} diff --git a/api/internal/scheduler/service/executor/aiExecutor.go b/api/internal/scheduler/service/executor/aiExecutor.go index abe91b0c..d1c2090c 100644 --- a/api/internal/scheduler/service/executor/aiExecutor.go +++ b/api/internal/scheduler/service/executor/aiExecutor.go @@ -2,10 +2,8 @@ package executor import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink" ) type AiExecutor interface { Execute(option *option.AiOption) (interface{}, error) - storeLink.Linkage } diff --git a/api/internal/scheduler/strategy/param/params.go b/api/internal/scheduler/strategy/param/params.go index 78270fc0..90ae3b74 100644 --- a/api/internal/scheduler/strategy/param/params.go +++ b/api/internal/scheduler/strategy/param/params.go @@ -5,5 +5,5 @@ import ( ) type Params struct { - Resources []*collector.ResourceSpecs + Resources []*collector.ResourceStats } diff --git a/api/internal/scheduler/strategy/param/resourcePricing.go b/api/internal/scheduler/strategy/param/resourcePricing.go index 570e4422..3f89fd73 100644 --- a/api/internal/scheduler/strategy/param/resourcePricing.go +++ b/api/internal/scheduler/strategy/param/resourcePricing.go @@ -5,13 +5,13 @@ import ( ) type ResourcePricingParams struct { - replicas int32 + Replicas int32 task *providerPricing.Task *Params } func (r *ResourcePricingParams) GetReplicas() int32 { - return r.replicas + return r.Replicas } func (r *ResourcePricingParams) GetTask() *providerPricing.Task { diff --git a/api/internal/scheduler/strategy/singleAssignment.go b/api/internal/scheduler/strategy/singleAssignment.go new file mode 100644 index 00000000..15d79331 --- /dev/null +++ b/api/internal/scheduler/strategy/singleAssignment.go @@ -0,0 +1,11 @@ +package strategy + +type SingleAssignment struct { + Cluster *AssignedCluster +} + +func (s *SingleAssignment) Schedule() ([]*AssignedCluster, error) { + var results []*AssignedCluster + results = append(results, s.Cluster) + return results, nil +} diff --git a/api/internal/scheduler/strategy/strategy.go b/api/internal/scheduler/strategy/strategy.go index af23fbf2..ef9123cc 100644 --- a/api/internal/scheduler/strategy/strategy.go +++ b/api/internal/scheduler/strategy/strategy.go @@ -1,5 +1,16 @@ package strategy +const ( + REPLICATION = "replication" + RESOURCE_PRICING = "resourcePricing" + STATIC_WEIGHT = "staticWeight" + DYNAMIC_WEIGHT = "dynamicWeight" +) + +var ( + strategyNames = []string{REPLICATION, RESOURCE_PRICING} +) + type Strategy interface { Schedule() ([]*AssignedCluster, error) } @@ -9,3 +20,7 @@ type AssignedCluster struct { Name string Replicas int32 } + +func GetStrategyNames() []string { + return strategyNames +} diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index 14a8a181..c3b4cda1 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -38,24 +38,24 @@ func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, name stri return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100} } -func (o *ModelArtsLink) UploadImage(path string) (interface{}, error) { +func (m *ModelArtsLink) UploadImage(path string) (interface{}, error) { //TODO modelArts上传镜像 return nil, nil } -func (o *ModelArtsLink) DeleteImage(imageId string) (interface{}, error) { +func (m *ModelArtsLink) DeleteImage(imageId string) (interface{}, error) { // TODO modelArts删除镜像 return nil, nil } -func (o *ModelArtsLink) QueryImageList() (interface{}, error) { +func (m *ModelArtsLink) QueryImageList() (interface{}, error) { // modelArts获取镜像列表 req := &modelarts.ListRepoReq{ Offset: "0", - Limit: strconv.Itoa(int(o.pageSize)), - Platform: o.platform, + Limit: strconv.Itoa(int(m.pageSize)), + Platform: m.platform, } - resp, err := o.svcCtx.ModelArtsImgRpc.ListReposDetails(o.ctx, req) + resp, err := m.svcCtx.ModelArtsImgRpc.ListReposDetails(m.ctx, req) if err != nil { return nil, err } @@ -63,7 +63,7 @@ func (o *ModelArtsLink) QueryImageList() (interface{}, error) { return resp, nil } -func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { +func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { // modelArts提交任务 environments := make(map[string]string) parameters := make([]*modelarts.ParametersTrainJob, 0) @@ -98,9 +98,9 @@ func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa NodeCount: 1, }, }, - Platform: o.platform, + Platform: m.platform, } - resp, err := o.svcCtx.ModelArtsRpc.CreateTrainingJob(o.ctx, req) + resp, err := m.svcCtx.ModelArtsRpc.CreateTrainingJob(m.ctx, req) if err != nil { return nil, err } @@ -108,13 +108,13 @@ func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa return resp, nil } -func (o *ModelArtsLink) QueryTask(taskId string) (interface{}, error) { +func (m *ModelArtsLink) QueryTask(taskId string) (interface{}, error) { // 获取任务 req := &modelarts.DetailTrainingJobsReq{ TrainingJobId: taskId, - Platform: o.platform, + Platform: m.platform, } - resp, err := o.svcCtx.ModelArtsRpc.GetTrainingJobs(o.ctx, req) + resp, err := m.svcCtx.ModelArtsRpc.GetTrainingJobs(m.ctx, req) if err != nil { return nil, err } @@ -122,13 +122,13 @@ func (o *ModelArtsLink) QueryTask(taskId string) (interface{}, error) { return resp, nil } -func (o *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) { +func (m *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) { // 删除任务 req := &modelarts.DeleteTrainingJobReq{ TrainingJobId: taskId, - Platform: o.platform, + Platform: m.platform, } - resp, err := o.svcCtx.ModelArtsRpc.DeleteTrainingJob(o.ctx, req) + resp, err := m.svcCtx.ModelArtsRpc.DeleteTrainingJob(m.ctx, req) if err != nil { return nil, err } @@ -136,12 +136,12 @@ func (o *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) { return resp, nil } -func (o *ModelArtsLink) QuerySpecs() (interface{}, error) { +func (m *ModelArtsLink) QuerySpecs() (interface{}, error) { // octopus查询资源规格 req := &modelarts.TrainingJobFlavorsReq{ - Platform: o.platform, + Platform: m.platform, } - resp, err := o.svcCtx.ModelArtsRpc.GetTrainingJobFlavors(o.ctx, req) + resp, err := m.svcCtx.ModelArtsRpc.GetTrainingJobFlavors(m.ctx, req) if err != nil { return nil, err } @@ -149,14 +149,74 @@ func (o *ModelArtsLink) QuerySpecs() (interface{}, error) { return resp, nil } -func (o *ModelArtsLink) GetResourceSpecs() (*collector.ResourceSpecs, error) { +func (m *ModelArtsLink) GetResourceStats() (*collector.ResourceStats, error) { return nil, nil } -func (o *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) { - task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType) +func (m *ModelArtsLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { + return nil, nil +} + +func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) { + err := m.GenerateSubmitParams(option) + if err != nil { + return nil, err + } + task, err := m.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType) if err != nil { return nil, err } return task, nil } + +func (m *ModelArtsLink) GenerateSubmitParams(option *option.AiOption) error { + err := m.generateResourceId(option) + if err != nil { + return err + } + err = m.generateImageId(option) + if err != nil { + return err + } + err = m.generateCmd(option) + if err != nil { + return err + } + err = m.generateEnv(option) + if err != nil { + return err + } + err = m.generateParams(option) + if err != nil { + return err + } + return nil +} + +func (m *ModelArtsLink) generateResourceId(option *option.AiOption) error { + _, err := m.QuerySpecs() + if err != nil { + return err + } + return nil +} + +func (m *ModelArtsLink) generateImageId(option *option.AiOption) error { + + return nil +} + +func (m *ModelArtsLink) generateCmd(option *option.AiOption) error { + + return nil +} + +func (m *ModelArtsLink) generateEnv(option *option.AiOption) error { + + return nil +} + +func (m *ModelArtsLink) generateParams(option *option.AiOption) error { + + return nil +} diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index cdc97ea9..9448d897 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -16,6 +16,7 @@ package storeLink import ( "context" + "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" @@ -196,14 +197,88 @@ func (o *OctopusLink) QuerySpecs() (interface{}, error) { return resp, nil } -func (o *OctopusLink) GetResourceSpecs() (*collector.ResourceSpecs, error) { +func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) { return nil, nil } +func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { + req := &octopus.GetMyDatasetListReq{ + Platform: o.platform, + PageIndex: o.pageIndex, + PageSize: o.pageSize, + } + resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req) + if err != nil { + return nil, err + } + if !resp.Success { + return nil, errors.New(resp.Error.Message) + } + specs := []*collector.DatasetsSpecs{} + for _, dataset := range resp.Payload.Datasets { + spec := &collector.DatasetsSpecs{Name: dataset.Name} + specs = append(specs, spec) + } + return specs, nil +} + func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) { - task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType) + err := o.GenerateSubmitParams(option) + if err != nil { + return nil, err + } + task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType) if err != nil { return nil, err } return task, nil } + +func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error { + err := o.generateResourceId(option) + if err != nil { + return err + } + err = o.generateImageId(option) + if err != nil { + return err + } + err = o.generateCmd(option) + if err != nil { + return err + } + err = o.generateEnv(option) + if err != nil { + return err + } + err = o.generateParams(option) + if err != nil { + return err + } + return nil +} + +func (o *OctopusLink) generateResourceId(option *option.AiOption) error { + + return nil +} + +func (o *OctopusLink) generateImageId(option *option.AiOption) error { + + return nil +} + +func (o *OctopusLink) generateCmd(option *option.AiOption) error { + + return nil +} + +func (o *OctopusLink) generateEnv(option *option.AiOption) error { + + return nil +} + +func (o *OctopusLink) generateParams(option *option.AiOption) error { + + return nil +} diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index ad430d55..eef573bf 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -48,6 +48,7 @@ const ( WorkPath = "/work/home/acgnnmfbwo/111111/py/" TimeoutLimit = "10:00:00" PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py" + DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset" ) func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangAi { @@ -131,14 +132,30 @@ func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string return resp, nil } +func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { + //req := &hpcAC.SubmitTensorflowTaskReq{ + // Params: &hpcAC.SubmitTensorflowTaskParams{ + // + // } + //} + return nil, nil +} + func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) { // shuguangAi提交任务 - if aiType == PYTORCH { + switch aiType { + case PYTORCH_TASK: task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId) if err != nil { return nil, err } return task, nil + case TENSORFLOW_TASK: + task, err := s.SubmitTensorflowTask(imageId, cmd, envs, params, resourceId) + if err != nil { + return nil, err + } + return task, nil } return nil, errors.New("shuguangAi不支持的任务类型") } @@ -169,13 +186,13 @@ func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) { return resp, nil } -func (o *ShuguangAi) QuerySpecs() (interface{}, error) { +func (s *ShuguangAi) QuerySpecs() (interface{}, error) { // ShuguangAi查询资源规格 req := &hpcAC.GetResourceSpecReq{ AcceleratorType: DCU, ResourceGroup: RESOURCE_GROUP, } - specs, err := o.svcCtx.ACRpc.GetResourceSpec(o.ctx, req) + specs, err := s.svcCtx.ACRpc.GetResourceSpec(s.ctx, req) if err != nil { return nil, err } @@ -183,41 +200,106 @@ func (o *ShuguangAi) QuerySpecs() (interface{}, error) { return specs, nil } -func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) { +func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { userReq := &hpcAC.GetUserInfoReq{} - userinfo, err := o.svcCtx.ACRpc.GetUserInfo(o.ctx, userReq) + userinfo, err := s.svcCtx.ACRpc.GetUserInfo(s.ctx, userReq) if err != nil { return nil, err } limitReq := &hpcAC.QueueReq{} - _, err = o.svcCtx.ACRpc.QueryUserQuotasLimit(o.ctx, limitReq) + _, err = s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) if err != nil { return nil, err } diskReq := &hpcAC.ParaStorQuotaReq{} - _, err = o.svcCtx.ACRpc.ParaStorQuota(o.ctx, diskReq) + _, err = s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) if err != nil { return nil, err } balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) - _ = &collector.ResourceSpecs{ - ParticipantId: o.participantId, - Name: o.platform, + _ = &collector.ResourceStats{ + ParticipantId: s.participantId, + Name: s.platform, Balance: balance, } return nil, nil } -func (o *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) { - o.generateSubmitParams(option) - task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType) +func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { + req := &hpcAC.GetFileListReq{Limit: 100, Path: DATASETS_DIR, Start: 0} + list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req) + if err != nil { + return nil, err + } + if list.Code != "0" { + return nil, errors.New(list.Msg) + } + specs := []*collector.DatasetsSpecs{} + for _, file := range list.Data.FileList { + spec := &collector.DatasetsSpecs{Name: file.Name, Size: strconv.FormatInt(file.Size, 10)} + specs = append(specs, spec) + } + return specs, nil +} + +func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) { + err := s.GenerateSubmitParams(option) + if err != nil { + return nil, err + } + task, err := s.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType) if err != nil { return nil, err } return task, nil } -func (o *ShuguangAi) generateSubmitParams(option *option.AiOption) { - +func (s *ShuguangAi) GenerateSubmitParams(option *option.AiOption) error { + err := s.generateResourceId(option) + if err != nil { + return err + } + err = s.generateImageId(option) + if err != nil { + return err + } + err = s.generateCmd(option) + if err != nil { + return err + } + err = s.generateEnv(option) + if err != nil { + return err + } + err = s.generateParams(option) + if err != nil { + return err + } + return nil +} + +func (s *ShuguangAi) generateResourceId(option *option.AiOption) error { + + return nil +} + +func (s *ShuguangAi) generateImageId(option *option.AiOption) error { + + return nil +} + +func (s *ShuguangAi) generateCmd(option *option.AiOption) error { + + return nil +} + +func (s *ShuguangAi) generateEnv(option *option.AiOption) error { + + return nil +} + +func (s *ShuguangAi) generateParams(option *option.AiOption) error { + + return nil } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index 3a644d8e..80714797 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -18,6 +18,8 @@ import ( "context" "github.com/pkg/errors" "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/common" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" @@ -51,6 +53,11 @@ const ( MODELARTS = "Modelarts" SHUGUANGAI = "ShuguangAi" SHUGUANGHPC = "ShuguangHpc" + CPU = "cpu" + GPU = "gpu" + CARD = "computeCard" + PYTORCH_TASK = "pytorch" + TENSORFLOW_TASK = "tensorflow" ) var ( @@ -65,6 +72,9 @@ var ( "3": SHUGUANGAI, "4": SHUGUANGHPC, } + resourceTypes = []string{CPU, GPU, CARD} + taskTypes = []string{PYTORCH_TASK, TENSORFLOW_TASK} + ERROR_RESP_EMPTY = errors.New("resp empty error") ERROR_CONVERT_EMPTY = errors.New("convert empty error") ) @@ -104,6 +114,44 @@ func GetParticipantById(partId int64, dbEngin *gorm.DB) *models.StorelinkCenter return &participant } +func GetResourceTypes() []string { + return resourceTypes +} + +func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string, error) { + var names []string + //errCount := 0 + colMap := *collectorMap + for _, col := range colMap { + var ns []string + specs, err := col.GetDatasetsSpecs() + if err != nil { + return nil, errors.New("failed to acquire datasets list") + } + for _, spec := range specs { + ns = append(ns, spec.Name) + } + if len(ns) == 0 { + continue + } + if len(names) == 0 { + names = ns + continue + } + + names = common.IntersectString(names, ns) + } + //if (len(*collectorMap) - errCount) < 2 { + // + //} + names = common.RemoveDuplicates(names) + return names, nil +} + +func GetTaskTypes() []string { + return taskTypes +} + func ConvertType(in interface{}, out interface{}, participant *models.StorelinkCenter) (interface{}, error) { switch (interface{})(in).(type) { diff --git a/go.mod b/go.mod index d043b053..9726e06a 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/rs/zerolog v1.28.0 github.com/shopspring/decimal v1.3.1 github.com/zeromicro/go-zero v1.6.0 - gitlink.org.cn/jcce-pcm/pcm-ac v0.0.0-20231207111119-cdecc6b118c8 + gitlink.org.cn/jcce-pcm/pcm-ac v0.0.0-20240201033409-2d4e27a90c39 gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes v0.0.0-20231214084401-de9ac5db7246 gitlink.org.cn/jcce-pcm/pcm-participant-modelarts v0.0.0-20231101085149-724c7c4cc090