Merge pull request 'modified storelink module structure' (#14) from tzwang/pcm-coordinator:master into master

Former-commit-id: 2c539fe282f5a0c471398b5efd272d7363e05eeb
This commit is contained in:
tzwang 2024-01-26 17:26:50 +08:00
commit ea2c9412d3
19 changed files with 494 additions and 598 deletions

View File

@ -57,9 +57,18 @@ func (l *DeleteLinkImageLogic) DeleteLinkImage(req *types.DeleteLinkImageReq) (r
return nil, err
}
if img == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
imgResp := img.(types.DeleteLinkImageResp)
return &imgResp, nil
resp = &types.DeleteLinkImageResp{}
//转换成统一返回类型
imgResp, err := storeLink.ConvertType(img, resp, participant)
if err != nil {
return nil, err
}
if imgResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return imgResp.(*types.DeleteLinkImageResp), nil
}

View File

@ -57,9 +57,18 @@ func (l *DeleteLinkTaskLogic) DeleteLinkTask(req *types.DeleteLinkTaskReq) (resp
return nil, err
}
if task == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
taskResp := task.(types.DeleteLinkTaskResp)
return &taskResp, nil
resp = &types.DeleteLinkTaskResp{}
//转换成统一返回类型
taskResp, err := storeLink.ConvertType(task, resp, participant)
if err != nil {
return nil, err
}
if taskResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return taskResp.(*types.DeleteLinkTaskResp), nil
}

View File

@ -57,9 +57,18 @@ func (l *GetAISpecsLogic) GetAISpecs(req *types.GetResourceSpecsReq) (resp *type
return nil, err
}
if specs == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
specsResp := specs.(types.GetResourceSpecsResp)
return &specsResp, nil
resp = &types.GetResourceSpecsResp{}
//转换成统一返回类型
specsResp, err := storeLink.ConvertType(specs, resp, participant)
if err != nil {
return nil, err
}
if specsResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return specsResp.(*types.GetResourceSpecsResp), nil
}

View File

@ -57,9 +57,18 @@ func (l *GetLinkImageListLogic) GetLinkImageList(req *types.GetLinkImageListReq)
return nil, err
}
if list == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
listResp := list.(types.GetLinkImageListResp)
return &listResp, nil
resp = &types.GetLinkImageListResp{}
//转换成统一返回类型
listResp, err := storeLink.ConvertType(list, resp, participant)
if err != nil {
return nil, err
}
if listResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return listResp.(*types.GetLinkImageListResp), nil
}

View File

@ -58,9 +58,18 @@ func (l *GetLinkTaskLogic) GetLinkTask(req *types.GetLinkTaskReq) (resp *types.G
return nil, err
}
if task == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
taskResp := task.(types.GetLinkTaskResp)
return &taskResp, nil
resp = &types.GetLinkTaskResp{}
//转换成统一返回类型
taskResp, err := storeLink.ConvertType(task, resp, participant)
if err != nil {
return nil, err
}
if taskResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return taskResp.(*types.GetLinkTaskResp), nil
}

View File

@ -72,6 +72,15 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
return nil, err
}
taskResp := task.(types.SubmitLinkTaskResp)
return &taskResp, nil
resp = &types.SubmitLinkTaskResp{}
//转换成统一返回类型
taskResp, err := storeLink.ConvertType(task, resp, participant)
if err != nil {
return nil, err
}
if taskResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return taskResp.(*types.SubmitLinkTaskResp), nil
}

View File

@ -58,9 +58,18 @@ func (l *UploadLinkImageLogic) UploadLinkImage(req *types.UploadLinkImageReq) (r
return nil, err
}
if img == nil {
return nil, nil
return nil, storeLink.ERROR_RESP_EMPTY
}
imgResp := img.(types.UploadLinkImageResp)
return &imgResp, nil
resp = &types.UploadLinkImageResp{}
//转换成统一返回类型
imgResp, err := storeLink.ConvertType(img, resp, participant)
if err != nil {
return nil, err
}
if imgResp == nil {
return nil, storeLink.ERROR_CONVERT_EMPTY
}
return imgResp.(*types.UploadLinkImageResp), nil
}

View File

@ -33,7 +33,7 @@ type AiQueue struct {
}
func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
aiExecutorMap, aiCollectorMap := service.InitAiClusterMap(svcCtx.ACRpc, svcCtx.ModelArtsRpc, svcCtx.ModelArtsImgRpc, svcCtx.OctopusRpc)
aiExecutorMap, aiCollectorMap := service.InitAiClusterMap(ctx, svcCtx)
return &AiQueue{
ctx: ctx,
svcCtx: svcCtx,

View File

@ -83,6 +83,8 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return errors.New("clusters is nil")
}
_ = *as.AiExecutor
return nil
}

View File

@ -1,13 +1,11 @@
package service
import (
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/impl"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
)
const (
@ -26,21 +24,21 @@ var (
}
)
func InitAiClusterMap(ACRpc hpcacclient.HpcAC, ModelArtsRpc modelartsservice.ModelArtsService, ModelArtsImgRpc imagesservice.ImagesService, OctopusRpc octopusclient.Octopus) (*map[string]executor.Executor, *map[string]collector.ResourceCollector) {
func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.Executor, *map[string]collector.ResourceCollector) {
executorMap := make(map[string]executor.Executor)
collectorMap := make(map[string]collector.ResourceCollector)
for k, v := range AiTypeMap {
switch v {
case OCTOPUS:
octopus := impl.NewOctopusExecutor(OctopusRpc, k)
octopus := storeLink.NewOctopusLink(ctx, svcCtx, k, 0)
collectorMap[k] = octopus
executorMap[k] = octopus
case MODELARTS:
modelarts := impl.NewModelartsExecutor(ModelArtsRpc, ModelArtsImgRpc, k)
modelarts := storeLink.NewModelArtsLink(ctx, svcCtx, k, 0)
collectorMap[k] = modelarts
executorMap[k] = modelarts
case SHUGUANGAI:
sgai := impl.NewShuguangAiExecutor(ACRpc, k)
sgai := storeLink.NewShuguangAi(ctx, svcCtx, k, 0)
collectorMap[k] = sgai
executorMap[k] = sgai
}

View File

@ -1,17 +1,8 @@
package executor
type Executor interface {
QueryImageList() ([]Image, error)
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error)
QueryTask(taskId string) (Task, error)
QuerySpecs() (Spec, error)
}
type Image struct {
}
type Task struct {
}
type Spec struct {
QueryImageList() (interface{}, error)
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error)
QueryTask(taskId string) (interface{}, error)
QuerySpecs() (interface{}, error)
}

View File

@ -1,44 +0,0 @@
package impl
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
)
type ModelArtsExecutor struct {
Name string
pageIndex int32
pageSize int32
ModelArtsRpc modelartsservice.ModelArtsService
ModelArtsImgRpc imagesservice.ImagesService
}
func NewModelartsExecutor(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string) *ModelArtsExecutor {
return &ModelArtsExecutor{Name: name, ModelArtsRpc: modelArtsRpc, ModelArtsImgRpc: modelArtsImgRpc, pageIndex: 1, pageSize: 100}
}
func (m ModelArtsExecutor) QueryImageList() ([]executor.Image, error) {
//TODO implement me
panic("implement me")
}
func (m ModelArtsExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (m ModelArtsExecutor) QueryTask(taskId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (m ModelArtsExecutor) QuerySpecs() (executor.Spec, error) {
//TODO implement me
panic("implement me")
}
func (a *ModelArtsExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}

View File

@ -1,42 +0,0 @@
package impl
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
)
type OctopusExecutor struct {
Name string
pageIndex int32
pageSize int32
OctopusRpc octopusclient.Octopus
}
func NewOctopusExecutor(OctopusRpc octopusclient.Octopus, name string) *OctopusExecutor {
return &OctopusExecutor{OctopusRpc: OctopusRpc, Name: name, pageIndex: 1, pageSize: 100}
}
func (o OctopusExecutor) QueryImageList() ([]executor.Image, error) {
//TODO implement me
panic("implement me")
}
func (o OctopusExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (o OctopusExecutor) QueryTask(taskId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (o OctopusExecutor) QuerySpecs() (executor.Spec, error) {
//TODO implement me
panic("implement me")
}
func (a *OctopusExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}

View File

@ -1,45 +0,0 @@
package impl
import (
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/executor"
)
//单条作业费=作业运行秒数×(CPU核心数*CPU单价+GPU卡数×GPU单价+DCU卡数×DCU单价)/3600
//CPU单价=队列CPU费率×计算中心CPU单价
//GPU单价=队列GPU费率×计算中心GPU单价
//DCU单价=队列DCU费率×计算中心DCU单价
type ShuguangAiExecutor struct {
Name string
ACRpc hpcacclient.HpcAC
}
func NewShuguangAiExecutor(acRpc hpcacclient.HpcAC, name string) *ShuguangAiExecutor {
return &ShuguangAiExecutor{Name: name, ACRpc: acRpc}
}
func (s ShuguangAiExecutor) QueryImageList() ([]executor.Image, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) QueryTask(taskId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) QuerySpecs() (executor.Spec, error) {
//TODO implement me
panic("implement me")
}
func (a *ShuguangAiExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}

View File

@ -16,8 +16,8 @@ package storeLink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"strconv"
@ -28,13 +28,13 @@ type ModelArtsLink struct {
ctx context.Context
svcCtx *svc.ServiceContext
platform string
participantId int64
pageIndex int32
pageSize int32
participant *models.StorelinkCenter
}
func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *ModelArtsLink {
return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, participant: participant, platform: participant.Name, pageIndex: 1, pageSize: 100}
func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ModelArtsLink {
return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
}
func (o *ModelArtsLink) UploadImage(path string) (interface{}, error) {
@ -59,13 +59,7 @@ func (o *ModelArtsLink) QueryImageList() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
imgListResp, err := ConvertType[modelarts.ListReposDetailsResp](resp, nil)
if err != nil {
return nil, err
}
return imgListResp, nil
return resp, nil
}
func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
@ -110,13 +104,7 @@ func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa
return nil, err
}
//转换成统一返回类型
submitResp, err := ConvertType[modelarts.CreateTrainingJobResp](resp, nil)
if err != nil {
return nil, err
}
return submitResp, nil
return resp, nil
}
func (o *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
@ -130,13 +118,7 @@ func (o *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
taskResp, err := ConvertType[modelarts.JobResponse](resp, o.participant)
if err != nil {
return nil, err
}
return taskResp, nil
return resp, nil
}
func (o *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
@ -150,13 +132,7 @@ func (o *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
deleteResp, err := ConvertType[modelarts.DeleteTrainingJobResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
return resp, nil
}
func (o *ModelArtsLink) QuerySpecs() (interface{}, error) {
@ -169,11 +145,9 @@ func (o *ModelArtsLink) QuerySpecs() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
specsResp, err := ConvertType[modelarts.TrainingJobFlavorsResp](resp, o.participant)
if err != nil {
return nil, err
}
return specsResp, nil
return resp, nil
}
func (o *ModelArtsLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}

View File

@ -16,8 +16,8 @@ package storeLink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
"strings"
@ -28,7 +28,8 @@ type OctopusLink struct {
svcCtx *svc.ServiceContext
pageIndex int32
pageSize int32
participant *models.StorelinkCenter
platform string
participantId int64
}
const (
@ -38,14 +39,14 @@ const (
RESOURCE_POOL = "common-pool"
)
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *OctopusLink {
return &OctopusLink{ctx: ctx, svcCtx: svcCtx, participant: participant, pageIndex: 1, pageSize: 100}
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink {
return &OctopusLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
}
func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
// octopus创建镜像
createReq := &octopus.CreateImageReq{
Platform: o.participant.Name,
Platform: o.platform,
CreateImage: &octopus.CreateImage{
SourceType: 1,
ImageName: IMG_NAME_PREFIX + utils.RandomString(7),
@ -59,7 +60,7 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
// octopus上传镜像
uploadReq := &octopus.UploadImageReq{
Platform: o.participant.Name,
Platform: o.platform,
ImageId: createResp.Payload.ImageId,
Params: &octopus.UploadImageParam{
Domain: "",
@ -73,19 +74,13 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
// Todo 实际上传
//转换成统一返回类型
resp, err := ConvertType[octopus.UploadImageResp](uploadResp, nil)
if err != nil {
return nil, err
}
return resp, nil
return uploadResp, nil
}
func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
// octopus删除镜像
req := &octopus.DeleteImageReq{
Platform: o.participant.Name,
Platform: o.platform,
ImageId: imageId,
}
resp, err := o.svcCtx.OctopusRpc.DeleteImage(o.ctx, req)
@ -93,19 +88,13 @@ func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
deleteResp, err := ConvertType[octopus.DeleteImageResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
return resp, nil
}
func (o *OctopusLink) QueryImageList() (interface{}, error) {
// octopus获取镜像列表
req := &octopus.GetUserImageListReq{
Platform: o.participant.Name,
Platform: o.platform,
PageIndex: o.pageIndex,
PageSize: o.pageSize,
}
@ -114,13 +103,7 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
imgListResp, err := ConvertType[octopus.GetUserImageListResp](resp, nil)
if err != nil {
return nil, err
}
return imgListResp, nil
return resp, nil
}
func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
@ -144,7 +127,7 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, para
}
req := &octopus.CreateTrainJobReq{
Platform: o.participant.Name,
Platform: o.platform,
Params: &octopus.CreateTrainJobParam{
ImageId: imageId,
Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10),
@ -167,19 +150,13 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, para
return nil, err
}
//转换成统一返回类型
submitResp, err := ConvertType[octopus.CreateTrainJobResp](resp, nil)
if err != nil {
return nil, err
}
return submitResp, nil
return resp, nil
}
func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
// octopus获取任务
req := &octopus.GetTrainJobReq{
Platform: o.participant.Name,
Platform: o.platform,
Id: taskId,
}
resp, err := o.svcCtx.OctopusRpc.GetTrainJob(o.ctx, req)
@ -187,19 +164,13 @@ func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
taskResp, err := ConvertType[octopus.GetTrainJobResp](resp, nil)
if err != nil {
return nil, err
}
return taskResp, nil
return resp, nil
}
func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) {
// octopus删除任务
req := &octopus.DeleteTrainJobReq{
Platform: o.participant.Name,
Platform: o.platform,
JobIds: []string{taskId},
}
resp, err := o.svcCtx.OctopusRpc.DeleteTrainJob(o.ctx, req)
@ -207,19 +178,13 @@ func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
deleteResp, err := ConvertType[octopus.DeleteTrainJobResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
return resp, nil
}
func (o *OctopusLink) QuerySpecs() (interface{}, error) {
// octopus查询资源规格
req := &octopus.GetResourceSpecsReq{
Platform: o.participant.Name,
Platform: o.platform,
ResourcePool: "common-pool",
}
resp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req)
@ -227,11 +192,9 @@ func (o *OctopusLink) QuerySpecs() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
specsResp, err := ConvertType[octopus.GetResourceSpecsResp](resp, o.participant)
if err != nil {
return nil, err
}
return specsResp, nil
return resp, nil
}
func (o *OctopusLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}

View File

@ -7,7 +7,6 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"strconv"
"strings"
@ -16,7 +15,8 @@ import (
type ShuguangHpc struct {
ctx context.Context
svcCtx *svc.ServiceContext
participant *models.StorelinkCenter
platform string
participantId int64
}
const (
@ -128,8 +128,8 @@ type ResourceSpec struct {
GAP_NDCU string
}
func NewShuguangHpc(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *ShuguangHpc {
return &ShuguangHpc{ctx: ctx, svcCtx: svcCtx, participant: participant}
func NewShuguangHpc(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangHpc {
return &ShuguangHpc{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id}
}
func (s ShuguangHpc) UploadImage(path string) (interface{}, error) {
@ -199,13 +199,7 @@ func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, param
return nil, err
}
//转换成统一返回类型
submitResp, err := ConvertType[hpcAC.SubmitJobResp](resp, nil)
if err != nil {
return nil, err
}
return submitResp, nil
return resp, nil
}
@ -221,13 +215,8 @@ func (s ShuguangHpc) QueryTask(taskId string) (interface{}, error) {
//实时作业检查是否成功
if respC.Data != nil && respC.Data.JobEndTime != "" {
taskRespC, err := ConvertType[hpcAC.GetJobDetailResp](respC, nil)
if err != nil {
return nil, err
}
return taskRespC, nil
}
return respC, nil
} else {
//历史作业
reqH := &hpcAC.HistoryJobDetailReq{
JobId: taskId,
@ -239,12 +228,8 @@ func (s ShuguangHpc) QueryTask(taskId string) (interface{}, error) {
return nil, err
}
taskRespH, err := ConvertType[hpcAC.HistoryJobDetailResp](respH, nil)
if err != nil {
return nil, err
return respH, nil
}
return taskRespH, nil
}
func (s ShuguangHpc) QuerySpecs() (interface{}, error) {
@ -254,8 +239,8 @@ func (s ShuguangHpc) QuerySpecs() (interface{}, error) {
var respec types.ResourceSpecSl
respec.SpecId = k
respec.SpecName = v
respec.ParticipantId = s.participant.Id
respec.ParticipantName = s.participant.Name
respec.ParticipantId = s.participantId
respec.ParticipantName = s.platform
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
}
@ -273,13 +258,7 @@ func (s ShuguangHpc) DeleteTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
taskResp, err := ConvertType[hpcAC.DeleteJobResp](resp, nil)
if err != nil {
return nil, err
}
return taskResp, nil
return resp, nil
}
func updateRequestByResourceId(resourceId string, req *hpcAC.SubmitJobReq) {

View File

@ -18,8 +18,8 @@ import (
"context"
"errors"
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"strings"
)
@ -27,7 +27,8 @@ import (
type ShuguangAi struct {
ctx context.Context
svcCtx *svc.ServiceContext
participant *models.StorelinkCenter
platform string
participantId int64
}
const (
@ -47,8 +48,8 @@ const (
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
)
func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *ShuguangAi {
return &ShuguangAi{ctx: ctx, svcCtx: svcCtx, participant: participant}
func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangAi {
return &ShuguangAi{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id}
}
func (s *ShuguangAi) UploadImage(path string) (interface{}, error) {
@ -70,13 +71,7 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
imgListResp, err := ConvertType[hpcAC.GetImageListAiResp](resp, nil)
if err != nil {
return nil, err
}
return imgListResp, nil
return resp, nil
}
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
@ -133,13 +128,7 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, param
return nil, err
}
//转换成统一返回类型
submitResp, err := ConvertType[hpcAC.SubmitTaskAiResp](resp, nil)
if err != nil {
return nil, err
}
return submitResp, nil
return resp, nil
}
func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) {
@ -152,13 +141,7 @@ func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
taskResp, err := ConvertType[hpcAC.GetPytorchTaskResp](resp, nil)
if err != nil {
return nil, err
}
return taskResp, nil
return resp, nil
}
func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) {
@ -171,13 +154,7 @@ func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) {
return nil, err
}
//转换成统一返回类型
deleteResp, err := ConvertType[hpcAC.DeleteTaskAiResp](resp, nil)
if err != nil {
return nil, err
}
return deleteResp, nil
return resp, nil
}
func (o *ShuguangAi) QuerySpecs() (interface{}, error) {
@ -191,11 +168,9 @@ func (o *ShuguangAi) QuerySpecs() (interface{}, error) {
return nil, err
}
//转换成统一返回类型
specsResp, err := ConvertType[hpcAC.GetResourceSpecResp](specs, o.participant)
if err != nil {
return nil, err
}
return specsResp, nil
return specs, nil
}
func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}

View File

@ -65,6 +65,8 @@ var (
"3": SHUGUANGAI,
"4": SHUGUANGHPC,
}
ERROR_RESP_EMPTY = errors.New("resp empty error")
ERROR_CONVERT_EMPTY = errors.New("convert empty error")
)
type StoreLink struct {
@ -74,16 +76,16 @@ type StoreLink struct {
func NewStoreLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *StoreLink {
switch participant.Type {
case TYPE_OCTOPUS:
linkStruct := NewOctopusLink(ctx, svcCtx, participant)
linkStruct := NewOctopusLink(ctx, svcCtx, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct}
case TYPE_MODELARTS:
linkStruct := NewModelArtsLink(ctx, svcCtx, participant)
linkStruct := NewModelArtsLink(ctx, svcCtx, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct}
case TYPE_SHUGUANGAI:
linkStruct := NewShuguangAi(ctx, svcCtx, participant)
linkStruct := NewShuguangAi(ctx, svcCtx, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct}
case TYPE_SHUGUANGHPC:
linkStruct := NewShuguangHpc(ctx, svcCtx, participant)
linkStruct := NewShuguangHpc(ctx, svcCtx, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct}
default:
return nil
@ -102,7 +104,7 @@ func GetParticipantById(partId int64, dbEngin *gorm.DB) *models.StorelinkCenter
return &participant
}
func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.StorelinkCenter) (interface{}, error) {
func ConvertType(in interface{}, out interface{}, participant *models.StorelinkCenter) (interface{}, error) {
switch (interface{})(in).(type) {
case *octopus.UploadImageResp:
@ -121,18 +123,23 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
case *octopus.DeleteImageResp:
inresp := (interface{})(in).(*octopus.DeleteImageResp)
var resp types.DeleteLinkImageResp
switch (interface{})(out).(type) {
case *types.DeleteLinkImageResp:
resp := (interface{})(out).(*types.DeleteLinkImageResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
return resp, nil
}
return resp, nil
}
return nil, nil
case *octopus.GetUserImageListResp:
inresp := (interface{})(in).(*octopus.GetUserImageListResp)
var resp types.GetLinkImageListResp
switch (interface{})(out).(type) {
case *types.GetLinkImageListResp:
resp := (interface{})(out).(*types.GetLinkImageListResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
@ -148,10 +155,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.Images = append(resp.Images, &image)
}
return resp, nil
}
return nil, nil
case *modelarts.ListReposDetailsResp:
inresp := (interface{})(in).(*modelarts.ListReposDetailsResp)
var resp types.GetLinkImageListResp
switch (interface{})(out).(type) {
case *types.GetLinkImageListResp:
resp := (interface{})(out).(*types.GetLinkImageListResp)
if inresp.Errors != nil {
resp.Success = false
resp.ErrorMsg = inresp.Errors[0].ErrorMessage
@ -170,10 +181,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
}
}
return resp, nil
}
return nil, nil
case *hpcAC.GetImageListAiResp:
inresp := (interface{})(in).(*hpcAC.GetImageListAiResp)
var resp types.GetLinkImageListResp
switch (interface{})(out).(type) {
case *types.GetLinkImageListResp:
resp := (interface{})(out).(*types.GetLinkImageListResp)
if inresp.Code == "0" {
resp.Success = true
for _, img := range inresp.Data {
@ -189,11 +204,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.Images = nil
}
return resp, nil
}
return nil, nil
case *octopus.CreateTrainJobResp:
inresp := (interface{})(in).(*octopus.CreateTrainJobResp)
var resp types.SubmitLinkTaskResp
switch (interface{})(out).(type) {
case *types.SubmitLinkTaskResp:
resp := (interface{})(out).(*types.SubmitLinkTaskResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
@ -203,10 +221,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.TaskId = inresp.Payload.JobId
return resp, nil
}
return nil, nil
case *modelarts.CreateTrainingJobResp:
inresp := (interface{})(in).(*modelarts.CreateTrainingJobResp)
var resp types.SubmitLinkTaskResp
switch (interface{})(out).(type) {
case *types.SubmitLinkTaskResp:
resp := (interface{})(out).(*types.SubmitLinkTaskResp)
if inresp.ErrorMsg != "" {
resp.ErrorMsg = inresp.ErrorMsg
resp.Success = false
@ -216,10 +238,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.TaskId = inresp.Metadata.Id
return resp, nil
}
return nil, nil
case *hpcAC.SubmitTaskAiResp:
inresp := (interface{})(in).(*hpcAC.SubmitTaskAiResp)
var resp types.SubmitLinkTaskResp
switch (interface{})(out).(type) {
case *types.SubmitLinkTaskResp:
resp := (interface{})(out).(*types.SubmitLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
resp.TaskId = inresp.Data
@ -228,10 +254,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.ErrorMsg = inresp.Msg
}
return resp, nil
}
return nil, nil
case *hpcAC.SubmitJobResp:
inresp := (interface{})(in).(*hpcAC.SubmitJobResp)
var resp types.SubmitLinkTaskResp
switch (interface{})(out).(type) {
case *types.SubmitLinkTaskResp:
resp := (interface{})(out).(*types.SubmitLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
resp.TaskId = inresp.Data
@ -240,10 +270,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.ErrorMsg = inresp.Msg
}
return resp, nil
}
return nil, nil
case *octopus.GetTrainJobResp:
inresp := (interface{})(in).(*octopus.GetTrainJobResp)
var resp types.GetLinkTaskResp
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
@ -259,10 +293,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.Task = &task
return resp, nil
}
return nil, nil
case *modelarts.JobResponse:
inresp := (interface{})(in).(*modelarts.JobResponse)
var resp types.GetLinkTaskResp
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
if inresp.ErrorMsg != "" {
resp.ErrorMsg = inresp.ErrorMsg
resp.Success = false
@ -277,10 +315,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.Task.TaskStatus = inresp.Status.Phase
return resp, nil
}
return nil, nil
case *hpcAC.GetPytorchTaskResp:
inresp := (interface{})(in).(*hpcAC.GetPytorchTaskResp)
var resp types.GetLinkTaskResp
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
var task types.TaskSl
@ -297,10 +339,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
}
return resp, nil
}
return nil, nil
case *hpcAC.GetJobDetailResp:
inresp := (interface{})(in).(*hpcAC.GetJobDetailResp)
var resp types.GetLinkTaskResp
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
var task types.TaskSl
@ -317,10 +363,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
}
return resp, nil
}
return nil, nil
case *hpcAC.HistoryJobDetailResp:
inresp := (interface{})(in).(*hpcAC.HistoryJobDetailResp)
var resp types.GetLinkTaskResp
switch (interface{})(out).(type) {
case *types.GetLinkTaskResp:
resp := (interface{})(out).(*types.GetLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
var task types.TaskSl
@ -337,10 +387,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
}
return resp, nil
}
return nil, nil
case *octopus.DeleteTrainJobResp:
inresp := (interface{})(in).(*octopus.DeleteTrainJobResp)
var resp types.DeleteLinkTaskResp
switch (interface{})(out).(type) {
case *types.DeleteLinkTaskResp:
resp := (interface{})(out).(*types.DeleteLinkTaskResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ErrorMsg = inresp.Error.Message
@ -348,10 +402,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
}
return resp, nil
}
return nil, nil
case *modelarts.DeleteTrainingJobResp:
inresp := (interface{})(in).(*modelarts.DeleteTrainingJobResp)
var resp types.DeleteLinkTaskResp
switch (interface{})(out).(type) {
case *types.DeleteLinkTaskResp:
resp := (interface{})(out).(*types.DeleteLinkTaskResp)
if inresp.ErrorMsg != "" {
resp.ErrorMsg = inresp.ErrorMsg
resp.Success = false
@ -359,10 +417,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
}
resp.Success = true
return resp, nil
}
return nil, nil
case *hpcAC.DeleteTaskAiResp:
inresp := (interface{})(in).(*hpcAC.DeleteTaskAiResp)
var resp types.DeleteLinkTaskResp
switch (interface{})(out).(type) {
case *types.DeleteLinkTaskResp:
resp := (interface{})(out).(*types.DeleteLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
} else {
@ -370,10 +432,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.ErrorMsg = inresp.Msg
}
return resp, nil
}
return nil, nil
case *hpcAC.DeleteJobResp:
inresp := (interface{})(in).(*hpcAC.DeleteJobResp)
var resp types.DeleteLinkTaskResp
switch (interface{})(out).(type) {
case *types.DeleteLinkTaskResp:
resp := (interface{})(out).(*types.DeleteLinkTaskResp)
if inresp.Code == "0" {
resp.Success = true
} else {
@ -381,10 +447,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.ErrorMsg = inresp.Msg
}
return resp, nil
}
return nil, nil
case *octopus.GetResourceSpecsResp:
inresp := (interface{})(in).(*octopus.GetResourceSpecsResp)
var resp types.GetResourceSpecsResp
switch (interface{})(out).(type) {
case *types.GetResourceSpecsResp:
resp := (interface{})(out).(*types.GetResourceSpecsResp)
resp.Success = inresp.Success
if !resp.Success {
resp.ResourceSpecs = nil
@ -402,10 +472,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
}
return resp, nil
}
return nil, nil
case *hpcAC.GetResourceSpecResp:
inresp := (interface{})(in).(*hpcAC.GetResourceSpecResp)
var resp types.GetResourceSpecsResp
switch (interface{})(out).(type) {
case *types.GetResourceSpecsResp:
resp := (interface{})(out).(*types.GetResourceSpecsResp)
if inresp.Code != "0" {
resp.Success = false
resp.ResourceSpecs = nil
@ -419,9 +493,14 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
resp.ResourceSpecs = append(resp.ResourceSpecs, &spec)
}
return resp, nil
}
return nil, nil
case *modelarts.TrainingJobFlavorsResp:
inresp := (interface{})(in).(*modelarts.TrainingJobFlavorsResp)
var resp types.GetResourceSpecsResp
switch (interface{})(out).(type) {
case *types.GetResourceSpecsResp:
resp := (interface{})(out).(*types.GetResourceSpecsResp)
resp.Success = true
if inresp.Flavors == nil {
@ -441,12 +520,15 @@ func ConvertType2[T any, RESP any](in *T, out *RESP, participant *models.Storeli
}
return resp, nil
}
return nil, nil
default:
return nil, errors.New("type convert fail")
}
}
func ConvertType[T any](in *T, participant *models.StorelinkCenter) (interface{}, error) {
func ConvertTypeOld[T any](in *T, participant *models.StorelinkCenter) (interface{}, error) {
switch (interface{})(in).(type) {
case *octopus.UploadImageResp: