Merge pull request 'added dynamic resources strategy for the ai scheduler' (#58) from tzwang/pcm-coordinator:master into master

Former-commit-id: 3a8e24cd0647d6ff347f20cc55e3c1a4aa5b691f
This commit is contained in:
tzwang 2024-03-20 17:41:03 +08:00
commit 918d95ad72
10 changed files with 240 additions and 50 deletions

View File

@ -15,18 +15,11 @@
package common package common
import ( import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "math"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"math/rand" "math/rand"
"time" "time"
) )
type SubSchedule interface {
GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
PickOptimalStrategy() (strategy.Strategy, error)
AssignTask(clusters []*strategy.AssignedCluster) error
}
// 求交集 // 求交集
func Intersect(slice1, slice2 []int64) []int64 { func Intersect(slice1, slice2 []int64) []int64 {
m := make(map[int64]int) m := make(map[int64]int)
@ -90,3 +83,8 @@ func MicsSlice(origin []int64, count int) []int64 {
} }
return result return result
} }
func RoundFloat(val float64, precision uint) float64 {
ratio := math.Pow(10, float64(precision))
return math.Round(val*ratio) / ratio
}

View File

@ -22,6 +22,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice" "gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm" "gorm.io/gorm"
@ -33,7 +34,7 @@ import (
type Scheduler struct { type Scheduler struct {
task *response.TaskInfo task *response.TaskInfo
participantIds []int64 participantIds []int64
subSchedule common.SubSchedule subSchedule SubSchedule
dbEngin *gorm.DB dbEngin *gorm.DB
result []string //pID:子任务yamlstring 键值对 result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService participantRpc participantservice.ParticipantService
@ -43,7 +44,13 @@ type Scheduler struct {
mu sync.RWMutex mu sync.RWMutex
} }
func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { type SubSchedule interface {
GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
PickOptimalStrategy() (strategy.Strategy, error)
AssignTask(clusters []*strategy.AssignedCluster) error
}
func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
var task *response.TaskInfo var task *response.TaskInfo
err := json.Unmarshal([]byte(val), &task) err := json.Unmarshal([]byte(val), &task)
if err != nil { if err != nil {
@ -123,7 +130,7 @@ func (s *Scheduler) TempAssign() error {
return nil return nil
} }
func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error { func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error {
//// 已指定 ParticipantId //// 已指定 ParticipantId
//if s.task.ParticipantId != 0 { //if s.task.ParticipantId != 0 {
// return nil // return nil

View File

@ -74,6 +74,9 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
case strategy.RESOURCES_PRICING: case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1}) strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1)
return strategy, nil
} }
return nil, errors.New("no strategy has been chosen") return nil, errors.New("no strategy has been chosen")

View File

@ -4,6 +4,7 @@ type AiOption struct {
AiClusterId string // shuguangAi /octopus ClusterId AiClusterId string // shuguangAi /octopus ClusterId
TaskName string TaskName string
ResourceType string // cpu/gpu/compute card ResourceType string // cpu/gpu/compute card
CpuCoreNum int64
TaskType string // pytorch/tensorflow/mindspore TaskType string // pytorch/tensorflow/mindspore
DatasetsName string // mnist/imageNet/iris DatasetsName string // mnist/imageNet/iris
StrategyName string StrategyName string
@ -29,3 +30,7 @@ type AiOption struct {
Image string Image string
Model interface{} Model interface{}
} }
func (a AiOption) GetOptionType() string {
return AI
}

View File

@ -1,5 +1,11 @@
package option package option
type Option struct { const (
Name string AI = "ai"
CLOUD = "cloud"
HPC = "hpc"
)
type Option interface {
GetOptionType() string
} }

View File

@ -8,20 +8,22 @@ type AiCollector interface {
type ResourceStats struct { type ResourceStats struct {
ParticipantId int64 ParticipantId int64
Name string Name string
CpuAvail float64 CpuCoreAvail int64
MemAvail float64 MemAvail float64
DiskAvail float64 DiskAvail float64
GpuAvail float64 GpuAvail int64
CardToHours map[Card]float64 CardsAvail []*Card
CpuToHours map[int]float64 CpuCoreHours float64
Balance float64 Balance float64
} }
type Card struct { type Card struct {
Platform string
Type string Type string
Name string Name string
TOpsAtFp16 float64 TOpsAtFp16 float64
Price int32 CardHours float64
Num int32
} }
type DatasetsSpecs struct { type DatasetsSpecs struct {

View File

@ -1,8 +1,70 @@
package strategy package strategy
import (
"errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
)
type DynamicResourcesStrategy struct { type DynamicResourcesStrategy struct {
replicas int32
resources []*collector.ResourceStats
opt option.Option
}
func NewDynamicResourcesStrategy(resources []*collector.ResourceStats, opt option.Option, replica int32) *DynamicResourcesStrategy {
return &DynamicResourcesStrategy{resources: resources, opt: opt, replicas: replica}
} }
func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) { func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
return nil, nil if ps.replicas < 1 {
return nil, errors.New("replicas must be greater than 0")
}
switch ps.opt.GetOptionType() {
case option.AI:
opt := (interface{})(ps.opt).(*option.AiOption)
var maxCardHoursAvailable float64
var maxCpuCoreHoursAvailable float64
var assignedCluster *AssignedCluster
var results []*AssignedCluster
for _, res := range ps.resources {
if opt.ResourceType == "" {
if res.CpuCoreHours <= 0 {
cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas}
results = append(results, cluster)
return results, nil
}
if res.CpuCoreHours > maxCpuCoreHoursAvailable {
maxCpuCoreHoursAvailable = res.CpuCoreHours
assignedCluster.Name = res.Name
assignedCluster.ParticipantId = res.ParticipantId
assignedCluster.Replicas = ps.replicas
}
}
if opt.ResourceType == "" {
var maxCurrentCardHours float64
for _, card := range res.CardsAvail {
cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3)
if cardHours > maxCurrentCardHours {
maxCurrentCardHours = cardHours
}
}
if maxCurrentCardHours > maxCardHoursAvailable {
maxCardHoursAvailable = maxCurrentCardHours
assignedCluster.Name = res.Name
assignedCluster.ParticipantId = res.ParticipantId
assignedCluster.Replicas = ps.replicas
}
}
}
results = append(results, assignedCluster)
return results, nil
}
return nil, errors.New("failed to apply DynamicResourcesStrategy")
} }

View File

@ -23,7 +23,7 @@ func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider {
for _, resource := range r.Resources { for _, resource := range r.Resources {
provider := providerPricing.NewProvider( provider := providerPricing.NewProvider(
resource.ParticipantId, resource.ParticipantId,
resource.CpuAvail, float64(resource.CpuCoreAvail),
resource.MemAvail, resource.MemAvail,
resource.DiskAvail, 0.0, 0.0, 0.0) resource.DiskAvail, 0.0, 0.0, 0.0)
providerList = append(providerList, provider) providerList = append(providerList, provider)

View File

@ -60,6 +60,10 @@ var (
MLU: CAMBRICON, MLU: CAMBRICON,
GCU: ENFLAME, GCU: ENFLAME,
} }
cardTopsMap = map[string]float64{
MLU: CAMBRICONMLU290,
GCU: EnflameT20,
}
) )
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink { func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink {
@ -245,13 +249,49 @@ func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) {
return nil, errors.New(balanceResp.Error.Message) return nil, errors.New(balanceResp.Error.Message)
} }
//resourceStat := collector.ResourceStats{} var cards []*collector.Card
// balance := float64(balanceResp.Payload.BillingUser.Amount)
//for _, spec := range specResp.TrainResourceSpecs { var cpuHours float64
// for _, spec := range specResp.TrainResourceSpecs {
//} if spec.Price == 0 {
ns := strings.Split(spec.Name, COMMA)
if len(ns) == 2 {
nss := strings.Split(ns[0], COLON)
if nss[0] == CPU {
cpuHours = -1
}
}
}
return nil, nil if spec.Price == 1 {
ns := strings.Split(spec.Name, COMMA)
cardSpecs := strings.Split(ns[0], STAR)
cardTops, isMapContainsKey := cardTopsMap[cardSpecs[1]]
if !isMapContainsKey {
continue
}
card := &collector.Card{
Platform: OCTOPUS,
Type: CARD,
Name: cardSpecs[1],
TOpsAtFp16: cardTops,
CardHours: balance / spec.Price,
}
cards = append(cards, card)
}
}
resourceStats := &collector.ResourceStats{
ParticipantId: o.participantId,
Name: o.platform,
Balance: balance,
CardsAvail: cards,
CpuCoreHours: cpuHours,
}
return resourceStats, nil
} }
func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
@ -349,6 +389,7 @@ func (o *OctopusLink) generateResourceId(option *option.AiOption) error {
if err != nil { if err != nil {
return err return err
} }
return nil
} }
return errors.New("failed to get ResourceId") return errors.New("failed to get ResourceId")
@ -433,7 +474,14 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error {
func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error { func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error {
// temporarily set algorithm to cnn // temporarily set algorithm to cnn
option.AlgorithmName = "cnn" if option.AlgorithmName == "" {
switch option.DatasetsName {
case "cifar10":
option.AlgorithmName = "cnn"
case "mnist":
option.AlgorithmName = "fcn"
}
}
req := &octopus.GetMyAlgorithmListReq{ req := &octopus.GetMyAlgorithmListReq{
Platform: o.platform, Platform: o.platform,
@ -457,14 +505,26 @@ func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error {
if ns[1] != option.AlgorithmName { if ns[1] != option.AlgorithmName {
continue continue
} }
if ns[2] != option.ResourceType { switch option.ResourceType {
continue case CPU:
if ns[2] != CPU {
continue
}
case CARD:
if ns[2] != strings.ToLower(option.ComputeCard) {
continue
}
} }
option.AlgorithmId = algorithm.AlgorithmId option.AlgorithmId = algorithm.AlgorithmId
return nil return nil
} }
} }
if option.AlgorithmId == "" {
return errors.New("Algorithm does not exist")
}
return errors.New("failed to get AlgorithmId") return errors.New("failed to get AlgorithmId")
} }
@ -487,7 +547,10 @@ func (o *OctopusLink) generateEnv(option *option.AiOption) error {
} }
func (o *OctopusLink) generateParams(option *option.AiOption) error { func (o *OctopusLink) generateParams(option *option.AiOption) error {
if len(option.Params) == 0 {
epoch := "epoch" + COMMA + "1"
option.Params = append(option.Params, epoch)
}
return nil return nil
} }

View File

@ -17,6 +17,7 @@ package storeLink
import ( import (
"context" "context"
"errors" "errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
@ -42,6 +43,8 @@ const (
DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset" DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset"
ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm" ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm"
TRAIN_FILE = "train.py" TRAIN_FILE = "train.py"
CPUCOREPRICEPERHOUR = 0.09
DCUPRICEPERHOUR = 2.0
) )
var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
@ -197,9 +200,9 @@ func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []str
} }
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
// set algorithmId temporarily // set algorithmId temporarily for storelink submit
if algorithmId == "" { if algorithmId == "" {
algorithmId = "pytorch-mnist-fully_connected_network" algorithmId = "pytorch-mnist-fcn"
} }
// shuguangAi提交任务 // shuguangAi提交任务
@ -268,24 +271,41 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
limitReq := &hpcAC.QueueReq{}
_, err = s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
if err != nil {
return nil, err
}
diskReq := &hpcAC.ParaStorQuotaReq{}
_, err = s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
if err != nil {
return nil, err
}
//limitReq := &hpcAC.QueueReq{}
//limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
//if err != nil {
// return nil, err
//}
//diskReq := &hpcAC.ParaStorQuotaReq{}
//diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
//if err != nil {
// return nil, err
//}
var cards []*collector.Card
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
_ = &collector.ResourceStats{ cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)
dcu := &collector.Card{
Platform: SHUGUANGAI,
Type: CARD,
Name: DCU,
TOpsAtFp16: DCU_TOPS,
CardHours: cardHours,
}
cards = append(cards, dcu)
resourceStats := &collector.ResourceStats{
ParticipantId: s.participantId, ParticipantId: s.participantId,
Name: s.platform, Name: s.platform,
Balance: balance, Balance: balance,
CardsAvail: cards,
CpuCoreHours: cpuHours,
} }
return nil, nil
return resourceStats, nil
} }
func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
@ -413,6 +433,7 @@ func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error {
if option.DatasetsName == "" { if option.DatasetsName == "" {
return errors.New("DatasetsName not set") return errors.New("DatasetsName not set")
} }
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0} req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0}
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req) list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
if err != nil { if err != nil {
@ -426,13 +447,34 @@ func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error {
for _, file := range list.Data.FileList { for _, file := range list.Data.FileList {
ns := strings.Split(file.Name, DASH) ns := strings.Split(file.Name, DASH)
if ns[0] == option.DatasetsName { if ns[0] == option.DatasetsName {
algorithmId = option.TaskType + DASH + file.Name algoName := ns[1]
option.AlgorithmId = algorithmId if option.AlgorithmName == "" {
option.AlgorithmName = ns[1] switch option.DatasetsName {
return nil case "cifar10":
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "cnn"
option.AlgorithmId = algorithmId
option.AlgorithmName = algoName
return nil
case "mnist":
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "fcn"
option.AlgorithmId = algorithmId
option.AlgorithmName = algoName
return nil
}
} else {
if algoName == option.AlgorithmName {
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + algoName
option.AlgorithmId = algorithmId
return nil
}
}
} }
} }
if algorithmId == "" {
return errors.New("Algorithm does not exist")
}
return errors.New("failed to get AlgorithmId") return errors.New("failed to get AlgorithmId")
} }
@ -451,8 +493,10 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error {
return errors.New("ResourceType not set") return errors.New("ResourceType not set")
} }
//epoch := "epoch" + COMMA + "1" if len(option.Params) == 0 {
//option.Params = append(option.Params, epoch) epoch := "epoch" + COMMA + "1"
option.Params = append(option.Params, epoch)
}
switch option.ResourceType { switch option.ResourceType {
case CPU: case CPU: