modified shuguangai getResourceStats

Former-commit-id: a108e00353ec13e300772ebbaa2a40a46e376377
This commit is contained in:
tzwang 2024-03-21 18:02:45 +08:00
parent e40a9bb2d0
commit 1cb0a39320
4 changed files with 51 additions and 27 deletions

View File

@ -61,12 +61,17 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
params := &param.Params{Resources: resources}
if len(resources) == 1 {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil
var cluster strategy.AssignedCluster
cluster.ParticipantId = resources[0].ParticipantId
cluster.Name = resources[0].Name
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
params := &param.Params{Resources: resources}
switch as.option.StrategyName {
case strategy.REPLICATION:
strategy := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params, Replicas: 1})
@ -75,7 +80,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1)
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil
}
@ -87,14 +92,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return errors.New("clusters is nil")
}
executorMap := *as.AiExecutor
for _, cluster := range clusters {
_, err := executorMap[cluster.Name].Execute(as.option)
if err != nil {
// TODO: database operation
}
// TODO: database operation
}
//executorMap := *as.AiExecutor
//for _, cluster := range clusters {
// _, err := executorMap[cluster.Name].Execute(as.option)
// if err != nil {
// // TODO: database operation
// }
// // TODO: database operation
//}
return nil
}

View File

@ -9,8 +9,11 @@ type ResourceStats struct {
ParticipantId int64
Name string
CpuCoreAvail int64
CpuCoreTotal int64
MemAvail float64
MemTotal float64
DiskAvail float64
DiskTotal float64
GpuAvail int64
CardsAvail []*Card
CpuCoreHours float64
@ -23,7 +26,7 @@ type Card struct {
Name string
TOpsAtFp16 float64
CardHours float64
Num int32
CardNum int32
}
type DatasetsSpecs struct {

View File

@ -28,10 +28,10 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
var maxCardHoursAvailable float64
var maxCpuCoreHoursAvailable float64
var assignedCluster *AssignedCluster
var assignedCluster AssignedCluster
var results []*AssignedCluster
for _, res := range ps.resources {
if opt.ResourceType == "" {
if opt.ResourceType == "cpu" {
if res.CpuCoreHours <= 0 {
cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas}
results = append(results, cluster)
@ -46,7 +46,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
}
}
if opt.ResourceType == "" {
if opt.ResourceType == "computeCard" {
var maxCurrentCardHours float64
for _, card := range res.CardsAvail {
cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3)
@ -62,7 +62,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
}
}
}
results = append(results, assignedCluster)
results = append(results, &assignedCluster)
return results, nil
}

View File

@ -45,6 +45,7 @@ const (
TRAIN_FILE = "train.py"
CPUCOREPRICEPERHOUR = 0.09
DCUPRICEPERHOUR = 2.0
KB = 1024
)
var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
@ -272,17 +273,25 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
return nil, err
}
//limitReq := &hpcAC.QueueReq{}
//limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
//if err != nil {
// return nil, err
//}
limitReq := &hpcAC.QueueReq{}
limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
if err != nil {
return nil, err
}
totalCpu := limitResp.Data.AccountMaxCpu
totalDcu := limitResp.Data.AccountMaxDcu
//diskReq := &hpcAC.ParaStorQuotaReq{}
//diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
//if err != nil {
// return nil, err
//}
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
if err != nil {
return nil, err
}
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB, 3)
generalInfo, err := s.svcCtx.ACRpc.GetGeneralInfo(s.ctx, nil)
memSize := common.RoundFloat(float64(generalInfo.MemoryInGib)*KB*KB, 3)
var cards []*collector.Card
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
@ -295,14 +304,21 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
Name: DCU,
TOpsAtFp16: DCU_TOPS,
CardHours: cardHours,
CardNum: int32(totalDcu),
}
cards = append(cards, dcu)
resourceStats := &collector.ResourceStats{
ParticipantId: s.participantId,
Name: s.platform,
Balance: balance,
CardsAvail: cards,
CpuCoreTotal: totalCpu,
CpuCoreAvail: 0,
DiskTotal: totalDisk,
DiskAvail: availDisk,
MemTotal: memSize,
MemAvail: 0,
CpuCoreHours: cpuHours,
CardsAvail: cards,
}
return resourceStats, nil