From b04a3f1255a418b9d812959c2d5a424a1f53fd0d Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 31 Jan 2024 11:24:00 +0800 Subject: [PATCH] updated aiScheduler Former-commit-id: 9b8d271bb13c6cfb7273ec76084ce8c123fbe3be --- .../scheduler/schedulers/aiScheduler.go | 31 ++++++------------ .../scheduler/schedulers/cloudScheduler.go | 8 ++--- .../strategy/{params => param}/params.go | 4 +-- .../scheduler/strategy/param/replication.go | 23 +++++++++++++ .../strategy/param/resourcePricing.go | 32 +++++++++++++++++++ .../strategy/params/replicationParams.go | 16 ---------- .../strategy/params/resourcePricingParams.go | 26 --------------- .../scheduler/strategy/replication.go | 6 ++-- .../scheduler/strategy/resourcePricing.go | 5 ++- .../scheduler/strategy/test/strategy_test.go | 18 ++++++++++- 10 files changed, 93 insertions(+), 76 deletions(-) rename api/internal/scheduler/strategy/{params => param}/params.go (69%) create mode 100644 api/internal/scheduler/strategy/param/replication.go create mode 100644 api/internal/scheduler/strategy/param/resourcePricing.go delete mode 100644 api/internal/scheduler/strategy/params/replicationParams.go delete mode 100644 api/internal/scheduler/strategy/params/resourcePricingParams.go diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 68fc15f1..4310bd46 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -17,11 +17,10 @@ package schedulers import ( "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" @@ -50,36 +49,24 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin } func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - resources, err := as.findClustersWithResource() + resources, err := as.findClustersWithResources() if err != nil { return nil, err } + if len(resources) == 0 { + return nil, errors.New("no cluster has resources") + } + params := ¶m.Params{Resources: resources} if len(resources) < 2 /*|| as.task */ { - var pros []entity.Participant - for _, resource := range resources { - pros = append(pros, entity.Participant{ - Participant_id: resource.ParticipantId, - Name: resource.Name, - }) - } - strategy := strategy.NewReplicationStrategy(nil) + strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params /*, Replicas: 1*/}) return strategy, nil } - task, providerList := as.genTaskAndProviders() - if err != nil { - return nil, nil - } - strategy := strategy.NewPricingStrategy(task, providerList...) + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params}) return strategy, nil } -func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) { - - return nil, nil -} - func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { if clusters == nil { return errors.New("clusters is nil") @@ -97,7 +84,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return nil } -func (as *AiScheduler) findClustersWithResource() ([]*collector.ResourceSpecs, error) { +func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceSpecs, error) { var resourceSpecs []*collector.ResourceSpecs for _, resourceCollector := range *as.ResourceCollector { spec, err := resourceCollector.GetResourceSpecs() diff --git a/api/internal/scheduler/schedulers/cloudScheduler.go b/api/internal/scheduler/schedulers/cloudScheduler.go index 6d9fcbe1..6024be02 100644 --- a/api/internal/scheduler/schedulers/cloudScheduler.go +++ b/api/internal/scheduler/schedulers/cloudScheduler.go @@ -19,6 +19,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/database" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" @@ -38,12 +39,9 @@ func NewCloudScheduler() *CloudScheduler { } func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - task, providerList, err := cs.genTaskAndProviders() - if err != nil { - return nil, nil - } + //获取所有计算中心 //调度算法 - strategy := strategy.NewPricingStrategy(task, providerList...) + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{}) return strategy, nil } diff --git a/api/internal/scheduler/strategy/params/params.go b/api/internal/scheduler/strategy/param/params.go similarity index 69% rename from api/internal/scheduler/strategy/params/params.go rename to api/internal/scheduler/strategy/param/params.go index 44c29f13..78270fc0 100644 --- a/api/internal/scheduler/strategy/params/params.go +++ b/api/internal/scheduler/strategy/param/params.go @@ -1,9 +1,9 @@ -package params +package param import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" ) type Params struct { - resources []*collector.ResourceSpecs + Resources []*collector.ResourceSpecs } diff --git a/api/internal/scheduler/strategy/param/replication.go b/api/internal/scheduler/strategy/param/replication.go new file mode 100644 index 00000000..6699ce6b --- /dev/null +++ b/api/internal/scheduler/strategy/param/replication.go @@ -0,0 +1,23 @@ +package param + +import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + +type ReplicationParams struct { + Replicas int32 + *Params +} + +func (r *ReplicationParams) GetReplicas() int32 { + return r.Replicas +} + +func (r *ReplicationParams) GetParticipants() []*entity.Participant { + var participants []*entity.Participant + for _, resource := range r.Resources { + participants = append(participants, &entity.Participant{ + Participant_id: resource.ParticipantId, + Name: resource.Name, + }) + } + return participants +} diff --git a/api/internal/scheduler/strategy/param/resourcePricing.go b/api/internal/scheduler/strategy/param/resourcePricing.go new file mode 100644 index 00000000..570e4422 --- /dev/null +++ b/api/internal/scheduler/strategy/param/resourcePricing.go @@ -0,0 +1,32 @@ +package param + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" +) + +type ResourcePricingParams struct { + replicas int32 + task *providerPricing.Task + *Params +} + +func (r *ResourcePricingParams) GetReplicas() int32 { + return r.replicas +} + +func (r *ResourcePricingParams) GetTask() *providerPricing.Task { + return r.task +} + +func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider { + var providerList []*providerPricing.Provider + for _, resource := range r.Resources { + provider := providerPricing.NewProvider( + resource.ParticipantId, + resource.CpuAvail, + resource.MemAvail, + resource.DiskAvail, 0.0, 0.0, 0.0) + providerList = append(providerList, provider) + } + return providerList +} diff --git a/api/internal/scheduler/strategy/params/replicationParams.go b/api/internal/scheduler/strategy/params/replicationParams.go deleted file mode 100644 index 7adfaea0..00000000 --- a/api/internal/scheduler/strategy/params/replicationParams.go +++ /dev/null @@ -1,16 +0,0 @@ -package params - -import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" - -type ReplicationOption struct { - replicas int32 - participants []entity.Participant -} - -func (o *ReplicationOption) GetReplicas() int32 { - return o.replicas -} - -func (o *ReplicationOption) GetParticipants() []entity.Participant { - return o.participants -} diff --git a/api/internal/scheduler/strategy/params/resourcePricingParams.go b/api/internal/scheduler/strategy/params/resourcePricingParams.go deleted file mode 100644 index 10fe7bd6..00000000 --- a/api/internal/scheduler/strategy/params/resourcePricingParams.go +++ /dev/null @@ -1,26 +0,0 @@ -package params - -import ( - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" -) - -type ResourcePricingOption struct { - replicas int32 - task *providerPricing.Task - providers []*providerPricing.Provider - *Params -} - -func NewResourcePricingOption(params *Params) *ResourcePricingOption { - return &ResourcePricingOption{ - Params: params, - } -} - -func (r *ResourcePricingOption) GetReplicas() int32 { - return r.replicas -} - -func (r *ResourcePricingOption) GetProviders() []*providerPricing.Provider { - return r.providers -} diff --git a/api/internal/scheduler/strategy/replication.go b/api/internal/scheduler/strategy/replication.go index 3f9549b9..fad6fcde 100644 --- a/api/internal/scheduler/strategy/replication.go +++ b/api/internal/scheduler/strategy/replication.go @@ -3,15 +3,15 @@ package strategy import ( "github.com/pkg/errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/params" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" ) type ReplicationStrategy struct { replicas int32 - participants []entity.Participant + participants []*entity.Participant } -func NewReplicationStrategy(params *params.ReplicationOption) *ReplicationStrategy { +func NewReplicationStrategy(params *param.ReplicationParams) *ReplicationStrategy { return &ReplicationStrategy{replicas: params.GetReplicas(), participants: params.GetParticipants(), } diff --git a/api/internal/scheduler/strategy/resourcePricing.go b/api/internal/scheduler/strategy/resourcePricing.go index 2abf2af6..e1614164 100644 --- a/api/internal/scheduler/strategy/resourcePricing.go +++ b/api/internal/scheduler/strategy/resourcePricing.go @@ -17,6 +17,7 @@ package strategy import ( "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" ) type PricingStrategy struct { @@ -25,7 +26,9 @@ type PricingStrategy struct { StrategyList []*providerPricing.Strategy } -func NewPricingStrategy(task *providerPricing.Task, providers ...*providerPricing.Provider) *PricingStrategy { +func NewPricingStrategy(params *param.ResourcePricingParams) *PricingStrategy { + providers := params.GetProviders() + task := params.GetTask() var providerList []*providerPricing.Provider var res [][]int diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index 767d71c4..53f24cb9 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -3,7 +3,9 @@ package test import ( "fmt" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param" "testing" ) @@ -13,10 +15,23 @@ func TestReplication(t *testing.T) { {Name: "test2", Participant_id: 2}, {Name: "test3", Participant_id: 3}, } + res := []*collector.ResourceSpecs{ + { + ParticipantId: 1, + Name: "test1", + }, + { + ParticipantId: 1, + Name: "test2"}, + { + ParticipantId: 1, + Name: "test3"}, + } tests := []struct { name string replica int32 ps []entity.Participant + res []*collector.ResourceSpecs }{ { name: "test1", @@ -32,7 +47,8 @@ func TestReplication(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - repl := strategy.NewReplicationStrategy(nil) + params := ¶m.Params{Resources: res} + repl := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: tt.replica}) schedule, err := repl.Schedule() if err != nil { return