From c39c628670276502b8b513be117743782f9d290c Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 22 Mar 2024 17:16:19 +0800 Subject: [PATCH] modified staticWeight strategy and separate it from algorithm Former-commit-id: 77630a215c8ad8fd3711f2e941f74e5c75ff29f2 --- .../weightDistributing/weightDistributing.go | 68 +++++++++++++++++ api/internal/scheduler/entity/entity.go | 6 -- .../scheduler/schedulers/aiScheduler.go | 20 +++-- .../scheduler/strategy/staticWeight.go | 76 ++++++------------- .../scheduler/strategy/test/strategy_test.go | 10 +-- 5 files changed, 109 insertions(+), 71 deletions(-) create mode 100644 api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go diff --git a/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go new file mode 100644 index 00000000..0124fa8b --- /dev/null +++ b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go @@ -0,0 +1,68 @@ +package weightDistributing + +import ( + "math" +) + +type Weight struct { + Id int64 + Weight int32 + Name string + Replica int32 +} + +func DistributeReplicas(weights []*Weight, replicas int32) { + var weightSum int32 + weightSum = 0 + for _, w := range weights { + weightSum += w.Weight + } + + weightRatio := make([]float64, len(weights)) + for i, w := range weights { + weightRatio[i] = float64(w.Weight) / float64(weightSum) + } + + var rest = replicas + + for i := 0; i < len(weights); i++ { + + var n = math.Round(float64(replicas) * weightRatio[i]) + rest -= int32(n) + + weights[i].Replica = int32(n) + } + + for { + if rest == 0 { + break + } + + maxIdx := 0 + minIdx := 0 + + if rest > 0 { + for i, ratio := range weightRatio { + if ratio > weightRatio[maxIdx] { + maxIdx = i + } + } + } else { + for i, ratio := range weightRatio { + if ratio < weightRatio[minIdx] { + minIdx = i + } + } + } + + if rest > 0 { + weights[maxIdx].Replica++ + weightRatio[maxIdx]-- + rest-- + } else { + weights[minIdx].Replica-- + weightRatio[minIdx]++ + rest++ + } + } +} diff --git a/api/internal/scheduler/entity/entity.go b/api/internal/scheduler/entity/entity.go index 7f5f6951..33e48dba 100644 --- a/api/internal/scheduler/entity/entity.go +++ b/api/internal/scheduler/entity/entity.go @@ -11,9 +11,3 @@ type Participant struct { Name string Participant_id int64 } - -type WeightP struct { - Participant_id int64 - Weight int32 - Name string -} diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 8a3a29cd..3ac52d10 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -82,6 +82,10 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { case strategy.DYNAMIC_RESOURCES: strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) return strategy, nil + case strategy.STATIC_WEIGHT: + //todo resources should match cluster StaticWeightMap + strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, 1) + return strategy, nil } return nil, errors.New("no strategy has been chosen") @@ -92,14 +96,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return errors.New("clusters is nil") } - //executorMap := *as.AiExecutor - //for _, cluster := range clusters { - // _, err := executorMap[cluster.Name].Execute(as.option) - // if err != nil { - // // TODO: database operation - // } - // // TODO: database operation - //} + executorMap := *as.AiExecutor + for _, cluster := range clusters { + _, err := executorMap[cluster.Name].Execute(as.option) + if err != nil { + // TODO: database operation + } + // TODO: database operation + } return nil } diff --git a/api/internal/scheduler/strategy/staticWeight.go b/api/internal/scheduler/strategy/staticWeight.go index 76230ca3..2172bec3 100644 --- a/api/internal/scheduler/strategy/staticWeight.go +++ b/api/internal/scheduler/strategy/staticWeight.go @@ -2,73 +2,45 @@ package strategy import ( "errors" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/weightDistributing" ) type StaticWeightStrategy struct { - // TODO: add fields - - //每个 - num int32 - weights []entity.WeightP + staticWeightMap map[string]int32 + replicas int32 } -func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy { - return &StaticWeightStrategy{weights: weights, - num: replicas, +func NewStaticWeightStrategy(staticWeightMap map[string]int32, replicas int32) *StaticWeightStrategy { + return &StaticWeightStrategy{staticWeightMap: staticWeightMap, + replicas: replicas, } } -func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { - // TODO: implement the scheduling logic return nil, nil +func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { - if ps.num < 1 { - return nil, errors.New("numbers must be greater than 0") + if s.replicas < 1 { + return nil, errors.New("replicas must be greater than 0") } - if ps.weights == nil { + if len(s.staticWeightMap) == 0 || s.staticWeightMap == nil { return nil, errors.New("weight must be set") } - var weightSum int32 - weightSum = 0 - for _, w := range ps.weights { - weightSum += w.Weight - } - - weightRatio := make([]float64, len(ps.weights)) - for i, w := range ps.weights { - weightRatio[i] = float64(w.Weight) / float64(weightSum) - } - - var rest = ps.num - var results []*AssignedCluster - - for i := 0; i < len(ps.weights); i++ { - - var n = int(float64(ps.num) * weightRatio[i]) - rest -= int32(n) - - cluster := &AssignedCluster{ParticipantId: ps.weights[i].Participant_id, Name: ps.weights[i].Name, Replicas: int32(n)} - results = append(results, cluster) - } - - if rest != 0 { - if rest < 0 { // 如果差值小于0,需要增加某些元素的值 - for i := len(ps.weights) - 1; rest < 0 && i >= 0; i-- { - if results[i].Replicas < ps.weights[i].Weight { - results[i].Replicas++ - rest++ - } - } - } else { - for i := len(ps.weights) - 1; rest > 0 && i >= 0; i-- { - if results[i].Replicas < ps.weights[i].Weight { - results[i].Replicas-- - rest-- - } - } + weights := make([]*weightDistributing.Weight, 0) + for k, v := range s.staticWeightMap { + weight := &weightDistributing.Weight{ + Name: k, + Weight: v, } + weights = append(weights, weight) + } + + weightDistributing.DistributeReplicas(weights, s.replicas) + + var results []*AssignedCluster + for _, weight := range weights { + cluster := &AssignedCluster{ParticipantId: weight.Id, Name: weight.Name, Replicas: weight.Replica} + results = append(results, cluster) } return results, nil diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index d2cb9fb1..eb0f59ad 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -63,15 +63,15 @@ func TestReplication(t *testing.T) { } func TestStaticWeight(t *testing.T) { - parts := []entity.WeightP{ - {Name: "p1", Participant_id: 1, Weight: 3}, - {Name: "p2", Participant_id: 2, Weight: 5}, - {Name: "p3", Participant_id: 3, Weight: 2}, + parts := map[string]int32{ + "test1": 6, + "test2": 5, + "test3": 2, } tests := []struct { name string replica int32 - ps []entity.WeightP + ps map[string]int32 }{ { name: "test1",