From a49f365a410aafaf20e94b8f731971420c05a9ee Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 28 Aug 2023 17:39:03 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E7=BB=93=E6=9E=84=E4=BF=AE?= =?UTF-8?q?=E6=94=B96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: f8bd112cafe5adf9aea373ac9f884a02ca44d58e --- api/internal/mqs/kq/ScheduleAi.go | 6 +-- api/internal/mqs/kq/ScheduleCloud.go | 6 +-- api/internal/mqs/kq/ScheduleHpc.go | 6 +-- api/internal/pkg/scheduler/cloudScheduler.go | 6 +++ api/internal/pkg/scheduler/commonScheduler.go | 7 +++ api/internal/pkg/scheduler/scheduler.go | 47 ++++++++++++++----- 6 files changed, 57 insertions(+), 21 deletions(-) diff --git a/api/internal/mqs/kq/ScheduleAi.go b/api/internal/mqs/kq/ScheduleAi.go index 76aae054..67584084 100644 --- a/api/internal/mqs/kq/ScheduleAi.go +++ b/api/internal/mqs/kq/ScheduleAi.go @@ -25,11 +25,11 @@ func NewScheduleAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleA func (l *ScheduleAiMq) Consume(_, val string) error { // 接受消息, 根据标签筛选过滤 aiSchdl := scheduler.NewAiScheduler(val) - schdl, err := scheduler.NewScheduler(aiSchdl, val) + schdl, err := scheduler.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin) if err != nil { return err } - schdl.MatchLabels(l.svcCtx.DbEngin) + schdl.MatchLabels() // 调度算法 err = schdl.AssignAndSchedule() @@ -38,7 +38,7 @@ func (l *ScheduleAiMq) Consume(_, val string) error { } // 存储数据 - err = schdl.SaveToDb(l.svcCtx.DbEngin) + err = schdl.SaveToDb() if err != nil { return err } diff --git a/api/internal/mqs/kq/ScheduleCloud.go b/api/internal/mqs/kq/ScheduleCloud.go index a7e1f2e6..9f24d5e6 100644 --- a/api/internal/mqs/kq/ScheduleCloud.go +++ b/api/internal/mqs/kq/ScheduleCloud.go @@ -25,11 +25,11 @@ func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *Schedu func (l *ScheduleCloudMq) Consume(_, val string) error { // 接受消息, 根据标签筛选过滤 cloudScheduler := scheduler.NewCloudScheduler() - schdl, err := scheduler.NewScheduler(cloudScheduler, val) + schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin) if err != nil { return err } - schdl.MatchLabels(l.svcCtx.DbEngin) + schdl.MatchLabels() // 调度算法 err = schdl.AssignAndSchedule() @@ -38,7 +38,7 @@ func (l *ScheduleCloudMq) Consume(_, val string) error { } // 存储数据 - err = schdl.SaveToDb(l.svcCtx.DbEngin) + err = schdl.SaveToDb() if err != nil { return err } diff --git a/api/internal/mqs/kq/ScheduleHpc.go b/api/internal/mqs/kq/ScheduleHpc.go index 0d663c1f..9660e0e5 100644 --- a/api/internal/mqs/kq/ScheduleHpc.go +++ b/api/internal/mqs/kq/ScheduleHpc.go @@ -25,11 +25,11 @@ func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *Schedule func (l *ScheduleHpcMq) Consume(_, val string) error { // 接受消息, 根据标签筛选过滤 hpcSchdl := scheduler.NewHpcScheduler(val) - schdl, err := scheduler.NewScheduler(hpcSchdl, val) + schdl, err := scheduler.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin) if err != nil { return err } - schdl.MatchLabels(l.svcCtx.DbEngin) + schdl.MatchLabels() // 调度算法 err = schdl.AssignAndSchedule() @@ -38,7 +38,7 @@ func (l *ScheduleHpcMq) Consume(_, val string) error { } // 存储数据 - err = schdl.SaveToDb(l.svcCtx.DbEngin) + err = schdl.SaveToDb() if err != nil { return err } diff --git a/api/internal/pkg/scheduler/cloudScheduler.go b/api/internal/pkg/scheduler/cloudScheduler.go index 3ad8190a..6c01b64c 100644 --- a/api/internal/pkg/scheduler/cloudScheduler.go +++ b/api/internal/pkg/scheduler/cloudScheduler.go @@ -22,6 +22,12 @@ func NewCloudScheduler() *cloudScheduler { } func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { + //参数为空返回 nil + if len(providers) == 0 || task == nil { + return nil, nil + } + + //调度算法 strategy := algo.NewK8sStrategy(task, providers...) taskResult, err := algo.ScheduleWithFullCollaboration(strategy, strategy.ProviderList) if err != nil { diff --git a/api/internal/pkg/scheduler/commonScheduler.go b/api/internal/pkg/scheduler/commonScheduler.go index 6956b4f5..f78f209e 100644 --- a/api/internal/pkg/scheduler/commonScheduler.go +++ b/api/internal/pkg/scheduler/commonScheduler.go @@ -12,6 +12,13 @@ type scheduleService interface { pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) } +type providerParams struct { + Disk_avail float64 + Mem_avail float64 + Cpu_avail float64 + Participant_id int64 +} + // 求交集 func intersect(slice1, slice2 []int64) []int64 { m := make(map[int64]int) diff --git a/api/internal/pkg/scheduler/scheduler.go b/api/internal/pkg/scheduler/scheduler.go index 3ad7f568..35613c78 100644 --- a/api/internal/pkg/scheduler/scheduler.go +++ b/api/internal/pkg/scheduler/scheduler.go @@ -13,18 +13,19 @@ type scheduler struct { task *types.TaskInfo participantIds []int64 scheduleService scheduleService + dbEngin *gorm.DB } -func NewScheduler(scheduleService scheduleService, val string) (*scheduler, error) { +func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB) (*scheduler, error) { var task *types.TaskInfo err := json.Unmarshal([]byte(val), &task) if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } - return &scheduler{task: task, scheduleService: scheduleService}, nil + return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin}, nil } -func (s *scheduler) MatchLabels(dbEngin *gorm.DB) { +func (s *scheduler) MatchLabels() { // 已指定 ParticipantId if s.task.ParticipantId != 0 { return @@ -34,7 +35,7 @@ func (s *scheduler) MatchLabels(dbEngin *gorm.DB) { count := 0 for key := range s.task.MatchLabels { var participantIds []int64 - dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds) + s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds) if count == 0 { ids = participantIds } @@ -66,10 +67,25 @@ func (s *scheduler) AssignAndSchedule() error { return nil } + //调度结果 ParticipantId + for i, e := range strategy.ResourcePerTask { + + if len(e) != 0 { + for _, ej := range e { + if ej == 1 { + s.task.ParticipantId = providerList[i].Pid + break + } + } + } else { + continue + } + } + return nil } -func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error { +func (s *scheduler) SaveToDb() error { if s.task.ParticipantId == 0 { return errors.New("participantId 为空") } @@ -77,7 +93,7 @@ func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error { if err != nil { return err } - tx := dbEngin.Create(structForDb) + tx := s.dbEngin.Create(structForDb) if tx.Error != nil { logx.Error(tx.Error) return tx.Error @@ -86,10 +102,17 @@ func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error { } func (s *scheduler) genTaskAndProviders() (*algo.Task, []*algo.Provider) { - //var providerList []*algo.Provider - //for _, id := range s.participantIds { - // provider := algo.NewProvider(id, 100, 200, 200, 0.0, 0.0, 0.0) - // - //} - return nil, nil + proParams := []providerParams{} + sqlstr := "SELECT SUM(a.disk_avail) as disk_avail,SUM(a.mem_avail) as mem_avail,SUM(a.cpu_total * a.cpu_usable) as cpu_avail,participant_id from (SELECT * from sc_node_avail_info where created_time in (SELECT MAX(created_time) as time from sc_node_avail_info where deleted_flag = 0 GROUP BY participant_id,node_name)) a GROUP BY a.participant_id" + s.dbEngin.Raw(sqlstr).Scan(&proParams) + + var providerList []*algo.Provider + for _, p := range proParams { + provider := algo.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0) + providerList = append(providerList, provider) + } + + t := algo.NewTask(0, 1, 2, 75120000, 301214500, 1200, 2, 6, 2000) + + return t, providerList }