From 3e6a5cfff00d8119d7198beb9d066bc5759be765 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 17 Jan 2024 10:26:58 +0800 Subject: [PATCH] scheduler refactor updated Former-commit-id: b75e9f6cff47371cfef663819023193c42a2cb87 --- pkg/scheduler/aiScheduler.go | 36 +++++------ pkg/scheduler/cloudScheduler.go | 42 +++++++------ pkg/scheduler/collector/aiCollector.go | 8 +++ pkg/scheduler/collector/collector.go | 8 +++ pkg/scheduler/common.go | 14 +---- pkg/scheduler/database/aiStorage.go | 8 +++ pkg/scheduler/database/cloudStorage.go | 28 +++++++++ pkg/scheduler/database/storage.go | 5 ++ pkg/scheduler/hpcScheduler.go | 32 ++++------ pkg/scheduler/scheduler.go | 59 +++---------------- .../strategies/priceBasedStrategy.go | 42 +++++++++---- pkg/scheduler/strategies/strategy.go | 14 +++++ 12 files changed, 165 insertions(+), 131 deletions(-) create mode 100644 pkg/scheduler/collector/aiCollector.go create mode 100644 pkg/scheduler/collector/collector.go create mode 100644 pkg/scheduler/database/aiStorage.go create mode 100644 pkg/scheduler/database/cloudStorage.go create mode 100644 pkg/scheduler/database/storage.go create mode 100644 pkg/scheduler/strategies/strategy.go diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/aiScheduler.go index b33e3957..1f4f5442 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/aiScheduler.go @@ -18,19 +18,21 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" - "gorm.io/gorm" ) -type aiScheduler struct { +type AiScheduler struct { yamlString string + collector collector.ResourceCollector } -func NewAiScheduler(val string) *aiScheduler { - return &aiScheduler{yamlString: val} +func NewAiScheduler(val string) *AiScheduler { + return &AiScheduler{yamlString: val} } -func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { +func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { ai := models.Ai{ ParticipantId: participantId, TaskId: task.TaskId, @@ -41,22 +43,16 @@ func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin return ai, nil } -func (as *aiScheduler) pickOptimalStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) (*providerPricing.Strategy, error) { +func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { + //a, b := as.genTaskAndProviders() + return nil, nil } -func (as *aiScheduler) genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*providerPricing.Task, []*providerPricing.Provider) { - var proParams []providerParams - sqlstr := "SELECT SUM(a.disk_avail) as disk_avail,SUM(a.mem_avail) as mem_avail,SUM(a.cpu_total * a.cpu_usable) as cpu_avail,participant_id from (SELECT * from sc_node_avail_info where created_time in (SELECT MAX(created_time) as time from sc_node_avail_info where deleted_flag = 0 GROUP BY participant_id,node_name)) a GROUP BY a.participant_id" - dbEngin.Raw(sqlstr).Scan(&proParams) - - var providerList []*providerPricing.Provider - for _, p := range proParams { - provider := providerPricing.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0) - providerList = append(providerList, provider) - } - - t := providerPricing.NewTask(0, 1, 2, 75120000, 301214500, 1200, 2, 6, 2000) - - return t, providerList +func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) { + return nil, nil +} + +func (as *AiScheduler) assignTask() error { + return nil } diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/cloudScheduler.go index eac03fb9..7384d269 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/cloudScheduler.go @@ -19,9 +19,9 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" - "gorm.io/gorm" "io" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -29,25 +29,25 @@ import ( kyaml "k8s.io/apimachinery/pkg/util/yaml" ) -type cloudScheduler struct { +type CloudScheduler struct { + storage database.Storage } -func NewCloudScheduler() *cloudScheduler { - return &cloudScheduler{} +func NewCloudScheduler() *CloudScheduler { + return &CloudScheduler{} } -func (cs *cloudScheduler) pickOptimalStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) (*providerPricing.Strategy, error) { - - //调度算法 - strategy := strategies.NewPricingStrategy(task, providers...) - taskResult, err := strategy.ScheduleWithFullCollaboration() +func (cs *CloudScheduler) pickOptimalStrategy() (strategies.Strategy, error) { + task, providerList, err := cs.genTaskAndProviders() if err != nil { - return nil, err + return nil, nil } - return taskResult.MaxscoreStrategy, nil + //调度算法 + strategy := strategies.NewPricingStrategy(task, providerList...) + return strategy, nil } -func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { +func (cs *CloudScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID) cloud.Id = utils.GenSnowflakeID() cloud.NsID = task.NsID @@ -56,7 +56,7 @@ func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, resource st return cloud, nil } -func (cs *cloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64, nsID string) models.Cloud { +func (cs *CloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64, nsID string) models.Cloud { var cloud models.Cloud d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) var err error @@ -100,11 +100,11 @@ func (cs *cloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64, ns return cloud } -func (cs *cloudScheduler) genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*providerPricing.Task, []*providerPricing.Provider) { - var proParams []providerParams - sqlstr := "SELECT SUM(a.disk_avail) as disk_avail,SUM(a.mem_avail) as mem_avail,SUM(a.cpu_total * a.cpu_usable) as cpu_avail,participant_id from (SELECT * from sc_node_avail_info where created_time in (SELECT MAX(created_time) as time from sc_node_avail_info where deleted_flag = 0 GROUP BY participant_id,node_name)) a GROUP BY a.participant_id" - dbEngin.Raw(sqlstr).Scan(&proParams) - +func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) { + proParams, err := cs.storage.GetProviderParams() + if err != nil { + return nil, nil, nil + } var providerList []*providerPricing.Provider for _, p := range proParams { provider := providerPricing.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0) @@ -114,5 +114,9 @@ func (cs *cloudScheduler) genTaskAndProviders(task *response.TaskInfo, dbEngin * //replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) //t := algorithm.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000) - return nil, providerList + return nil, providerList, nil +} + +func (cs *CloudScheduler) assignTask() error { + return nil } diff --git a/pkg/scheduler/collector/aiCollector.go b/pkg/scheduler/collector/aiCollector.go new file mode 100644 index 00000000..e383cba3 --- /dev/null +++ b/pkg/scheduler/collector/aiCollector.go @@ -0,0 +1,8 @@ +package collector + +type AiCollector struct { +} + +func (a *AiCollector) getResourceSpecs() { + +} diff --git a/pkg/scheduler/collector/collector.go b/pkg/scheduler/collector/collector.go new file mode 100644 index 00000000..7e0739c5 --- /dev/null +++ b/pkg/scheduler/collector/collector.go @@ -0,0 +1,8 @@ +package collector + +type ResourceCollector interface { + getResourceSpecs() +} + +type ResourceSpecs struct { +} diff --git a/pkg/scheduler/common.go b/pkg/scheduler/common.go index 97327b8f..8d838c43 100644 --- a/pkg/scheduler/common.go +++ b/pkg/scheduler/common.go @@ -16,23 +16,15 @@ package scheduler import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" - "gorm.io/gorm" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "math/rand" "time" ) type scheduleService interface { getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) - pickOptimalStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) (*providerPricing.Strategy, error) - genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*providerPricing.Task, []*providerPricing.Provider) -} - -type providerParams struct { - Disk_avail float64 - Mem_avail float64 - Cpu_avail float64 - Participant_id int64 + pickOptimalStrategy() (strategies.Strategy, error) + assignTask() error } // 求交集 diff --git a/pkg/scheduler/database/aiStorage.go b/pkg/scheduler/database/aiStorage.go new file mode 100644 index 00000000..f8f424a7 --- /dev/null +++ b/pkg/scheduler/database/aiStorage.go @@ -0,0 +1,8 @@ +package database + +type AiStorage struct { +} + +func (s *AiStorage) getParticipants() { + +} diff --git a/pkg/scheduler/database/cloudStorage.go b/pkg/scheduler/database/cloudStorage.go new file mode 100644 index 00000000..929ba159 --- /dev/null +++ b/pkg/scheduler/database/cloudStorage.go @@ -0,0 +1,28 @@ +package database + +import "gorm.io/gorm" + +type CloudStorage struct { + dbEngin *gorm.DB +} + +func NewCloudStorage(dbEngin *gorm.DB) *CloudStorage { + return &CloudStorage{dbEngin: dbEngin} +} + +func (c *CloudStorage) GetProviderParams() ([]providerParams, error) { + var proParams []providerParams + sqlstr := "SELECT SUM(a.disk_avail) as disk_avail,SUM(a.mem_avail) as mem_avail,SUM(a.cpu_total * a.cpu_usable) as cpu_avail,participant_id from (SELECT * from sc_node_avail_info where created_time in (SELECT MAX(created_time) as time from sc_node_avail_info where deleted_flag = 0 GROUP BY participant_id,node_name)) a GROUP BY a.participant_id" + c.dbEngin.Raw(sqlstr).Scan(&proParams) + if len(proParams) == 0 { + return nil, nil + } + return proParams, nil +} + +type providerParams struct { + Disk_avail float64 + Mem_avail float64 + Cpu_avail float64 + Participant_id int64 +} diff --git a/pkg/scheduler/database/storage.go b/pkg/scheduler/database/storage.go new file mode 100644 index 00000000..44887345 --- /dev/null +++ b/pkg/scheduler/database/storage.go @@ -0,0 +1,5 @@ +package database + +type Storage interface { + GetProviderParams() ([]providerParams, error) +} diff --git a/pkg/scheduler/hpcScheduler.go b/pkg/scheduler/hpcScheduler.go index c795a70b..e1b4018f 100644 --- a/pkg/scheduler/hpcScheduler.go +++ b/pkg/scheduler/hpcScheduler.go @@ -19,19 +19,19 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" - "gorm.io/gorm" ) -type hpcScheduler struct { +type HpcScheduler struct { yamlString string } -func NewHpcScheduler(val string) *hpcScheduler { - return &hpcScheduler{yamlString: val} +func NewHpcScheduler(val string) *HpcScheduler { + return &HpcScheduler{yamlString: val} } -func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { +func (h *HpcScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { hpc := models.Hpc{} utils.Convert(task.Metadata, &hpc) hpc.Id = utils.GenSnowflakeID() @@ -42,22 +42,14 @@ func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, resource strin return hpc, nil } -func (h *hpcScheduler) pickOptimalStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) (*providerPricing.Strategy, error) { +func (h *HpcScheduler) pickOptimalStrategy() (strategies.Strategy, error) { return nil, nil } -func (h *hpcScheduler) genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*providerPricing.Task, []*providerPricing.Provider) { - var proParams []providerParams - sqlstr := "SELECT SUM(a.disk_avail) as disk_avail,SUM(a.mem_avail) as mem_avail,SUM(a.cpu_total * a.cpu_usable) as cpu_avail,participant_id from (SELECT * from sc_node_avail_info where created_time in (SELECT MAX(created_time) as time from sc_node_avail_info where deleted_flag = 0 GROUP BY participant_id,node_name)) a GROUP BY a.participant_id" - dbEngin.Raw(sqlstr).Scan(&proParams) - - var providerList []*providerPricing.Provider - for _, p := range proParams { - provider := providerPricing.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0) - providerList = append(providerList, provider) - } - - t := providerPricing.NewTask(0, 1, 2, 75120000, 301214500, 1200, 2, 6, 2000) - - return t, providerList +func (h *HpcScheduler) genTaskAndProviders(task *response.TaskInfo) (*providerPricing.Task, []*providerPricing.Provider) { + return nil, nil +} + +func (h *HpcScheduler) assignTask() error { + return nil } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 6ac87129..5e0fe90c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -19,7 +19,6 @@ import ( "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" "sigs.k8s.io/yaml" @@ -133,28 +132,16 @@ func (s *scheduler) AssignAndSchedule() error { } //生成算法所需参数 - task, providerList, err := s.obtainParamsForStrategy() - if err != nil { - return err - } + //task, providerList, err := s.obtainParamsForStrategy() + //if err != nil { + // return err + //} //集群数量不满足,指定到标签匹配后第一个集群 - if len(providerList) < 2 { - s.task.ParticipantId = s.participantIds[0] - return nil - } - - //调度算法 - strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...) - if err != nil { - return err - } - - //调度结果 - err = s.assignReplicasToResult(strategy, providerList) - if err != nil { - return err - } + //if len(providerList) < 2 { + // s.task.ParticipantId = s.participantIds[0] + // return nil + //} return nil } @@ -179,33 +166,3 @@ func (s *scheduler) SaveToDb() error { } return nil } - -func (s *scheduler) obtainParamsForStrategy() (*providerPricing.Task, []*providerPricing.Provider, error) { - task, providerList := s.scheduleService.genTaskAndProviders(s.task, s.dbEngin) - - if len(providerList) == 0 { - return nil, nil, errors.New("获取集群失败") - } - - return task, providerList, nil -} - -func (s *scheduler) assignReplicasToResult(strategy *providerPricing.Strategy, providerList []*providerPricing.Provider) error { - - if len(strategy.Tasksolution) == 0 { - return errors.New("调度失败, 未能获取调度结果") - } - - for i, e := range strategy.Tasksolution { - if e == 0 { - continue - } - s.result[providerList[i].Pid] = string(e) - } - - if len(s.result) == 0 { - return errors.New("可用集群为空") - } - - return nil -} diff --git a/pkg/scheduler/strategies/priceBasedStrategy.go b/pkg/scheduler/strategies/priceBasedStrategy.go index d8fa7ddf..5c620fa3 100644 --- a/pkg/scheduler/strategies/priceBasedStrategy.go +++ b/pkg/scheduler/strategies/priceBasedStrategy.go @@ -19,13 +19,13 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" ) -type pricingStrategy struct { +type PricingStrategy struct { ProviderList []*providerPricing.Provider Task *providerPricing.Task StrategyList []*providerPricing.Strategy } -func NewPricingStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) *pricingStrategy { +func NewPricingStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) *PricingStrategy { var providerList []*providerPricing.Provider var res [][]int @@ -56,10 +56,10 @@ func NewPricingStrategy(task *providerPricing.Task, providers ...*providerPricin strategyList = append(strategyList, strategy) } - return &pricingStrategy{ProviderList: providerList, Task: task, StrategyList: strategyList} + return &PricingStrategy{ProviderList: providerList, Task: task, StrategyList: strategyList} } -func (ps *pricingStrategy) computeMaxScore() (*providerPricing.Task, error) { +func (ps *PricingStrategy) computeMaxScore() (*providerPricing.Task, error) { maxStrategy := providerPricing.NewStrategy() var maxprofit float64 @@ -106,11 +106,7 @@ func (ps *pricingStrategy) computeMaxScore() (*providerPricing.Task, error) { return ps.Task, nil } -//type strategyService interface { -// computeMaxScore() (*providerPricing.Task, error) -//} - -func (ps *pricingStrategy) ScheduleWithFullCollaboration() (*providerPricing.Task, error) { +func (ps *PricingStrategy) produceMaxScoreStrategy() (*providerPricing.Strategy, error) { task, err := ps.computeMaxScore() if err != nil { return nil, err @@ -137,5 +133,31 @@ func (ps *pricingStrategy) ScheduleWithFullCollaboration() (*providerPricing.Tas task.ResourcePerTask = append(task.ResourcePerTask, resourcePerTaskPerProviders) } - return task, nil + return task.MaxscoreStrategy, nil +} + +func (ps *PricingStrategy) Schedule() ([]*AssignedCluster, error) { + strategy, err := ps.produceMaxScoreStrategy() + if err != nil { + return nil, err + } + + if len(strategy.Tasksolution) == 0 { + return nil, errors.New("调度失败, 未能获取调度结果") + } + + var results []*AssignedCluster + for i, e := range strategy.Tasksolution { + if e == 0 { + continue + } + cluster := &AssignedCluster{ParticipantId: ps.ProviderList[i].Pid, Replicas: int32(e)} + results = append(results, cluster) + } + + if len(results) == 0 { + return nil, errors.New("可用集群为空") + } + + return results, nil } diff --git a/pkg/scheduler/strategies/strategy.go b/pkg/scheduler/strategies/strategy.go new file mode 100644 index 00000000..e265acdd --- /dev/null +++ b/pkg/scheduler/strategies/strategy.go @@ -0,0 +1,14 @@ +package strategies + +type Strategy interface { + Schedule() ([]*AssignedCluster, error) +} + +type AssignedCluster struct { + ParticipantId int64 + Name string + Replicas int32 +} + +type Options struct { +}