diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 98f5f916..548a06eb 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -44,6 +44,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type Params: req.AiOption.Params, Envs: req.AiOption.Envs, Cmd: req.AiOption.Cmd, + ClusterIds: req.AiOption.AiClusterIds, } aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt) if err != nil { diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 5c8daacc..d0a5e485 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -25,7 +25,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -71,44 +70,45 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil } - resources, err := as.findClustersWithResources() + /* resources, err := as.findClustersWithResources() - if err != nil { - return nil, err - } - if len(resources) == 0 { - return nil, errors.New("no cluster has resources") - } + if err != nil { + return nil, err + } + if len(resources) == 0 { + return nil, errors.New("no cluster has resources") + } - if len(resources) == 1 { - var cluster strategy.AssignedCluster - cluster.ClusterId = resources[0].ClusterId - cluster.Replicas = 1 - return &strategy.SingleAssignment{Cluster: &cluster}, nil - } + if len(resources) == 1 { + var cluster strategy.AssignedCluster + cluster.ClusterId = resources[0].ClusterId + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil + } - params := ¶m.Params{Resources: resources} + params := ¶m.Params{Resources: resources}*/ switch as.option.StrategyName { case strategy.REPLICATION: var clusterIds []string - for _, resource := range resources { + /* for _, resource := range resources { clusterIds = append(clusterIds, resource.ClusterId) - } + }*/ strategy := strategy.NewReplicationStrategy(clusterIds, 1) return strategy, nil - case strategy.RESOURCES_PRICING: - strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) - return strategy, nil - case strategy.DYNAMIC_RESOURCES: - strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) - return strategy, nil + /* case strategy.RESOURCES_PRICING: + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) + return strategy, nil + case strategy.DYNAMIC_RESOURCES: + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) + return strategy, nil*/ case strategy.STATIC_WEIGHT: //todo resources should match cluster StaticWeightMap strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, as.option.Replica) return strategy, nil case strategy.RANDOM: - + strategy := strategy.NewRandomStrategy(as.option.ClusterIds, as.option.Replica) + return strategy, nil } return nil, errors.New("no strategy has been chosen") diff --git a/api/internal/scheduler/strategy/random.go b/api/internal/scheduler/strategy/random.go index b030bafd..315c0b3f 100644 --- a/api/internal/scheduler/strategy/random.go +++ b/api/internal/scheduler/strategy/random.go @@ -1,10 +1,66 @@ package strategy -/*type RandomStrategy struct { +import ( + "github.com/pkg/errors" + "math/rand" +) + +type RandomStrategy struct { clusterIds []string replicas int32 } -func NewRandomStrategy(clusterIds []string, replicas int32) *NewRandomStrategy { - return &RandomStrategy{clusterIds: clusterIds, replicas: replicas} -}*/ +func NewRandomStrategy(clusterIds []string, replicas int32) *RandomStrategy { + return &RandomStrategy{clusterIds: clusterIds, + replicas: replicas, + } +} + +func (s *RandomStrategy) Schedule() ([]*AssignedCluster, error) { + + if s.replicas < 1 { + return nil, errors.New("replicas must be greater than 0") + } + + if len(s.clusterIds) < 1 { + return nil, errors.New("cluster must be greater than 0") + } + + if len(s.clusterIds) == 0 || s.clusterIds == nil { + return nil, errors.New("weight must be set") + } + + // 创建一个切片来保存每个部分的数量 + parts := make([]int32, len(s.clusterIds)) + + // 首先将每个部分都分配至少一个副本 + for i := range parts { + parts[i] = 1 + s.replicas-- + } + + // 剩余要分配的副本数 + remaining := s.replicas + + // 随机分配剩余的副本 + for remaining > 0 { + // 随机选择一个部分(索引从0到numParts-1) + partIndex := rand.Intn(len(s.clusterIds)) + + // 如果该部分加上一个副本后不会超过总数,则分配一个副本 + if parts[partIndex]+1 <= s.replicas { + parts[partIndex]++ + remaining-- + } + } + + var results []*AssignedCluster + if len(s.clusterIds) == len(parts) { + for i, key := range s.clusterIds { + cluster := &AssignedCluster{ClusterId: key, Replicas: parts[i]} + results = append(results, cluster) + } + } + return results, nil + +} diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index 8802ffa2..de36b7be 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -202,7 +202,7 @@ func splitRandomParts(total int) []int { func TestNumRandom(t *testing.T) { total := 10 // 假设副本数是10 - numParts := 3 // 假设要分成5个集群 + numParts := 2 // 假设要分成5个集群 parts, err := splitIntoParts(total, numParts) if err != nil {