NewRandomStrategy
Former-commit-id: 6d0df33c37d7e5a58d46a41d4e3ce78bf2860a62
This commit is contained in:
commit
af7b932962
|
@ -45,6 +45,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
|
||||||
Params: req.AiOption.Params,
|
Params: req.AiOption.Params,
|
||||||
Envs: req.AiOption.Envs,
|
Envs: req.AiOption.Envs,
|
||||||
Cmd: req.AiOption.Cmd,
|
Cmd: req.AiOption.Cmd,
|
||||||
|
ClusterIds: req.AiOption.AiClusterIds,
|
||||||
}
|
}
|
||||||
aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
|
aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -26,7 +26,6 @@ import (
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||||
|
@ -91,7 +90,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
||||||
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
|
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
resources, err := as.findClustersWithResources()
|
/* resources, err := as.findClustersWithResources()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -107,26 +106,29 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
||||||
return &strategy.SingleAssignment{Cluster: &cluster}, nil
|
return &strategy.SingleAssignment{Cluster: &cluster}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
params := ¶m.Params{Resources: resources}
|
params := ¶m.Params{Resources: resources}*/
|
||||||
|
|
||||||
switch as.option.StrategyName {
|
switch as.option.StrategyName {
|
||||||
case strategy.REPLICATION:
|
case strategy.REPLICATION:
|
||||||
var clusterIds []string
|
var clusterIds []string
|
||||||
for _, resource := range resources {
|
/* for _, resource := range resources {
|
||||||
clusterIds = append(clusterIds, resource.ClusterId)
|
clusterIds = append(clusterIds, resource.ClusterId)
|
||||||
}
|
}*/
|
||||||
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
|
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
|
||||||
return strategy, nil
|
return strategy, nil
|
||||||
case strategy.RESOURCES_PRICING:
|
/* case strategy.RESOURCES_PRICING:
|
||||||
strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1})
|
strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1})
|
||||||
return strategy, nil
|
return strategy, nil
|
||||||
case strategy.DYNAMIC_RESOURCES:
|
case strategy.DYNAMIC_RESOURCES:
|
||||||
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
|
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
|
||||||
return strategy, nil
|
return strategy, nil*/
|
||||||
case strategy.STATIC_WEIGHT:
|
case strategy.STATIC_WEIGHT:
|
||||||
//todo resources should match cluster StaticWeightMap
|
//todo resources should match cluster StaticWeightMap
|
||||||
strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
|
strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica)
|
||||||
return strategy, nil
|
return strategy, nil
|
||||||
|
case strategy.RANDOM:
|
||||||
|
strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica)
|
||||||
|
return strategy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.New("no strategy has been chosen")
|
return nil, errors.New("no strategy has been chosen")
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
package strategy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"math/rand"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RandomStrategy struct {
|
||||||
|
clusterIds []string
|
||||||
|
replicas int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRandomStrategy(clusterIds []string, replicas int32) *RandomStrategy {
|
||||||
|
return &RandomStrategy{clusterIds: clusterIds,
|
||||||
|
replicas: replicas,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *RandomStrategy) Schedule() ([]*AssignedCluster, error) {
|
||||||
|
|
||||||
|
if s.replicas < 1 {
|
||||||
|
return nil, errors.New("replicas must be greater than 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s.clusterIds) < 1 {
|
||||||
|
return nil, errors.New("cluster must be greater than 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s.clusterIds) == 0 || s.clusterIds == nil {
|
||||||
|
return nil, errors.New("weight must be set")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建一个切片来保存每个部分的数量
|
||||||
|
parts := make([]int32, len(s.clusterIds))
|
||||||
|
|
||||||
|
// 首先将每个部分都分配至少一个副本
|
||||||
|
for i := range parts {
|
||||||
|
parts[i] = 1
|
||||||
|
s.replicas--
|
||||||
|
}
|
||||||
|
|
||||||
|
// 剩余要分配的副本数
|
||||||
|
remaining := s.replicas
|
||||||
|
|
||||||
|
// 随机分配剩余的副本
|
||||||
|
for remaining > 0 {
|
||||||
|
// 随机选择一个部分(索引从0到numParts-1)
|
||||||
|
partIndex := rand.Intn(len(s.clusterIds))
|
||||||
|
|
||||||
|
// 如果该部分加上一个副本后不会超过总数,则分配一个副本
|
||||||
|
if parts[partIndex]+1 <= s.replicas {
|
||||||
|
parts[partIndex]++
|
||||||
|
remaining--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var results []*AssignedCluster
|
||||||
|
if len(s.clusterIds) == len(parts) {
|
||||||
|
for i, key := range s.clusterIds {
|
||||||
|
cluster := &AssignedCluster{ClusterId: key, Replicas: parts[i]}
|
||||||
|
results = append(results, cluster)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results, nil
|
||||||
|
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ const (
|
||||||
RESOURCES_PRICING = "resourcesPricing"
|
RESOURCES_PRICING = "resourcesPricing"
|
||||||
STATIC_WEIGHT = "staticWeight"
|
STATIC_WEIGHT = "staticWeight"
|
||||||
DYNAMIC_RESOURCES = "dynamicResources"
|
DYNAMIC_RESOURCES = "dynamicResources"
|
||||||
|
RANDOM = "random"
|
||||||
DATA_LOCALITY = "dataLocality" //感知数据位置,数据调度和计算调度协同,近数据调度
|
DATA_LOCALITY = "dataLocality" //感知数据位置,数据调度和计算调度协同,近数据调度
|
||||||
ENERGY_CONSUMPTION = "energyConsumption" //根据各集群总体能耗水平调度作业,优先选择能耗低的集群调度作业
|
ENERGY_CONSUMPTION = "energyConsumption" //根据各集群总体能耗水平调度作业,优先选择能耗低的集群调度作业
|
||||||
)
|
)
|
||||||
|
|
|
@ -5,7 +5,9 @@ import (
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestReplication(t *testing.T) {
|
func TestReplication(t *testing.T) {
|
||||||
|
@ -106,3 +108,144 @@ func TestStaticWeight(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRandom(t *testing.T) {
|
||||||
|
// 使用当前时间作为随机数种子,确保每次程序运行产生的随机数序列都不同
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
/*randomNum := randInt(1, 100)
|
||||||
|
fmt.Println("Random number:", randomNum)*/
|
||||||
|
total := 5 // 假设总数是5
|
||||||
|
first, second := splitIntoTwoRandomParts(total)
|
||||||
|
fmt.Printf("第一部分的数量: %d, 第二部分的数量: %d\n", first, second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// randInt 生成一个指定范围内的随机整数,包括min但不包括max
|
||||||
|
func randInt(min, max int) int {
|
||||||
|
return min + rand.Intn(max-min)
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitIntoTwoRandomParts(total int) (int, int) {
|
||||||
|
if total < 2 {
|
||||||
|
// 如果总数小于2,则无法分成两部分
|
||||||
|
return 0, 0
|
||||||
|
}
|
||||||
|
// 生成一个随机数作为第一部分的数量(范围在[1, total-1]之间)
|
||||||
|
firstPart := rand.Intn(total-1) + 1
|
||||||
|
// 第二部分的数量就是总数减去第一部分的数量
|
||||||
|
secondPart := total - firstPart
|
||||||
|
return firstPart, secondPart
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitIntoRandomParts(total int) (int, int) {
|
||||||
|
if total < 2 {
|
||||||
|
// 如果总数小于2,则无法分成两部分
|
||||||
|
return 0, 0
|
||||||
|
}
|
||||||
|
// 生成一个随机数作为第一部分的数量(范围在[1, total-1]之间)
|
||||||
|
firstPart := rand.Intn(total-1) + 1
|
||||||
|
// 第二部分的数量就是总数减去第一部分的数量
|
||||||
|
secondPart := total - firstPart
|
||||||
|
return firstPart, secondPart
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRandoms(t *testing.T) {
|
||||||
|
// 使用当前时间作为随机数种子,确保每次程序运行产生的随机数序列都不同
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
/*randomNum := randInt(1, 100)
|
||||||
|
fmt.Println("Random number:", randomNum)*/
|
||||||
|
total := 10 // 假设总数是5
|
||||||
|
parts := splitRandomParts(total)
|
||||||
|
fmt.Println("分配结果:", parts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitIntoRandomParts 将总数total随机分成多个部分,并返回这些部分的切片
|
||||||
|
func splitRandomParts(total int) []int {
|
||||||
|
if total < 2 {
|
||||||
|
// 如果总数小于2,则无法分成多个部分
|
||||||
|
return []int{total}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建一个切片来保存每个部分的数量
|
||||||
|
var parts []int
|
||||||
|
|
||||||
|
// 剩余要分配的副本数
|
||||||
|
remaining := total
|
||||||
|
|
||||||
|
// 随机决定要分成的部分数量(至少2个部分)
|
||||||
|
numParts := rand.Intn(total-1) + 2
|
||||||
|
|
||||||
|
// 确保每个部分至少获得1个副本
|
||||||
|
for i := 0; i < numParts-1; i++ {
|
||||||
|
// 生成一个随机数(1到剩余副本数之间)
|
||||||
|
// 为了避免最后一个部分太小,我们可能需要调整随机数范围
|
||||||
|
minPartSize := 1
|
||||||
|
if remaining <= numParts-i {
|
||||||
|
// 如果剩余副本数不足以让每个部分都至少获得1个,则调整最小部分大小
|
||||||
|
minPartSize = remaining / (numParts - i)
|
||||||
|
if remaining%(numParts-i) > 0 {
|
||||||
|
minPartSize++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// 生成一个大于等于minPartSize且小于等于remaining的随机数
|
||||||
|
partSize := minPartSize + rand.Intn(remaining-minPartSize+1)
|
||||||
|
parts = append(parts, partSize)
|
||||||
|
remaining -= partSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// 最后一个部分的数量就是剩余的副本数
|
||||||
|
parts = append(parts, remaining)
|
||||||
|
|
||||||
|
return parts
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNumRandom(t *testing.T) {
|
||||||
|
total := 10 // 假设副本数是10
|
||||||
|
numParts := 2 // 假设要分成5个集群
|
||||||
|
|
||||||
|
parts, err := splitIntoParts(total, numParts)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("Error:", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fmt.Println("分配结果:", parts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// splitIntoParts 将总数total随机分成numParts个部分,并返回这些部分的切片
|
||||||
|
func splitIntoParts(total int, numParts int) ([]int, error) {
|
||||||
|
if total < 1 || numParts < 1 {
|
||||||
|
// 总数或部分数量不能小于1
|
||||||
|
return nil, fmt.Errorf("total and numParts must be greater than 0")
|
||||||
|
}
|
||||||
|
if numParts > total {
|
||||||
|
// 部分数量不能大于总数
|
||||||
|
return nil, fmt.Errorf("numParts cannot be greater than total")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建一个切片来保存每个部分的数量
|
||||||
|
parts := make([]int, numParts)
|
||||||
|
|
||||||
|
// 首先将每个部分都分配至少一个副本
|
||||||
|
for i := range parts {
|
||||||
|
parts[i] = 1
|
||||||
|
total--
|
||||||
|
}
|
||||||
|
|
||||||
|
// 剩余要分配的副本数
|
||||||
|
remaining := total
|
||||||
|
|
||||||
|
// 随机分配剩余的副本
|
||||||
|
for remaining > 0 {
|
||||||
|
// 随机选择一个部分(索引从0到numParts-1)
|
||||||
|
partIndex := rand.Intn(numParts)
|
||||||
|
|
||||||
|
// 如果该部分加上一个副本后不会超过总数,则分配一个副本
|
||||||
|
if parts[partIndex]+1 <= total {
|
||||||
|
parts[partIndex]++
|
||||||
|
remaining--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return parts, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue