diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index ec5c942e..7498a513 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -17,7 +17,8 @@ package mqs import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" - scheduler2 "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" ) /* @@ -25,36 +26,32 @@ import ( Listening to the payment flow status change notification message queue */ type AiQueue struct { - ctx context.Context - svcCtx *svc.ServiceContext + ctx context.Context + svcCtx *svc.ServiceContext + scheduler *scheduler.Scheduler } func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { + acCollector := &collector.ShuguangAiCollector{ACRpc: svcCtx.ACRpc} + resourceCollectiors := []collector.ResourceCollector{acCollector} return &AiQueue{ - ctx: ctx, - svcCtx: svcCtx, + ctx: ctx, + svcCtx: svcCtx, + scheduler: scheduler.NewScheduler2(resourceCollectiors, nil), } } func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - aiSchdl := scheduler2.NewAiScheduler(val) - schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin, nil) - if err != nil { - return err - } - schdl.MatchLabels() + aiSchdl := scheduler.NewAiScheduler(val, nil) + + //schdl.MatchLabels() // 调度算法 - err = schdl.AssignAndSchedule() + err := l.scheduler.AssignAndSchedule(aiSchdl) if err != nil { return err } - // 存储数据 - err = schdl.SaveToDb() - if err != nil { - return err - } return nil } diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/aiScheduler.go index 52708f30..8f63c39c 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/aiScheduler.go @@ -24,12 +24,14 @@ import ( ) type AiScheduler struct { - yamlString string - collector collector.ResourceCollector + yamlString string + resourceCollectors []collector.ResourceCollector + task *response.TaskInfo + //storelink } -func NewAiScheduler(val string) *AiScheduler { - return &AiScheduler{yamlString: val} +func NewAiScheduler(val string, resourceCollectors []collector.ResourceCollector) *AiScheduler { + return &AiScheduler{yamlString: val, resourceCollectors: resourceCollectors} } func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { @@ -44,9 +46,8 @@ func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin } func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { - //a, b := as.genTaskAndProviders() - - return nil, nil + strategy := strategies.NewReplicationStrategy(nil, 0) + return strategy, nil } func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) { diff --git a/pkg/scheduler/collector/collector.go b/pkg/scheduler/collector/collector.go index 06f31d24..5ce28493 100644 --- a/pkg/scheduler/collector/collector.go +++ b/pkg/scheduler/collector/collector.go @@ -5,12 +5,13 @@ type ResourceCollector interface { } type ResourceSpecs struct { - CpuAvail float64 - MemAvail float64 - DiskAvail float64 - GpuAvail float64 - CardAvail []Card - Balance float64 + ParticipantId int64 + CpuAvail float64 + MemAvail float64 + DiskAvail float64 + GpuAvail float64 + CardAvail []Card + Balance float64 } type Card struct { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b981f37a..3aae57e9 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -20,13 +20,14 @@ import ( "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" "sigs.k8s.io/yaml" "strings" ) -type scheduler struct { +type Scheduler struct { task *response.TaskInfo participantIds []int64 scheduleService scheduleService @@ -34,19 +35,24 @@ type scheduler struct { result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService resourceCollectors []collector.ResourceCollector + storages []database.Storage //storelink } -func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*scheduler, error) { +func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { var task *response.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } - return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil + return &Scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func (s *scheduler) SpecifyClusters() { +func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage) *Scheduler { + return &Scheduler{resourceCollectors: resourceCollectors, storages: storages} +} + +func (s *Scheduler) SpecifyClusters() { // 如果已指定集群名,通过数据库查询后返回p端ip列表 if len(s.task.Clusters) != 0 { s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds) @@ -54,7 +60,7 @@ func (s *scheduler) SpecifyClusters() { } } -func (s *scheduler) SpecifyNsID() { +func (s *Scheduler) SpecifyNsID() { // 未指定集群名,只指定nsID if len(s.task.Clusters) == 0 { if len(s.task.NsID) != 0 { @@ -70,7 +76,7 @@ func (s *scheduler) SpecifyNsID() { } } -func (s *scheduler) MatchLabels() { +func (s *Scheduler) MatchLabels() { var ids []int64 count := 0 @@ -93,7 +99,7 @@ func (s *scheduler) MatchLabels() { } // TempAssign todo 屏蔽原调度算法 -func (s *scheduler) TempAssign() error { +func (s *Scheduler) TempAssign() error { //需要判断task中的资源类型,针对metadata中的多个kind做不同处理 //输入副本数和集群列表,最终结果输出为pID对应副本数量列表,针对多个kind需要做拆分和重新拼接组合 @@ -113,28 +119,28 @@ func (s *scheduler) TempAssign() error { return nil } -func (s *scheduler) AssignAndSchedule() error { - // 已指定 ParticipantId - if s.task.ParticipantId != 0 { - return nil - } - // 标签匹配以及后,未找到ParticipantIds - if len(s.participantIds) == 0 { - return errors.New("未找到匹配的ParticipantIds") - } +func (s *Scheduler) AssignAndSchedule(ss scheduleService) error { + //// 已指定 ParticipantId + //if s.task.ParticipantId != 0 { + // return nil + //} + //// 标签匹配以及后,未找到ParticipantIds + //if len(s.participantIds) == 0 { + // return errors.New("未找到匹配的ParticipantIds") + //} + // + //// 指定或者标签匹配的结果只有一个集群,给任务信息指定 + //if len(s.participantIds) == 1 { + // s.task.ParticipantId = s.participantIds[0] + // //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + // //result := make(map[int64]string) + // //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) + // //s.result = result + // + // return nil + //} - // 指定或者标签匹配的结果只有一个集群,给任务信息指定 - if len(s.participantIds) == 1 { - s.task.ParticipantId = s.participantIds[0] - //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) - //result := make(map[int64]string) - //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) - //s.result = result - - return nil - } - - strategy, err := s.scheduleService.pickOptimalStrategy() + strategy, err := ss.pickOptimalStrategy() if err != nil { return err } @@ -150,7 +156,7 @@ func (s *scheduler) AssignAndSchedule() error { // return nil //} - err = s.scheduleService.assignTask(clusters) + err = ss.assignTask(clusters) if err != nil { return err } @@ -158,7 +164,7 @@ func (s *scheduler) AssignAndSchedule() error { return nil } -func (s *scheduler) SaveToDb() error { +func (s *Scheduler) SaveToDb() error { for _, participantId := range s.participantIds {