Merge pull request 'fix scheduler cycle import error' (#88) from tzwang/pcm-coordinator:master into master
Former-commit-id: da86729a030c64630460e6203469f8e0f6c41e48
This commit is contained in:
commit
1de507e136
|
@ -2,6 +2,7 @@ package schedule
|
|||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
||||
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
|
@ -24,7 +25,11 @@ func NewScheduleGetAlgorithmsLogic(ctx context.Context, svcCtx *svc.ServiceConte
|
|||
}
|
||||
|
||||
func (l *ScheduleGetAlgorithmsLogic) ScheduleGetAlgorithms(req *types.AiAlgorithmsReq) (resp *types.AiAlgorithmsResp, err error) {
|
||||
// todo: add your logic here and delete this line
|
||||
|
||||
return
|
||||
resp = &types.AiAlgorithmsResp{}
|
||||
algorithms, err := storeLink.GetAlgorithms(l.ctx, l.svcCtx.Scheduler.ResourceCollector, req.ResourceType, req.TaskType, req.Dataset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp.Algorithms = algorithms
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -2,9 +2,7 @@ package schedule
|
|||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
||||
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
|
||||
|
@ -27,8 +25,7 @@ func NewScheduleGetDatasetsLogic(ctx context.Context, svcCtx *svc.ServiceContext
|
|||
|
||||
func (l *ScheduleGetDatasetsLogic) ScheduleGetDatasets() (resp *types.AiDatasetsResp, err error) {
|
||||
resp = &types.AiDatasetsResp{}
|
||||
_, colMap := service.InitAiClusterMap(l.ctx, l.svcCtx)
|
||||
names, err := storeLink.GetDatasetsNames(colMap)
|
||||
names, err := storeLink.GetDatasetsNames(l.ctx, l.svcCtx.Scheduler.ResourceCollector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -2,6 +2,8 @@ package schedule
|
|||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
|
@ -24,7 +26,28 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc
|
|||
}
|
||||
|
||||
func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) {
|
||||
// todo: add your logic here and delete this line
|
||||
resp = &types.ScheduleResp{}
|
||||
opt := &option.AiOption{
|
||||
ResourceType: req.AiOption.ResourceType,
|
||||
Tops: 0,
|
||||
TaskType: req.AiOption.TaskType,
|
||||
DatasetsName: req.AiOption.Datasets,
|
||||
AlgorithmName: "cnn",
|
||||
StrategyName: req.AiOption.Strategy,
|
||||
ClusterToStaticWeight: nil,
|
||||
Params: []string{
|
||||
"epoch,1",
|
||||
},
|
||||
}
|
||||
aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return
|
||||
err = l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -47,12 +47,12 @@ func (l *DeleteLinkImageLogic) DeleteLinkImage(req *types.DeleteLinkImageReq) (r
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
|
||||
if storelink == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
img, err := storelink.ILinkage.DeleteImage(req.ImageId)
|
||||
img, err := storelink.ILinkage.DeleteImage(l.ctx, req.ImageId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -47,12 +47,12 @@ func (l *DeleteLinkTaskLogic) DeleteLinkTask(req *types.DeleteLinkTaskReq) (resp
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
|
||||
if storelink == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
task, err := storelink.ILinkage.DeleteTask(req.TaskId)
|
||||
task, err := storelink.ILinkage.DeleteTask(l.ctx, req.TaskId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -47,12 +47,12 @@ func (l *GetAISpecsLogic) GetAISpecs(req *types.GetResourceSpecsReq) (resp *type
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
|
||||
if storelink == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
specs, err := storelink.ILinkage.QuerySpecs()
|
||||
specs, err := storelink.ILinkage.QuerySpecs(l.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -47,12 +47,12 @@ func (l *GetLinkImageListLogic) GetLinkImageList(req *types.GetLinkImageListReq)
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
|
||||
if storelink == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
list, err := storelink.ILinkage.QueryImageList()
|
||||
list, err := storelink.ILinkage.QueryImageList(l.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -48,12 +48,12 @@ func (l *GetLinkTaskLogic) GetLinkTask(req *types.GetLinkTaskReq) (resp *types.G
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
|
||||
if storelink == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
task, err := storelink.ILinkage.QueryTask(req.TaskId)
|
||||
task, err := storelink.ILinkage.QueryTask(l.ctx, req.TaskId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
|
||||
if storelink == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
|
|||
envs = append(envs, env)
|
||||
}
|
||||
}
|
||||
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "", "", "pytorch")
|
||||
task, err := storelink.ILinkage.SubmitTask(l.ctx, req.ImageId, req.Cmd, envs, params, req.ResourceId, "", "", "pytorch")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -48,12 +48,12 @@ func (l *UploadLinkImageLogic) UploadLinkImage(req *types.UploadLinkImageReq) (r
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
storelink := storeLink.NewStoreLink(l.ctx, l.svcCtx, participant)
|
||||
storelink := storeLink.NewStoreLink(l.svcCtx.OctopusRpc, l.svcCtx.ModelArtsRpc, l.svcCtx.ModelArtsImgRpc, l.svcCtx.ACRpc, participant)
|
||||
if storelink == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
img, err := storelink.ILinkage.UploadImage(req.FilePath)
|
||||
img, err := storelink.ILinkage.UploadImage(l.ctx, req.FilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -16,9 +16,7 @@ package mqs
|
|||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
)
|
||||
|
||||
|
@ -27,25 +25,22 @@ import (
|
|||
Listening to the payment flow status change notification message queue
|
||||
*/
|
||||
type AiQueue struct {
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
scheduler *scheduler.Scheduler
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
|
||||
aiExecutorMap, aiCollectorMap := service.InitAiClusterMap(ctx, svcCtx)
|
||||
return &AiQueue{
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
scheduler: scheduler.NewSchdlr(aiCollectorMap, nil, aiExecutorMap),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *AiQueue) Consume(val string) error {
|
||||
aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler, nil)
|
||||
aiSchdl, _ := schedulers.NewAiScheduler(l.ctx, val, l.svcCtx.Scheduler, nil)
|
||||
|
||||
// 调度算法
|
||||
err := l.scheduler.AssignAndSchedule(aiSchdl)
|
||||
err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -2,18 +2,44 @@ package database
|
|||
|
||||
import (
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
"gorm.io/gorm"
|
||||
"time"
|
||||
)
|
||||
|
||||
type AiStorage struct {
|
||||
DbEngin *gorm.DB
|
||||
}
|
||||
|
||||
func (s *AiStorage) GetParticipants() {
|
||||
func (s *AiStorage) GetParticipants() (*types.ClusterListResp, error) {
|
||||
var resp types.ClusterListResp
|
||||
tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.List)
|
||||
if tx.Error != nil {
|
||||
logx.Errorf(tx.Error.Error())
|
||||
return nil, tx.Error
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) SaveTask(cluster strategy.AssignedCluster) error {
|
||||
// 构建主任务结构体
|
||||
taskModel := models.Task{
|
||||
Status: constants.Saved,
|
||||
Description: "ai task",
|
||||
Name: "testAi",
|
||||
CommitTime: time.Now(),
|
||||
}
|
||||
// 保存任务数据到数据库
|
||||
tx := s.DbEngin.Create(&taskModel)
|
||||
if tx.Error != nil {
|
||||
return tx.Error
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) UpdateTask() error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -39,7 +39,7 @@ type Scheduler struct {
|
|||
result []string //pID:子任务yamlstring 键值对
|
||||
participantRpc participantservice.ParticipantService
|
||||
ResourceCollector *map[string]collector.AiCollector
|
||||
Storages database.Storage
|
||||
AiStorages *database.AiStorage
|
||||
AiExecutor *map[string]executor.AiExecutor
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
@ -59,8 +59,8 @@ func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, partici
|
|||
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
|
||||
}
|
||||
|
||||
func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
|
||||
return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor}
|
||||
func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages *database.AiStorage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
|
||||
return &Scheduler{ResourceCollector: resourceCollector, AiStorages: storages, AiExecutor: aiExecutor}
|
||||
}
|
||||
|
||||
func (s *Scheduler) SpecifyClusters() {
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package schedulers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||
|
@ -32,10 +33,11 @@ type AiScheduler struct {
|
|||
task *response.TaskInfo
|
||||
*scheduler.Scheduler
|
||||
option *option.AiOption
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewAiScheduler(val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
|
||||
return &AiScheduler{yamlString: val, Scheduler: scheduler, option: option}, nil
|
||||
func NewAiScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
|
||||
return &AiScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option}, nil
|
||||
}
|
||||
|
||||
func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
|
||||
|
@ -56,6 +58,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
|||
}
|
||||
|
||||
resources, err := as.findClustersWithResources()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -104,7 +107,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
|
|||
continue
|
||||
}
|
||||
go func() {
|
||||
_, err := executorMap[c.Name].Execute(as.option)
|
||||
_, err := executorMap[c.Name].Execute(as.ctx, as.option)
|
||||
if err != nil {
|
||||
// TODO: database operation
|
||||
}
|
||||
|
@ -127,7 +130,7 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats,
|
|||
wg.Add(1)
|
||||
rc := resourceCollector
|
||||
go func() {
|
||||
spec, err := rc.GetResourceStats()
|
||||
spec, err := rc.GetResourceStats(as.ctx)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
wg.Done()
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -24,21 +26,21 @@ var (
|
|||
}
|
||||
)
|
||||
|
||||
func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.AiCollector) {
|
||||
func InitAiClusterMap(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, aCRpc hpcacclient.HpcAC) (*map[string]executor.AiExecutor, *map[string]collector.AiCollector) {
|
||||
executorMap := make(map[string]executor.AiExecutor)
|
||||
collectorMap := make(map[string]collector.AiCollector)
|
||||
for k, v := range AiTypeMap {
|
||||
switch v {
|
||||
case OCTOPUS:
|
||||
octopus := storeLink.NewOctopusLink(ctx, svcCtx, k, 0)
|
||||
octopus := storeLink.NewOctopusLink(octopusRpc, k, 0)
|
||||
collectorMap[k] = octopus
|
||||
executorMap[k] = octopus
|
||||
case MODELARTS:
|
||||
modelarts := storeLink.NewModelArtsLink(ctx, svcCtx, k, 0)
|
||||
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, k, 0)
|
||||
collectorMap[k] = modelarts
|
||||
executorMap[k] = modelarts
|
||||
case SHUGUANGAI:
|
||||
sgai := storeLink.NewShuguangAi(ctx, svcCtx, k, 0)
|
||||
sgai := storeLink.NewShuguangAi(aCRpc, k, 0)
|
||||
collectorMap[k] = sgai
|
||||
executorMap[k] = sgai
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package collector
|
||||
|
||||
import "context"
|
||||
|
||||
type AiCollector interface {
|
||||
GetResourceStats() (*ResourceStats, error)
|
||||
GetDatasetsSpecs() ([]*DatasetsSpecs, error)
|
||||
GetAlgorithms() ([]*Algorithm, error)
|
||||
GetResourceStats(ctx context.Context) (*ResourceStats, error)
|
||||
GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error)
|
||||
GetAlgorithms(ctx context.Context) ([]*Algorithm, error)
|
||||
}
|
||||
|
||||
type ResourceStats struct {
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
package executor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||
)
|
||||
|
||||
type AiExecutor interface {
|
||||
Execute(option *option.AiOption) (interface{}, error)
|
||||
Execute(ctx context.Context, option *option.AiOption) (interface{}, error)
|
||||
}
|
||||
|
|
|
@ -18,44 +18,45 @@ import (
|
|||
"context"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type ModelArtsLink struct {
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
platform string
|
||||
participantId int64
|
||||
pageIndex int32
|
||||
pageSize int32
|
||||
modelArtsRpc modelartsservice.ModelArtsService
|
||||
modelArtsImgRpc imagesservice.ImagesService
|
||||
platform string
|
||||
participantId int64
|
||||
pageIndex int32
|
||||
pageSize int32
|
||||
}
|
||||
|
||||
func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ModelArtsLink {
|
||||
return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
|
||||
func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64) *ModelArtsLink {
|
||||
return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) UploadImage(path string) (interface{}, error) {
|
||||
func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) {
|
||||
//TODO modelArts上传镜像
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) DeleteImage(imageId string) (interface{}, error) {
|
||||
func (m *ModelArtsLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
|
||||
// TODO modelArts删除镜像
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) QueryImageList() (interface{}, error) {
|
||||
func (m *ModelArtsLink) QueryImageList(ctx context.Context) (interface{}, error) {
|
||||
// modelArts获取镜像列表
|
||||
req := &modelarts.ListRepoReq{
|
||||
Offset: "0",
|
||||
Limit: strconv.Itoa(int(m.pageSize)),
|
||||
Platform: m.platform,
|
||||
}
|
||||
resp, err := m.svcCtx.ModelArtsImgRpc.ListReposDetails(m.ctx, req)
|
||||
resp, err := m.modelArtsImgRpc.ListReposDetails(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -63,7 +64,7 @@ func (m *ModelArtsLink) QueryImageList() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
// modelArts提交任务
|
||||
environments := make(map[string]string)
|
||||
parameters := make([]*modelarts.ParametersTrainJob, 0)
|
||||
|
@ -100,7 +101,7 @@ func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa
|
|||
},
|
||||
Platform: m.platform,
|
||||
}
|
||||
resp, err := m.svcCtx.ModelArtsRpc.CreateTrainingJob(m.ctx, req)
|
||||
resp, err := m.modelArtsRpc.CreateTrainingJob(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -108,13 +109,13 @@ func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
|
||||
func (m *ModelArtsLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
|
||||
// 获取任务
|
||||
req := &modelarts.DetailTrainingJobsReq{
|
||||
TrainingJobId: taskId,
|
||||
Platform: m.platform,
|
||||
}
|
||||
resp, err := m.svcCtx.ModelArtsRpc.GetTrainingJobs(m.ctx, req)
|
||||
resp, err := m.modelArtsRpc.GetTrainingJobs(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -122,13 +123,13 @@ func (m *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
|
||||
func (m *ModelArtsLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
|
||||
// 删除任务
|
||||
req := &modelarts.DeleteTrainingJobReq{
|
||||
TrainingJobId: taskId,
|
||||
Platform: m.platform,
|
||||
}
|
||||
resp, err := m.svcCtx.ModelArtsRpc.DeleteTrainingJob(m.ctx, req)
|
||||
resp, err := m.modelArtsRpc.DeleteTrainingJob(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -136,12 +137,12 @@ func (m *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) QuerySpecs() (interface{}, error) {
|
||||
func (m *ModelArtsLink) QuerySpecs(ctx context.Context) (interface{}, error) {
|
||||
// octopus查询资源规格
|
||||
req := &modelarts.TrainingJobFlavorsReq{
|
||||
Platform: m.platform,
|
||||
}
|
||||
resp, err := m.svcCtx.ModelArtsRpc.GetTrainingJobFlavors(m.ctx, req)
|
||||
resp, err := m.modelArtsRpc.GetTrainingJobFlavors(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -149,32 +150,32 @@ func (m *ModelArtsLink) QuerySpecs() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) GetResourceStats() (*collector.ResourceStats, error) {
|
||||
func (m *ModelArtsLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
||||
func (m *ModelArtsLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) GetAlgorithms() ([]*collector.Algorithm, error) {
|
||||
func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
|
||||
err := m.GenerateSubmitParams(option)
|
||||
func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
|
||||
err := m.GenerateSubmitParams(ctx, option)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task, err := m.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
||||
task, err := m.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return task, nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) GenerateSubmitParams(option *option.AiOption) error {
|
||||
err := m.generateResourceId(option)
|
||||
func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
|
||||
err := m.generateResourceId(ctx, option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -197,8 +198,8 @@ func (m *ModelArtsLink) GenerateSubmitParams(option *option.AiOption) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *ModelArtsLink) generateResourceId(option *option.AiOption) error {
|
||||
_, err := m.QuerySpecs()
|
||||
func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption) error {
|
||||
_, err := m.QuerySpecs(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -19,17 +19,16 @@ import (
|
|||
"errors"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type OctopusLink struct {
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
octopusRpc octopusclient.Octopus
|
||||
pageIndex int32
|
||||
pageSize int32
|
||||
platform string
|
||||
|
@ -66,11 +65,11 @@ var (
|
|||
}
|
||||
)
|
||||
|
||||
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink {
|
||||
return &OctopusLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
|
||||
func NewOctopusLink(octopusRpc octopusclient.Octopus, name string, id int64) *OctopusLink {
|
||||
return &OctopusLink{octopusRpc: octopusRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
|
||||
}
|
||||
|
||||
func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
|
||||
func (o *OctopusLink) UploadImage(ctx context.Context, path string) (interface{}, error) {
|
||||
// octopus创建镜像
|
||||
createReq := &octopus.CreateImageReq{
|
||||
Platform: o.platform,
|
||||
|
@ -80,7 +79,7 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
|
|||
ImageVersion: IMG_VERSION_PREFIX + utils.RandomString(7),
|
||||
},
|
||||
}
|
||||
createResp, err := o.svcCtx.OctopusRpc.CreateImage(o.ctx, createReq)
|
||||
createResp, err := o.octopusRpc.CreateImage(ctx, createReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -94,7 +93,7 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
|
|||
FileName: "",
|
||||
},
|
||||
}
|
||||
uploadResp, err := o.svcCtx.OctopusRpc.UploadImage(o.ctx, uploadReq)
|
||||
uploadResp, err := o.octopusRpc.UploadImage(ctx, uploadReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -104,13 +103,13 @@ func (o *OctopusLink) UploadImage(path string) (interface{}, error) {
|
|||
return uploadResp, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
|
||||
func (o *OctopusLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
|
||||
// octopus删除镜像
|
||||
req := &octopus.DeleteImageReq{
|
||||
Platform: o.platform,
|
||||
ImageId: imageId,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.DeleteImage(o.ctx, req)
|
||||
resp, err := o.octopusRpc.DeleteImage(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -118,14 +117,14 @@ func (o *OctopusLink) DeleteImage(imageId string) (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) QueryImageList() (interface{}, error) {
|
||||
func (o *OctopusLink) QueryImageList(ctx context.Context) (interface{}, error) {
|
||||
// octopus获取镜像列表
|
||||
req := &octopus.GetUserImageListReq{
|
||||
Platform: o.platform,
|
||||
PageIndex: o.pageIndex,
|
||||
PageSize: o.pageSize,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.GetUserImageList(o.ctx, req)
|
||||
resp, err := o.octopusRpc.GetUserImageList(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -133,7 +132,7 @@ func (o *OctopusLink) QueryImageList() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
func (o *OctopusLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
// octopus提交任务
|
||||
|
||||
// python参数
|
||||
|
@ -176,7 +175,7 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, para
|
|||
AlgorithmVersion: VERSION,
|
||||
},
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.CreateTrainJob(o.ctx, req)
|
||||
resp, err := o.octopusRpc.CreateTrainJob(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -184,13 +183,13 @@ func (o *OctopusLink) SubmitTask(imageId string, cmd string, envs []string, para
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
|
||||
func (o *OctopusLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
|
||||
// octopus获取任务
|
||||
req := &octopus.GetTrainJobReq{
|
||||
Platform: o.platform,
|
||||
Id: taskId,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.GetTrainJob(o.ctx, req)
|
||||
resp, err := o.octopusRpc.GetTrainJob(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -198,13 +197,13 @@ func (o *OctopusLink) QueryTask(taskId string) (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) {
|
||||
func (o *OctopusLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
|
||||
// octopus删除任务
|
||||
req := &octopus.DeleteTrainJobReq{
|
||||
Platform: o.platform,
|
||||
JobIds: []string{taskId},
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.DeleteTrainJob(o.ctx, req)
|
||||
resp, err := o.octopusRpc.DeleteTrainJob(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -212,13 +211,13 @@ func (o *OctopusLink) DeleteTask(taskId string) (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) QuerySpecs() (interface{}, error) {
|
||||
func (o *OctopusLink) QuerySpecs(ctx context.Context) (interface{}, error) {
|
||||
// octopus查询资源规格
|
||||
req := &octopus.GetResourceSpecsReq{
|
||||
Platform: o.platform,
|
||||
ResourcePool: RESOURCE_POOL,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req)
|
||||
resp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -226,12 +225,12 @@ func (o *OctopusLink) QuerySpecs() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) {
|
||||
func (o *OctopusLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
|
||||
req := &octopus.GetResourceSpecsReq{
|
||||
Platform: o.platform,
|
||||
ResourcePool: RESOURCE_POOL,
|
||||
}
|
||||
specResp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req)
|
||||
specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -241,7 +240,7 @@ func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) {
|
|||
balanceReq := &octopus.GetUserBalanceReq{
|
||||
Platform: o.platform,
|
||||
}
|
||||
balanceResp, err := o.svcCtx.OctopusRpc.GetUserBalance(o.ctx, balanceReq)
|
||||
balanceResp, err := o.octopusRpc.GetUserBalance(ctx, balanceReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -294,13 +293,13 @@ func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) {
|
|||
return resourceStats, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
||||
func (o *OctopusLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
|
||||
req := &octopus.GetMyDatasetListReq{
|
||||
Platform: o.platform,
|
||||
PageIndex: o.pageIndex,
|
||||
PageSize: o.pageSize,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req)
|
||||
resp, err := o.octopusRpc.GetMyDatasetList(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -315,7 +314,7 @@ func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
|||
return specs, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) GetAlgorithms() ([]*collector.Algorithm, error) {
|
||||
func (o *OctopusLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
|
||||
var algorithms []*collector.Algorithm
|
||||
|
||||
req := &octopus.GetMyAlgorithmListReq{
|
||||
|
@ -323,7 +322,7 @@ func (o *OctopusLink) GetAlgorithms() ([]*collector.Algorithm, error) {
|
|||
PageIndex: o.pageIndex,
|
||||
PageSize: o.pageSize,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.GetMyAlgorithmList(o.ctx, req)
|
||||
resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -338,32 +337,32 @@ func (o *OctopusLink) GetAlgorithms() ([]*collector.Algorithm, error) {
|
|||
return algorithms, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
|
||||
err := o.GenerateSubmitParams(option)
|
||||
func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
|
||||
err := o.GenerateSubmitParams(ctx, option)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
||||
task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return task, nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error {
|
||||
err := o.generateResourceId(option)
|
||||
func (o *OctopusLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
|
||||
err := o.generateResourceId(ctx, option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = o.generateDatasetsId(option)
|
||||
err = o.generateDatasetsId(ctx, option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = o.generateImageId(option)
|
||||
err = o.generateImageId(ctx, option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = o.generateAlgorithmId(option)
|
||||
err = o.generateAlgorithmId(ctx, option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -382,7 +381,7 @@ func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (o *OctopusLink) generateResourceId(option *option.AiOption) error {
|
||||
func (o *OctopusLink) generateResourceId(ctx context.Context, option *option.AiOption) error {
|
||||
if option.ResourceType == "" {
|
||||
return errors.New("ResourceType not set")
|
||||
}
|
||||
|
@ -390,7 +389,7 @@ func (o *OctopusLink) generateResourceId(option *option.AiOption) error {
|
|||
Platform: o.platform,
|
||||
ResourcePool: RESOURCE_POOL,
|
||||
}
|
||||
specResp, err := o.svcCtx.OctopusRpc.GetResourceSpecs(o.ctx, req)
|
||||
specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -418,7 +417,7 @@ func (o *OctopusLink) generateResourceId(option *option.AiOption) error {
|
|||
return errors.New("failed to get ResourceId")
|
||||
}
|
||||
|
||||
func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error {
|
||||
func (o *OctopusLink) generateDatasetsId(ctx context.Context, option *option.AiOption) error {
|
||||
if option.DatasetsName == "" {
|
||||
return errors.New("DatasetsName not set")
|
||||
}
|
||||
|
@ -427,7 +426,7 @@ func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error {
|
|||
PageIndex: o.pageIndex,
|
||||
PageSize: o.pageSize,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req)
|
||||
resp, err := o.octopusRpc.GetMyDatasetList(ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -443,7 +442,7 @@ func (o *OctopusLink) generateDatasetsId(option *option.AiOption) error {
|
|||
return errors.New("failed to get DatasetsId")
|
||||
}
|
||||
|
||||
func (o *OctopusLink) generateImageId(option *option.AiOption) error {
|
||||
func (o *OctopusLink) generateImageId(ctx context.Context, option *option.AiOption) error {
|
||||
if option.TaskType == "" {
|
||||
return errors.New("TaskType not set")
|
||||
}
|
||||
|
@ -453,7 +452,7 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error {
|
|||
PageIndex: o.pageIndex,
|
||||
PageSize: o.pageSize,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.GetUserImageList(o.ctx, req)
|
||||
resp, err := o.octopusRpc.GetUserImageList(ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -475,7 +474,7 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error {
|
|||
PageIndex: o.pageIndex,
|
||||
PageSize: o.pageSize,
|
||||
}
|
||||
preImgResp, err := o.svcCtx.OctopusRpc.GetPresetImageList(o.ctx, preImgReq)
|
||||
preImgResp, err := o.octopusRpc.GetPresetImageList(ctx, preImgReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -495,7 +494,7 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error {
|
|||
return errors.New("failed to get ImageId")
|
||||
}
|
||||
|
||||
func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error {
|
||||
func (o *OctopusLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
|
||||
// temporarily set algorithm to cnn
|
||||
if option.AlgorithmName == "" {
|
||||
switch option.DatasetsName {
|
||||
|
@ -511,7 +510,7 @@ func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error {
|
|||
PageIndex: o.pageIndex,
|
||||
PageSize: o.pageSize,
|
||||
}
|
||||
resp, err := o.svcCtx.OctopusRpc.GetMyAlgorithmList(o.ctx, req)
|
||||
resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -4,17 +4,16 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type ShuguangHpc struct {
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
aCRpc hpcacclient.HpcAC
|
||||
platform string
|
||||
participantId int64
|
||||
}
|
||||
|
@ -128,23 +127,23 @@ type ResourceSpecHpc struct {
|
|||
GAP_NDCU string
|
||||
}
|
||||
|
||||
func NewShuguangHpc(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangHpc {
|
||||
return &ShuguangHpc{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id}
|
||||
func NewShuguangHpc(aCRpc hpcacclient.HpcAC, name string, id int64) *ShuguangHpc {
|
||||
return &ShuguangHpc{aCRpc: aCRpc, platform: name, participantId: id}
|
||||
}
|
||||
|
||||
func (s ShuguangHpc) UploadImage(path string) (interface{}, error) {
|
||||
func (s ShuguangHpc) UploadImage(ctx context.Context, path string) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s ShuguangHpc) DeleteImage(imageId string) (interface{}, error) {
|
||||
func (s ShuguangHpc) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s ShuguangHpc) QueryImageList() (interface{}, error) {
|
||||
func (s ShuguangHpc) QueryImageList(ctx context.Context) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
func (s ShuguangHpc) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
// shuguangHpc提交任务
|
||||
|
||||
//判断是否resourceId匹配自定义资源Id
|
||||
|
@ -194,7 +193,7 @@ func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, param
|
|||
|
||||
updateSGHpcRequestByResourceId(resourceId, req)
|
||||
|
||||
resp, err := s.svcCtx.ACRpc.SubmitJob(s.ctx, req)
|
||||
resp, err := s.aCRpc.SubmitJob(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -203,12 +202,12 @@ func (s ShuguangHpc) SubmitTask(imageId string, cmd string, envs []string, param
|
|||
|
||||
}
|
||||
|
||||
func (s ShuguangHpc) QueryTask(taskId string) (interface{}, error) {
|
||||
func (s ShuguangHpc) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
|
||||
//实时作业
|
||||
reqC := &hpcAC.JobDetailReq{
|
||||
JobId: taskId,
|
||||
}
|
||||
respC, err := s.svcCtx.ACRpc.GetJobDetail(s.ctx, reqC)
|
||||
respC, err := s.aCRpc.GetJobDetail(ctx, reqC)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -223,7 +222,7 @@ func (s ShuguangHpc) QueryTask(taskId string) (interface{}, error) {
|
|||
JobmanagerId: strconv.Itoa(StrJobManagerID),
|
||||
}
|
||||
|
||||
respH, err := s.svcCtx.ACRpc.HistoryJobDetail(s.ctx, reqH)
|
||||
respH, err := s.aCRpc.HistoryJobDetail(ctx, reqH)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -232,7 +231,7 @@ func (s ShuguangHpc) QueryTask(taskId string) (interface{}, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s ShuguangHpc) QuerySpecs() (interface{}, error) {
|
||||
func (s ShuguangHpc) QuerySpecs(ctx context.Context) (interface{}, error) {
|
||||
resp := &types.GetResourceSpecsResp{}
|
||||
|
||||
for k, v := range RESOURCESPECSHPC {
|
||||
|
@ -248,12 +247,12 @@ func (s ShuguangHpc) QuerySpecs() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s ShuguangHpc) DeleteTask(taskId string) (interface{}, error) {
|
||||
func (s ShuguangHpc) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
|
||||
strJobInfoMap := fmt.Sprintf(StrJobInfoMap, StrJobManagerID, Username, taskId)
|
||||
req := &hpcAC.DeleteJobReq{
|
||||
StrJobInfoMap: strJobInfoMap,
|
||||
}
|
||||
resp, err := s.svcCtx.ACRpc.DeleteJob(s.ctx, req)
|
||||
resp, err := s.aCRpc.DeleteJob(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -20,10 +20,10 @@ import (
|
|||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
@ -91,31 +91,30 @@ type ResourceSpecSGAI struct {
|
|||
}
|
||||
|
||||
type ShuguangAi struct {
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
aCRpc hpcacclient.HpcAC
|
||||
platform string
|
||||
participantId int64
|
||||
}
|
||||
|
||||
func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangAi {
|
||||
return &ShuguangAi{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id}
|
||||
func NewShuguangAi(aCRpc hpcacclient.HpcAC, name string, id int64) *ShuguangAi {
|
||||
return &ShuguangAi{aCRpc: aCRpc, platform: name, participantId: id}
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) UploadImage(path string) (interface{}, error) {
|
||||
func (s *ShuguangAi) UploadImage(ctx context.Context, path string) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) DeleteImage(imageId string) (interface{}, error) {
|
||||
func (s *ShuguangAi) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) QueryImageList() (interface{}, error) {
|
||||
func (s *ShuguangAi) QueryImageList(ctx context.Context) (interface{}, error) {
|
||||
// shuguangAi获取镜像列表
|
||||
req := &hpcAC.GetImageListAiReq{
|
||||
AcceleratorType: DCU,
|
||||
TaskType: PYTORCH,
|
||||
}
|
||||
resp, err := s.svcCtx.ACRpc.GetImageListAi(s.ctx, req)
|
||||
resp, err := s.aCRpc.GetImageListAi(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -123,7 +122,7 @@ func (s *ShuguangAi) QueryImageList() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
|
||||
func (s *ShuguangAi) SubmitPytorchTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
|
||||
//判断是否resourceId匹配自定义资源Id
|
||||
_, isMapContainsKey := RESOURCESPECSAI[resourceId]
|
||||
if !isMapContainsKey {
|
||||
|
@ -132,7 +131,7 @@ func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string
|
|||
|
||||
//根据imageId获取imagePath, version
|
||||
imageReq := &hpcAC.GetImageAiByIdReq{ImageId: imageId}
|
||||
imageResp, err := s.svcCtx.ACRpc.GetImageAiById(s.ctx, imageReq)
|
||||
imageResp, err := s.aCRpc.GetImageAiById(ctx, imageReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -176,7 +175,7 @@ func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string
|
|||
|
||||
updateSGAIRequestByResourceId(resourceId, req)
|
||||
|
||||
resp, err := s.svcCtx.ACRpc.SubmitPytorchTask(s.ctx, req)
|
||||
resp, err := s.aCRpc.SubmitPytorchTask(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -191,7 +190,7 @@ func updateSGAIRequestByResourceId(resourceId string, req *hpcAC.SubmitPytorchTa
|
|||
req.Params.WorkerRamSize = spec.RAM
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
|
||||
func (s *ShuguangAi) SubmitTensorflowTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) {
|
||||
//req := &hpcAC.SubmitTensorflowTaskReq{
|
||||
// Params: &hpcAC.SubmitTensorflowTaskParams{
|
||||
//
|
||||
|
@ -200,7 +199,7 @@ func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []str
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
func (s *ShuguangAi) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||
// set algorithmId temporarily for storelink submit
|
||||
if algorithmId == "" {
|
||||
algorithmId = "pytorch-mnist-fcn"
|
||||
|
@ -209,13 +208,13 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, param
|
|||
// shuguangAi提交任务
|
||||
switch aiType {
|
||||
case PYTORCH_TASK:
|
||||
task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
|
||||
task, err := s.SubmitPytorchTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return task, nil
|
||||
case TENSORFLOW_TASK:
|
||||
task, err := s.SubmitTensorflowTask(imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
|
||||
task, err := s.SubmitTensorflowTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -224,12 +223,12 @@ func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, param
|
|||
return nil, errors.New("shuguangAi不支持的任务类型")
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) {
|
||||
func (s *ShuguangAi) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
|
||||
// shuguangAi获取任务
|
||||
req := &hpcAC.GetPytorchTaskReq{
|
||||
Id: taskId,
|
||||
}
|
||||
resp, err := s.svcCtx.ACRpc.GetPytorchTask(s.ctx, req)
|
||||
resp, err := s.aCRpc.GetPytorchTask(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -237,12 +236,12 @@ func (s *ShuguangAi) QueryTask(taskId string) (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) {
|
||||
func (s *ShuguangAi) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
|
||||
// shuguangAi删除任务
|
||||
req := &hpcAC.DeleteTaskAiReq{
|
||||
Ids: taskId,
|
||||
}
|
||||
resp, err := s.svcCtx.ACRpc.DeleteTaskAi(s.ctx, req)
|
||||
resp, err := s.aCRpc.DeleteTaskAi(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -250,7 +249,7 @@ func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) QuerySpecs() (interface{}, error) {
|
||||
func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) {
|
||||
resp := &types.GetResourceSpecsResp{}
|
||||
|
||||
for k, v := range RESOURCESPECSAI {
|
||||
|
@ -266,10 +265,10 @@ func (s *ShuguangAi) QuerySpecs() (interface{}, error) {
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
||||
func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
|
||||
//balance
|
||||
userReq := &hpcAC.GetUserInfoReq{}
|
||||
userinfo, err := s.svcCtx.ACRpc.GetUserInfo(s.ctx, userReq)
|
||||
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -277,7 +276,7 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
|||
|
||||
//resource limit
|
||||
limitReq := &hpcAC.QueueReq{}
|
||||
limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
|
||||
limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -286,7 +285,7 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
|||
|
||||
//disk
|
||||
diskReq := &hpcAC.ParaStorQuotaReq{}
|
||||
diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
|
||||
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -295,14 +294,14 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
|||
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
|
||||
|
||||
//memory
|
||||
nodeResp, err := s.svcCtx.ACRpc.GetNodeResources(s.ctx, nil)
|
||||
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES
|
||||
|
||||
//resources being occupied
|
||||
memberJobResp, err := s.svcCtx.ACRpc.GetMemberJobs(s.ctx, nil)
|
||||
memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -361,9 +360,9 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
|||
return resourceStats, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
||||
func (s *ShuguangAi) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
|
||||
req := &hpcAC.GetFileListReq{Limit: 100, Path: DATASETS_DIR, Start: 0}
|
||||
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
|
||||
list, err := s.aCRpc.GetFileList(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -378,12 +377,12 @@ func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
|||
return specs, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) GetAlgorithms() ([]*collector.Algorithm, error) {
|
||||
func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
|
||||
var algorithms []*collector.Algorithm
|
||||
for _, t := range GetTaskTypes() {
|
||||
taskType := t
|
||||
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0}
|
||||
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
|
||||
list, err := s.aCRpc.GetFileList(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -398,28 +397,28 @@ func (s *ShuguangAi) GetAlgorithms() ([]*collector.Algorithm, error) {
|
|||
return algorithms, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
|
||||
err := s.GenerateSubmitParams(option)
|
||||
func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
|
||||
err := s.GenerateSubmitParams(ctx, option)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
task, err := s.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
||||
task, err := s.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return task, nil
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) GenerateSubmitParams(option *option.AiOption) error {
|
||||
func (s *ShuguangAi) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
|
||||
err := s.generateResourceId(option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.generateImageId(option)
|
||||
err = s.generateImageId(ctx, option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.generateAlgorithmId(option)
|
||||
err = s.generateAlgorithmId(ctx, option)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -473,7 +472,7 @@ func (s *ShuguangAi) generateResourceId(option *option.AiOption) error {
|
|||
return errors.New("failed to get ResourceId")
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) generateImageId(option *option.AiOption) error {
|
||||
func (s *ShuguangAi) generateImageId(ctx context.Context, option *option.AiOption) error {
|
||||
if option.TaskType == "" {
|
||||
return errors.New("TaskType not set")
|
||||
}
|
||||
|
@ -482,7 +481,7 @@ func (s *ShuguangAi) generateImageId(option *option.AiOption) error {
|
|||
AcceleratorType: DCU,
|
||||
TaskType: taskType,
|
||||
}
|
||||
resp, err := s.svcCtx.ACRpc.GetImageListAi(s.ctx, req)
|
||||
resp, err := s.aCRpc.GetImageListAi(ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -502,13 +501,13 @@ func (s *ShuguangAi) generateImageId(option *option.AiOption) error {
|
|||
return errors.New("failed to get ImageId")
|
||||
}
|
||||
|
||||
func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error {
|
||||
func (s *ShuguangAi) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
|
||||
if option.DatasetsName == "" {
|
||||
return errors.New("DatasetsName not set")
|
||||
}
|
||||
|
||||
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0}
|
||||
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
|
||||
list, err := s.aCRpc.GetFileList(ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -19,26 +19,29 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
|
||||
"gorm.io/gorm"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Linkage interface {
|
||||
UploadImage(path string) (interface{}, error)
|
||||
DeleteImage(imageId string) (interface{}, error)
|
||||
QueryImageList() (interface{}, error)
|
||||
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error)
|
||||
QueryTask(taskId string) (interface{}, error)
|
||||
QuerySpecs() (interface{}, error)
|
||||
DeleteTask(taskId string) (interface{}, error)
|
||||
UploadImage(ctx context.Context, path string) (interface{}, error)
|
||||
DeleteImage(ctx context.Context, imageId string) (interface{}, error)
|
||||
QueryImageList(ctx context.Context) (interface{}, error)
|
||||
SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error)
|
||||
QueryTask(ctx context.Context, taskId string) (interface{}, error)
|
||||
QuerySpecs(ctx context.Context) (interface{}, error)
|
||||
DeleteTask(ctx context.Context, taskId string) (interface{}, error)
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -89,19 +92,19 @@ type StoreLink struct {
|
|||
ILinkage Linkage
|
||||
}
|
||||
|
||||
func NewStoreLink(ctx context.Context, svcCtx *svc.ServiceContext, participant *models.StorelinkCenter) *StoreLink {
|
||||
func NewStoreLink(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, aCRpc hpcacclient.HpcAC, participant *models.StorelinkCenter) *StoreLink {
|
||||
switch participant.Type {
|
||||
case TYPE_OCTOPUS:
|
||||
linkStruct := NewOctopusLink(ctx, svcCtx, participant.Name, participant.Id)
|
||||
linkStruct := NewOctopusLink(octopusRpc, participant.Name, participant.Id)
|
||||
return &StoreLink{ILinkage: linkStruct}
|
||||
case TYPE_MODELARTS:
|
||||
linkStruct := NewModelArtsLink(ctx, svcCtx, participant.Name, participant.Id)
|
||||
linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id)
|
||||
return &StoreLink{ILinkage: linkStruct}
|
||||
case TYPE_SHUGUANGAI:
|
||||
linkStruct := NewShuguangAi(ctx, svcCtx, participant.Name, participant.Id)
|
||||
linkStruct := NewShuguangAi(aCRpc, participant.Name, participant.Id)
|
||||
return &StoreLink{ILinkage: linkStruct}
|
||||
case TYPE_SHUGUANGHPC:
|
||||
linkStruct := NewShuguangHpc(ctx, svcCtx, participant.Name, participant.Id)
|
||||
linkStruct := NewShuguangHpc(aCRpc, participant.Name, participant.Id)
|
||||
return &StoreLink{ILinkage: linkStruct}
|
||||
default:
|
||||
return nil
|
||||
|
@ -124,7 +127,7 @@ func GetResourceTypes() []string {
|
|||
return resourceTypes
|
||||
}
|
||||
|
||||
func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string, error) {
|
||||
func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.AiCollector) ([]string, error) {
|
||||
var wg sync.WaitGroup
|
||||
var errCh = make(chan error, len(*collectorMap))
|
||||
var errs []error
|
||||
|
@ -136,7 +139,7 @@ func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string,
|
|||
c := col
|
||||
go func() {
|
||||
var ns []string
|
||||
specs, err := c.GetDatasetsSpecs()
|
||||
specs, err := c.GetDatasetsSpecs(ctx)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
wg.Done()
|
||||
|
@ -176,7 +179,7 @@ func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string,
|
|||
return names, nil
|
||||
}
|
||||
|
||||
func GetAlgorithms(collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
|
||||
func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
|
||||
var names []string
|
||||
var wg sync.WaitGroup
|
||||
var errCh = make(chan error, len(*collectorMap))
|
||||
|
@ -189,7 +192,7 @@ func GetAlgorithms(collectorMap *map[string]collector.AiCollector, resourceType
|
|||
c := col
|
||||
go func() {
|
||||
var ns []string
|
||||
algorithms, err := c.GetAlgorithms()
|
||||
algorithms, err := c.GetAlgorithms(ctx)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
wg.Done()
|
||||
|
|
|
@ -22,6 +22,9 @@ import (
|
|||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/zrpc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice"
|
||||
|
@ -58,6 +61,7 @@ type ServiceContext struct {
|
|||
PromClient tracker.Prometheus
|
||||
AlertClient *alert.AlertmanagerAPI
|
||||
HttpClient *resty.Client
|
||||
Scheduler *scheduler.Scheduler
|
||||
}
|
||||
|
||||
func NewServiceContext(c config.Config) *ServiceContext {
|
||||
|
@ -110,16 +114,26 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
|||
Addr: c.Redis.Host,
|
||||
Password: c.Redis.Pass,
|
||||
})
|
||||
|
||||
// scheduler
|
||||
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf))
|
||||
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf))
|
||||
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf))
|
||||
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf))
|
||||
aiExecutor, resourceCollector := service.InitAiClusterMap(octopusRpc, modelArtsRpc, modelArtsImgRpc, aCRpc)
|
||||
storage := &database.AiStorage{DbEngin: dbEngin}
|
||||
scheduler := scheduler.NewSchdlr(resourceCollector, storage, aiExecutor)
|
||||
|
||||
return &ServiceContext{
|
||||
Cron: cron.New(cron.WithSeconds()),
|
||||
DbEngin: dbEngin,
|
||||
Config: c,
|
||||
RedisClient: redisClient,
|
||||
ModelArtsRpc: modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)),
|
||||
ModelArtsImgRpc: imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)),
|
||||
ModelArtsRpc: modelArtsRpc,
|
||||
ModelArtsImgRpc: modelArtsImgRpc,
|
||||
CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)),
|
||||
ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)),
|
||||
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
|
||||
ACRpc: aCRpc,
|
||||
OctopusRpc: octopusRpc,
|
||||
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
|
||||
K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)),
|
||||
MonitorClient: make(map[int64]tracker.Prometheus),
|
||||
|
@ -127,5 +141,6 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
|||
PromClient: promClient,
|
||||
AlertClient: alertClient,
|
||||
HttpClient: httpClient,
|
||||
Scheduler: scheduler,
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue