From 125b94cf9c42af614918fa987abbfebd4acc90f7 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 30 Jan 2024 17:33:48 +0800 Subject: [PATCH] updated strategies of the scheduler and their params Former-commit-id: c301ea7fef461e20cbe801947fa9551cbcff0caf --- api/internal/scheduler/scheduler.go | 2 + .../scheduler/schedulers/aiScheduler.go | 2 +- api/internal/scheduler/service/aiService.go | 4 +- .../scheduler/strategy/option/option.go | 5 -- .../strategy/option/resourcePricingOption.go | 4 - .../scheduler/strategy/params/params.go | 9 ++ .../strategy/params/replicationParams.go | 16 ++++ .../strategy/params/resourcePricingParams.go | 26 ++++++ .../scheduler/strategy/replication.go | 7 +- .../scheduler/strategy/staticWeight.go | 6 ++ .../scheduler/strategy/test/strategy_test.go | 90 +++++++++++++++++++ 11 files changed, 156 insertions(+), 15 deletions(-) delete mode 100644 api/internal/scheduler/strategy/option/option.go delete mode 100644 api/internal/scheduler/strategy/option/resourcePricingOption.go create mode 100644 api/internal/scheduler/strategy/params/params.go create mode 100644 api/internal/scheduler/strategy/params/replicationParams.go create mode 100644 api/internal/scheduler/strategy/params/resourcePricingParams.go create mode 100644 api/internal/scheduler/strategy/test/strategy_test.go diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index 5788b8b3..e791c9a0 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -27,6 +27,7 @@ import ( "gorm.io/gorm" "sigs.k8s.io/yaml" "strings" + "sync" ) type Scheduler struct { @@ -39,6 +40,7 @@ type Scheduler struct { ResourceCollector *map[string]collector.ResourceCollector Storages database.Storage AiExecutor *map[string]executor.AiExecutor + mu sync.RWMutex } func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 300c6613..68fc15f1 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -63,7 +63,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { Name: resource.Name, }) } - strategy := strategy.NewReplicationStrategy(nil, 0) + strategy := strategy.NewReplicationStrategy(nil) return strategy, nil } diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 3b46596b..29ab4653 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -24,8 +24,8 @@ var ( } ) -func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.Executor, *map[string]collector.ResourceCollector) { - executorMap := make(map[string]executor.Executor) +func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.ResourceCollector) { + executorMap := make(map[string]executor.AiExecutor) collectorMap := make(map[string]collector.ResourceCollector) for k, v := range AiTypeMap { switch v { diff --git a/api/internal/scheduler/strategy/option/option.go b/api/internal/scheduler/strategy/option/option.go deleted file mode 100644 index 9b6328f5..00000000 --- a/api/internal/scheduler/strategy/option/option.go +++ /dev/null @@ -1,5 +0,0 @@ -package option - -type Option interface { - GetOption() (interface{}, error) -} diff --git a/api/internal/scheduler/strategy/option/resourcePricingOption.go b/api/internal/scheduler/strategy/option/resourcePricingOption.go deleted file mode 100644 index dcf2920a..00000000 --- a/api/internal/scheduler/strategy/option/resourcePricingOption.go +++ /dev/null @@ -1,4 +0,0 @@ -package option - -type ResourcePricingOption struct { -} diff --git a/api/internal/scheduler/strategy/params/params.go b/api/internal/scheduler/strategy/params/params.go new file mode 100644 index 00000000..44c29f13 --- /dev/null +++ b/api/internal/scheduler/strategy/params/params.go @@ -0,0 +1,9 @@ +package params + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector" +) + +type Params struct { + resources []*collector.ResourceSpecs +} diff --git a/api/internal/scheduler/strategy/params/replicationParams.go b/api/internal/scheduler/strategy/params/replicationParams.go new file mode 100644 index 00000000..7adfaea0 --- /dev/null +++ b/api/internal/scheduler/strategy/params/replicationParams.go @@ -0,0 +1,16 @@ +package params + +import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + +type ReplicationOption struct { + replicas int32 + participants []entity.Participant +} + +func (o *ReplicationOption) GetReplicas() int32 { + return o.replicas +} + +func (o *ReplicationOption) GetParticipants() []entity.Participant { + return o.participants +} diff --git a/api/internal/scheduler/strategy/params/resourcePricingParams.go b/api/internal/scheduler/strategy/params/resourcePricingParams.go new file mode 100644 index 00000000..10fe7bd6 --- /dev/null +++ b/api/internal/scheduler/strategy/params/resourcePricingParams.go @@ -0,0 +1,26 @@ +package params + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" +) + +type ResourcePricingOption struct { + replicas int32 + task *providerPricing.Task + providers []*providerPricing.Provider + *Params +} + +func NewResourcePricingOption(params *Params) *ResourcePricingOption { + return &ResourcePricingOption{ + Params: params, + } +} + +func (r *ResourcePricingOption) GetReplicas() int32 { + return r.replicas +} + +func (r *ResourcePricingOption) GetProviders() []*providerPricing.Provider { + return r.providers +} diff --git a/api/internal/scheduler/strategy/replication.go b/api/internal/scheduler/strategy/replication.go index 88ecd6fb..3f9549b9 100644 --- a/api/internal/scheduler/strategy/replication.go +++ b/api/internal/scheduler/strategy/replication.go @@ -3,6 +3,7 @@ package strategy import ( "github.com/pkg/errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/params" ) type ReplicationStrategy struct { @@ -10,9 +11,9 @@ type ReplicationStrategy struct { participants []entity.Participant } -func NewReplicationStrategy(participants []entity.Participant, replicas int32) *ReplicationStrategy { - return &ReplicationStrategy{replicas: replicas, - participants: participants, +func NewReplicationStrategy(params *params.ReplicationOption) *ReplicationStrategy { + return &ReplicationStrategy{replicas: params.GetReplicas(), + participants: params.GetParticipants(), } } diff --git a/api/internal/scheduler/strategy/staticWeight.go b/api/internal/scheduler/strategy/staticWeight.go index b028baaa..5215771d 100644 --- a/api/internal/scheduler/strategy/staticWeight.go +++ b/api/internal/scheduler/strategy/staticWeight.go @@ -13,6 +13,12 @@ type StaticWeightStrategy struct { weights []entity.WeightP } +func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy { + return &StaticWeightStrategy{weights: weights, + num: replicas, + } +} + func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { // TODO: implement the scheduling logic return nil, nil diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go new file mode 100644 index 00000000..767d71c4 --- /dev/null +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -0,0 +1,90 @@ +package test + +import ( + "fmt" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy" + "testing" +) + +func TestReplication(t *testing.T) { + parts := []entity.Participant{ + {Name: "test1", Participant_id: 1}, + {Name: "test2", Participant_id: 2}, + {Name: "test3", Participant_id: 3}, + } + tests := []struct { + name string + replica int32 + ps []entity.Participant + }{ + { + name: "test1", + replica: 1, + ps: parts, + }, + { + name: "test2", + replica: 2, + ps: parts, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + repl := strategy.NewReplicationStrategy(nil) + schedule, err := repl.Schedule() + if err != nil { + return + } + for _, cluster := range schedule { + fmt.Println(cluster) + } + + }) + } + +} + +func TestStaticWeight(t *testing.T) { + parts := []entity.WeightP{ + {Name: "p1", Participant_id: 1, Weight: 3}, + {Name: "p2", Participant_id: 2, Weight: 5}, + {Name: "p3", Participant_id: 3, Weight: 2}, + } + tests := []struct { + name string + replica int32 + ps []entity.WeightP + }{ + { + name: "test1", + replica: 1, + ps: parts, + }, + { + name: "test2", + replica: 5, + ps: parts, + }, + { + name: "test2", + replica: 6, + ps: parts, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + repl := strategy.NewStaticWeightStrategy(tt.ps, tt.replica) + schedule, err := repl.Schedule() + if err != nil { + return + } + for _, cluster := range schedule { + fmt.Println(cluster) + } + + }) + } +}