From d207b844c9b7b1b1eccda755788d83dea4d716f8 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 22 Nov 2023 18:12:15 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E7=BB=93=E6=9E=84=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 35dc202d29253cd4336dfa00e98ba79f1c310f7a --- pkg/scheduler/cloudScheduler.go | 14 ++++++--- pkg/scheduler/scheduler.go | 55 ++++++++++++++++++++++++++------- 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/cloudScheduler.go index 7c77094b..09711982 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/cloudScheduler.go @@ -17,7 +17,6 @@ package scheduler import ( "bytes" "encoding/json" - "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo" @@ -38,10 +37,15 @@ func NewCloudScheduler() *cloudScheduler { } func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) { - //参数为空返回 nil - if len(providers) == 0 || task == nil { - return nil, errors.New("算法获取参数为空") - } + ////参数为空,返回 nil + //if len(providers) == 0 || task == nil { + // return nil, errors.New("算法获取参数为空") + //} + // + ////仅有一个provider,返回nil + //if len(providers) == 1 { + // return nil, nil + //} //调度算法 strategy := algo.NewK8sStrategy(task, providers...) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b6b3805b..52cb0a9e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -83,6 +83,9 @@ func (s *scheduler) AssignAndSchedule() error { // ParticipantIds 返回唯一值 if len(s.participantIds) == 1 { + if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) { + return errors.Errorf("集群 %d 不可用", s.participantIds[0]) + } s.task.ParticipantId = s.participantIds[0] return nil } @@ -93,16 +96,21 @@ func (s *scheduler) AssignAndSchedule() error { return err } + //集群数量不满足,指定到标签匹配后第一个集群 + if len(providerList) < 2 { + if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) { + return errors.Errorf("集群 %d 不可用", s.participantIds[0]) + } + s.task.ParticipantId = s.participantIds[0] + return nil + } + + //调度算法 strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...) if err != nil { return err } - if strategy == nil { - s.task.ParticipantId = s.participantIds[0] - return nil - } - //调度结果 err = s.assignReplicasToResult(strategy, providerList) if err != nil { @@ -131,23 +139,38 @@ func (s *scheduler) SaveToDb() error { func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider, error) { task, providerList := s.scheduleService.genTaskAndProviders(s.task, s.dbEngin) - // 查询集群是否可用 - err := s.checkAvailableParticipants(&providerList) + // 过滤可用集群 + err := s.filterAvailableProviders(&providerList) if err != nil { return nil, nil, err } + //可用集群为0 + if len(providerList) == 0 { + return nil, nil, errors.New("未能获取可用集群") + } return task, providerList, nil } -func (s *scheduler) checkAvailableParticipants(providerList *[]*algo.Provider) error { +func (s *scheduler) checkIfParticipantAvailable(id ParticipantId) bool { + + workingIds, err := s.getAvailableParticipantIds() + if err != nil { + return false + } + + return contains(workingIds, int64(id)) +} + +func (s *scheduler) getAvailableParticipantIds() ([]int64, error) { + resp, err := s.participantRpc.ListParticipant(context.Background(), nil) if err != nil { - return err + return nil, err } if resp.Code != 200 { - return errors.New("集群列表查询失败") + return nil, errors.New("集群列表查询失败") } var workingIds []int64 @@ -158,9 +181,19 @@ func (s *scheduler) checkAvailableParticipants(providerList *[]*algo.Provider) e workingIds = append(workingIds, e.ParticipantId) } + return workingIds, nil +} + +func (s *scheduler) filterAvailableProviders(providerList *[]*algo.Provider) error { + + workingIds, err := s.getAvailableParticipantIds() + if err != nil { + return err + } + var tempList []*algo.Provider for _, provider := range *providerList { - if contains(workingIds, provider.Pid) { + if contains(workingIds, provider.Pid) && contains(s.participantIds, provider.Pid) { tempList = append(tempList, provider) } }