modified executor implementations

Former-commit-id: 4cf3a7804d5668bb0e1c3bd09891225b310598b9
This commit is contained in:
tzwang 2024-02-01 16:29:55 +08:00
parent 3cc16100fa
commit c18e5a484c
9 changed files with 52 additions and 27 deletions

View File

@ -67,7 +67,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
envs = append(envs, env) envs = append(envs, env)
} }
} }
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId) task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -30,7 +30,7 @@ type AiScheduler struct {
yamlString string yamlString string
task *response.TaskInfo task *response.TaskInfo
*scheduler.Scheduler *scheduler.Scheduler
option option.AiOption option *option.AiOption
} }
func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) { func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) {
@ -74,7 +74,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
executorMap := *as.AiExecutor executorMap := *as.AiExecutor
for _, cluster := range clusters { for _, cluster := range clusters {
_, err := executorMap[cluster.Name].Execute(option.AiOption{}) _, err := executorMap[cluster.Name].Execute(as.option)
if err != nil { if err != nil {
// TODO: database operation // TODO: database operation
} }

View File

@ -1,17 +1,20 @@
package option package option
type AiOption struct { type AiOption struct {
aiType string // shuguangAi/octopus AiType string // shuguangAi/octopus
resourceType string // cpu/gpu/compute card ResourceType string // cpu/gpu/compute card
taskType string // pytorch/tensorflow TaskType string // pytorch/tensorflow
imageId string ImageId string
specId string SpecId string
datasetsId string DatasetsId string
codeId string CodeId string
ResourceId string
cmd string Cmd string
Envs []string
Params []string
datasets string Datasets string
code string Code string
} }

View File

@ -6,6 +6,6 @@ import (
) )
type AiExecutor interface { type AiExecutor interface {
Execute(option option.AiOption) (interface{}, error) Execute(option *option.AiOption) (interface{}, error)
storeLink.Linkage storeLink.Linkage
} }

View File

@ -63,7 +63,7 @@ func (o *ModelArtsLink) QueryImageList() (interface{}, error) {
return resp, nil return resp, nil
} }
func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// modelArts提交任务 // modelArts提交任务
environments := make(map[string]string) environments := make(map[string]string)
parameters := make([]*modelarts.ParametersTrainJob, 0) parameters := make([]*modelarts.ParametersTrainJob, 0)
@ -153,6 +153,10 @@ func (o *ModelArtsLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil return nil, nil
} }
func (o *ModelArtsLink) Execute(option option.AiOption) (interface{}, error) { func (o *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
return nil, nil task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
if err != nil {
return nil, err
}
return task, nil
} }

View File

@ -107,7 +107,7 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) {
return resp, nil return resp, nil
} }
func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// octopus提交任务 // octopus提交任务
// python参数 // python参数
@ -200,6 +200,10 @@ func (o *OctopusLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil return nil, nil
} }
func (o *OctopusLink) Execute(option option.AiOption) (interface{}, error) { func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
return nil, nil task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
if err != nil {
return nil, err
}
return task, nil
} }

View File

@ -144,7 +144,7 @@ func (s ShuguangHpc) QueryImageList() (interface{}, error) {
return nil, nil return nil, nil
} }
func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// shuguangHpc提交任务 // shuguangHpc提交任务
//判断是否resourceId匹配自定义资源Id //判断是否resourceId匹配自定义资源Id

View File

@ -76,9 +76,7 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) {
return resp, nil return resp, nil
} }
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) { func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
// shuguangAi提交任务
//判断是否resourceId匹配自定义资源Id //判断是否resourceId匹配自定义资源Id
if resourceId != SHUGUANGAI_CUSTOM_RESOURCE_ID { if resourceId != SHUGUANGAI_CUSTOM_RESOURCE_ID {
return nil, errors.New("shuguangAi资源Id不存在") return nil, errors.New("shuguangAi资源Id不存在")
@ -133,6 +131,18 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, param
return resp, nil return resp, nil
} }
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// shuguangAi提交任务
if aiType == PYTORCH {
task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId)
if err != nil {
return nil, err
}
return task, nil
}
return nil, errors.New("shuguangAi不支持的任务类型")
}
func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) { func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) {
// shuguangAi获取任务 // shuguangAi获取任务
req := &hpcAC.GetPytorchTaskReq{ req := &hpcAC.GetPytorchTaskReq{
@ -199,6 +209,10 @@ func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil return nil, nil
} }
func (o *ShuguangAi) Execute(option option.AiOption) (interface{}, error) { func (o *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
return nil, nil task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
if err != nil {
return nil, err
}
return task, nil
} }

View File

@ -31,7 +31,7 @@ type Linkage interface {
UploadImage(path string) (interface{}, error) UploadImage(path string) (interface{}, error)
DeleteImage(imageId string) (interface{}, error) DeleteImage(imageId string) (interface{}, error)
QueryImageList() (interface{}, error) QueryImageList() (interface{}, error)
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error)
QueryTask(taskId string) (interface{}, error) QueryTask(taskId string) (interface{}, error)
QuerySpecs() (interface{}, error) QuerySpecs() (interface{}, error)
DeleteTask(taskId string) (interface{}, error) DeleteTask(taskId string) (interface{}, error)