From 6c8dae6a780261b279e91e0e8fecd182843a75db Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 22 Jan 2024 23:15:01 +0800 Subject: [PATCH 1/5] modified ai scheduler struct Former-commit-id: 761d378c4d78dd20bc749e077c44a3b9ef772412 --- pkg/scheduler/collector/acCollector.go | 7 +++++-- pkg/scheduler/collector/collector.go | 3 ++- pkg/scheduler/scheduler.go | 15 +++++++++------ 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/pkg/scheduler/collector/acCollector.go b/pkg/scheduler/collector/acCollector.go index fdeee694..8107a4c4 100644 --- a/pkg/scheduler/collector/acCollector.go +++ b/pkg/scheduler/collector/acCollector.go @@ -1,13 +1,16 @@ package collector +import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + //单条作业费=作业运行秒数×(CPU核心数*CPU单价+GPU卡数×GPU单价+DCU卡数×DCU单价)/3600 //CPU单价=队列CPU费率×计算中心CPU单价 //GPU单价=队列GPU费率×计算中心GPU单价 //DCU单价=队列DCU费率×计算中心DCU单价 type ShuguangAiCollector struct { + ACRpc hpcacclient.HpcAC } -func (a *ShuguangAiCollector) getResourceSpecs() { - +func (a *ShuguangAiCollector) getResourceSpecs() (*ResourceSpecs, error) { + return nil, nil } diff --git a/pkg/scheduler/collector/collector.go b/pkg/scheduler/collector/collector.go index 73bcffd0..06f31d24 100644 --- a/pkg/scheduler/collector/collector.go +++ b/pkg/scheduler/collector/collector.go @@ -1,7 +1,7 @@ package collector type ResourceCollector interface { - getResourceSpecs() ([]ResourceSpecs, error) + getResourceSpecs() (*ResourceSpecs, error) } type ResourceSpecs struct { @@ -10,6 +10,7 @@ type ResourceSpecs struct { DiskAvail float64 GpuAvail float64 CardAvail []Card + Balance float64 } type Card struct { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index e04952a9..b981f37a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -19,6 +19,7 @@ import ( "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" "sigs.k8s.io/yaml" @@ -26,12 +27,14 @@ import ( ) type scheduler struct { - task *response.TaskInfo - participantIds []int64 - scheduleService scheduleService - dbEngin *gorm.DB - result []string //pID:子任务yamlstring 键值对 - participantRpc participantservice.ParticipantService + task *response.TaskInfo + participantIds []int64 + scheduleService scheduleService + dbEngin *gorm.DB + result []string //pID:子任务yamlstring 键值对 + participantRpc participantservice.ParticipantService + resourceCollectors []collector.ResourceCollector + //storelink } func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*scheduler, error) { From 0c87a541c51d2b4468643cb45a4d465481ab7c89 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 23 Jan 2024 11:42:10 +0800 Subject: [PATCH 2/5] scheduler refactor updated Former-commit-id: 95e2b32695d5f67ecdee979ccc2e483fc8847f40 --- api/internal/mqs/ScheduleAi.go | 31 ++++++------- pkg/scheduler/aiScheduler.go | 15 ++++--- pkg/scheduler/collector/collector.go | 13 +++--- pkg/scheduler/scheduler.go | 66 +++++++++++++++------------- 4 files changed, 65 insertions(+), 60 deletions(-) diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index ec5c942e..7498a513 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -17,7 +17,8 @@ package mqs import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" - scheduler2 "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" ) /* @@ -25,36 +26,32 @@ import ( Listening to the payment flow status change notification message queue */ type AiQueue struct { - ctx context.Context - svcCtx *svc.ServiceContext + ctx context.Context + svcCtx *svc.ServiceContext + scheduler *scheduler.Scheduler } func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { + acCollector := &collector.ShuguangAiCollector{ACRpc: svcCtx.ACRpc} + resourceCollectiors := []collector.ResourceCollector{acCollector} return &AiQueue{ - ctx: ctx, - svcCtx: svcCtx, + ctx: ctx, + svcCtx: svcCtx, + scheduler: scheduler.NewScheduler2(resourceCollectiors, nil), } } func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - aiSchdl := scheduler2.NewAiScheduler(val) - schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin, nil) - if err != nil { - return err - } - schdl.MatchLabels() + aiSchdl := scheduler.NewAiScheduler(val, nil) + + //schdl.MatchLabels() // 调度算法 - err = schdl.AssignAndSchedule() + err := l.scheduler.AssignAndSchedule(aiSchdl) if err != nil { return err } - // 存储数据 - err = schdl.SaveToDb() - if err != nil { - return err - } return nil } diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/aiScheduler.go index 52708f30..8f63c39c 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/aiScheduler.go @@ -24,12 +24,14 @@ import ( ) type AiScheduler struct { - yamlString string - collector collector.ResourceCollector + yamlString string + resourceCollectors []collector.ResourceCollector + task *response.TaskInfo + //storelink } -func NewAiScheduler(val string) *AiScheduler { - return &AiScheduler{yamlString: val} +func NewAiScheduler(val string, resourceCollectors []collector.ResourceCollector) *AiScheduler { + return &AiScheduler{yamlString: val, resourceCollectors: resourceCollectors} } func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { @@ -44,9 +46,8 @@ func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin } func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { - //a, b := as.genTaskAndProviders() - - return nil, nil + strategy := strategies.NewReplicationStrategy(nil, 0) + return strategy, nil } func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) { diff --git a/pkg/scheduler/collector/collector.go b/pkg/scheduler/collector/collector.go index 06f31d24..5ce28493 100644 --- a/pkg/scheduler/collector/collector.go +++ b/pkg/scheduler/collector/collector.go @@ -5,12 +5,13 @@ type ResourceCollector interface { } type ResourceSpecs struct { - CpuAvail float64 - MemAvail float64 - DiskAvail float64 - GpuAvail float64 - CardAvail []Card - Balance float64 + ParticipantId int64 + CpuAvail float64 + MemAvail float64 + DiskAvail float64 + GpuAvail float64 + CardAvail []Card + Balance float64 } type Card struct { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b981f37a..3aae57e9 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -20,13 +20,14 @@ import ( "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" "sigs.k8s.io/yaml" "strings" ) -type scheduler struct { +type Scheduler struct { task *response.TaskInfo participantIds []int64 scheduleService scheduleService @@ -34,19 +35,24 @@ type scheduler struct { result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService resourceCollectors []collector.ResourceCollector + storages []database.Storage //storelink } -func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*scheduler, error) { +func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { var task *response.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } - return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil + return &Scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func (s *scheduler) SpecifyClusters() { +func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage) *Scheduler { + return &Scheduler{resourceCollectors: resourceCollectors, storages: storages} +} + +func (s *Scheduler) SpecifyClusters() { // 如果已指定集群名,通过数据库查询后返回p端ip列表 if len(s.task.Clusters) != 0 { s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds) @@ -54,7 +60,7 @@ func (s *scheduler) SpecifyClusters() { } } -func (s *scheduler) SpecifyNsID() { +func (s *Scheduler) SpecifyNsID() { // 未指定集群名,只指定nsID if len(s.task.Clusters) == 0 { if len(s.task.NsID) != 0 { @@ -70,7 +76,7 @@ func (s *scheduler) SpecifyNsID() { } } -func (s *scheduler) MatchLabels() { +func (s *Scheduler) MatchLabels() { var ids []int64 count := 0 @@ -93,7 +99,7 @@ func (s *scheduler) MatchLabels() { } // TempAssign todo 屏蔽原调度算法 -func (s *scheduler) TempAssign() error { +func (s *Scheduler) TempAssign() error { //需要判断task中的资源类型,针对metadata中的多个kind做不同处理 //输入副本数和集群列表,最终结果输出为pID对应副本数量列表,针对多个kind需要做拆分和重新拼接组合 @@ -113,28 +119,28 @@ func (s *scheduler) TempAssign() error { return nil } -func (s *scheduler) AssignAndSchedule() error { - // 已指定 ParticipantId - if s.task.ParticipantId != 0 { - return nil - } - // 标签匹配以及后,未找到ParticipantIds - if len(s.participantIds) == 0 { - return errors.New("未找到匹配的ParticipantIds") - } +func (s *Scheduler) AssignAndSchedule(ss scheduleService) error { + //// 已指定 ParticipantId + //if s.task.ParticipantId != 0 { + // return nil + //} + //// 标签匹配以及后,未找到ParticipantIds + //if len(s.participantIds) == 0 { + // return errors.New("未找到匹配的ParticipantIds") + //} + // + //// 指定或者标签匹配的结果只有一个集群,给任务信息指定 + //if len(s.participantIds) == 1 { + // s.task.ParticipantId = s.participantIds[0] + // //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + // //result := make(map[int64]string) + // //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) + // //s.result = result + // + // return nil + //} - // 指定或者标签匹配的结果只有一个集群,给任务信息指定 - if len(s.participantIds) == 1 { - s.task.ParticipantId = s.participantIds[0] - //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) - //result := make(map[int64]string) - //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) - //s.result = result - - return nil - } - - strategy, err := s.scheduleService.pickOptimalStrategy() + strategy, err := ss.pickOptimalStrategy() if err != nil { return err } @@ -150,7 +156,7 @@ func (s *scheduler) AssignAndSchedule() error { // return nil //} - err = s.scheduleService.assignTask(clusters) + err = ss.assignTask(clusters) if err != nil { return err } @@ -158,7 +164,7 @@ func (s *scheduler) AssignAndSchedule() error { return nil } -func (s *scheduler) SaveToDb() error { +func (s *Scheduler) SaveToDb() error { for _, participantId := range s.participantIds { From 73c2c4346897af56912645f073bd27c111772753 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 23 Jan 2024 17:51:11 +0800 Subject: [PATCH 3/5] scheduler refactor updated Former-commit-id: 8d2fffb6f6b50350549876e18a36527231d12dc2 --- api/internal/mqs/ScheduleAi.go | 11 ++-- api/internal/mqs/ScheduleHpc.go | 20 ------- pkg/scheduler/aiScheduler.go | 58 ++++++++++++++++--- .../{ => proxy}/collector/acCollector.go | 3 +- .../{ => proxy}/collector/collector.go | 3 +- pkg/scheduler/proxy/executor/acExecutor.go | 32 ++++++++++ pkg/scheduler/proxy/executor/executor.go | 17 ++++++ pkg/scheduler/scheduler.go | 9 +-- 8 files changed, 114 insertions(+), 39 deletions(-) rename pkg/scheduler/{ => proxy}/collector/acCollector.go (84%) rename pkg/scheduler/{ => proxy}/collector/collector.go (83%) create mode 100644 pkg/scheduler/proxy/executor/acExecutor.go create mode 100644 pkg/scheduler/proxy/executor/executor.go diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index 7498a513..45ef84df 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -18,7 +18,8 @@ import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" ) /* @@ -34,18 +35,18 @@ type AiQueue struct { func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { acCollector := &collector.ShuguangAiCollector{ACRpc: svcCtx.ACRpc} resourceCollectiors := []collector.ResourceCollector{acCollector} + executorMap := make(map[string]executor.Executor) + executorMap["ai"] = &executor.ShuguangAiExecutor{ACRpc: svcCtx.ACRpc} return &AiQueue{ ctx: ctx, svcCtx: svcCtx, - scheduler: scheduler.NewScheduler2(resourceCollectiors, nil), + scheduler: scheduler.NewScheduler2(resourceCollectiors, nil, executorMap), } } func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - aiSchdl := scheduler.NewAiScheduler(val, nil) - - //schdl.MatchLabels() + aiSchdl, _ := scheduler.NewAiScheduler(val, l.scheduler) // 调度算法 err := l.scheduler.AssignAndSchedule(aiSchdl) diff --git a/api/internal/mqs/ScheduleHpc.go b/api/internal/mqs/ScheduleHpc.go index 1e188e92..f0b56aee 100644 --- a/api/internal/mqs/ScheduleHpc.go +++ b/api/internal/mqs/ScheduleHpc.go @@ -17,7 +17,6 @@ package mqs import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" - scheduler2 "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" ) /* @@ -37,24 +36,5 @@ func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq { } func (l *HpcMq) Consume(val string) error { - // 接受消息, 根据标签筛选过滤 - hpcSchdl := scheduler2.NewHpcScheduler(val) - schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin, nil) - if err != nil { - return err - } - schdl.MatchLabels() - - // 调度算法 - err = schdl.AssignAndSchedule() - if err != nil { - return err - } - - // 存储数据 - err = schdl.SaveToDb() - if err != nil { - return err - } return nil } diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/aiScheduler.go index 8f63c39c..d5771faf 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/aiScheduler.go @@ -15,23 +15,24 @@ package scheduler import ( + "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" ) type AiScheduler struct { - yamlString string - resourceCollectors []collector.ResourceCollector - task *response.TaskInfo - //storelink + yamlString string + task *response.TaskInfo + *Scheduler } -func NewAiScheduler(val string, resourceCollectors []collector.ResourceCollector) *AiScheduler { - return &AiScheduler{yamlString: val, resourceCollectors: resourceCollectors} +func NewAiScheduler(val string, scheduler *Scheduler) (*AiScheduler, error) { + return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil } func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { @@ -46,14 +47,55 @@ func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin } func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { - strategy := strategies.NewReplicationStrategy(nil, 0) + resources, err := as.findProvidersWithResource() + if err != nil { + return nil, err + } + + if len(resources) < 2 /*|| as.task */ { + var pros []entity.Participant + for _, resource := range resources { + pros = append(pros, entity.Participant{ + Participant_id: resource.ParticipantId, + Name: resource.Name, + }) + } + strategy := strategies.NewReplicationStrategy(nil, 0) + return strategy, nil + } + + task, providerList := as.genTaskAndProviders() + if err != nil { + return nil, nil + } + strategy := strategies.NewPricingStrategy(task, providerList...) return strategy, nil } func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) { + return nil, nil } func (as *AiScheduler) assignTask(clusters []*strategies.AssignedCluster) error { + if clusters == nil { + return errors.New("clusters is nil") + } + return nil } + +func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) { + var resourceSpecs []*collector.ResourceSpecs + for _, resourceCollector := range as.resourceCollectors { + spec, err := resourceCollector.GetResourceSpecs() + if err != nil { + continue + } + resourceSpecs = append(resourceSpecs, spec) + } + if len(resourceSpecs) == 0 { + return nil, errors.New("no resource found") + } + return resourceSpecs, nil +} diff --git a/pkg/scheduler/collector/acCollector.go b/pkg/scheduler/proxy/collector/acCollector.go similarity index 84% rename from pkg/scheduler/collector/acCollector.go rename to pkg/scheduler/proxy/collector/acCollector.go index 8107a4c4..587639cc 100644 --- a/pkg/scheduler/collector/acCollector.go +++ b/pkg/scheduler/proxy/collector/acCollector.go @@ -8,9 +8,10 @@ import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" //DCU单价=队列DCU费率×计算中心DCU单价 type ShuguangAiCollector struct { + Name string ACRpc hpcacclient.HpcAC } -func (a *ShuguangAiCollector) getResourceSpecs() (*ResourceSpecs, error) { +func (a *ShuguangAiCollector) GetResourceSpecs() (*ResourceSpecs, error) { return nil, nil } diff --git a/pkg/scheduler/collector/collector.go b/pkg/scheduler/proxy/collector/collector.go similarity index 83% rename from pkg/scheduler/collector/collector.go rename to pkg/scheduler/proxy/collector/collector.go index 5ce28493..0f1d5720 100644 --- a/pkg/scheduler/collector/collector.go +++ b/pkg/scheduler/proxy/collector/collector.go @@ -1,11 +1,12 @@ package collector type ResourceCollector interface { - getResourceSpecs() (*ResourceSpecs, error) + GetResourceSpecs() (*ResourceSpecs, error) } type ResourceSpecs struct { ParticipantId int64 + Name string CpuAvail float64 MemAvail float64 DiskAvail float64 diff --git a/pkg/scheduler/proxy/executor/acExecutor.go b/pkg/scheduler/proxy/executor/acExecutor.go new file mode 100644 index 00000000..cd7b6916 --- /dev/null +++ b/pkg/scheduler/proxy/executor/acExecutor.go @@ -0,0 +1,32 @@ +package executor + +import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + +type ShuguangAiExecutor struct { + Name string + ACRpc hpcacclient.HpcAC +} + +func (s ShuguangAiExecutor) QueryImageList() ([]Image, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) QueryTask(taskId string) (Task, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) QuerySpecs() (Spec, error) { + //TODO implement me + panic("implement me") +} + +func NewShuguangAiExecutor(name string, acRpc hpcacclient.HpcAC) *ShuguangAiExecutor { + return &ShuguangAiExecutor{Name: name, ACRpc: acRpc} +} diff --git a/pkg/scheduler/proxy/executor/executor.go b/pkg/scheduler/proxy/executor/executor.go new file mode 100644 index 00000000..61055927 --- /dev/null +++ b/pkg/scheduler/proxy/executor/executor.go @@ -0,0 +1,17 @@ +package executor + +type Executor interface { + QueryImageList() ([]Image, error) + SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error) + QueryTask(taskId string) (Task, error) + QuerySpecs() (Spec, error) +} + +type Image struct { +} + +type Task struct { +} + +type Spec struct { +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3aae57e9..3efebaba 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -19,8 +19,9 @@ import ( "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" "sigs.k8s.io/yaml" @@ -36,7 +37,7 @@ type Scheduler struct { participantRpc participantservice.ParticipantService resourceCollectors []collector.ResourceCollector storages []database.Storage - //storelink + aiExecutor map[string]executor.Executor } func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { @@ -48,8 +49,8 @@ func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, return &Scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage) *Scheduler { - return &Scheduler{resourceCollectors: resourceCollectors, storages: storages} +func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler { + return &Scheduler{resourceCollectors: resourceCollectors, storages: storages, aiExecutor: aiExecutor} } func (s *Scheduler) SpecifyClusters() { From b9e79dc671f13752fa561245e972f2e540da56e9 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 24 Jan 2024 11:16:59 +0800 Subject: [PATCH 4/5] scheduler refactor modified Former-commit-id: 1e70763e7f5008da3ad10efcebb0d5338d8665ca --- api/internal/mqs/ScheduleAi.go | 7 +++-- api/internal/mqs/ScheduleCloud.go | 3 +- pkg/scheduler/{ => common}/common.go | 16 +++++----- pkg/scheduler/scheduler.go | 29 ++++++++++--------- pkg/scheduler/{ => schedulers}/aiScheduler.go | 23 ++++++++------- .../{ => schedulers}/cloudScheduler.go | 12 ++++---- .../{ => schedulers}/hpcScheduler.go | 10 +++---- pkg/scheduler/schedulers/vmScheduler.go | 24 +++++++++++++++ .../collector/acCollector.go | 0 .../{proxy => service}/collector/collector.go | 0 .../{proxy => service}/executor/acExecutor.go | 0 .../{proxy => service}/executor/executor.go | 0 .../{strategies => strategy}/replication.go | 2 +- .../resourcePricing.go | 2 +- .../{strategies => strategy}/staticWeight.go | 2 +- .../{strategies => strategy}/strategy.go | 2 +- 16 files changed, 80 insertions(+), 52 deletions(-) rename pkg/scheduler/{ => common}/common.go (76%) rename pkg/scheduler/{ => schedulers}/aiScheduler.go (76%) rename pkg/scheduler/{ => schedulers}/cloudScheduler.go (90%) rename pkg/scheduler/{ => schedulers}/hpcScheduler.go (82%) create mode 100644 pkg/scheduler/schedulers/vmScheduler.go rename pkg/scheduler/{proxy => service}/collector/acCollector.go (100%) rename pkg/scheduler/{proxy => service}/collector/collector.go (100%) rename pkg/scheduler/{proxy => service}/executor/acExecutor.go (100%) rename pkg/scheduler/{proxy => service}/executor/executor.go (100%) rename pkg/scheduler/{strategies => strategy}/replication.go (97%) rename pkg/scheduler/{strategies => strategy}/resourcePricing.go (99%) rename pkg/scheduler/{strategies => strategy}/staticWeight.go (91%) rename pkg/scheduler/{strategies => strategy}/strategy.go (90%) diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index 45ef84df..311bee00 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -18,8 +18,9 @@ import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" ) /* @@ -46,7 +47,7 @@ func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - aiSchdl, _ := scheduler.NewAiScheduler(val, l.scheduler) + aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler) // 调度算法 err := l.scheduler.AssignAndSchedule(aiSchdl) diff --git a/api/internal/mqs/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go index c40a5e9b..cb7c9d7b 100644 --- a/api/internal/mqs/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -18,6 +18,7 @@ import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers" ) /* @@ -37,7 +38,7 @@ func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq { func (l *CloudMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - cloudScheduler := scheduler.NewCloudScheduler() + cloudScheduler := schedulers.NewCloudScheduler() schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) if err != nil { return err diff --git a/pkg/scheduler/common.go b/pkg/scheduler/common/common.go similarity index 76% rename from pkg/scheduler/common.go rename to pkg/scheduler/common/common.go index 7b2cab8e..fb71cc8c 100644 --- a/pkg/scheduler/common.go +++ b/pkg/scheduler/common/common.go @@ -12,23 +12,23 @@ */ -package scheduler +package common import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" "math/rand" "time" ) -type scheduleService interface { - getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) - pickOptimalStrategy() (strategies.Strategy, error) - assignTask(clusters []*strategies.AssignedCluster) error +type SubSchedule interface { + GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) + PickOptimalStrategy() (strategy.Strategy, error) + AssignTask(clusters []*strategy.AssignedCluster) error } // 求交集 -func intersect(slice1, slice2 []int64) []int64 { +func Intersect(slice1, slice2 []int64) []int64 { m := make(map[int64]int) nn := make([]int64, 0) for _, v := range slice1 { @@ -44,7 +44,7 @@ func intersect(slice1, slice2 []int64) []int64 { return nn } -func micsSlice(origin []int64, count int) []int64 { +func MicsSlice(origin []int64, count int) []int64 { tmpOrigin := make([]int64, len(origin)) copy(tmpOrigin, origin) //一定要seed diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3efebaba..1407568e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -19,9 +19,10 @@ import ( "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/common" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" "sigs.k8s.io/yaml" @@ -31,26 +32,26 @@ import ( type Scheduler struct { task *response.TaskInfo participantIds []int64 - scheduleService scheduleService + subSchedule common.SubSchedule dbEngin *gorm.DB result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService - resourceCollectors []collector.ResourceCollector - storages []database.Storage - aiExecutor map[string]executor.Executor + ResourceCollectors []collector.ResourceCollector + Storages []database.Storage + AiExecutor map[string]executor.Executor } -func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { +func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { var task *response.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } - return &Scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil + return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil } func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler { - return &Scheduler{resourceCollectors: resourceCollectors, storages: storages, aiExecutor: aiExecutor} + return &Scheduler{ResourceCollectors: resourceCollectors, Storages: storages, AiExecutor: aiExecutor} } func (s *Scheduler) SpecifyClusters() { @@ -90,7 +91,7 @@ func (s *Scheduler) MatchLabels() { if count == 0 { ids = participantIds } - ids = intersect(ids, participantIds) + ids = common.Intersect(ids, participantIds) count++ } s.participantIds = ids @@ -120,7 +121,7 @@ func (s *Scheduler) TempAssign() error { return nil } -func (s *Scheduler) AssignAndSchedule(ss scheduleService) error { +func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error { //// 已指定 ParticipantId //if s.task.ParticipantId != 0 { // return nil @@ -141,7 +142,7 @@ func (s *Scheduler) AssignAndSchedule(ss scheduleService) error { // return nil //} - strategy, err := ss.pickOptimalStrategy() + strategy, err := ss.PickOptimalStrategy() if err != nil { return err } @@ -157,7 +158,7 @@ func (s *Scheduler) AssignAndSchedule(ss scheduleService) error { // return nil //} - err = ss.assignTask(clusters) + err = ss.AssignTask(clusters) if err != nil { return err } @@ -170,7 +171,7 @@ func (s *Scheduler) SaveToDb() error { for _, participantId := range s.participantIds { for _, resource := range s.task.Metadata { - structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, participantId) + structForDb, err := s.subSchedule.GetNewStructForDb(s.task, resource, participantId) if err != nil { return err } diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/schedulers/aiScheduler.go similarity index 76% rename from pkg/scheduler/aiScheduler.go rename to pkg/scheduler/schedulers/aiScheduler.go index d5771faf..feef4e93 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/schedulers/aiScheduler.go @@ -12,30 +12,31 @@ */ -package scheduler +package schedulers import ( "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" ) type AiScheduler struct { yamlString string task *response.TaskInfo - *Scheduler + *scheduler.Scheduler } -func NewAiScheduler(val string, scheduler *Scheduler) (*AiScheduler, error) { +func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) { return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil } -func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { +func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { ai := models.Ai{ ParticipantId: participantId, TaskId: task.TaskId, @@ -46,7 +47,7 @@ func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin return ai, nil } -func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { +func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { resources, err := as.findProvidersWithResource() if err != nil { return nil, err @@ -60,7 +61,7 @@ func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { Name: resource.Name, }) } - strategy := strategies.NewReplicationStrategy(nil, 0) + strategy := strategy.NewReplicationStrategy(nil, 0) return strategy, nil } @@ -68,7 +69,7 @@ func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { if err != nil { return nil, nil } - strategy := strategies.NewPricingStrategy(task, providerList...) + strategy := strategy.NewPricingStrategy(task, providerList...) return strategy, nil } @@ -77,7 +78,7 @@ func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider return nil, nil } -func (as *AiScheduler) assignTask(clusters []*strategies.AssignedCluster) error { +func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { if clusters == nil { return errors.New("clusters is nil") } @@ -87,7 +88,7 @@ func (as *AiScheduler) assignTask(clusters []*strategies.AssignedCluster) error func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) { var resourceSpecs []*collector.ResourceSpecs - for _, resourceCollector := range as.resourceCollectors { + for _, resourceCollector := range as.ResourceCollectors { spec, err := resourceCollector.GetResourceSpecs() if err != nil { continue diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/schedulers/cloudScheduler.go similarity index 90% rename from pkg/scheduler/cloudScheduler.go rename to pkg/scheduler/schedulers/cloudScheduler.go index 95121182..89d46795 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/schedulers/cloudScheduler.go @@ -12,7 +12,7 @@ */ -package scheduler +package schedulers import ( "bytes" @@ -20,7 +20,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "io" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -37,17 +37,17 @@ func NewCloudScheduler() *CloudScheduler { return &CloudScheduler{} } -func (cs *CloudScheduler) pickOptimalStrategy() (strategies.Strategy, error) { +func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { task, providerList, err := cs.genTaskAndProviders() if err != nil { return nil, nil } //调度算法 - strategy := strategies.NewPricingStrategy(task, providerList...) + strategy := strategy.NewPricingStrategy(task, providerList...) return strategy, nil } -func (cs *CloudScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { +func (cs *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID) cloud.Id = utils.GenSnowflakeID() cloud.NsID = task.NsID @@ -117,6 +117,6 @@ func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*provi return nil, providerList, nil } -func (cs *CloudScheduler) assignTask(clusters []*strategies.AssignedCluster) error { +func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return nil } diff --git a/pkg/scheduler/hpcScheduler.go b/pkg/scheduler/schedulers/hpcScheduler.go similarity index 82% rename from pkg/scheduler/hpcScheduler.go rename to pkg/scheduler/schedulers/hpcScheduler.go index af6416e6..92d49d84 100644 --- a/pkg/scheduler/hpcScheduler.go +++ b/pkg/scheduler/schedulers/hpcScheduler.go @@ -12,14 +12,14 @@ */ -package scheduler +package schedulers import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" ) @@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *HpcScheduler { return &HpcScheduler{yamlString: val} } -func (h *HpcScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { +func (h *HpcScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { hpc := models.Hpc{} utils.Convert(task.Metadata, &hpc) hpc.Id = utils.GenSnowflakeID() @@ -42,7 +42,7 @@ func (h *HpcScheduler) getNewStructForDb(task *response.TaskInfo, resource strin return hpc, nil } -func (h *HpcScheduler) pickOptimalStrategy() (strategies.Strategy, error) { +func (h *HpcScheduler) PickOptimalStrategy() (strategy.Strategy, error) { return nil, nil } @@ -50,6 +50,6 @@ func (h *HpcScheduler) genTaskAndProviders(task *response.TaskInfo) (*providerPr return nil, nil } -func (h *HpcScheduler) assignTask(clusters []*strategies.AssignedCluster) error { +func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return nil } diff --git a/pkg/scheduler/schedulers/vmScheduler.go b/pkg/scheduler/schedulers/vmScheduler.go new file mode 100644 index 00000000..ad4b7de0 --- /dev/null +++ b/pkg/scheduler/schedulers/vmScheduler.go @@ -0,0 +1,24 @@ +package schedulers + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy" +) + +type VmScheduler struct { +} + +func (v VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { + //TODO implement me + panic("implement me") +} + +func (v VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) { + //TODO implement me + panic("implement me") +} + +func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { + //TODO implement me + panic("implement me") +} diff --git a/pkg/scheduler/proxy/collector/acCollector.go b/pkg/scheduler/service/collector/acCollector.go similarity index 100% rename from pkg/scheduler/proxy/collector/acCollector.go rename to pkg/scheduler/service/collector/acCollector.go diff --git a/pkg/scheduler/proxy/collector/collector.go b/pkg/scheduler/service/collector/collector.go similarity index 100% rename from pkg/scheduler/proxy/collector/collector.go rename to pkg/scheduler/service/collector/collector.go diff --git a/pkg/scheduler/proxy/executor/acExecutor.go b/pkg/scheduler/service/executor/acExecutor.go similarity index 100% rename from pkg/scheduler/proxy/executor/acExecutor.go rename to pkg/scheduler/service/executor/acExecutor.go diff --git a/pkg/scheduler/proxy/executor/executor.go b/pkg/scheduler/service/executor/executor.go similarity index 100% rename from pkg/scheduler/proxy/executor/executor.go rename to pkg/scheduler/service/executor/executor.go diff --git a/pkg/scheduler/strategies/replication.go b/pkg/scheduler/strategy/replication.go similarity index 97% rename from pkg/scheduler/strategies/replication.go rename to pkg/scheduler/strategy/replication.go index 2a699c5a..08d7e29f 100644 --- a/pkg/scheduler/strategies/replication.go +++ b/pkg/scheduler/strategy/replication.go @@ -1,4 +1,4 @@ -package strategies +package strategy import ( "github.com/pkg/errors" diff --git a/pkg/scheduler/strategies/resourcePricing.go b/pkg/scheduler/strategy/resourcePricing.go similarity index 99% rename from pkg/scheduler/strategies/resourcePricing.go rename to pkg/scheduler/strategy/resourcePricing.go index 5c620fa3..f909f62f 100644 --- a/pkg/scheduler/strategies/resourcePricing.go +++ b/pkg/scheduler/strategy/resourcePricing.go @@ -12,7 +12,7 @@ */ -package strategies +package strategy import ( "errors" diff --git a/pkg/scheduler/strategies/staticWeight.go b/pkg/scheduler/strategy/staticWeight.go similarity index 91% rename from pkg/scheduler/strategies/staticWeight.go rename to pkg/scheduler/strategy/staticWeight.go index 8e08d219..3aa5d769 100644 --- a/pkg/scheduler/strategies/staticWeight.go +++ b/pkg/scheduler/strategy/staticWeight.go @@ -1,4 +1,4 @@ -package strategies +package strategy type StaticWeightStrategy struct { // TODO: add fields diff --git a/pkg/scheduler/strategies/strategy.go b/pkg/scheduler/strategy/strategy.go similarity index 90% rename from pkg/scheduler/strategies/strategy.go rename to pkg/scheduler/strategy/strategy.go index e265acdd..1502dc21 100644 --- a/pkg/scheduler/strategies/strategy.go +++ b/pkg/scheduler/strategy/strategy.go @@ -1,4 +1,4 @@ -package strategies +package strategy type Strategy interface { Schedule() ([]*AssignedCluster, error) From 8882790591817ee675dca816f1eac53d70150de0 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 24 Jan 2024 17:59:33 +0800 Subject: [PATCH 5/5] scheduler refactor modified Former-commit-id: dbf14eb6b8c151587c8f48319a3d3a7cedbedb06 --- api/internal/mqs/ScheduleAi.go | 10 ++-- pkg/scheduler/scheduler.go | 22 ++++----- pkg/scheduler/schedulers/aiScheduler.go | 2 +- pkg/scheduler/service/aiService.go | 49 +++++++++++++++++++ .../service/collector/acCollector.go | 17 ------- pkg/scheduler/service/executor/acExecutor.go | 32 ------------ pkg/scheduler/service/impl/modelarts.go | 44 +++++++++++++++++ pkg/scheduler/service/impl/octopus.go | 42 ++++++++++++++++ pkg/scheduler/service/impl/shuguangAi.go | 45 +++++++++++++++++ pkg/scheduler/strategy/dynamicResources.go | 4 ++ 10 files changed, 199 insertions(+), 68 deletions(-) create mode 100644 pkg/scheduler/service/aiService.go delete mode 100644 pkg/scheduler/service/collector/acCollector.go delete mode 100644 pkg/scheduler/service/executor/acExecutor.go create mode 100644 pkg/scheduler/service/impl/modelarts.go create mode 100644 pkg/scheduler/service/impl/octopus.go create mode 100644 pkg/scheduler/service/impl/shuguangAi.go create mode 100644 pkg/scheduler/strategy/dynamicResources.go diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index 311bee00..6cc7dc2f 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -19,8 +19,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service" ) /* @@ -34,14 +33,11 @@ type AiQueue struct { } func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { - acCollector := &collector.ShuguangAiCollector{ACRpc: svcCtx.ACRpc} - resourceCollectiors := []collector.ResourceCollector{acCollector} - executorMap := make(map[string]executor.Executor) - executorMap["ai"] = &executor.ShuguangAiExecutor{ACRpc: svcCtx.ACRpc} + aiExecutorMap, aiCollectorMap := service.InitAiClusterMap(svcCtx.ACRpc, svcCtx.ModelArtsRpc, svcCtx.ModelArtsImgRpc, svcCtx.OctopusRpc) return &AiQueue{ ctx: ctx, svcCtx: svcCtx, - scheduler: scheduler.NewScheduler2(resourceCollectiors, nil, executorMap), + scheduler: scheduler.NewScheduler2(aiCollectorMap, nil, aiExecutorMap), } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 1407568e..49d3a150 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -30,15 +30,15 @@ import ( ) type Scheduler struct { - task *response.TaskInfo - participantIds []int64 - subSchedule common.SubSchedule - dbEngin *gorm.DB - result []string //pID:子任务yamlstring 键值对 - participantRpc participantservice.ParticipantService - ResourceCollectors []collector.ResourceCollector - Storages []database.Storage - AiExecutor map[string]executor.Executor + task *response.TaskInfo + participantIds []int64 + subSchedule common.SubSchedule + dbEngin *gorm.DB + result []string //pID:子任务yamlstring 键值对 + participantRpc participantservice.ParticipantService + ResourceCollector *map[string]collector.ResourceCollector + Storages database.Storage + AiExecutor *map[string]executor.Executor } func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { @@ -50,8 +50,8 @@ func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler { - return &Scheduler{ResourceCollectors: resourceCollectors, Storages: storages, AiExecutor: aiExecutor} +func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.Executor) *Scheduler { + return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor} } func (s *Scheduler) SpecifyClusters() { diff --git a/pkg/scheduler/schedulers/aiScheduler.go b/pkg/scheduler/schedulers/aiScheduler.go index feef4e93..d127b266 100644 --- a/pkg/scheduler/schedulers/aiScheduler.go +++ b/pkg/scheduler/schedulers/aiScheduler.go @@ -88,7 +88,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) { var resourceSpecs []*collector.ResourceSpecs - for _, resourceCollector := range as.ResourceCollectors { + for _, resourceCollector := range *as.ResourceCollector { spec, err := resourceCollector.GetResourceSpecs() if err != nil { continue diff --git a/pkg/scheduler/service/aiService.go b/pkg/scheduler/service/aiService.go new file mode 100644 index 00000000..9fa919dd --- /dev/null +++ b/pkg/scheduler/service/aiService.go @@ -0,0 +1,49 @@ +package service + +import ( + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/impl" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" +) + +const ( + OCTOPUS = "Octopus" + MODELARTS = "Modelarts" + SHUGUANGAI = "ShuguangAi" +) + +var ( + AiTypeMap = map[string]string{ + "Hanwuji": OCTOPUS, + "Suiyan": OCTOPUS, + "Sailingsi": OCTOPUS, + "modelarts-CloudBrain2": MODELARTS, + "ShuguangAi": SHUGUANGAI, + } +) + +func InitAiClusterMap(ACRpc hpcacclient.HpcAC, ModelArtsRpc modelartsservice.ModelArtsService, ModelArtsImgRpc imagesservice.ImagesService, OctopusRpc octopusclient.Octopus) (*map[string]executor.Executor, *map[string]collector.ResourceCollector) { + executorMap := make(map[string]executor.Executor) + collectorMap := make(map[string]collector.ResourceCollector) + for k, v := range AiTypeMap { + switch v { + case OCTOPUS: + octopus := impl.NewOctopusExecutor(OctopusRpc, k) + collectorMap[k] = octopus + executorMap[k] = octopus + case MODELARTS: + modelarts := impl.NewModelartsExecutor(ModelArtsRpc, ModelArtsImgRpc, k) + collectorMap[k] = modelarts + executorMap[k] = modelarts + case SHUGUANGAI: + sgai := impl.NewShuguangAiExecutor(ACRpc, k) + collectorMap[k] = sgai + executorMap[k] = sgai + } + } + return &executorMap, &collectorMap +} diff --git a/pkg/scheduler/service/collector/acCollector.go b/pkg/scheduler/service/collector/acCollector.go deleted file mode 100644 index 587639cc..00000000 --- a/pkg/scheduler/service/collector/acCollector.go +++ /dev/null @@ -1,17 +0,0 @@ -package collector - -import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" - -//单条作业费=作业运行秒数×(CPU核心数*CPU单价+GPU卡数×GPU单价+DCU卡数×DCU单价)/3600 -//CPU单价=队列CPU费率×计算中心CPU单价 -//GPU单价=队列GPU费率×计算中心GPU单价 -//DCU单价=队列DCU费率×计算中心DCU单价 - -type ShuguangAiCollector struct { - Name string - ACRpc hpcacclient.HpcAC -} - -func (a *ShuguangAiCollector) GetResourceSpecs() (*ResourceSpecs, error) { - return nil, nil -} diff --git a/pkg/scheduler/service/executor/acExecutor.go b/pkg/scheduler/service/executor/acExecutor.go deleted file mode 100644 index cd7b6916..00000000 --- a/pkg/scheduler/service/executor/acExecutor.go +++ /dev/null @@ -1,32 +0,0 @@ -package executor - -import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" - -type ShuguangAiExecutor struct { - Name string - ACRpc hpcacclient.HpcAC -} - -func (s ShuguangAiExecutor) QueryImageList() ([]Image, error) { - //TODO implement me - panic("implement me") -} - -func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error) { - //TODO implement me - panic("implement me") -} - -func (s ShuguangAiExecutor) QueryTask(taskId string) (Task, error) { - //TODO implement me - panic("implement me") -} - -func (s ShuguangAiExecutor) QuerySpecs() (Spec, error) { - //TODO implement me - panic("implement me") -} - -func NewShuguangAiExecutor(name string, acRpc hpcacclient.HpcAC) *ShuguangAiExecutor { - return &ShuguangAiExecutor{Name: name, ACRpc: acRpc} -} diff --git a/pkg/scheduler/service/impl/modelarts.go b/pkg/scheduler/service/impl/modelarts.go new file mode 100644 index 00000000..76f702b7 --- /dev/null +++ b/pkg/scheduler/service/impl/modelarts.go @@ -0,0 +1,44 @@ +package impl + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice" +) + +type ModelArtsExecutor struct { + Name string + pageIndex int32 + pageSize int32 + ModelArtsRpc modelartsservice.ModelArtsService + ModelArtsImgRpc imagesservice.ImagesService +} + +func NewModelartsExecutor(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string) *ModelArtsExecutor { + return &ModelArtsExecutor{Name: name, ModelArtsRpc: modelArtsRpc, ModelArtsImgRpc: modelArtsImgRpc, pageIndex: 1, pageSize: 100} +} + +func (m ModelArtsExecutor) QueryImageList() ([]executor.Image, error) { + //TODO implement me + panic("implement me") +} + +func (m ModelArtsExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (m ModelArtsExecutor) QueryTask(taskId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (m ModelArtsExecutor) QuerySpecs() (executor.Spec, error) { + //TODO implement me + panic("implement me") +} + +func (a *ModelArtsExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) { + return nil, nil +} diff --git a/pkg/scheduler/service/impl/octopus.go b/pkg/scheduler/service/impl/octopus.go new file mode 100644 index 00000000..a4b6c5b8 --- /dev/null +++ b/pkg/scheduler/service/impl/octopus.go @@ -0,0 +1,42 @@ +package impl + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" +) + +type OctopusExecutor struct { + Name string + pageIndex int32 + pageSize int32 + OctopusRpc octopusclient.Octopus +} + +func NewOctopusExecutor(OctopusRpc octopusclient.Octopus, name string) *OctopusExecutor { + return &OctopusExecutor{OctopusRpc: OctopusRpc, Name: name, pageIndex: 1, pageSize: 100} +} + +func (o OctopusExecutor) QueryImageList() ([]executor.Image, error) { + //TODO implement me + panic("implement me") +} + +func (o OctopusExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (o OctopusExecutor) QueryTask(taskId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (o OctopusExecutor) QuerySpecs() (executor.Spec, error) { + //TODO implement me + panic("implement me") +} + +func (a *OctopusExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) { + return nil, nil +} diff --git a/pkg/scheduler/service/impl/shuguangAi.go b/pkg/scheduler/service/impl/shuguangAi.go new file mode 100644 index 00000000..049455c6 --- /dev/null +++ b/pkg/scheduler/service/impl/shuguangAi.go @@ -0,0 +1,45 @@ +package impl + +import ( + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" +) + +//单条作业费=作业运行秒数×(CPU核心数*CPU单价+GPU卡数×GPU单价+DCU卡数×DCU单价)/3600 +//CPU单价=队列CPU费率×计算中心CPU单价 +//GPU单价=队列GPU费率×计算中心GPU单价 +//DCU单价=队列DCU费率×计算中心DCU单价 + +type ShuguangAiExecutor struct { + Name string + ACRpc hpcacclient.HpcAC +} + +func NewShuguangAiExecutor(acRpc hpcacclient.HpcAC, name string) *ShuguangAiExecutor { + return &ShuguangAiExecutor{Name: name, ACRpc: acRpc} +} + +func (s ShuguangAiExecutor) QueryImageList() ([]executor.Image, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) QueryTask(taskId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) QuerySpecs() (executor.Spec, error) { + //TODO implement me + panic("implement me") +} + +func (a *ShuguangAiExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) { + return nil, nil +} diff --git a/pkg/scheduler/strategy/dynamicResources.go b/pkg/scheduler/strategy/dynamicResources.go new file mode 100644 index 00000000..579333f2 --- /dev/null +++ b/pkg/scheduler/strategy/dynamicResources.go @@ -0,0 +1,4 @@ +package strategy + +type DynamicResourcesStrategy struct { +}