diff --git a/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go new file mode 100644 index 00000000..0124fa8b --- /dev/null +++ b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go @@ -0,0 +1,68 @@ +package weightDistributing + +import ( + "math" +) + +type Weight struct { + Id int64 + Weight int32 + Name string + Replica int32 +} + +func DistributeReplicas(weights []*Weight, replicas int32) { + var weightSum int32 + weightSum = 0 + for _, w := range weights { + weightSum += w.Weight + } + + weightRatio := make([]float64, len(weights)) + for i, w := range weights { + weightRatio[i] = float64(w.Weight) / float64(weightSum) + } + + var rest = replicas + + for i := 0; i < len(weights); i++ { + + var n = math.Round(float64(replicas) * weightRatio[i]) + rest -= int32(n) + + weights[i].Replica = int32(n) + } + + for { + if rest == 0 { + break + } + + maxIdx := 0 + minIdx := 0 + + if rest > 0 { + for i, ratio := range weightRatio { + if ratio > weightRatio[maxIdx] { + maxIdx = i + } + } + } else { + for i, ratio := range weightRatio { + if ratio < weightRatio[minIdx] { + minIdx = i + } + } + } + + if rest > 0 { + weights[maxIdx].Replica++ + weightRatio[maxIdx]-- + rest-- + } else { + weights[minIdx].Replica-- + weightRatio[minIdx]++ + rest++ + } + } +} diff --git a/api/internal/scheduler/entity/entity.go b/api/internal/scheduler/entity/entity.go index 7f5f6951..33e48dba 100644 --- a/api/internal/scheduler/entity/entity.go +++ b/api/internal/scheduler/entity/entity.go @@ -11,9 +11,3 @@ type Participant struct { Name string Participant_id int64 } - -type WeightP struct { - Participant_id int64 - Weight int32 - Name string -} diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 2d5518fa..3ac52d10 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -61,12 +61,17 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { if len(resources) == 0 { return nil, errors.New("no cluster has resources") } - params := ¶m.Params{Resources: resources} if len(resources) == 1 { - return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil + var cluster strategy.AssignedCluster + cluster.ParticipantId = resources[0].ParticipantId + cluster.Name = resources[0].Name + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil } + params := ¶m.Params{Resources: resources} + switch as.option.StrategyName { case strategy.REPLICATION: strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: 1}) @@ -75,7 +80,11 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil case strategy.DYNAMIC_RESOURCES: - strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1) + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) + return strategy, nil + case strategy.STATIC_WEIGHT: + //todo resources should match cluster StaticWeightMap + strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, 1) return strategy, nil } diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index c6e68851..b4d66c68 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -9,8 +9,11 @@ type ResourceStats struct { ParticipantId int64 Name string CpuCoreAvail int64 + CpuCoreTotal int64 MemAvail float64 + MemTotal float64 DiskAvail float64 + DiskTotal float64 GpuAvail int64 CardsAvail []*Card CpuCoreHours float64 @@ -23,7 +26,7 @@ type Card struct { Name string TOpsAtFp16 float64 CardHours float64 - Num int32 + CardNum int32 } type DatasetsSpecs struct { diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index ea528311..c8d4052f 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -28,10 +28,10 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { var maxCardHoursAvailable float64 var maxCpuCoreHoursAvailable float64 - var assignedCluster *AssignedCluster + var assignedCluster AssignedCluster var results []*AssignedCluster for _, res := range ps.resources { - if opt.ResourceType == "" { + if opt.ResourceType == "cpu" { if res.CpuCoreHours <= 0 { cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas} results = append(results, cluster) @@ -46,7 +46,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } } - if opt.ResourceType == "" { + if opt.ResourceType == "computeCard" { var maxCurrentCardHours float64 for _, card := range res.CardsAvail { cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3) @@ -62,7 +62,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } } } - results = append(results, assignedCluster) + results = append(results, &assignedCluster) return results, nil } diff --git a/api/internal/scheduler/strategy/staticWeight.go b/api/internal/scheduler/strategy/staticWeight.go index 76230ca3..2172bec3 100644 --- a/api/internal/scheduler/strategy/staticWeight.go +++ b/api/internal/scheduler/strategy/staticWeight.go @@ -2,73 +2,45 @@ package strategy import ( "errors" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/weightDistributing" ) type StaticWeightStrategy struct { - // TODO: add fields - - //每个 - num int32 - weights []entity.WeightP + staticWeightMap map[string]int32 + replicas int32 } -func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy { - return &StaticWeightStrategy{weights: weights, - num: replicas, +func NewStaticWeightStrategy(staticWeightMap map[string]int32, replicas int32) *StaticWeightStrategy { + return &StaticWeightStrategy{staticWeightMap: staticWeightMap, + replicas: replicas, } } -func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { - // TODO: implement the scheduling logic return nil, nil +func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { - if ps.num < 1 { - return nil, errors.New("numbers must be greater than 0") + if s.replicas < 1 { + return nil, errors.New("replicas must be greater than 0") } - if ps.weights == nil { + if len(s.staticWeightMap) == 0 || s.staticWeightMap == nil { return nil, errors.New("weight must be set") } - var weightSum int32 - weightSum = 0 - for _, w := range ps.weights { - weightSum += w.Weight - } - - weightRatio := make([]float64, len(ps.weights)) - for i, w := range ps.weights { - weightRatio[i] = float64(w.Weight) / float64(weightSum) - } - - var rest = ps.num - var results []*AssignedCluster - - for i := 0; i < len(ps.weights); i++ { - - var n = int(float64(ps.num) * weightRatio[i]) - rest -= int32(n) - - cluster := &AssignedCluster{ParticipantId: ps.weights[i].Participant_id, Name: ps.weights[i].Name, Replicas: int32(n)} - results = append(results, cluster) - } - - if rest != 0 { - if rest < 0 { // 如果差值小于0,需要增加某些元素的值 - for i := len(ps.weights) - 1; rest < 0 && i >= 0; i-- { - if results[i].Replicas < ps.weights[i].Weight { - results[i].Replicas++ - rest++ - } - } - } else { - for i := len(ps.weights) - 1; rest > 0 && i >= 0; i-- { - if results[i].Replicas < ps.weights[i].Weight { - results[i].Replicas-- - rest-- - } - } + weights := make([]*weightDistributing.Weight, 0) + for k, v := range s.staticWeightMap { + weight := &weightDistributing.Weight{ + Name: k, + Weight: v, } + weights = append(weights, weight) + } + + weightDistributing.DistributeReplicas(weights, s.replicas) + + var results []*AssignedCluster + for _, weight := range weights { + cluster := &AssignedCluster{ParticipantId: weight.Id, Name: weight.Name, Replicas: weight.Replica} + results = append(results, cluster) } return results, nil diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index d2cb9fb1..eb0f59ad 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -63,15 +63,15 @@ func TestReplication(t *testing.T) { } func TestStaticWeight(t *testing.T) { - parts := []entity.WeightP{ - {Name: "p1", Participant_id: 1, Weight: 3}, - {Name: "p2", Participant_id: 2, Weight: 5}, - {Name: "p3", Participant_id: 3, Weight: 2}, + parts := map[string]int32{ + "test1": 6, + "test2": 5, + "test3": 2, } tests := []struct { name string replica int32 - ps []entity.WeightP + ps map[string]int32 }{ { name: "test1", diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 63a43cf9..f44466fe 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -45,6 +45,7 @@ const ( TRAIN_FILE = "train.py" CPUCOREPRICEPERHOUR = 0.09 DCUPRICEPERHOUR = 2.0 + KB = 1024 ) var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ @@ -272,17 +273,25 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { return nil, err } - //limitReq := &hpcAC.QueueReq{} - //limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) - //if err != nil { - // return nil, err - //} + limitReq := &hpcAC.QueueReq{} + limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) + if err != nil { + return nil, err + } + totalCpu := limitResp.Data.AccountMaxCpu + totalDcu := limitResp.Data.AccountMaxDcu - //diskReq := &hpcAC.ParaStorQuotaReq{} - //diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) - //if err != nil { - // return nil, err - //} + diskReq := &hpcAC.ParaStorQuotaReq{} + diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) + if err != nil { + return nil, err + } + + totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB, 3) + availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB, 3) + + generalInfo, err := s.svcCtx.ACRpc.GetGeneralInfo(s.ctx, nil) + memSize := common.RoundFloat(float64(generalInfo.MemoryInGib)*KB*KB, 3) var cards []*collector.Card balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) @@ -295,14 +304,21 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { Name: DCU, TOpsAtFp16: DCU_TOPS, CardHours: cardHours, + CardNum: int32(totalDcu), } cards = append(cards, dcu) resourceStats := &collector.ResourceStats{ ParticipantId: s.participantId, Name: s.platform, Balance: balance, - CardsAvail: cards, + CpuCoreTotal: totalCpu, + CpuCoreAvail: 0, + DiskTotal: totalDisk, + DiskAvail: availDisk, + MemTotal: memSize, + MemAvail: 0, CpuCoreHours: cpuHours, + CardsAvail: cards, } return resourceStats, nil