Merge pull request 'added schedule.api into pcm.api' (#22) from tzwang/pcm-coordinator:master into master

Former-commit-id: 9652fc452b4046735f46279911690deb3735fdaf
This commit is contained in:
tzwang 2024-02-05 11:50:15 +08:00
commit a878233ccf
23 changed files with 505 additions and 78 deletions

View File

@ -8,6 +8,7 @@ import (
"vm/pcm-vm.api"
"cloud/pcm-cloud.api"
"storelink/pcm-storelink.api"
"schedule/pcm-schedule.api"
)
info(
@ -617,4 +618,26 @@ service pcm {
@handler GetClusterHandler
get /adapter/cluster/get (ClusterDelReq) returns (ClusterResp)
}
@server(
prefix: pcm/v1
group : schedule
)
service pcm {
@handler ScheduleGetAiResourceTypesHandler
get /schedule/ai/getResourceTypes returns (AiResourceTypesResp)
@handler ScheduleGetAiTaskTypesHandler
get /schedule/ai/getTaskTypes returns (AiTaskTypesResp)
@handler ScheduleGetDatasetsHandler
get /schedule/ai/getDatasets returns (AiDatasetsResp)
@handler ScheduleGetStrategyHandler
get /schedule/ai/getStrategies returns (AiStrategyResp)
@handler ScheduleSubmitHandler
post /schedule/submit (ScheduleResp) returns (ScheduleResp)
}

View File

@ -0,0 +1,44 @@
syntax = "v1"
info(
title: "schedule"
desc: "调度服务"
author: "tzwang"
email: "tzwang@qq.com"
)
type (
ScheduleReq {
AiOption *AiOption `json:"aiOption,optional"`
}
ScheduleResp {
Success bool `json:"success"`
TaskId string `json:"taskId"`
ClusterId string `json:"clusterId"`
ErrorMsg string `json:"errorMsg"`
}
AiOption {
ResourceType string `json:"resourceType"`
TaskType string `json:"taskType"`
Datasets string `json:"datasets"`
Strategy string `json:"strategy"`
}
AiResourceTypesResp {
ResourceTypes []string `json:"resourceTypes"`
}
AiTaskTypesResp {
TaskTypes []string `json:"taskTypes"`
}
AiDatasetsResp {
Datasets []string `json:"datasets"`
}
AiStrategyResp {
Strategies []string `json:"strategies"`
}
)

View File

@ -67,7 +67,7 @@ func (l *SubmitLinkTaskLogic) SubmitLinkTask(req *types.SubmitLinkTaskReq) (resp
envs = append(envs, env)
}
}
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "")
task, err := storelink.ILinkage.SubmitTask(req.ImageId, req.Cmd, envs, params, req.ResourceId, "pytorch")
if err != nil {
return nil, err
}

View File

@ -37,13 +37,12 @@ func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
return &AiQueue{
ctx: ctx,
svcCtx: svcCtx,
scheduler: scheduler.NewScheduler2(aiCollectorMap, nil, aiExecutorMap),
scheduler: scheduler.NewSchdlr(aiCollectorMap, nil, aiExecutorMap),
}
}
func (l *AiQueue) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler)
aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler, nil)
// 调度算法
err := l.scheduler.AssignAndSchedule(aiSchdl)

View File

@ -44,6 +44,34 @@ func Intersect(slice1, slice2 []int64) []int64 {
return nn
}
func IntersectString(slice1, slice2 []string) []string {
k := make(map[string]int)
for _, num := range slice1 {
k[num]++
}
var output []string
for _, num := range slice2 {
if k[num] > 0 {
output = append(output, num)
k[num]--
}
}
return output
}
func RemoveDuplicates(slc []string) []string {
keys := make(map[string]bool)
list := []string{}
for _, entry := range slc {
if _, value := keys[entry]; !value {
keys[entry] = true
list = append(list, entry)
}
}
return list
}
func MicsSlice(origin []int64, count int) []int64 {
tmpOrigin := make([]int64, len(origin))
copy(tmpOrigin, origin)

View File

@ -1,8 +1,19 @@
package database
import (
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gorm.io/gorm"
)
type AiStorage struct {
DbEngin *gorm.DB
}
func (s *AiStorage) getParticipants() {
func (s *AiStorage) GetParticipants() {
var resp types.ClusterListResp
tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.Data)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
}
}

View File

@ -37,7 +37,7 @@ type Scheduler struct {
dbEngin *gorm.DB
result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService
ResourceCollector *map[string]collector.ResourceCollector
ResourceCollector *map[string]collector.AiCollector
Storages database.Storage
AiExecutor *map[string]executor.AiExecutor
mu sync.RWMutex
@ -52,7 +52,7 @@ func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB,
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
}
func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages database.Storage, aiExecutor *map[string]executor.AiExecutor) *Scheduler {
return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor}
}

View File

@ -33,8 +33,8 @@ type AiScheduler struct {
option *option.AiOption
}
func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) {
return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil
func NewAiScheduler(val string, scheduler *scheduler.Scheduler, option *option.AiOption) (*AiScheduler, error) {
return &AiScheduler{yamlString: val, Scheduler: scheduler, option: option}, nil
}
func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
@ -49,6 +49,11 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin
}
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if as.option.AiClusterId != "" {
// TODO database operation Find
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil
}
resources, err := as.findClustersWithResources()
if err != nil {
return nil, err
@ -58,13 +63,20 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
}
params := &param.Params{Resources: resources}
if len(resources) < 2 /*|| as.task */ {
strategy := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params /*, Replicas: 1*/})
if len(resources) == 1 {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil
}
switch as.option.StrategyName {
case strategy.REPLICATION:
strategy := strategy.NewReplicationStrategy(&param.ReplicationParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.RESOURCE_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
}
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params})
return strategy, nil
return nil, errors.New("no strategy has been chosen")
}
func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
@ -84,10 +96,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil
}
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceSpecs, error) {
var resourceSpecs []*collector.ResourceSpecs
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
var resourceSpecs []*collector.ResourceStats
for _, resourceCollector := range *as.ResourceCollector {
spec, err := resourceCollector.GetResourceSpecs()
spec, err := resourceCollector.GetResourceStats()
if err != nil {
continue
}

View File

@ -1,13 +1,17 @@
package option
type AiOption struct {
AiType string // shuguangAi/octopus
ResourceType string // cpu/gpu/compute card
TaskType string // pytorch/tensorflow
AiClusterId string // shuguangAi /octopus ClusterId
ResourceType string // cpu/gpu/compute card
TaskType string // pytorch/tensorflow/mindspore
DatasetsName string // mnist/imageNet/iris
StrategyName string
ClusterToStaticWeight map[string]int32
CodeType string
ImageId string
SpecId string
DatasetsId string
ImageId string
SpecId string
//DatasetsId string
CodeId string
ResourceId string
@ -17,4 +21,5 @@ type AiOption struct {
Datasets string
Code string
Model interface{}
}

View File

@ -0,0 +1,5 @@
package option
type Option struct {
Name string
}

View File

@ -9,24 +9,24 @@ import (
)
const (
OCTOPUS = "Octopus"
MODELARTS = "Modelarts"
SHUGUANGAI = "ShuguangAi"
OCTOPUS = "octopus"
MODELARTS = "modelarts"
SHUGUANGAI = "shuguangAi"
)
var (
AiTypeMap = map[string]string{
"Hanwuji": OCTOPUS,
"Suiyan": OCTOPUS,
"Sailingsi": OCTOPUS,
"Modelarts-CloudBrain2": MODELARTS,
"ShuguangAi": SHUGUANGAI,
"hanwuji": OCTOPUS,
"suiyan": OCTOPUS,
"sailingsi": OCTOPUS,
"modelarts-CloudBrain2": MODELARTS,
"shuguangAi": SHUGUANGAI,
}
)
func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.ResourceCollector) {
func InitAiClusterMap(ctx context.Context, svcCtx *svc.ServiceContext) (*map[string]executor.AiExecutor, *map[string]collector.AiCollector) {
executorMap := make(map[string]executor.AiExecutor)
collectorMap := make(map[string]collector.ResourceCollector)
collectorMap := make(map[string]collector.AiCollector)
for k, v := range AiTypeMap {
switch v {
case OCTOPUS:

View File

@ -1,10 +1,11 @@
package collector
type ResourceCollector interface {
GetResourceSpecs() (*ResourceSpecs, error)
type AiCollector interface {
GetResourceStats() (*ResourceStats, error)
GetDatasetsSpecs() ([]*DatasetsSpecs, error)
}
type ResourceSpecs struct {
type ResourceStats struct {
ParticipantId int64
Name string
CpuAvail float64
@ -20,3 +21,8 @@ type Card struct {
Name string
TOpsAtFp16 float64
}
type DatasetsSpecs struct {
Name string
Size string
}

View File

@ -2,10 +2,8 @@ package executor
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/storeLink"
)
type AiExecutor interface {
Execute(option *option.AiOption) (interface{}, error)
storeLink.Linkage
}

View File

@ -5,5 +5,5 @@ import (
)
type Params struct {
Resources []*collector.ResourceSpecs
Resources []*collector.ResourceStats
}

View File

@ -5,13 +5,13 @@ import (
)
type ResourcePricingParams struct {
replicas int32
Replicas int32
task *providerPricing.Task
*Params
}
func (r *ResourcePricingParams) GetReplicas() int32 {
return r.replicas
return r.Replicas
}
func (r *ResourcePricingParams) GetTask() *providerPricing.Task {

View File

@ -1,7 +1,7 @@
package strategy
import (
"github.com/pkg/errors"
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/strategy/param"
)

View File

@ -0,0 +1,11 @@
package strategy
type SingleAssignment struct {
Cluster *AssignedCluster
}
func (s *SingleAssignment) Schedule() ([]*AssignedCluster, error) {
var results []*AssignedCluster
results = append(results, s.Cluster)
return results, nil
}

View File

@ -1,5 +1,16 @@
package strategy
const (
REPLICATION = "replication"
RESOURCE_PRICING = "resourcePricing"
STATIC_WEIGHT = "staticWeight"
DYNAMIC_WEIGHT = "dynamicWeight"
)
var (
strategyNames = []string{REPLICATION, RESOURCE_PRICING}
)
type Strategy interface {
Schedule() ([]*AssignedCluster, error)
}
@ -9,3 +20,7 @@ type AssignedCluster struct {
Name string
Replicas int32
}
func GetStrategyNames() []string {
return strategyNames
}

View File

@ -38,24 +38,24 @@ func NewModelArtsLink(ctx context.Context, svcCtx *svc.ServiceContext, name stri
return &ModelArtsLink{ctx: ctx, svcCtx: svcCtx, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
}
func (o *ModelArtsLink) UploadImage(path string) (interface{}, error) {
func (m *ModelArtsLink) UploadImage(path string) (interface{}, error) {
//TODO modelArts上传镜像
return nil, nil
}
func (o *ModelArtsLink) DeleteImage(imageId string) (interface{}, error) {
func (m *ModelArtsLink) DeleteImage(imageId string) (interface{}, error) {
// TODO modelArts删除镜像
return nil, nil
}
func (o *ModelArtsLink) QueryImageList() (interface{}, error) {
func (m *ModelArtsLink) QueryImageList() (interface{}, error) {
// modelArts获取镜像列表
req := &modelarts.ListRepoReq{
Offset: "0",
Limit: strconv.Itoa(int(o.pageSize)),
Platform: o.platform,
Limit: strconv.Itoa(int(m.pageSize)),
Platform: m.platform,
}
resp, err := o.svcCtx.ModelArtsImgRpc.ListReposDetails(o.ctx, req)
resp, err := m.svcCtx.ModelArtsImgRpc.ListReposDetails(m.ctx, req)
if err != nil {
return nil, err
}
@ -63,7 +63,7 @@ func (o *ModelArtsLink) QueryImageList() (interface{}, error) {
return resp, nil
}
func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
func (m *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// modelArts提交任务
environments := make(map[string]string)
parameters := make([]*modelarts.ParametersTrainJob, 0)
@ -98,9 +98,9 @@ func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa
NodeCount: 1,
},
},
Platform: o.platform,
Platform: m.platform,
}
resp, err := o.svcCtx.ModelArtsRpc.CreateTrainingJob(o.ctx, req)
resp, err := m.svcCtx.ModelArtsRpc.CreateTrainingJob(m.ctx, req)
if err != nil {
return nil, err
}
@ -108,13 +108,13 @@ func (o *ModelArtsLink) SubmitTask(imageId string, cmd string, envs []string, pa
return resp, nil
}
func (o *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
func (m *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
// 获取任务
req := &modelarts.DetailTrainingJobsReq{
TrainingJobId: taskId,
Platform: o.platform,
Platform: m.platform,
}
resp, err := o.svcCtx.ModelArtsRpc.GetTrainingJobs(o.ctx, req)
resp, err := m.svcCtx.ModelArtsRpc.GetTrainingJobs(m.ctx, req)
if err != nil {
return nil, err
}
@ -122,13 +122,13 @@ func (o *ModelArtsLink) QueryTask(taskId string) (interface{}, error) {
return resp, nil
}
func (o *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
func (m *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
// 删除任务
req := &modelarts.DeleteTrainingJobReq{
TrainingJobId: taskId,
Platform: o.platform,
Platform: m.platform,
}
resp, err := o.svcCtx.ModelArtsRpc.DeleteTrainingJob(o.ctx, req)
resp, err := m.svcCtx.ModelArtsRpc.DeleteTrainingJob(m.ctx, req)
if err != nil {
return nil, err
}
@ -136,12 +136,12 @@ func (o *ModelArtsLink) DeleteTask(taskId string) (interface{}, error) {
return resp, nil
}
func (o *ModelArtsLink) QuerySpecs() (interface{}, error) {
func (m *ModelArtsLink) QuerySpecs() (interface{}, error) {
// octopus查询资源规格
req := &modelarts.TrainingJobFlavorsReq{
Platform: o.platform,
Platform: m.platform,
}
resp, err := o.svcCtx.ModelArtsRpc.GetTrainingJobFlavors(o.ctx, req)
resp, err := m.svcCtx.ModelArtsRpc.GetTrainingJobFlavors(m.ctx, req)
if err != nil {
return nil, err
}
@ -149,14 +149,74 @@ func (o *ModelArtsLink) QuerySpecs() (interface{}, error) {
return resp, nil
}
func (o *ModelArtsLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
func (m *ModelArtsLink) GetResourceStats() (*collector.ResourceStats, error) {
return nil, nil
}
func (o *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
func (m *ModelArtsLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
return nil, nil
}
func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
err := m.GenerateSubmitParams(option)
if err != nil {
return nil, err
}
task, err := m.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType)
if err != nil {
return nil, err
}
return task, nil
}
func (m *ModelArtsLink) GenerateSubmitParams(option *option.AiOption) error {
err := m.generateResourceId(option)
if err != nil {
return err
}
err = m.generateImageId(option)
if err != nil {
return err
}
err = m.generateCmd(option)
if err != nil {
return err
}
err = m.generateEnv(option)
if err != nil {
return err
}
err = m.generateParams(option)
if err != nil {
return err
}
return nil
}
func (m *ModelArtsLink) generateResourceId(option *option.AiOption) error {
_, err := m.QuerySpecs()
if err != nil {
return err
}
return nil
}
func (m *ModelArtsLink) generateImageId(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateCmd(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateEnv(option *option.AiOption) error {
return nil
}
func (m *ModelArtsLink) generateParams(option *option.AiOption) error {
return nil
}

View File

@ -16,6 +16,7 @@ package storeLink
import (
"context"
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
@ -196,14 +197,88 @@ func (o *OctopusLink) QuerySpecs() (interface{}, error) {
return resp, nil
}
func (o *OctopusLink) GetResourceSpecs() (*collector.ResourceSpecs, error) {
func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) {
return nil, nil
}
func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
req := &octopus.GetMyDatasetListReq{
Platform: o.platform,
PageIndex: o.pageIndex,
PageSize: o.pageSize,
}
resp, err := o.svcCtx.OctopusRpc.GetMyDatasetList(o.ctx, req)
if err != nil {
return nil, err
}
if !resp.Success {
return nil, errors.New(resp.Error.Message)
}
specs := []*collector.DatasetsSpecs{}
for _, dataset := range resp.Payload.Datasets {
spec := &collector.DatasetsSpecs{Name: dataset.Name}
specs = append(specs, spec)
}
return specs, nil
}
func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
err := o.GenerateSubmitParams(option)
if err != nil {
return nil, err
}
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType)
if err != nil {
return nil, err
}
return task, nil
}
func (o *OctopusLink) GenerateSubmitParams(option *option.AiOption) error {
err := o.generateResourceId(option)
if err != nil {
return err
}
err = o.generateImageId(option)
if err != nil {
return err
}
err = o.generateCmd(option)
if err != nil {
return err
}
err = o.generateEnv(option)
if err != nil {
return err
}
err = o.generateParams(option)
if err != nil {
return err
}
return nil
}
func (o *OctopusLink) generateResourceId(option *option.AiOption) error {
return nil
}
func (o *OctopusLink) generateImageId(option *option.AiOption) error {
return nil
}
func (o *OctopusLink) generateCmd(option *option.AiOption) error {
return nil
}
func (o *OctopusLink) generateEnv(option *option.AiOption) error {
return nil
}
func (o *OctopusLink) generateParams(option *option.AiOption) error {
return nil
}

View File

@ -48,6 +48,7 @@ const (
WorkPath = "/work/home/acgnnmfbwo/111111/py/"
TimeoutLimit = "10:00:00"
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset"
)
func NewShuguangAi(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *ShuguangAi {
@ -131,14 +132,30 @@ func (s *ShuguangAi) SubmitPytorchTask(imageId string, cmd string, envs []string
return resp, nil
}
func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []string, params []string, resourceId string) (interface{}, error) {
//req := &hpcAC.SubmitTensorflowTaskReq{
// Params: &hpcAC.SubmitTensorflowTaskParams{
//
// }
//}
return nil, nil
}
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, aiType string) (interface{}, error) {
// shuguangAi提交任务
if aiType == PYTORCH {
switch aiType {
case PYTORCH_TASK:
task, err := s.SubmitPytorchTask(imageId, cmd, envs, params, resourceId)
if err != nil {
return nil, err
}
return task, nil
case TENSORFLOW_TASK:
task, err := s.SubmitTensorflowTask(imageId, cmd, envs, params, resourceId)
if err != nil {
return nil, err
}
return task, nil
}
return nil, errors.New("shuguangAi不支持的任务类型")
}
@ -169,13 +186,13 @@ func (s *ShuguangAi) DeleteTask(taskId string) (interface{}, error) {
return resp, nil
}
func (o *ShuguangAi) QuerySpecs() (interface{}, error) {
func (s *ShuguangAi) QuerySpecs() (interface{}, error) {
// ShuguangAi查询资源规格
req := &hpcAC.GetResourceSpecReq{
AcceleratorType: DCU,
ResourceGroup: RESOURCE_GROUP,
}
specs, err := o.svcCtx.ACRpc.GetResourceSpec(o.ctx, req)
specs, err := s.svcCtx.ACRpc.GetResourceSpec(s.ctx, req)
if err != nil {
return nil, err
}
@ -183,36 +200,106 @@ func (o *ShuguangAi) QuerySpecs() (interface{}, error) {
return specs, nil
}
func (o *ShuguangAi) GetResourceSpecs() (*collector.ResourceSpecs, error) {
func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := o.svcCtx.ACRpc.GetUserInfo(o.ctx, userReq)
userinfo, err := s.svcCtx.ACRpc.GetUserInfo(s.ctx, userReq)
if err != nil {
return nil, err
}
limitReq := &hpcAC.QueueReq{}
_, err = o.svcCtx.ACRpc.QueryUserQuotasLimit(o.ctx, limitReq)
_, err = s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
if err != nil {
return nil, err
}
diskReq := &hpcAC.ParaStorQuotaReq{}
_, err = o.svcCtx.ACRpc.ParaStorQuota(o.ctx, diskReq)
_, err = s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
if err != nil {
return nil, err
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
_ = &collector.ResourceSpecs{
ParticipantId: o.participantId,
Name: o.platform,
_ = &collector.ResourceStats{
ParticipantId: s.participantId,
Name: s.platform,
Balance: balance,
}
return nil, nil
}
func (o *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
task, err := o.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AiType)
func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
req := &hpcAC.GetFileListReq{Limit: 100, Path: DATASETS_DIR, Start: 0}
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
if err != nil {
return nil, err
}
if list.Code != "0" {
return nil, errors.New(list.Msg)
}
specs := []*collector.DatasetsSpecs{}
for _, file := range list.Data.FileList {
spec := &collector.DatasetsSpecs{Name: file.Name, Size: strconv.FormatInt(file.Size, 10)}
specs = append(specs, spec)
}
return specs, nil
}
func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
err := s.GenerateSubmitParams(option)
if err != nil {
return nil, err
}
task, err := s.SubmitTask(option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.TaskType)
if err != nil {
return nil, err
}
return task, nil
}
func (s *ShuguangAi) GenerateSubmitParams(option *option.AiOption) error {
err := s.generateResourceId(option)
if err != nil {
return err
}
err = s.generateImageId(option)
if err != nil {
return err
}
err = s.generateCmd(option)
if err != nil {
return err
}
err = s.generateEnv(option)
if err != nil {
return err
}
err = s.generateParams(option)
if err != nil {
return err
}
return nil
}
func (s *ShuguangAi) generateResourceId(option *option.AiOption) error {
return nil
}
func (s *ShuguangAi) generateImageId(option *option.AiOption) error {
return nil
}
func (s *ShuguangAi) generateCmd(option *option.AiOption) error {
return nil
}
func (s *ShuguangAi) generateEnv(option *option.AiOption) error {
return nil
}
func (s *ShuguangAi) generateParams(option *option.AiOption) error {
return nil
}

View File

@ -18,6 +18,8 @@ import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcAC"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/common"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
@ -51,6 +53,11 @@ const (
MODELARTS = "Modelarts"
SHUGUANGAI = "ShuguangAi"
SHUGUANGHPC = "ShuguangHpc"
CPU = "cpu"
GPU = "gpu"
CARD = "computeCard"
PYTORCH_TASK = "pytorch"
TENSORFLOW_TASK = "tensorflow"
)
var (
@ -65,6 +72,9 @@ var (
"3": SHUGUANGAI,
"4": SHUGUANGHPC,
}
resourceTypes = []string{CPU, GPU, CARD}
taskTypes = []string{PYTORCH_TASK, TENSORFLOW_TASK}
ERROR_RESP_EMPTY = errors.New("resp empty error")
ERROR_CONVERT_EMPTY = errors.New("convert empty error")
)
@ -104,6 +114,44 @@ func GetParticipantById(partId int64, dbEngin *gorm.DB) *models.StorelinkCenter
return &participant
}
func GetResourceTypes() []string {
return resourceTypes
}
func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string, error) {
var names []string
//errCount := 0
colMap := *collectorMap
for _, col := range colMap {
var ns []string
specs, err := col.GetDatasetsSpecs()
if err != nil {
return nil, errors.New("failed to acquire datasets list")
}
for _, spec := range specs {
ns = append(ns, spec.Name)
}
if len(ns) == 0 {
continue
}
if len(names) == 0 {
names = ns
continue
}
names = common.IntersectString(names, ns)
}
//if (len(*collectorMap) - errCount) < 2 {
//
//}
names = common.RemoveDuplicates(names)
return names, nil
}
func GetTaskTypes() []string {
return taskTypes
}
func ConvertType(in interface{}, out interface{}, participant *models.StorelinkCenter) (interface{}, error) {
switch (interface{})(in).(type) {

2
go.mod
View File

@ -26,7 +26,7 @@ require (
github.com/rs/zerolog v1.28.0
github.com/shopspring/decimal v1.3.1
github.com/zeromicro/go-zero v1.6.0
gitlink.org.cn/jcce-pcm/pcm-ac v0.0.0-20231207111119-cdecc6b118c8
gitlink.org.cn/jcce-pcm/pcm-ac v0.0.0-20240201033409-2d4e27a90c39
gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d
gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes v0.0.0-20231214084401-de9ac5db7246
gitlink.org.cn/jcce-pcm/pcm-participant-modelarts v0.0.0-20231101085149-724c7c4cc090