updated strategies of the scheduler and their params

Former-commit-id: c301ea7fef461e20cbe801947fa9551cbcff0caf
This commit is contained in:
tzwang 2024-01-30 17:33:48 +08:00
parent 7d91e83a4a
commit 125b94cf9c
11 changed files with 156 additions and 15 deletions

View File

@ -27,6 +27,7 @@ import (
"gorm.io/gorm"
"sigs.k8s.io/yaml"
"strings"
"sync"
)
type Scheduler struct {
@ -39,6 +40,7 @@ type Scheduler struct {
ResourceCollector *map[string]collector.ResourceCollector
Storages database.Storage
AiExecutor *map[string]executor.AiExecutor
mu sync.RWMutex
}
func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {

View File

@ -63,7 +63,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
Name: resource.Name,
})
}
strategy := strategy.NewReplicationStrategy(nil, 0)
strategy := strategy.NewReplicationStrategy(nil)
return strategy, nil
}

View File

@ -24,8 +24,8 @@ var (
}
)
func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.Executor, *map[string]collector.ResourceCollector) {
executorMap := make(map[string]executor.Executor)
func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.ResourceCollector) {
executorMap := make(map[string]executor.AiExecutor)
collectorMap := make(map[string]collector.ResourceCollector)
for k, v := range AiTypeMap {
switch v {

View File

@ -1,5 +0,0 @@
package option
type Option interface {
GetOption() (interface{}, error)
}

View File

@ -1,4 +0,0 @@
package option
type ResourcePricingOption struct {
}

View File

@ -0,0 +1,9 @@
package params
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
)
type Params struct {
resources []*collector.ResourceSpecs
}

View File

@ -0,0 +1,16 @@
package params
import "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
type ReplicationOption struct {
replicas int32
participants []entity.Participant
}
func (o *ReplicationOption) GetReplicas() int32 {
return o.replicas
}
func (o *ReplicationOption) GetParticipants() []entity.Participant {
return o.participants
}

View File

@ -0,0 +1,26 @@
package params
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
)
type ResourcePricingOption struct {
replicas int32
task *providerPricing.Task
providers []*providerPricing.Provider
*Params
}
func NewResourcePricingOption(params *Params) *ResourcePricingOption {
return &ResourcePricingOption{
Params: params,
}
}
func (r *ResourcePricingOption) GetReplicas() int32 {
return r.replicas
}
func (r *ResourcePricingOption) GetProviders() []*providerPricing.Provider {
return r.providers
}

View File

@ -3,6 +3,7 @@ package strategy
import (
"github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/params"
)
type ReplicationStrategy struct {
@ -10,9 +11,9 @@ type ReplicationStrategy struct {
participants []entity.Participant
}
func NewReplicationStrategy(participants []entity.Participant, replicas int32) *ReplicationStrategy {
return &ReplicationStrategy{replicas: replicas,
participants: participants,
func NewReplicationStrategy(params *params.ReplicationOption) *ReplicationStrategy {
return &ReplicationStrategy{replicas: params.GetReplicas(),
participants: params.GetParticipants(),
}
}

View File

@ -13,6 +13,12 @@ type StaticWeightStrategy struct {
weights []entity.WeightP
}
func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy {
return &StaticWeightStrategy{weights: weights,
num: replicas,
}
}
func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
// TODO: implement the scheduling logic return nil, nil

View File

@ -0,0 +1,90 @@
package test
import (
"fmt"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy"
"testing"
)
func TestReplication(t *testing.T) {
parts := []entity.Participant{
{Name: "test1", Participant_id: 1},
{Name: "test2", Participant_id: 2},
{Name: "test3", Participant_id: 3},
}
tests := []struct {
name string
replica int32
ps []entity.Participant
}{
{
name: "test1",
replica: 1,
ps: parts,
},
{
name: "test2",
replica: 2,
ps: parts,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
repl := strategy.NewReplicationStrategy(nil)
schedule, err := repl.Schedule()
if err != nil {
return
}
for _, cluster := range schedule {
fmt.Println(cluster)
}
})
}
}
func TestStaticWeight(t *testing.T) {
parts := []entity.WeightP{
{Name: "p1", Participant_id: 1, Weight: 3},
{Name: "p2", Participant_id: 2, Weight: 5},
{Name: "p3", Participant_id: 3, Weight: 2},
}
tests := []struct {
name string
replica int32
ps []entity.WeightP
}{
{
name: "test1",
replica: 1,
ps: parts,
},
{
name: "test2",
replica: 5,
ps: parts,
},
{
name: "test2",
replica: 6,
ps: parts,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
repl := strategy.NewStaticWeightStrategy(tt.ps, tt.replica)
schedule, err := repl.Schedule()
if err != nil {
return
}
for _, cluster := range schedule {
fmt.Println(cluster)
}
})
}
}