From 1cb0a39320daae86d2e8fbb73b13ca96f50533c0 Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 21 Mar 2024 18:02:45 +0800 Subject: [PATCH 1/2] modified shuguangai getResourceStats Former-commit-id: a108e00353ec13e300772ebbaa2a40a46e376377 --- .../scheduler/schedulers/aiScheduler.go | 27 +++++++------ .../scheduler/service/collector/collector.go | 5 ++- .../scheduler/strategy/dynamicResources.go | 8 ++-- api/internal/storeLink/shuguangai.go | 38 +++++++++++++------ 4 files changed, 51 insertions(+), 27 deletions(-) diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 2d5518fa..8a3a29cd 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -61,12 +61,17 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { if len(resources) == 0 { return nil, errors.New("no cluster has resources") } - params := ¶m.Params{Resources: resources} if len(resources) == 1 { - return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil + var cluster strategy.AssignedCluster + cluster.ParticipantId = resources[0].ParticipantId + cluster.Name = resources[0].Name + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil } + params := ¶m.Params{Resources: resources} + switch as.option.StrategyName { case strategy.REPLICATION: strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: 1}) @@ -75,7 +80,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil case strategy.DYNAMIC_RESOURCES: - strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1) + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) return strategy, nil } @@ -87,14 +92,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return errors.New("clusters is nil") } - executorMap := *as.AiExecutor - for _, cluster := range clusters { - _, err := executorMap[cluster.Name].Execute(as.option) - if err != nil { - // TODO: database operation - } - // TODO: database operation - } + //executorMap := *as.AiExecutor + //for _, cluster := range clusters { + // _, err := executorMap[cluster.Name].Execute(as.option) + // if err != nil { + // // TODO: database operation + // } + // // TODO: database operation + //} return nil } diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index c6e68851..b4d66c68 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -9,8 +9,11 @@ type ResourceStats struct { ParticipantId int64 Name string CpuCoreAvail int64 + CpuCoreTotal int64 MemAvail float64 + MemTotal float64 DiskAvail float64 + DiskTotal float64 GpuAvail int64 CardsAvail []*Card CpuCoreHours float64 @@ -23,7 +26,7 @@ type Card struct { Name string TOpsAtFp16 float64 CardHours float64 - Num int32 + CardNum int32 } type DatasetsSpecs struct { diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index ea528311..c8d4052f 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -28,10 +28,10 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { var maxCardHoursAvailable float64 var maxCpuCoreHoursAvailable float64 - var assignedCluster *AssignedCluster + var assignedCluster AssignedCluster var results []*AssignedCluster for _, res := range ps.resources { - if opt.ResourceType == "" { + if opt.ResourceType == "cpu" { if res.CpuCoreHours <= 0 { cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas} results = append(results, cluster) @@ -46,7 +46,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } } - if opt.ResourceType == "" { + if opt.ResourceType == "computeCard" { var maxCurrentCardHours float64 for _, card := range res.CardsAvail { cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3) @@ -62,7 +62,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } } } - results = append(results, assignedCluster) + results = append(results, &assignedCluster) return results, nil } diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 63a43cf9..f44466fe 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -45,6 +45,7 @@ const ( TRAIN_FILE = "train.py" CPUCOREPRICEPERHOUR = 0.09 DCUPRICEPERHOUR = 2.0 + KB = 1024 ) var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ @@ -272,17 +273,25 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { return nil, err } - //limitReq := &hpcAC.QueueReq{} - //limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) - //if err != nil { - // return nil, err - //} + limitReq := &hpcAC.QueueReq{} + limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) + if err != nil { + return nil, err + } + totalCpu := limitResp.Data.AccountMaxCpu + totalDcu := limitResp.Data.AccountMaxDcu - //diskReq := &hpcAC.ParaStorQuotaReq{} - //diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) - //if err != nil { - // return nil, err - //} + diskReq := &hpcAC.ParaStorQuotaReq{} + diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) + if err != nil { + return nil, err + } + + totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB, 3) + availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB, 3) + + generalInfo, err := s.svcCtx.ACRpc.GetGeneralInfo(s.ctx, nil) + memSize := common.RoundFloat(float64(generalInfo.MemoryInGib)*KB*KB, 3) var cards []*collector.Card balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) @@ -295,14 +304,21 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { Name: DCU, TOpsAtFp16: DCU_TOPS, CardHours: cardHours, + CardNum: int32(totalDcu), } cards = append(cards, dcu) resourceStats := &collector.ResourceStats{ ParticipantId: s.participantId, Name: s.platform, Balance: balance, - CardsAvail: cards, + CpuCoreTotal: totalCpu, + CpuCoreAvail: 0, + DiskTotal: totalDisk, + DiskAvail: availDisk, + MemTotal: memSize, + MemAvail: 0, CpuCoreHours: cpuHours, + CardsAvail: cards, } return resourceStats, nil From c39c628670276502b8b513be117743782f9d290c Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 22 Mar 2024 17:16:19 +0800 Subject: [PATCH 2/2] modified staticWeight strategy and separate it from algorithm Former-commit-id: 77630a215c8ad8fd3711f2e941f74e5c75ff29f2 --- .../weightDistributing/weightDistributing.go | 68 +++++++++++++++++ api/internal/scheduler/entity/entity.go | 6 -- .../scheduler/schedulers/aiScheduler.go | 20 +++-- .../scheduler/strategy/staticWeight.go | 76 ++++++------------- .../scheduler/strategy/test/strategy_test.go | 10 +-- 5 files changed, 109 insertions(+), 71 deletions(-) create mode 100644 api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go diff --git a/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go new file mode 100644 index 00000000..0124fa8b --- /dev/null +++ b/api/internal/scheduler/algorithm/weightDistributing/weightDistributing.go @@ -0,0 +1,68 @@ +package weightDistributing + +import ( + "math" +) + +type Weight struct { + Id int64 + Weight int32 + Name string + Replica int32 +} + +func DistributeReplicas(weights []*Weight, replicas int32) { + var weightSum int32 + weightSum = 0 + for _, w := range weights { + weightSum += w.Weight + } + + weightRatio := make([]float64, len(weights)) + for i, w := range weights { + weightRatio[i] = float64(w.Weight) / float64(weightSum) + } + + var rest = replicas + + for i := 0; i < len(weights); i++ { + + var n = math.Round(float64(replicas) * weightRatio[i]) + rest -= int32(n) + + weights[i].Replica = int32(n) + } + + for { + if rest == 0 { + break + } + + maxIdx := 0 + minIdx := 0 + + if rest > 0 { + for i, ratio := range weightRatio { + if ratio > weightRatio[maxIdx] { + maxIdx = i + } + } + } else { + for i, ratio := range weightRatio { + if ratio < weightRatio[minIdx] { + minIdx = i + } + } + } + + if rest > 0 { + weights[maxIdx].Replica++ + weightRatio[maxIdx]-- + rest-- + } else { + weights[minIdx].Replica-- + weightRatio[minIdx]++ + rest++ + } + } +} diff --git a/api/internal/scheduler/entity/entity.go b/api/internal/scheduler/entity/entity.go index 7f5f6951..33e48dba 100644 --- a/api/internal/scheduler/entity/entity.go +++ b/api/internal/scheduler/entity/entity.go @@ -11,9 +11,3 @@ type Participant struct { Name string Participant_id int64 } - -type WeightP struct { - Participant_id int64 - Weight int32 - Name string -} diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 8a3a29cd..3ac52d10 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -82,6 +82,10 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { case strategy.DYNAMIC_RESOURCES: strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) return strategy, nil + case strategy.STATIC_WEIGHT: + //todo resources should match cluster StaticWeightMap + strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, 1) + return strategy, nil } return nil, errors.New("no strategy has been chosen") @@ -92,14 +96,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return errors.New("clusters is nil") } - //executorMap := *as.AiExecutor - //for _, cluster := range clusters { - // _, err := executorMap[cluster.Name].Execute(as.option) - // if err != nil { - // // TODO: database operation - // } - // // TODO: database operation - //} + executorMap := *as.AiExecutor + for _, cluster := range clusters { + _, err := executorMap[cluster.Name].Execute(as.option) + if err != nil { + // TODO: database operation + } + // TODO: database operation + } return nil } diff --git a/api/internal/scheduler/strategy/staticWeight.go b/api/internal/scheduler/strategy/staticWeight.go index 76230ca3..2172bec3 100644 --- a/api/internal/scheduler/strategy/staticWeight.go +++ b/api/internal/scheduler/strategy/staticWeight.go @@ -2,73 +2,45 @@ package strategy import ( "errors" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/weightDistributing" ) type StaticWeightStrategy struct { - // TODO: add fields - - //每个 - num int32 - weights []entity.WeightP + staticWeightMap map[string]int32 + replicas int32 } -func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy { - return &StaticWeightStrategy{weights: weights, - num: replicas, +func NewStaticWeightStrategy(staticWeightMap map[string]int32, replicas int32) *StaticWeightStrategy { + return &StaticWeightStrategy{staticWeightMap: staticWeightMap, + replicas: replicas, } } -func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { - // TODO: implement the scheduling logic return nil, nil +func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) { - if ps.num < 1 { - return nil, errors.New("numbers must be greater than 0") + if s.replicas < 1 { + return nil, errors.New("replicas must be greater than 0") } - if ps.weights == nil { + if len(s.staticWeightMap) == 0 || s.staticWeightMap == nil { return nil, errors.New("weight must be set") } - var weightSum int32 - weightSum = 0 - for _, w := range ps.weights { - weightSum += w.Weight - } - - weightRatio := make([]float64, len(ps.weights)) - for i, w := range ps.weights { - weightRatio[i] = float64(w.Weight) / float64(weightSum) - } - - var rest = ps.num - var results []*AssignedCluster - - for i := 0; i < len(ps.weights); i++ { - - var n = int(float64(ps.num) * weightRatio[i]) - rest -= int32(n) - - cluster := &AssignedCluster{ParticipantId: ps.weights[i].Participant_id, Name: ps.weights[i].Name, Replicas: int32(n)} - results = append(results, cluster) - } - - if rest != 0 { - if rest < 0 { // 如果差值小于0,需要增加某些元素的值 - for i := len(ps.weights) - 1; rest < 0 && i >= 0; i-- { - if results[i].Replicas < ps.weights[i].Weight { - results[i].Replicas++ - rest++ - } - } - } else { - for i := len(ps.weights) - 1; rest > 0 && i >= 0; i-- { - if results[i].Replicas < ps.weights[i].Weight { - results[i].Replicas-- - rest-- - } - } + weights := make([]*weightDistributing.Weight, 0) + for k, v := range s.staticWeightMap { + weight := &weightDistributing.Weight{ + Name: k, + Weight: v, } + weights = append(weights, weight) + } + + weightDistributing.DistributeReplicas(weights, s.replicas) + + var results []*AssignedCluster + for _, weight := range weights { + cluster := &AssignedCluster{ParticipantId: weight.Id, Name: weight.Name, Replicas: weight.Replica} + results = append(results, cluster) } return results, nil diff --git a/api/internal/scheduler/strategy/test/strategy_test.go b/api/internal/scheduler/strategy/test/strategy_test.go index d2cb9fb1..eb0f59ad 100644 --- a/api/internal/scheduler/strategy/test/strategy_test.go +++ b/api/internal/scheduler/strategy/test/strategy_test.go @@ -63,15 +63,15 @@ func TestReplication(t *testing.T) { } func TestStaticWeight(t *testing.T) { - parts := []entity.WeightP{ - {Name: "p1", Participant_id: 1, Weight: 3}, - {Name: "p2", Participant_id: 2, Weight: 5}, - {Name: "p3", Participant_id: 3, Weight: 2}, + parts := map[string]int32{ + "test1": 6, + "test2": 5, + "test3": 2, } tests := []struct { name string replica int32 - ps []entity.WeightP + ps map[string]int32 }{ { name: "test1",