diff --git a/api/internal/logic/schedule/schedulegetalgorithmslogic.go b/api/internal/logic/schedule/schedulegetalgorithmslogic.go index fe949b41..2c78efd3 100644 --- a/api/internal/logic/schedule/schedulegetalgorithmslogic.go +++ b/api/internal/logic/schedule/schedulegetalgorithmslogic.go @@ -2,6 +2,7 @@ package schedule import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +25,11 @@ func NewScheduleGetAlgorithmsLogic(ctx context.Context, svcCtx *svc.ServiceConte } func (l *ScheduleGetAlgorithmsLogic) ScheduleGetAlgorithms(req *types.AiAlgorithmsReq) (resp *types.AiAlgorithmsResp, err error) { - // todo: add your logic here and delete this line - - return + resp = &types.AiAlgorithmsResp{} + algorithms, err := storeLink.GetAlgorithms(l.ctx, l.svcCtx.Scheduler.ResourceCollector, req.ResourceType, req.TaskType, req.Dataset) + if err != nil { + return nil, err + } + resp.Algorithms = algorithms + return resp, nil } diff --git a/api/internal/logic/schedule/schedulegetdatasetslogic.go b/api/internal/logic/schedule/schedulegetdatasetslogic.go index 94005178..f7aeab14 100644 --- a/api/internal/logic/schedule/schedulegetdatasetslogic.go +++ b/api/internal/logic/schedule/schedulegetdatasetslogic.go @@ -2,9 +2,7 @@ package schedule import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -27,8 +25,7 @@ func NewScheduleGetDatasetsLogic(ctx context.Context, svcCtx *svc.ServiceContext func (l *ScheduleGetDatasetsLogic) ScheduleGetDatasets() (resp *types.AiDatasetsResp, err error) { resp = &types.AiDatasetsResp{} - _, colMap := service.InitAiClusterMap(l.ctx, l.svcCtx) - names, err := storeLink.GetDatasetsNames(colMap) + names, err := storeLink.GetDatasetsNames(l.ctx, l.svcCtx.Scheduler.ResourceCollector) if err != nil { return nil, err } diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index fad58867..fc7469f9 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -2,6 +2,8 @@ package schedule import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" @@ -24,7 +26,28 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc } func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) { - // todo: add your logic here and delete this line + resp = &types.ScheduleResp{} + opt := &option.AiOption{ + ResourceType: req.AiOption.ResourceType, + Tops: 0, + TaskType: req.AiOption.TaskType, + DatasetsName: req.AiOption.Datasets, + AlgorithmName: "cnn", + StrategyName: req.AiOption.Strategy, + ClusterToStaticWeight: nil, + Params: []string{ + "epoch,1", + }, + } + aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt) + if err != nil { + return nil, err + } - return + err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) + if err != nil { + return nil, err + } + + return resp, nil } diff --git a/api/internal/logic/storelink/deletelinkimagelogic.go b/api/internal/logic/storelink/deletelinkimagelogic.go index 501d2596..6b2f91e5 100644 --- a/api/internal/logic/storelink/deletelinkimagelogic.go +++ b/api/internal/logic/storelink/deletelinkimagelogic.go @@ -47,12 +47,12 @@ func (l *DeleteLinkImageLogic) DeleteLinkImage(req *types.DeleteLinkImageReq) (r return resp, nil } - storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } - img, err := storelink.ILinkage.DeleteImage(req.ImageId) + img, err := storelink.ILinkage.DeleteImage(l.ctx, req.ImageId) if err != nil { return nil, err } diff --git a/api/internal/logic/storelink/deletelinktasklogic.go b/api/internal/logic/storelink/deletelinktasklogic.go index c325a60b..1321f983 100644 --- a/api/internal/logic/storelink/deletelinktasklogic.go +++ b/api/internal/logic/storelink/deletelinktasklogic.go @@ -47,12 +47,12 @@ func (l *DeleteLinkTaskLogic) DeleteLinkTask(req *types.DeleteLinkTaskReq) (resp return resp, nil } - storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } - task, err := storelink.ILinkage.DeleteTask(req.TaskId) + task, err := storelink.ILinkage.DeleteTask(l.ctx, req.TaskId) if err != nil { return nil, err } diff --git a/api/internal/logic/storelink/getaispecslogic.go b/api/internal/logic/storelink/getaispecslogic.go index 2b0aa68f..2ba8ee38 100644 --- a/api/internal/logic/storelink/getaispecslogic.go +++ b/api/internal/logic/storelink/getaispecslogic.go @@ -47,12 +47,12 @@ func (l *GetAISpecsLogic) GetAISpecs(req *types.GetResourceSpecsReq) (resp *type return resp, nil } - storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } - specs, err := storelink.ILinkage.QuerySpecs() + specs, err := storelink.ILinkage.QuerySpecs(l.ctx) if err != nil { return nil, err } diff --git a/api/internal/logic/storelink/getlinkimagelistlogic.go b/api/internal/logic/storelink/getlinkimagelistlogic.go index 4fdae263..9e811da8 100644 --- a/api/internal/logic/storelink/getlinkimagelistlogic.go +++ b/api/internal/logic/storelink/getlinkimagelistlogic.go @@ -47,12 +47,12 @@ func (l *GetLinkImageListLogic) GetLinkImageList(req *types.GetLinkImageListReq) return resp, nil } - storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } - list, err := storelink.ILinkage.QueryImageList() + list, err := storelink.ILinkage.QueryImageList(l.ctx) if err != nil { return nil, err } diff --git a/api/internal/logic/storelink/getlinktasklogic.go b/api/internal/logic/storelink/getlinktasklogic.go index 301f7a46..97aab7af 100644 --- a/api/internal/logic/storelink/getlinktasklogic.go +++ b/api/internal/logic/storelink/getlinktasklogic.go @@ -48,12 +48,12 @@ func (l *GetLinkTaskLogic) GetLinkTask(req *types.GetLinkTaskReq) (resp *types.G return resp, nil } - storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } - task, err := storelink.ILinkage.QueryTask(req.TaskId) + task, err := storelink.ILinkage.QueryTask(l.ctx, req.TaskId) if err != nil { return nil, err } diff --git a/api/internal/logic/storelink/submitlinktasklogic.go b/api/internal/logic/storelink/submitlinktasklogic.go index 53536ffe..5e8f9d14 100644 --- a/api/internal/logic/storelink/submitlinktasklogic.go +++ b/api/internal/logic/storelink/submitlinktasklogic.go @@ -48,7 +48,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp return resp, nil } - storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } @@ -67,7 +67,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp envs = append(envs, env) } } - task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "", "", "pytorch") + task, err := storelink.ILinkage.SubmitTask(l.ctx, req.ImageId, req.Cmd, envs, params, req.ResourceId, "", "", "pytorch") if err != nil { return nil, err } diff --git a/api/internal/logic/storelink/uploadlinkimagelogic.go b/api/internal/logic/storelink/uploadlinkimagelogic.go index 95c4a9da..60beade5 100644 --- a/api/internal/logic/storelink/uploadlinkimagelogic.go +++ b/api/internal/logic/storelink/uploadlinkimagelogic.go @@ -48,12 +48,12 @@ func (l *UploadLinkImageLogic) UploadLinkImage(req *types.UploadLinkImageReq) (r return resp, nil } - storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant) + storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant) if storelink == nil { return nil, nil } - img, err := storelink.ILinkage.UploadImage(req.FilePath) + img, err := storelink.ILinkage.UploadImage(l.ctx, req.FilePath) if err != nil { return nil, err } diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index c090a795..61713ad2 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -16,9 +16,7 @@ package mqs import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" ) @@ -27,25 +25,22 @@ import ( Listening to the payment flow status change notification message queue */ type AiQueue struct { - ctx context.Context - svcCtx *svc.ServiceContext - scheduler *scheduler.Scheduler + ctx context.Context + svcCtx *svc.ServiceContext } func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { - aiExecutorMap, aiCollectorMap := service.InitAiClusterMap(ctx, svcCtx) return &AiQueue{ - ctx: ctx, - svcCtx: svcCtx, - scheduler: scheduler.NewSchdlr(aiCollectorMap, nil, aiExecutorMap), + ctx: ctx, + svcCtx: svcCtx, } } func (l *AiQueue) Consume(val string) error { - aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler, nil) + aiSchdl, _ := schedulers.NewAiScheduler(l.ctx, val, l.svcCtx.Scheduler, nil) // 调度算法 - err := l.scheduler.AssignAndSchedule(aiSchdl) + err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) if err != nil { return err } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index aa984f16..edcc5969 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -2,18 +2,44 @@ package database import ( "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gorm.io/gorm" + "time" ) type AiStorage struct { DbEngin *gorm.DB } -func (s *AiStorage) GetParticipants() { +func (s *AiStorage) GetParticipants() (*types.ClusterListResp, error) { var resp types.ClusterListResp tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.List) if tx.Error != nil { logx.Errorf(tx.Error.Error()) + return nil, tx.Error } + return &resp, nil +} + +func (s *AiStorage) SaveTask(cluster strategy.AssignedCluster) error { + // 构建主任务结构体 + taskModel := models.Task{ + Status: constants.Saved, + Description: "ai task", + Name: "testAi", + CommitTime: time.Now(), + } + // 保存任务数据到数据库 + tx := s.DbEngin.Create(&taskModel) + if tx.Error != nil { + return tx.Error + } + return nil +} + +func (s *AiStorage) UpdateTask() error { + return nil } diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index 8c3265db..281788e6 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -39,7 +39,7 @@ type Scheduler struct { result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService ResourceCollector *map[string]collector.AiCollector - Storages database.Storage + AiStorages *database.AiStorage AiExecutor *map[string]executor.AiExecutor mu sync.RWMutex } @@ -59,8 +59,8 @@ func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, partici return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler { - return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor} +func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages *database.AiStorage, aiExecutor *map[string]executor.AiExecutor) *Scheduler { + return &Scheduler{ResourceCollector: resourceCollector, AiStorages: storages, AiExecutor: aiExecutor} } func (s *Scheduler) SpecifyClusters() { diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 1667f8af..024ef7ae 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -15,6 +15,7 @@ package schedulers import ( + "context" "errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" @@ -32,10 +33,11 @@ type AiScheduler struct { task *response.TaskInfo *scheduler.Scheduler option *option.AiOption + ctx context.Context } -func NewAiScheduler(val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) { - return &AiScheduler{yamlString: val, Scheduler: scheduler, option: option}, nil +func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) { + return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil } func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { @@ -56,6 +58,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { } resources, err := as.findClustersWithResources() + if err != nil { return nil, err } @@ -104,7 +107,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { continue } go func() { - _, err := executorMap[c.Name].Execute(as.option) + _, err := executorMap[c.Name].Execute(as.ctx, as.option) if err != nil { // TODO: database operation } @@ -127,7 +130,7 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, wg.Add(1) rc := resourceCollector go func() { - spec, err := rc.GetResourceStats() + spec, err := rc.GetResourceStats(as.ctx) if err != nil { errCh <- err wg.Done() diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index a3773562..5b21003b 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -1,11 +1,13 @@ package service import ( - "context" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" ) const ( @@ -24,21 +26,21 @@ var ( } ) -func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.AiCollector) { +func InitAiClusterMap(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, aCRpc hpcacclient.HpcAC) (*map[string]executor.AiExecutor, *map[string]collector.AiCollector) { executorMap := make(map[string]executor.AiExecutor) collectorMap := make(map[string]collector.AiCollector) for k, v := range AiTypeMap { switch v { case OCTOPUS: - octopus := storeLink.NewOctopusLink(ctx, svcCtx, k, 0) + octopus := storeLink.NewOctopusLink(octopusRpc, k, 0) collectorMap[k] = octopus executorMap[k] = octopus case MODELARTS: - modelarts := storeLink.NewModelArtsLink(ctx, svcCtx, k, 0) + modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, k, 0) collectorMap[k] = modelarts executorMap[k] = modelarts case SHUGUANGAI: - sgai := storeLink.NewShuguangAi(ctx, svcCtx, k, 0) + sgai := storeLink.NewShuguangAi(aCRpc, k, 0) collectorMap[k] = sgai executorMap[k] = sgai } diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index 8d67175c..39a05e5e 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -1,9 +1,11 @@ package collector +import "context" + type AiCollector interface { - GetResourceStats() (*ResourceStats, error) - GetDatasetsSpecs() ([]*DatasetsSpecs, error) - GetAlgorithms() ([]*Algorithm, error) + GetResourceStats(ctx context.Context) (*ResourceStats, error) + GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error) + GetAlgorithms(ctx context.Context) ([]*Algorithm, error) } type ResourceStats struct { diff --git a/api/internal/scheduler/service/executor/aiExecutor.go b/api/internal/scheduler/service/executor/aiExecutor.go index fae0a90a..de3e49da 100644 --- a/api/internal/scheduler/service/executor/aiExecutor.go +++ b/api/internal/scheduler/service/executor/aiExecutor.go @@ -1,9 +1,10 @@ package executor import ( + "context" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" ) type AiExecutor interface { - Execute(option *option.AiOption) (interface{}, error) + Execute(ctx context.Context, option *option.AiOption) (interface{}, error) } diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index 64c3437b..6dffc2ec 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -18,44 +18,45 @@ import ( "context" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice" "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts" "strconv" "strings" ) type ModelArtsLink struct { - ctx context.Context - svcCtx *svc.ServiceContext - platform string - participantId int64 - pageIndex int32 - pageSize int32 + modelArtsRpc modelartsservice.ModelArtsService + modelArtsImgRpc imagesservice.ImagesService + platform string + participantId int64 + pageIndex int32 + pageSize int32 } -func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ModelArtsLink { - return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100} +func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64) *ModelArtsLink { + return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100} } -func (m *ModelArtsLink) UploadImage(path string) (interface{}, error) { +func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) { //TODO modelArts上传镜像 return nil, nil } -func (m *ModelArtsLink) DeleteImage(imageId string) (interface{}, error) { +func (m *ModelArtsLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) { // TODO modelArts删除镜像 return nil, nil } -func (m *ModelArtsLink) QueryImageList() (interface{}, error) { +func (m *ModelArtsLink) QueryImageList(ctx context.Context) (interface{}, error) { // modelArts获取镜像列表 req := &modelarts.ListRepoReq{ Offset: "0", Limit: strconv.Itoa(int(m.pageSize)), Platform: m.platform, } - resp, err := m.svcCtx.ModelArtsImgRpc.ListReposDetails(m.ctx, req) + resp, err := m.modelArtsImgRpc.ListReposDetails(ctx, req) if err != nil { return nil, err } @@ -63,7 +64,7 @@ func (m *ModelArtsLink) QueryImageList() (interface{}, error) { return resp, nil } -func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { +func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // modelArts提交任务 environments := make(map[string]string) parameters := make([]*modelarts.ParametersTrainJob, 0) @@ -100,7 +101,7 @@ func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa }, Platform: m.platform, } - resp, err := m.svcCtx.ModelArtsRpc.CreateTrainingJob(m.ctx, req) + resp, err := m.modelArtsRpc.CreateTrainingJob(ctx, req) if err != nil { return nil, err } @@ -108,13 +109,13 @@ func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa return resp, nil } -func (m *ModelArtsLink) QueryTask(taskId string) (interface{}, error) { +func (m *ModelArtsLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) { // 获取任务 req := &modelarts.DetailTrainingJobsReq{ TrainingJobId: taskId, Platform: m.platform, } - resp, err := m.svcCtx.ModelArtsRpc.GetTrainingJobs(m.ctx, req) + resp, err := m.modelArtsRpc.GetTrainingJobs(ctx, req) if err != nil { return nil, err } @@ -122,13 +123,13 @@ func (m *ModelArtsLink) QueryTask(taskId string) (interface{}, error) { return resp, nil } -func (m *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) { +func (m *ModelArtsLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) { // 删除任务 req := &modelarts.DeleteTrainingJobReq{ TrainingJobId: taskId, Platform: m.platform, } - resp, err := m.svcCtx.ModelArtsRpc.DeleteTrainingJob(m.ctx, req) + resp, err := m.modelArtsRpc.DeleteTrainingJob(ctx, req) if err != nil { return nil, err } @@ -136,12 +137,12 @@ func (m *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) { return resp, nil } -func (m *ModelArtsLink) QuerySpecs() (interface{}, error) { +func (m *ModelArtsLink) QuerySpecs(ctx context.Context) (interface{}, error) { // octopus查询资源规格 req := &modelarts.TrainingJobFlavorsReq{ Platform: m.platform, } - resp, err := m.svcCtx.ModelArtsRpc.GetTrainingJobFlavors(m.ctx, req) + resp, err := m.modelArtsRpc.GetTrainingJobFlavors(ctx, req) if err != nil { return nil, err } @@ -149,32 +150,32 @@ func (m *ModelArtsLink) QuerySpecs() (interface{}, error) { return resp, nil } -func (m *ModelArtsLink) GetResourceStats() (*collector.ResourceStats, error) { +func (m *ModelArtsLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { return nil, nil } -func (m *ModelArtsLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { +func (m *ModelArtsLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) { return nil, nil } -func (m *ModelArtsLink) GetAlgorithms() ([]*collector.Algorithm, error) { +func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) { return nil, nil } -func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) { - err := m.GenerateSubmitParams(option) +func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { + err := m.GenerateSubmitParams(ctx, option) if err != nil { return nil, err } - task, err := m.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) + task, err := m.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } return task, nil } -func (m *ModelArtsLink) GenerateSubmitParams(option *option.AiOption) error { - err := m.generateResourceId(option) +func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error { + err := m.generateResourceId(ctx, option) if err != nil { return err } @@ -197,8 +198,8 @@ func (m *ModelArtsLink) GenerateSubmitParams(option *option.AiOption) error { return nil } -func (m *ModelArtsLink) generateResourceId(option *option.AiOption) error { - _, err := m.QuerySpecs() +func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption) error { + _, err := m.QuerySpecs(ctx) if err != nil { return err } diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index b3bd546a..2422ada3 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -19,17 +19,16 @@ import ( "errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" "math" "strconv" "strings" ) type OctopusLink struct { - ctx context.Context - svcCtx *svc.ServiceContext + octopusRpc octopusclient.Octopus pageIndex int32 pageSize int32 platform string @@ -66,11 +65,11 @@ var ( } ) -func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink { - return &OctopusLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100} +func NewOctopusLink(octopusRpc octopusclient.Octopus, name string, id int64) *OctopusLink { + return &OctopusLink{octopusRpc: octopusRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100} } -func (o *OctopusLink) UploadImage(path string) (interface{}, error) { +func (o *OctopusLink) UploadImage(ctx context.Context, path string) (interface{}, error) { // octopus创建镜像 createReq := &octopus.CreateImageReq{ Platform: o.platform, @@ -80,7 +79,7 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) { ImageVersion: IMG_VERSION_PREFIX + utils.RandomString(7), }, } - createResp, err := o.svcCtx.OctopusRpc.CreateImage(o.ctx, createReq) + createResp, err := o.octopusRpc.CreateImage(ctx, createReq) if err != nil { return nil, err } @@ -94,7 +93,7 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) { FileName: "", }, } - uploadResp, err := o.svcCtx.OctopusRpc.UploadImage(o.ctx, uploadReq) + uploadResp, err := o.octopusRpc.UploadImage(ctx, uploadReq) if err != nil { return nil, err } @@ -104,13 +103,13 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) { return uploadResp, nil } -func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) { +func (o *OctopusLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) { // octopus删除镜像 req := &octopus.DeleteImageReq{ Platform: o.platform, ImageId: imageId, } - resp, err := o.svcCtx.OctopusRpc.DeleteImage(o.ctx, req) + resp, err := o.octopusRpc.DeleteImage(ctx, req) if err != nil { return nil, err } @@ -118,14 +117,14 @@ func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) { return resp, nil } -func (o *OctopusLink) QueryImageList() (interface{}, error) { +func (o *OctopusLink) QueryImageList(ctx context.Context) (interface{}, error) { // octopus获取镜像列表 req := &octopus.GetUserImageListReq{ Platform: o.platform, PageIndex: o.pageIndex, PageSize: o.pageSize, } - resp, err := o.svcCtx.OctopusRpc.GetUserImageList(o.ctx, req) + resp, err := o.octopusRpc.GetUserImageList(ctx, req) if err != nil { return nil, err } @@ -133,7 +132,7 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) { return resp, nil } -func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { +func (o *OctopusLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // octopus提交任务 // python参数 @@ -176,7 +175,7 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, para AlgorithmVersion: VERSION, }, } - resp, err := o.svcCtx.OctopusRpc.CreateTrainJob(o.ctx, req) + resp, err := o.octopusRpc.CreateTrainJob(ctx, req) if err != nil { return nil, err } @@ -184,13 +183,13 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, para return resp, nil } -func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) { +func (o *OctopusLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) { // octopus获取任务 req := &octopus.GetTrainJobReq{ Platform: o.platform, Id: taskId, } - resp, err := o.svcCtx.OctopusRpc.GetTrainJob(o.ctx, req) + resp, err := o.octopusRpc.GetTrainJob(ctx, req) if err != nil { return nil, err } @@ -198,13 +197,13 @@ func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) { return resp, nil } -func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) { +func (o *OctopusLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) { // octopus删除任务 req := &octopus.DeleteTrainJobReq{ Platform: o.platform, JobIds: []string{taskId}, } - resp, err := o.svcCtx.OctopusRpc.DeleteTrainJob(o.ctx, req) + resp, err := o.octopusRpc.DeleteTrainJob(ctx, req) if err != nil { return nil, err } @@ -212,13 +211,13 @@ func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) { return resp, nil } -func (o *OctopusLink) QuerySpecs() (interface{}, error) { +func (o *OctopusLink) QuerySpecs(ctx context.Context) (interface{}, error) { // octopus查询资源规格 req := &octopus.GetResourceSpecsReq{ Platform: o.platform, ResourcePool: RESOURCE_POOL, } - resp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req) + resp, err := o.octopusRpc.GetResourceSpecs(ctx, req) if err != nil { return nil, err } @@ -226,12 +225,12 @@ func (o *OctopusLink) QuerySpecs() (interface{}, error) { return resp, nil } -func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) { +func (o *OctopusLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { req := &octopus.GetResourceSpecsReq{ Platform: o.platform, ResourcePool: RESOURCE_POOL, } - specResp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req) + specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req) if err != nil { return nil, err } @@ -241,7 +240,7 @@ func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) { balanceReq := &octopus.GetUserBalanceReq{ Platform: o.platform, } - balanceResp, err := o.svcCtx.OctopusRpc.GetUserBalance(o.ctx, balanceReq) + balanceResp, err := o.octopusRpc.GetUserBalance(ctx, balanceReq) if err != nil { return nil, err } @@ -294,13 +293,13 @@ func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) { return resourceStats, nil } -func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { +func (o *OctopusLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) { req := &octopus.GetMyDatasetListReq{ Platform: o.platform, PageIndex: o.pageIndex, PageSize: o.pageSize, } - resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req) + resp, err := o.octopusRpc.GetMyDatasetList(ctx, req) if err != nil { return nil, err } @@ -315,7 +314,7 @@ func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { return specs, nil } -func (o *OctopusLink) GetAlgorithms() ([]*collector.Algorithm, error) { +func (o *OctopusLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) { var algorithms []*collector.Algorithm req := &octopus.GetMyAlgorithmListReq{ @@ -323,7 +322,7 @@ func (o *OctopusLink) GetAlgorithms() ([]*collector.Algorithm, error) { PageIndex: o.pageIndex, PageSize: o.pageSize, } - resp, err := o.svcCtx.OctopusRpc.GetMyAlgorithmList(o.ctx, req) + resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req) if err != nil { return nil, err } @@ -338,32 +337,32 @@ func (o *OctopusLink) GetAlgorithms() ([]*collector.Algorithm, error) { return algorithms, nil } -func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) { - err := o.GenerateSubmitParams(option) +func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { + err := o.GenerateSubmitParams(ctx, option) if err != nil { return nil, err } - task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) + task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } return task, nil } -func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error { - err := o.generateResourceId(option) +func (o *OctopusLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error { + err := o.generateResourceId(ctx, option) if err != nil { return err } - err = o.generateDatasetsId(option) + err = o.generateDatasetsId(ctx, option) if err != nil { return err } - err = o.generateImageId(option) + err = o.generateImageId(ctx, option) if err != nil { return err } - err = o.generateAlgorithmId(option) + err = o.generateAlgorithmId(ctx, option) if err != nil { return err } @@ -382,7 +381,7 @@ func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error { return nil } -func (o *OctopusLink) generateResourceId(option *option.AiOption) error { +func (o *OctopusLink) generateResourceId(ctx context.Context, option *option.AiOption) error { if option.ResourceType == "" { return errors.New("ResourceType not set") } @@ -390,7 +389,7 @@ func (o *OctopusLink) generateResourceId(option *option.AiOption) error { Platform: o.platform, ResourcePool: RESOURCE_POOL, } - specResp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req) + specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req) if err != nil { return err } @@ -418,7 +417,7 @@ func (o *OctopusLink) generateResourceId(option *option.AiOption) error { return errors.New("failed to get ResourceId") } -func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error { +func (o *OctopusLink) generateDatasetsId(ctx context.Context, option *option.AiOption) error { if option.DatasetsName == "" { return errors.New("DatasetsName not set") } @@ -427,7 +426,7 @@ func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error { PageIndex: o.pageIndex, PageSize: o.pageSize, } - resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req) + resp, err := o.octopusRpc.GetMyDatasetList(ctx, req) if err != nil { return err } @@ -443,7 +442,7 @@ func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error { return errors.New("failed to get DatasetsId") } -func (o *OctopusLink) generateImageId(option *option.AiOption) error { +func (o *OctopusLink) generateImageId(ctx context.Context, option *option.AiOption) error { if option.TaskType == "" { return errors.New("TaskType not set") } @@ -453,7 +452,7 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error { PageIndex: o.pageIndex, PageSize: o.pageSize, } - resp, err := o.svcCtx.OctopusRpc.GetUserImageList(o.ctx, req) + resp, err := o.octopusRpc.GetUserImageList(ctx, req) if err != nil { return err } @@ -475,7 +474,7 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error { PageIndex: o.pageIndex, PageSize: o.pageSize, } - preImgResp, err := o.svcCtx.OctopusRpc.GetPresetImageList(o.ctx, preImgReq) + preImgResp, err := o.octopusRpc.GetPresetImageList(ctx, preImgReq) if err != nil { return err } @@ -495,7 +494,7 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error { return errors.New("failed to get ImageId") } -func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error { +func (o *OctopusLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error { // temporarily set algorithm to cnn if option.AlgorithmName == "" { switch option.DatasetsName { @@ -511,7 +510,7 @@ func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error { PageIndex: o.pageIndex, PageSize: o.pageSize, } - resp, err := o.svcCtx.OctopusRpc.GetMyAlgorithmList(o.ctx, req) + resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req) if err != nil { return err } diff --git a/api/internal/storeLink/shuguangHpc.go b/api/internal/storeLink/shuguangHpc.go index b4a0768d..d5ffc8ec 100644 --- a/api/internal/storeLink/shuguangHpc.go +++ b/api/internal/storeLink/shuguangHpc.go @@ -4,17 +4,16 @@ import ( "context" "errors" "fmt" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" "strconv" "strings" ) type ShuguangHpc struct { - ctx context.Context - svcCtx *svc.ServiceContext + aCRpc hpcacclient.HpcAC platform string participantId int64 } @@ -128,23 +127,23 @@ type ResourceSpecHpc struct { GAP_NDCU string } -func NewShuguangHpc(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangHpc { - return &ShuguangHpc{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id} +func NewShuguangHpc(aCRpc hpcacclient.HpcAC, name string, id int64) *ShuguangHpc { + return &ShuguangHpc{aCRpc: aCRpc, platform: name, participantId: id} } -func (s ShuguangHpc) UploadImage(path string) (interface{}, error) { +func (s ShuguangHpc) UploadImage(ctx context.Context, path string) (interface{}, error) { return nil, nil } -func (s ShuguangHpc) DeleteImage(imageId string) (interface{}, error) { +func (s ShuguangHpc) DeleteImage(ctx context.Context, imageId string) (interface{}, error) { return nil, nil } -func (s ShuguangHpc) QueryImageList() (interface{}, error) { +func (s ShuguangHpc) QueryImageList(ctx context.Context) (interface{}, error) { return nil, nil } -func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { +func (s ShuguangHpc) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // shuguangHpc提交任务 //判断是否resourceId匹配自定义资源Id @@ -194,7 +193,7 @@ func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, param updateSGHpcRequestByResourceId(resourceId, req) - resp, err := s.svcCtx.ACRpc.SubmitJob(s.ctx, req) + resp, err := s.aCRpc.SubmitJob(ctx, req) if err != nil { return nil, err } @@ -203,12 +202,12 @@ func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, param } -func (s ShuguangHpc) QueryTask(taskId string) (interface{}, error) { +func (s ShuguangHpc) QueryTask(ctx context.Context, taskId string) (interface{}, error) { //实时作业 reqC := &hpcAC.JobDetailReq{ JobId: taskId, } - respC, err := s.svcCtx.ACRpc.GetJobDetail(s.ctx, reqC) + respC, err := s.aCRpc.GetJobDetail(ctx, reqC) if err != nil { return nil, err } @@ -223,7 +222,7 @@ func (s ShuguangHpc) QueryTask(taskId string) (interface{}, error) { JobmanagerId: strconv.Itoa(StrJobManagerID), } - respH, err := s.svcCtx.ACRpc.HistoryJobDetail(s.ctx, reqH) + respH, err := s.aCRpc.HistoryJobDetail(ctx, reqH) if err != nil { return nil, err } @@ -232,7 +231,7 @@ func (s ShuguangHpc) QueryTask(taskId string) (interface{}, error) { } } -func (s ShuguangHpc) QuerySpecs() (interface{}, error) { +func (s ShuguangHpc) QuerySpecs(ctx context.Context) (interface{}, error) { resp := &types.GetResourceSpecsResp{} for k, v := range RESOURCESPECSHPC { @@ -248,12 +247,12 @@ func (s ShuguangHpc) QuerySpecs() (interface{}, error) { return resp, nil } -func (s ShuguangHpc) DeleteTask(taskId string) (interface{}, error) { +func (s ShuguangHpc) DeleteTask(ctx context.Context, taskId string) (interface{}, error) { strJobInfoMap := fmt.Sprintf(StrJobInfoMap, StrJobManagerID, Username, taskId) req := &hpcAC.DeleteJobReq{ StrJobInfoMap: strJobInfoMap, } - resp, err := s.svcCtx.ACRpc.DeleteJob(s.ctx, req) + resp, err := s.aCRpc.DeleteJob(ctx, req) if err != nil { return nil, err } diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 654e12b4..e7da3222 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -20,10 +20,10 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" "strconv" "strings" ) @@ -91,31 +91,30 @@ type ResourceSpecSGAI struct { } type ShuguangAi struct { - ctx context.Context - svcCtx *svc.ServiceContext + aCRpc hpcacclient.HpcAC platform string participantId int64 } -func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangAi { - return &ShuguangAi{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id} +func NewShuguangAi(aCRpc hpcacclient.HpcAC, name string, id int64) *ShuguangAi { + return &ShuguangAi{aCRpc: aCRpc, platform: name, participantId: id} } -func (s *ShuguangAi) UploadImage(path string) (interface{}, error) { +func (s *ShuguangAi) UploadImage(ctx context.Context, path string) (interface{}, error) { return nil, nil } -func (s *ShuguangAi) DeleteImage(imageId string) (interface{}, error) { +func (s *ShuguangAi) DeleteImage(ctx context.Context, imageId string) (interface{}, error) { return nil, nil } -func (s *ShuguangAi) QueryImageList() (interface{}, error) { +func (s *ShuguangAi) QueryImageList(ctx context.Context) (interface{}, error) { // shuguangAi获取镜像列表 req := &hpcAC.GetImageListAiReq{ AcceleratorType: DCU, TaskType: PYTORCH, } - resp, err := s.svcCtx.ACRpc.GetImageListAi(s.ctx, req) + resp, err := s.aCRpc.GetImageListAi(ctx, req) if err != nil { return nil, err } @@ -123,7 +122,7 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) { return resp, nil } -func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) { +func (s *ShuguangAi) SubmitPytorchTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) { //判断是否resourceId匹配自定义资源Id _, isMapContainsKey := RESOURCESPECSAI[resourceId] if !isMapContainsKey { @@ -132,7 +131,7 @@ func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string //根据imageId获取imagePath, version imageReq := &hpcAC.GetImageAiByIdReq{ImageId: imageId} - imageResp, err := s.svcCtx.ACRpc.GetImageAiById(s.ctx, imageReq) + imageResp, err := s.aCRpc.GetImageAiById(ctx, imageReq) if err != nil { return nil, err } @@ -176,7 +175,7 @@ func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string updateSGAIRequestByResourceId(resourceId, req) - resp, err := s.svcCtx.ACRpc.SubmitPytorchTask(s.ctx, req) + resp, err := s.aCRpc.SubmitPytorchTask(ctx, req) if err != nil { return nil, err } @@ -191,7 +190,7 @@ func updateSGAIRequestByResourceId(resourceId string, req *hpcAC.SubmitPytorchTa req.Params.WorkerRamSize = spec.RAM } -func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) { +func (s *ShuguangAi) SubmitTensorflowTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) { //req := &hpcAC.SubmitTensorflowTaskReq{ // Params: &hpcAC.SubmitTensorflowTaskParams{ // @@ -200,7 +199,7 @@ func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []str return nil, nil } -func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { +func (s *ShuguangAi) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // set algorithmId temporarily for storelink submit if algorithmId == "" { algorithmId = "pytorch-mnist-fcn" @@ -209,13 +208,13 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, param // shuguangAi提交任务 switch aiType { case PYTORCH_TASK: - task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId, datasetsId, algorithmId) + task, err := s.SubmitPytorchTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId) if err != nil { return nil, err } return task, nil case TENSORFLOW_TASK: - task, err := s.SubmitTensorflowTask(imageId, cmd, envs, params, resourceId, datasetsId, algorithmId) + task, err := s.SubmitTensorflowTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId) if err != nil { return nil, err } @@ -224,12 +223,12 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, param return nil, errors.New("shuguangAi不支持的任务类型") } -func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) { +func (s *ShuguangAi) QueryTask(ctx context.Context, taskId string) (interface{}, error) { // shuguangAi获取任务 req := &hpcAC.GetPytorchTaskReq{ Id: taskId, } - resp, err := s.svcCtx.ACRpc.GetPytorchTask(s.ctx, req) + resp, err := s.aCRpc.GetPytorchTask(ctx, req) if err != nil { return nil, err } @@ -237,12 +236,12 @@ func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) { return resp, nil } -func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) { +func (s *ShuguangAi) DeleteTask(ctx context.Context, taskId string) (interface{}, error) { // shuguangAi删除任务 req := &hpcAC.DeleteTaskAiReq{ Ids: taskId, } - resp, err := s.svcCtx.ACRpc.DeleteTaskAi(s.ctx, req) + resp, err := s.aCRpc.DeleteTaskAi(ctx, req) if err != nil { return nil, err } @@ -250,7 +249,7 @@ func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) { return resp, nil } -func (s *ShuguangAi) QuerySpecs() (interface{}, error) { +func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) { resp := &types.GetResourceSpecsResp{} for k, v := range RESOURCESPECSAI { @@ -266,10 +265,10 @@ func (s *ShuguangAi) QuerySpecs() (interface{}, error) { return resp, nil } -func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { +func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { //balance userReq := &hpcAC.GetUserInfoReq{} - userinfo, err := s.svcCtx.ACRpc.GetUserInfo(s.ctx, userReq) + userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq) if err != nil { return nil, err } @@ -277,7 +276,7 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { //resource limit limitReq := &hpcAC.QueueReq{} - limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq) + limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq) if err != nil { return nil, err } @@ -286,7 +285,7 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { //disk diskReq := &hpcAC.ParaStorQuotaReq{} - diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq) + diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) if err != nil { return nil, err } @@ -295,14 +294,14 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) //memory - nodeResp, err := s.svcCtx.ACRpc.GetNodeResources(s.ctx, nil) + nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil) if err != nil { return nil, err } memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES //resources being occupied - memberJobResp, err := s.svcCtx.ACRpc.GetMemberJobs(s.ctx, nil) + memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil) if err != nil { return nil, err } @@ -361,9 +360,9 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) { return resourceStats, nil } -func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { +func (s *ShuguangAi) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) { req := &hpcAC.GetFileListReq{Limit: 100, Path: DATASETS_DIR, Start: 0} - list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req) + list, err := s.aCRpc.GetFileList(ctx, req) if err != nil { return nil, err } @@ -378,12 +377,12 @@ func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) { return specs, nil } -func (s *ShuguangAi) GetAlgorithms() ([]*collector.Algorithm, error) { +func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) { var algorithms []*collector.Algorithm for _, t := range GetTaskTypes() { taskType := t req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0} - list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req) + list, err := s.aCRpc.GetFileList(ctx, req) if err != nil { return nil, err } @@ -398,28 +397,28 @@ func (s *ShuguangAi) GetAlgorithms() ([]*collector.Algorithm, error) { return algorithms, nil } -func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) { - err := s.GenerateSubmitParams(option) +func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { + err := s.GenerateSubmitParams(ctx, option) if err != nil { return nil, err } - task, err := s.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) + task, err := s.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } return task, nil } -func (s *ShuguangAi) GenerateSubmitParams(option *option.AiOption) error { +func (s *ShuguangAi) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error { err := s.generateResourceId(option) if err != nil { return err } - err = s.generateImageId(option) + err = s.generateImageId(ctx, option) if err != nil { return err } - err = s.generateAlgorithmId(option) + err = s.generateAlgorithmId(ctx, option) if err != nil { return err } @@ -473,7 +472,7 @@ func (s *ShuguangAi) generateResourceId(option *option.AiOption) error { return errors.New("failed to get ResourceId") } -func (s *ShuguangAi) generateImageId(option *option.AiOption) error { +func (s *ShuguangAi) generateImageId(ctx context.Context, option *option.AiOption) error { if option.TaskType == "" { return errors.New("TaskType not set") } @@ -482,7 +481,7 @@ func (s *ShuguangAi) generateImageId(option *option.AiOption) error { AcceleratorType: DCU, TaskType: taskType, } - resp, err := s.svcCtx.ACRpc.GetImageListAi(s.ctx, req) + resp, err := s.aCRpc.GetImageListAi(ctx, req) if err != nil { return err } @@ -502,13 +501,13 @@ func (s *ShuguangAi) generateImageId(option *option.AiOption) error { return errors.New("failed to get ImageId") } -func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error { +func (s *ShuguangAi) generateAlgorithmId(ctx context.Context, option *option.AiOption) error { if option.DatasetsName == "" { return errors.New("DatasetsName not set") } req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0} - list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req) + list, err := s.aCRpc.GetFileList(ctx, req) if err != nil { return err } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index a0bb1919..ce00a540 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -19,26 +19,29 @@ import ( "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" "gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC" + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice" "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts" "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" "gorm.io/gorm" "strings" "sync" ) type Linkage interface { - UploadImage(path string) (interface{}, error) - DeleteImage(imageId string) (interface{}, error) - QueryImageList() (interface{}, error) - SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) - QueryTask(taskId string) (interface{}, error) - QuerySpecs() (interface{}, error) - DeleteTask(taskId string) (interface{}, error) + UploadImage(ctx context.Context, path string) (interface{}, error) + DeleteImage(ctx context.Context, imageId string) (interface{}, error) + QueryImageList(ctx context.Context) (interface{}, error) + SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) + QueryTask(ctx context.Context, taskId string) (interface{}, error) + QuerySpecs(ctx context.Context) (interface{}, error) + DeleteTask(ctx context.Context, taskId string) (interface{}, error) } const ( @@ -89,19 +92,19 @@ type StoreLink struct { ILinkage Linkage } -func NewStoreLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *StoreLink { +func NewStoreLink(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, aCRpc hpcacclient.HpcAC, participant *models.StorelinkCenter) *StoreLink { switch participant.Type { case TYPE_OCTOPUS: - linkStruct := NewOctopusLink(ctx, svcCtx, participant.Name, participant.Id) + linkStruct := NewOctopusLink(octopusRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} case TYPE_MODELARTS: - linkStruct := NewModelArtsLink(ctx, svcCtx, participant.Name, participant.Id) + linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} case TYPE_SHUGUANGAI: - linkStruct := NewShuguangAi(ctx, svcCtx, participant.Name, participant.Id) + linkStruct := NewShuguangAi(aCRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} case TYPE_SHUGUANGHPC: - linkStruct := NewShuguangHpc(ctx, svcCtx, participant.Name, participant.Id) + linkStruct := NewShuguangHpc(aCRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} default: return nil @@ -124,7 +127,7 @@ func GetResourceTypes() []string { return resourceTypes } -func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string, error) { +func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.AiCollector) ([]string, error) { var wg sync.WaitGroup var errCh = make(chan error, len(*collectorMap)) var errs []error @@ -136,7 +139,7 @@ func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string, c := col go func() { var ns []string - specs, err := c.GetDatasetsSpecs() + specs, err := c.GetDatasetsSpecs(ctx) if err != nil { errCh <- err wg.Done() @@ -176,7 +179,7 @@ func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string, return names, nil } -func GetAlgorithms(collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) { +func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) { var names []string var wg sync.WaitGroup var errCh = make(chan error, len(*collectorMap)) @@ -189,7 +192,7 @@ func GetAlgorithms(collectorMap *map[string]collector.AiCollector, resourceType c := col go func() { var ns []string - algorithms, err := c.GetAlgorithms() + algorithms, err := c.GetAlgorithms(ctx) if err != nil { errCh <- err wg.Done() diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index 5caebbf0..4bec99a7 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -22,6 +22,9 @@ import ( "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice" @@ -58,6 +61,7 @@ type ServiceContext struct { PromClient tracker.Prometheus AlertClient *alert.AlertmanagerAPI HttpClient *resty.Client + Scheduler *scheduler.Scheduler } func NewServiceContext(c config.Config) *ServiceContext { @@ -110,16 +114,26 @@ func NewServiceContext(c config.Config) *ServiceContext { Addr: c.Redis.Host, Password: c.Redis.Pass, }) + + // scheduler + octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)) + aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)) + modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)) + modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)) + aiExecutor, resourceCollector := service.InitAiClusterMap(octopusRpc, modelArtsRpc, modelArtsImgRpc, aCRpc) + storage := &database.AiStorage{DbEngin: dbEngin} + scheduler := scheduler.NewSchdlr(resourceCollector, storage, aiExecutor) + return &ServiceContext{ Cron: cron.New(cron.WithSeconds()), DbEngin: dbEngin, Config: c, RedisClient: redisClient, - ModelArtsRpc: modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)), - ModelArtsImgRpc: imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)), + ModelArtsRpc: modelArtsRpc, + ModelArtsImgRpc: modelArtsImgRpc, CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)), - ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), - OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), + ACRpc: aCRpc, + OctopusRpc: octopusRpc, OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)), MonitorClient: make(map[int64]tracker.Prometheus), @@ -127,5 +141,6 @@ func NewServiceContext(c config.Config) *ServiceContext { PromClient: promClient, AlertClient: alertClient, HttpClient: httpClient, + Scheduler: scheduler, } }