From 1cb0a39320daae86d2e8fbb73b13ca96f50533c0 Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 21 Mar 2024 18:02:45 +0800 Subject: [PATCH] modified shuguangai getResourceStats Former-commit-id: a108e00353ec13e300772ebbaa2a40a46e376377 --- .../scheduler/schedulers/aiScheduler.go | 27 +++++++------ .../scheduler/service/collector/collector.go | 5 ++- .../scheduler/strategy/dynamicResources.go | 8 ++-- api/internal/storeLink/shuguangai.go | 38 +++++++++++++------ 4 files changed, 51 insertions(+), 27 deletions(-) diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 2d5518fa..8a3a29cd 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -61,12 +61,17 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { if len(resources) == 0 { return nil, errors.New("no cluster has resources") } - params := ¶m.Params{Resources: resources} if len(resources) == 1 { - return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil + var cluster strategy.AssignedCluster + cluster.ParticipantId = resources[0].ParticipantId + cluster.Name = resources[0].Name + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil } + params := ¶m.Params{Resources: resources} + switch as.option.StrategyName { case strategy.REPLICATION: strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: 1}) @@ -75,7 +80,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) return strategy, nil case strategy.DYNAMIC_RESOURCES: - strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1) + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) return strategy, nil } @@ -87,14 +92,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { return errors.New("clusters is nil") } - executorMap := *as.AiExecutor - for _, cluster := range clusters { - _, err := executorMap[cluster.Name].Execute(as.option) - if err != nil { - // TODO: database operation - } - // TODO: database operation - } + //executorMap := *as.AiExecutor + //for _, cluster := range clusters { + // _, err := executorMap[cluster.Name].Execute(as.option) + // if err != nil { + // // TODO: database operation + // } + // // TODO: database operation + //} return nil } diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index c6e68851..b4d66c68 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -9,8 +9,11 @@ type ResourceStats struct { ParticipantId int64 Name string CpuCoreAvail int64 + CpuCoreTotal int64 MemAvail float64 + MemTotal float64 DiskAvail float64 + DiskTotal float64 GpuAvail int64 CardsAvail []*Card CpuCoreHours float64 @@ -23,7 +26,7 @@ type Card struct { Name string TOpsAtFp16 float64 CardHours float64 - Num int32 + CardNum int32 } type DatasetsSpecs struct { diff --git a/api/internal/scheduler/strategy/dynamicResources.go b/api/internal/scheduler/strategy/dynamicResources.go index ea528311..c8d4052f 100644 --- a/api/internal/scheduler/strategy/dynamicResources.go +++ b/api/internal/scheduler/strategy/dynamicResources.go @@ -28,10 +28,10 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { var maxCardHoursAvailable float64 var maxCpuCoreHoursAvailable float64 - var assignedCluster *AssignedCluster + var assignedCluster AssignedCluster var results []*AssignedCluster for _, res := range ps.resources { - if opt.ResourceType == "" { + if opt.ResourceType == "cpu" { if res.CpuCoreHours <= 0 { cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas} results = append(results, cluster) @@ -46,7 +46,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } } - if opt.ResourceType == "" { + if opt.ResourceType == "computeCard" { var maxCurrentCardHours float64 for _, card := range res.CardsAvail { cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3) @@ -62,7 +62,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { } } } - results = append(results, assignedCluster) + results = append(results, &assignedCluster) return results, nil } diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 63a43cf9..f44466fe 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -45,6 +45,7 @@ const ( TRAIN_FILE = "train.py" CPUCOREPRICEPERHOUR = 0.09 DCUPRICEPERHOUR = 2.0 + KB = 1024 ) var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ @@ -272,17 +273,25 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { return nil, err } - //limitReq := &hpcAC.QueueReq{} - //limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) - //if err != nil { - // return nil, err - //} + limitReq := &hpcAC.QueueReq{} + limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) + if err != nil { + return nil, err + } + totalCpu := limitResp.Data.AccountMaxCpu + totalDcu := limitResp.Data.AccountMaxDcu - //diskReq := &hpcAC.ParaStorQuotaReq{} - //diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) - //if err != nil { - // return nil, err - //} + diskReq := &hpcAC.ParaStorQuotaReq{} + diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) + if err != nil { + return nil, err + } + + totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB, 3) + availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB, 3) + + generalInfo, err := s.svcCtx.ACRpc.GetGeneralInfo(s.ctx, nil) + memSize := common.RoundFloat(float64(generalInfo.MemoryInGib)*KB*KB, 3) var cards []*collector.Card balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) @@ -295,14 +304,21 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { Name: DCU, TOpsAtFp16: DCU_TOPS, CardHours: cardHours, + CardNum: int32(totalDcu), } cards = append(cards, dcu) resourceStats := &collector.ResourceStats{ ParticipantId: s.participantId, Name: s.platform, Balance: balance, - CardsAvail: cards, + CpuCoreTotal: totalCpu, + CpuCoreAvail: 0, + DiskTotal: totalDisk, + DiskAvail: availDisk, + MemTotal: memSize, + MemAvail: 0, CpuCoreHours: cpuHours, + CardsAvail: cards, } return resourceStats, nil