modified staticWeight strategy and separate it from algorithm

Former-commit-id: 77630a215c8ad8fd3711f2e941f74e5c75ff29f2
This commit is contained in:
tzwang 2024-03-22 17:16:19 +08:00
parent 1cb0a39320
commit c39c628670
5 changed files with 109 additions and 71 deletions

View File

@ -0,0 +1,68 @@
package weightDistributing
import (
"math"
)
type Weight struct {
Id int64
Weight int32
Name string
Replica int32
}
func DistributeReplicas(weights []*Weight, replicas int32) {
var weightSum int32
weightSum = 0
for _, w := range weights {
weightSum += w.Weight
}
weightRatio := make([]float64, len(weights))
for i, w := range weights {
weightRatio[i] = float64(w.Weight) / float64(weightSum)
}
var rest = replicas
for i := 0; i < len(weights); i++ {
var n = math.Round(float64(replicas) * weightRatio[i])
rest -= int32(n)
weights[i].Replica = int32(n)
}
for {
if rest == 0 {
break
}
maxIdx := 0
minIdx := 0
if rest > 0 {
for i, ratio := range weightRatio {
if ratio > weightRatio[maxIdx] {
maxIdx = i
}
}
} else {
for i, ratio := range weightRatio {
if ratio < weightRatio[minIdx] {
minIdx = i
}
}
}
if rest > 0 {
weights[maxIdx].Replica++
weightRatio[maxIdx]--
rest--
} else {
weights[minIdx].Replica--
weightRatio[minIdx]++
rest++
}
}
}

View File

@ -11,9 +11,3 @@ type Participant struct {
Name string Name string
Participant_id int64 Participant_id int64
} }
type WeightP struct {
Participant_id int64
Weight int32
Name string
}

View File

@ -82,6 +82,10 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
case strategy.DYNAMIC_RESOURCES: case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil return strategy, nil
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, 1)
return strategy, nil
} }
return nil, errors.New("no strategy has been chosen") return nil, errors.New("no strategy has been chosen")
@ -92,14 +96,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return errors.New("clusters is nil") return errors.New("clusters is nil")
} }
//executorMap := *as.AiExecutor executorMap := *as.AiExecutor
//for _, cluster := range clusters { for _, cluster := range clusters {
// _, err := executorMap[cluster.Name].Execute(as.option) _, err := executorMap[cluster.Name].Execute(as.option)
// if err != nil { if err != nil {
// // TODO: database operation // TODO: database operation
// } }
// // TODO: database operation // TODO: database operation
//} }
return nil return nil
} }

View File

@ -2,73 +2,45 @@ package strategy
import ( import (
"errors" "errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/weightDistributing"
) )
type StaticWeightStrategy struct { type StaticWeightStrategy struct {
// TODO: add fields staticWeightMap map[string]int32
replicas int32
//每个
num int32
weights []entity.WeightP
} }
func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy { func NewStaticWeightStrategy(staticWeightMap map[string]int32, replicas int32) *StaticWeightStrategy {
return &StaticWeightStrategy{weights: weights, return &StaticWeightStrategy{staticWeightMap: staticWeightMap,
num: replicas, replicas: replicas,
} }
} }
func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
// TODO: implement the scheduling logic return nil, nil
if ps.num < 1 { if s.replicas < 1 {
return nil, errors.New("numbers must be greater than 0") return nil, errors.New("replicas must be greater than 0")
} }
if ps.weights == nil { if len(s.staticWeightMap) == 0 || s.staticWeightMap == nil {
return nil, errors.New("weight must be set") return nil, errors.New("weight must be set")
} }
var weightSum int32 weights := make([]*weightDistributing.Weight, 0)
weightSum = 0 for k, v := range s.staticWeightMap {
for _, w := range ps.weights { weight := &weightDistributing.Weight{
weightSum += w.Weight Name: k,
} Weight: v,
weightRatio := make([]float64, len(ps.weights))
for i, w := range ps.weights {
weightRatio[i] = float64(w.Weight) / float64(weightSum)
}
var rest = ps.num
var results []*AssignedCluster
for i := 0; i < len(ps.weights); i++ {
var n = int(float64(ps.num) * weightRatio[i])
rest -= int32(n)
cluster := &AssignedCluster{ParticipantId: ps.weights[i].Participant_id, Name: ps.weights[i].Name, Replicas: int32(n)}
results = append(results, cluster)
}
if rest != 0 {
if rest < 0 { // 如果差值小于0需要增加某些元素的值
for i := len(ps.weights) - 1; rest < 0 && i >= 0; i-- {
if results[i].Replicas < ps.weights[i].Weight {
results[i].Replicas++
rest++
}
}
} else {
for i := len(ps.weights) - 1; rest > 0 && i >= 0; i-- {
if results[i].Replicas < ps.weights[i].Weight {
results[i].Replicas--
rest--
}
}
} }
weights = append(weights, weight)
}
weightDistributing.DistributeReplicas(weights, s.replicas)
var results []*AssignedCluster
for _, weight := range weights {
cluster := &AssignedCluster{ParticipantId: weight.Id, Name: weight.Name, Replicas: weight.Replica}
results = append(results, cluster)
} }
return results, nil return results, nil

View File

@ -63,15 +63,15 @@ func TestReplication(t *testing.T) {
} }
func TestStaticWeight(t *testing.T) { func TestStaticWeight(t *testing.T) {
parts := []entity.WeightP{ parts := map[string]int32{
{Name: "p1", Participant_id: 1, Weight: 3}, "test1": 6,
{Name: "p2", Participant_id: 2, Weight: 5}, "test2": 5,
{Name: "p3", Participant_id: 3, Weight: 2}, "test3": 2,
} }
tests := []struct { tests := []struct {
name string name string
replica int32 replica int32
ps []entity.WeightP ps map[string]int32
}{ }{
{ {
name: "test1", name: "test1",