Merge pull request 'updated Aischeduler implementations' (#35) from tzwang/pcm-coordinator:master into master
Former-commit-id: 4a48e5c1e4559f84bf1bc604c394869755ba56d9
This commit is contained in:
commit
9e0b96d647
|
@ -2,6 +2,7 @@ package option
|
|||
|
||||
type AiOption struct {
|
||||
AiClusterId string // shuguangAi /octopus ClusterId
|
||||
TaskName string
|
||||
ResourceType string // cpu/gpu/compute card
|
||||
TaskType string // pytorch/tensorflow/mindspore
|
||||
DatasetsName string // mnist/imageNet/iris
|
||||
|
@ -10,6 +11,7 @@ type AiOption struct {
|
|||
Tops float64
|
||||
ComputeCard string
|
||||
CodeType string
|
||||
AlgorithmName string
|
||||
|
||||
ImageId string
|
||||
SpecId string
|
||||
|
@ -22,7 +24,8 @@ type AiOption struct {
|
|||
Envs []string
|
||||
Params []string
|
||||
|
||||
Datasets string
|
||||
Code string
|
||||
Model interface{}
|
||||
Datasets string
|
||||
AlgorithmCode string
|
||||
Image string
|
||||
Model interface{}
|
||||
}
|
||||
|
|
|
@ -351,91 +351,7 @@ func (o *OctopusLink) generateResourceId(option *option.AiOption) error {
|
|||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpecsResp, computeCard string) error {
|
||||
if option.Tops == 0 {
|
||||
for _, spec := range specs.TrainResourceSpecs {
|
||||
if spec.Price == 1 {
|
||||
ns := strings.Split(spec.Name, COMMA)
|
||||
cardSpecs := strings.Split(ns[0], STAR)
|
||||
if cardSpecs[1] == computeCard {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cardNum := math.Ceil(option.Tops / float64(BASE_TOPS))
|
||||
for _, spec := range specs.TrainResourceSpecs {
|
||||
if option.Tops < BASE_TOPS {
|
||||
if spec.Price == 1 {
|
||||
ns := strings.Split(spec.Name, COMMA)
|
||||
cardSpecs := strings.Split(ns[0], STAR)
|
||||
if cardSpecs[1] == computeCard {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
ns := strings.Split(spec.Name, COMMA)
|
||||
if len(ns) != 4 {
|
||||
continue
|
||||
}
|
||||
cardSpecs := strings.Split(ns[0], STAR)
|
||||
if cardSpecs[1] != computeCard {
|
||||
continue
|
||||
}
|
||||
s, err := strconv.ParseFloat(cardSpecs[0], 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch computeCard {
|
||||
case GCU:
|
||||
if cardNum == s { // 1, 4, 8
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
if 1 < cardNum && cardNum <= 4 && s == 4 {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
if 4 < cardNum && s == 8 {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
|
||||
case MLU: // 1, 2, 4
|
||||
if cardNum/2 == s {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
if 1 < cardNum/2 && cardNum/2 <= 2 && s == 2 {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
if 2 < cardNum/2 && s == 4 {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return errors.New("set ResourceId error")
|
||||
return errors.New("failed to get ResourceId")
|
||||
}
|
||||
|
||||
func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error {
|
||||
|
@ -552,3 +468,87 @@ func (o *OctopusLink) generateParams(option *option.AiOption) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpecsResp, computeCard string) error {
|
||||
if option.Tops == 0 {
|
||||
for _, spec := range specs.TrainResourceSpecs {
|
||||
if spec.Price == 1 {
|
||||
ns := strings.Split(spec.Name, COMMA)
|
||||
cardSpecs := strings.Split(ns[0], STAR)
|
||||
if cardSpecs[1] == computeCard {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cardNum := math.Ceil(option.Tops / float64(BASE_TOPS))
|
||||
for _, spec := range specs.TrainResourceSpecs {
|
||||
if option.Tops < BASE_TOPS {
|
||||
if spec.Price == 1 {
|
||||
ns := strings.Split(spec.Name, COMMA)
|
||||
cardSpecs := strings.Split(ns[0], STAR)
|
||||
if cardSpecs[1] == computeCard {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
ns := strings.Split(spec.Name, COMMA)
|
||||
if len(ns) != 4 {
|
||||
continue
|
||||
}
|
||||
cardSpecs := strings.Split(ns[0], STAR)
|
||||
if cardSpecs[1] != computeCard {
|
||||
continue
|
||||
}
|
||||
s, err := strconv.ParseFloat(cardSpecs[0], 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch computeCard {
|
||||
case GCU:
|
||||
if cardNum == s { // 1, 4, 8
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
if 1 < cardNum && cardNum <= 4 && s == 4 {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
if 4 < cardNum && s == 8 {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
|
||||
case MLU: // 1, 2, 4
|
||||
if cardNum/2 == s {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
if 1 < cardNum/2 && cardNum/2 <= 2 && s == 2 {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
if 2 < cardNum/2 && s == 4 {
|
||||
option.ResourceId = spec.Id
|
||||
option.ComputeCard = computeCard
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return errors.New("set ResourceId error")
|
||||
}
|
||||
|
|
|
@ -28,21 +28,20 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
RAM_SIZE_1G = 1024 // 1G
|
||||
WORKER_NUMBER = 1
|
||||
WORKER_CPU_NUMBER = 5
|
||||
WORKER_GPU_NUMBER = 1
|
||||
SHUGUANGAI_CUSTOM_RESOURCE_ID = "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi"
|
||||
SHUGUANGAI_CUSTOM_RESOURCE_NAME = "1*DCU, CPU:5, 内存:10GB"
|
||||
DCU = "dcu"
|
||||
PYTORCH = "Pytorch"
|
||||
TASK_PYTORCH_PREFIX = "PytorchTask"
|
||||
TENSORFLOW = "Tensorflow"
|
||||
RESOURCE_GROUP = "wzhdtest"
|
||||
WorkPath = "/work/home/acgnnmfbwo/pcmv1/"
|
||||
TimeoutLimit = "10:00:00"
|
||||
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
|
||||
DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset"
|
||||
RAM_SIZE_1G = 1024 // 1G
|
||||
WORKER_NUMBER = 1
|
||||
DCU = "dcu"
|
||||
DCU_TOPS = 24.5
|
||||
PYTORCH = "Pytorch"
|
||||
TASK_PYTORCH_PREFIX = "PytorchTask"
|
||||
TENSORFLOW = "Tensorflow"
|
||||
RESOURCE_GROUP = "wzhdtest"
|
||||
WorkPath = "/work/home/acgnnmfbwo/pcmv1/"
|
||||
TimeoutLimit = "10:00:00"
|
||||
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
|
||||
DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset"
|
||||
ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm"
|
||||
TRAIN_FILE = "train.py"
|
||||
)
|
||||
|
||||
var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
|
||||
|
@ -120,7 +119,7 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
|
||||
func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
|
||||
//判断是否resourceId匹配自定义资源Id
|
||||
_, isMapContainsKey := RESOURCESPECSAI[resourceId]
|
||||
if !isMapContainsKey {
|
||||
|
@ -148,10 +147,15 @@ func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string
|
|||
env += s[0] + "=" + s[1] + SPACE
|
||||
}
|
||||
|
||||
//set paths
|
||||
paths := strings.Split(algorithmId, DASH)
|
||||
workPath := ALGORITHM_DIR + FORWARD_SLASH + paths[0] + FORWARD_SLASH + paths[1] + DASH + paths[2]
|
||||
codePath := workPath + FORWARD_SLASH + TRAIN_FILE
|
||||
|
||||
req := &hpcAC.SubmitPytorchTaskReq{
|
||||
Params: &hpcAC.SubmitPytorchTaskParams{
|
||||
TaskName: TASK_PYTORCH_PREFIX + UNDERSCORE + utils.RandomString(10),
|
||||
WorkPath: WorkPath,
|
||||
WorkPath: workPath,
|
||||
IsDistributed: false,
|
||||
IsHvd: false,
|
||||
Env: env,
|
||||
|
@ -161,7 +165,7 @@ func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string
|
|||
WorkerNumber: WORKER_NUMBER,
|
||||
ResourceGroup: RESOURCE_GROUP,
|
||||
TimeoutLimit: TimeoutLimit,
|
||||
PythonCodePath: PythonCodePath,
|
||||
PythonCodePath: codePath,
|
||||
PythonArg: pythonArg,
|
||||
},
|
||||
}
|
||||
|
@ -183,7 +187,7 @@ func updateSGAIRequestByResourceId(resourceId string, req *hpcAC.SubmitPytorchTa
|
|||
req.Params.WorkerRamSize = spec.RAM
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
|
||||
func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
|
||||
//req := &hpcAC.SubmitTensorflowTaskReq{
|
||||
// Params: &hpcAC.SubmitTensorflowTaskParams{
|
||||
//
|
||||
|
@ -193,16 +197,21 @@ func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []str
|
|||
}
|
||||
|
||||
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
// set algorithmId temporarily
|
||||
if algorithmId == "" {
|
||||
algorithmId = "pytorch-mnist-fully_connected_network"
|
||||
}
|
||||
|
||||
// shuguangAi提交任务
|
||||
switch aiType {
|
||||
case PYTORCH_TASK:
|
||||
task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId)
|
||||
task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return task, nil
|
||||
case TENSORFLOW_TASK:
|
||||
task, err := s.SubmitTensorflowTask(imageId, cmd, envs, params, resourceId)
|
||||
task, err := s.SubmitTensorflowTask(imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -317,6 +326,10 @@ func (s *ShuguangAi) GenerateSubmitParams(option *option.AiOption) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.generateAlgorithmId(option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.generateCmd(option)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -333,13 +346,77 @@ func (s *ShuguangAi) GenerateSubmitParams(option *option.AiOption) error {
|
|||
}
|
||||
|
||||
func (s *ShuguangAi) generateResourceId(option *option.AiOption) error {
|
||||
if option.ResourceType == "" {
|
||||
return errors.New("ResourceType not set")
|
||||
}
|
||||
|
||||
return nil
|
||||
if option.ResourceType == CPU {
|
||||
option.ResourceId = "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi"
|
||||
}
|
||||
|
||||
if option.ResourceType == CARD {
|
||||
if option.Tops == 0 {
|
||||
option.ResourceId = "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi"
|
||||
}
|
||||
|
||||
if option.Tops > DCU_TOPS {
|
||||
option.ResourceId = "jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2"
|
||||
}
|
||||
|
||||
//Todo add more dcu specs
|
||||
}
|
||||
|
||||
return errors.New("failed to get ResourceId")
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) generateImageId(option *option.AiOption) error {
|
||||
if option.TaskType == "" {
|
||||
return errors.New("TaskType not set")
|
||||
}
|
||||
taskType := strings.Title(option.TaskType)
|
||||
req := &hpcAC.GetImageListAiReq{
|
||||
AcceleratorType: DCU,
|
||||
TaskType: taskType,
|
||||
}
|
||||
resp, err := s.svcCtx.ACRpc.GetImageListAi(s.ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.Code != "0" {
|
||||
return errors.New("failed to get imageId")
|
||||
}
|
||||
|
||||
return nil
|
||||
if option.ResourceType == CPU {
|
||||
|
||||
}
|
||||
|
||||
return errors.New("failed to get ImageId")
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error {
|
||||
if option.DatasetsName == "" {
|
||||
return errors.New("DatasetsName not set")
|
||||
}
|
||||
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0}
|
||||
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if list.Code != "0" {
|
||||
return errors.New(list.Msg)
|
||||
}
|
||||
|
||||
var algorithmId string
|
||||
for _, file := range list.Data.FileList {
|
||||
ns := strings.Split(file.Name, DASH)
|
||||
if ns[0] == option.DatasetsName {
|
||||
algorithmId = option.TaskType + DASH + file.Name
|
||||
option.AlgorithmId = algorithmId
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("failed to get AlgorithmId")
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) generateCmd(option *option.AiOption) error {
|
||||
|
|
|
@ -44,6 +44,8 @@ const (
|
|||
SPACE = " "
|
||||
UNDERSCORE = "_"
|
||||
EQUAL = "="
|
||||
DASH = "-"
|
||||
FORWARD_SLASH = "/"
|
||||
COMMA = ","
|
||||
STAR = "*"
|
||||
TYPE_OCTOPUS = "1"
|
||||
|
@ -526,27 +528,6 @@ func ConvertType(in interface{}, out interface{}, participant *models.StorelinkC
|
|||
}
|
||||
return nil, nil
|
||||
|
||||
case *hpcAC.GetResourceSpecResp:
|
||||
inresp := (interface{})(in).(*hpcAC.GetResourceSpecResp)
|
||||
switch (interface{})(out).(type) {
|
||||
case *types.GetResourceSpecsResp:
|
||||
resp := (interface{})(out).(*types.GetResourceSpecsResp)
|
||||
if inresp.Code != "0" {
|
||||
resp.Success = false
|
||||
resp.ResourceSpecs = nil
|
||||
} else {
|
||||
var spec types.ResourceSpecSl
|
||||
resp.Success = true
|
||||
spec.ParticipantName = participant.Name
|
||||
spec.ParticipantId = participant.Id
|
||||
spec.SpecName = SHUGUANGAI_CUSTOM_RESOURCE_NAME
|
||||
spec.SpecId = SHUGUANGAI_CUSTOM_RESOURCE_ID
|
||||
resp.ResourceSpecs = append(resp.ResourceSpecs, &spec)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
return nil, nil
|
||||
|
||||
case *modelarts.TrainingJobFlavorsResp:
|
||||
inresp := (interface{})(in).(*modelarts.TrainingJobFlavorsResp)
|
||||
switch (interface{})(out).(type) {
|
||||
|
@ -862,23 +843,6 @@ func ConvertTypeOld[T any](in *T, participant *models.StorelinkCenter) (interfac
|
|||
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
case *hpcAC.GetResourceSpecResp:
|
||||
var resp types.GetResourceSpecsResp
|
||||
inresp := (interface{})(in).(*hpcAC.GetResourceSpecResp)
|
||||
|
||||
if inresp.Code != "0" {
|
||||
resp.Success = false
|
||||
resp.ResourceSpecs = nil
|
||||
} else {
|
||||
var spec types.ResourceSpecSl
|
||||
resp.Success = true
|
||||
spec.ParticipantName = participant.Name
|
||||
spec.ParticipantId = participant.Id
|
||||
spec.SpecName = SHUGUANGAI_CUSTOM_RESOURCE_NAME
|
||||
spec.SpecId = SHUGUANGAI_CUSTOM_RESOURCE_ID
|
||||
resp.ResourceSpecs = append(resp.ResourceSpecs, &spec)
|
||||
}
|
||||
return resp, nil
|
||||
case *modelarts.TrainingJobFlavorsResp:
|
||||
var resp types.GetResourceSpecsResp
|
||||
|
|
Loading…
Reference in New Issue