Merge pull request 'updated strategy module' (#65) from tzwang/pcm-coordinator:master into master

Former-commit-id: 20a3916d2c026bb042bc12e17739c06b4340e3ad
This commit is contained in:
tzwang 2024-03-22 17:21:00 +08:00
commit 6641c7452f
8 changed files with 144 additions and 82 deletions

View File

@ -0,0 +1,68 @@
package weightDistributing
import (
"math"
)
type Weight struct {
Id int64
Weight int32
Name string
Replica int32
}
func DistributeReplicas(weights []*Weight, replicas int32) {
var weightSum int32
weightSum = 0
for _, w := range weights {
weightSum += w.Weight
}
weightRatio := make([]float64, len(weights))
for i, w := range weights {
weightRatio[i] = float64(w.Weight) / float64(weightSum)
}
var rest = replicas
for i := 0; i < len(weights); i++ {
var n = math.Round(float64(replicas) * weightRatio[i])
rest -= int32(n)
weights[i].Replica = int32(n)
}
for {
if rest == 0 {
break
}
maxIdx := 0
minIdx := 0
if rest > 0 {
for i, ratio := range weightRatio {
if ratio > weightRatio[maxIdx] {
maxIdx = i
}
}
} else {
for i, ratio := range weightRatio {
if ratio < weightRatio[minIdx] {
minIdx = i
}
}
}
if rest > 0 {
weights[maxIdx].Replica++
weightRatio[maxIdx]--
rest--
} else {
weights[minIdx].Replica--
weightRatio[minIdx]++
rest++
}
}
}

View File

@ -11,9 +11,3 @@ type Participant struct {
Name string
Participant_id int64
}
type WeightP struct {
Participant_id int64
Weight int32
Name string
}

View File

@ -61,12 +61,17 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
params := &param.Params{Resources: resources}
if len(resources) == 1 {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil
var cluster strategy.AssignedCluster
cluster.ParticipantId = resources[0].ParticipantId
cluster.Name = resources[0].Name
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
params := &param.Params{Resources: resources}
switch as.option.StrategyName {
case strategy.REPLICATION:
strategy := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params, Replicas: 1})
@ -75,7 +80,11 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1)
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(as.option.ClusterToStaticWeight, 1)
return strategy, nil
}

View File

@ -9,8 +9,11 @@ type ResourceStats struct {
ParticipantId int64
Name string
CpuCoreAvail int64
CpuCoreTotal int64
MemAvail float64
MemTotal float64
DiskAvail float64
DiskTotal float64
GpuAvail int64
CardsAvail []*Card
CpuCoreHours float64
@ -23,7 +26,7 @@ type Card struct {
Name string
TOpsAtFp16 float64
CardHours float64
Num int32
CardNum int32
}
type DatasetsSpecs struct {

View File

@ -28,10 +28,10 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
var maxCardHoursAvailable float64
var maxCpuCoreHoursAvailable float64
var assignedCluster *AssignedCluster
var assignedCluster AssignedCluster
var results []*AssignedCluster
for _, res := range ps.resources {
if opt.ResourceType == "" {
if opt.ResourceType == "cpu" {
if res.CpuCoreHours <= 0 {
cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas}
results = append(results, cluster)
@ -46,7 +46,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
}
}
if opt.ResourceType == "" {
if opt.ResourceType == "computeCard" {
var maxCurrentCardHours float64
for _, card := range res.CardsAvail {
cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3)
@ -62,7 +62,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
}
}
}
results = append(results, assignedCluster)
results = append(results, &assignedCluster)
return results, nil
}

View File

@ -2,73 +2,45 @@ package strategy
import (
"errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/weightDistributing"
)
type StaticWeightStrategy struct {
// TODO: add fields
//每个
num int32
weights []entity.WeightP
staticWeightMap map[string]int32
replicas int32
}
func NewStaticWeightStrategy(weights []entity.WeightP, replicas int32) *StaticWeightStrategy {
return &StaticWeightStrategy{weights: weights,
num: replicas,
func NewStaticWeightStrategy(staticWeightMap map[string]int32, replicas int32) *StaticWeightStrategy {
return &StaticWeightStrategy{staticWeightMap: staticWeightMap,
replicas: replicas,
}
}
func (ps *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
// TODO: implement the scheduling logic return nil, nil
func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
if ps.num < 1 {
return nil, errors.New("numbers must be greater than 0")
if s.replicas < 1 {
return nil, errors.New("replicas must be greater than 0")
}
if ps.weights == nil {
if len(s.staticWeightMap) == 0 || s.staticWeightMap == nil {
return nil, errors.New("weight must be set")
}
var weightSum int32
weightSum = 0
for _, w := range ps.weights {
weightSum += w.Weight
}
weightRatio := make([]float64, len(ps.weights))
for i, w := range ps.weights {
weightRatio[i] = float64(w.Weight) / float64(weightSum)
}
var rest = ps.num
var results []*AssignedCluster
for i := 0; i < len(ps.weights); i++ {
var n = int(float64(ps.num) * weightRatio[i])
rest -= int32(n)
cluster := &AssignedCluster{ParticipantId: ps.weights[i].Participant_id, Name: ps.weights[i].Name, Replicas: int32(n)}
results = append(results, cluster)
}
if rest != 0 {
if rest < 0 { // 如果差值小于0需要增加某些元素的值
for i := len(ps.weights) - 1; rest < 0 && i >= 0; i-- {
if results[i].Replicas < ps.weights[i].Weight {
results[i].Replicas++
rest++
}
}
} else {
for i := len(ps.weights) - 1; rest > 0 && i >= 0; i-- {
if results[i].Replicas < ps.weights[i].Weight {
results[i].Replicas--
rest--
}
}
weights := make([]*weightDistributing.Weight, 0)
for k, v := range s.staticWeightMap {
weight := &weightDistributing.Weight{
Name: k,
Weight: v,
}
weights = append(weights, weight)
}
weightDistributing.DistributeReplicas(weights, s.replicas)
var results []*AssignedCluster
for _, weight := range weights {
cluster := &AssignedCluster{ParticipantId: weight.Id, Name: weight.Name, Replicas: weight.Replica}
results = append(results, cluster)
}
return results, nil

View File

@ -63,15 +63,15 @@ func TestReplication(t *testing.T) {
}
func TestStaticWeight(t *testing.T) {
parts := []entity.WeightP{
{Name: "p1", Participant_id: 1, Weight: 3},
{Name: "p2", Participant_id: 2, Weight: 5},
{Name: "p3", Participant_id: 3, Weight: 2},
parts := map[string]int32{
"test1": 6,
"test2": 5,
"test3": 2,
}
tests := []struct {
name string
replica int32
ps []entity.WeightP
ps map[string]int32
}{
{
name: "test1",

View File

@ -45,6 +45,7 @@ const (
TRAIN_FILE = "train.py"
CPUCOREPRICEPERHOUR = 0.09
DCUPRICEPERHOUR = 2.0
KB = 1024
)
var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
@ -272,17 +273,25 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
return nil, err
}
//limitReq := &hpcAC.QueueReq{}
//limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
//if err != nil {
// return nil, err
//}
limitReq := &hpcAC.QueueReq{}
limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
if err != nil {
return nil, err
}
totalCpu := limitResp.Data.AccountMaxCpu
totalDcu := limitResp.Data.AccountMaxDcu
//diskReq := &hpcAC.ParaStorQuotaReq{}
//diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
//if err != nil {
// return nil, err
//}
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
if err != nil {
return nil, err
}
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB, 3)
generalInfo, err := s.svcCtx.ACRpc.GetGeneralInfo(s.ctx, nil)
memSize := common.RoundFloat(float64(generalInfo.MemoryInGib)*KB*KB, 3)
var cards []*collector.Card
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
@ -295,14 +304,21 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
Name: DCU,
TOpsAtFp16: DCU_TOPS,
CardHours: cardHours,
CardNum: int32(totalDcu),
}
cards = append(cards, dcu)
resourceStats := &collector.ResourceStats{
ParticipantId: s.participantId,
Name: s.platform,
Balance: balance,
CardsAvail: cards,
CpuCoreTotal: totalCpu,
CpuCoreAvail: 0,
DiskTotal: totalDisk,
DiskAvail: availDisk,
MemTotal: memSize,
MemAvail: 0,
CpuCoreHours: cpuHours,
CardsAvail: cards,
}
return resourceStats, nil