From ea7216e15346c628b24b88ad26a7516a0c151ea7 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 17 Apr 2024 17:57:35 +0800 Subject: [PATCH] added adapterId for ai scheduler submit func Former-commit-id: 94e73c0fd326bf84d26196d97cc70654994ceca2 --- .../schedule/schedulegetalgorithmslogic.go | 2 +- .../schedule/schedulegetdatasetslogic.go | 2 +- api/internal/scheduler/database/aiStorage.go | 15 +++++++ api/internal/scheduler/scheduler.go | 26 ++++++------ .../scheduler/schedulers/aiScheduler.go | 16 ++++---- .../scheduler/schedulers/option/aiOption.go | 3 +- api/internal/scheduler/service/aiService.go | 41 +++++++++++++++++-- api/internal/storeLink/storeLink.go | 12 +++--- api/internal/svc/servicecontext.go | 24 ++++++----- 9 files changed, 96 insertions(+), 45 deletions(-) diff --git a/api/internal/logic/schedule/schedulegetalgorithmslogic.go b/api/internal/logic/schedule/schedulegetalgorithmslogic.go index 2c78efd3..0f26d789 100644 --- a/api/internal/logic/schedule/schedulegetalgorithmslogic.go +++ b/api/internal/logic/schedule/schedulegetalgorithmslogic.go @@ -26,7 +26,7 @@ func NewScheduleGetAlgorithmsLogic(ctx context.Context, svcCtx *svc.ServiceConte func (l *ScheduleGetAlgorithmsLogic) ScheduleGetAlgorithms(req *types.AiAlgorithmsReq) (resp *types.AiAlgorithmsResp, err error) { resp = &types.AiAlgorithmsResp{} - algorithms, err := storeLink.GetAlgorithms(l.ctx, l.svcCtx.Scheduler.ResourceCollector, req.ResourceType, req.TaskType, req.Dataset) + algorithms, err := storeLink.GetAlgorithms(l.ctx, l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap["1777144940459986944"], req.ResourceType, req.TaskType, req.Dataset) if err != nil { return nil, err } diff --git a/api/internal/logic/schedule/schedulegetdatasetslogic.go b/api/internal/logic/schedule/schedulegetdatasetslogic.go index f7aeab14..777d7435 100644 --- a/api/internal/logic/schedule/schedulegetdatasetslogic.go +++ b/api/internal/logic/schedule/schedulegetdatasetslogic.go @@ -25,7 +25,7 @@ func NewScheduleGetDatasetsLogic(ctx context.Context, svcCtx *svc.ServiceContext func (l *ScheduleGetDatasetsLogic) ScheduleGetDatasets() (resp *types.AiDatasetsResp, err error) { resp = &types.AiDatasetsResp{} - names, err := storeLink.GetDatasetsNames(l.ctx, l.svcCtx.Scheduler.ResourceCollector) + names, err := storeLink.GetDatasetsNames(l.ctx, l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap["1777144940459986944"]) if err != nil { return nil, err } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index c0b706f2..2cf648aa 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -33,6 +33,21 @@ func (s *AiStorage) GetClustersByAdapterId(id string) (*types.ClusterListResp, e return &resp, nil } +func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) { + var list []types.AdapterInfo + var ids []string + db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter") + db = db.Where("type = ?", adapterType) + err := db.Order("create_time desc").Find(&list).Error + if err != nil { + return nil, err + } + for _, info := range list { + ids = append(ids, info.Id) + } + return ids, nil +} + func (s *AiStorage) SaveTask(name string) error { // 构建主任务结构体 taskModel := models.Task{ diff --git a/api/internal/scheduler/scheduler.go b/api/internal/scheduler/scheduler.go index 75aa115f..d214e76a 100644 --- a/api/internal/scheduler/scheduler.go +++ b/api/internal/scheduler/scheduler.go @@ -20,8 +20,7 @@ import ( "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice" @@ -32,16 +31,15 @@ import ( ) type Scheduler struct { - task *response.TaskInfo - participantIds []int64 - subSchedule SubSchedule - dbEngin *gorm.DB - result []string //pID:子任务yamlstring 键值对 - participantRpc participantservice.ParticipantService - ResourceCollector *map[string]collector.AiCollector - AiStorages *database.AiStorage - AiExecutor *map[string]executor.AiExecutor - mu sync.RWMutex + task *response.TaskInfo + participantIds []int64 + subSchedule SubSchedule + dbEngin *gorm.DB + result []string //pID:子任务yamlstring 键值对 + participantRpc participantservice.ParticipantService + AiStorages *database.AiStorage + AiService *service.AiService + mu sync.RWMutex } type SubSchedule interface { @@ -59,8 +57,8 @@ func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, partici return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func NewSchdlr(resourceCollector *map[string]collector.AiCollector, storages *database.AiStorage, aiExecutor *map[string]executor.AiExecutor) *Scheduler { - return &Scheduler{ResourceCollector: resourceCollector, AiStorages: storages, AiExecutor: aiExecutor} +func NewSchdlr(aiService *service.AiService, storages *database.AiStorage) *Scheduler { + return &Scheduler{AiService: aiService, AiStorages: storages} } func (s *Scheduler) SpecifyClusters() { diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 7026f7e5..a3e3e366 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -64,9 +64,8 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin } func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - if as.option.AiClusterId != "" { - // TODO database operation Find - return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: "", Replicas: 1}}, nil + if len(as.option.ClusterIds) == 1 { + return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil } resources, err := as.findClustersWithResources() @@ -131,7 +130,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa var ch = make(chan *AiResult, len(clusters)) var errCh = make(chan interface{}, len(clusters)) - executorMap := *as.AiExecutor + executorMap := as.AiService.AiExecutorAdapterMap[as.option.AdapterId] for _, cluster := range clusters { c := cluster wg.Add(1) @@ -202,13 +201,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { var wg sync.WaitGroup - var ch = make(chan *collector.ResourceStats, len(*as.ResourceCollector)) - var errCh = make(chan interface{}, len(*as.ResourceCollector)) + var clustersNum = len(as.AiService.AiCollectorAdapterMap[as.option.AdapterId]) + var ch = make(chan *collector.ResourceStats, clustersNum) + var errCh = make(chan interface{}, clustersNum) var resourceSpecs []*collector.ResourceStats var errs []interface{} - for s, resourceCollector := range *as.ResourceCollector { + for s, resourceCollector := range as.AiService.AiCollectorAdapterMap[as.option.AdapterId] { wg.Add(1) rc := resourceCollector id := s @@ -242,7 +242,7 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, errs = append(errs, e) } - if len(errs) == len(*as.ResourceCollector) { + if len(errs) == clustersNum { return nil, errors.New("get resources failed") } diff --git a/api/internal/scheduler/schedulers/option/aiOption.go b/api/internal/scheduler/schedulers/option/aiOption.go index 735a8610..f8a6495f 100644 --- a/api/internal/scheduler/schedulers/option/aiOption.go +++ b/api/internal/scheduler/schedulers/option/aiOption.go @@ -1,7 +1,8 @@ package option type AiOption struct { - AiClusterId string // shuguangAi /octopus ClusterId + AdapterId string + ClusterIds []string TaskName string ResourceType string // cpu/gpu/compute card CpuCoreNum int64 diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 9fa480c7..93188f9d 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -1,11 +1,14 @@ package service import ( + "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice" @@ -18,30 +21,60 @@ const ( SHUGUANGAI = "shuguangAi" ) -func InitAiClusterMap(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, aCRpc hpcacclient.HpcAC, storages *database.AiStorage) (*map[string]executor.AiExecutor, *map[string]collector.AiCollector) { - clusters, _ := storages.GetClustersByAdapterId("1777144940459986944") +type AiService struct { + AiExecutorAdapterMap map[string]map[string]executor.AiExecutor + AiCollectorAdapterMap map[string]map[string]collector.AiCollector +} +func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService, error) { + var aiType = "1" + adapterIds, err := storages.GetAdapterIdsByType(aiType) + if err != nil { + return nil, err + } + aiService := &AiService{ + AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor), + AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector), + } + for _, id := range adapterIds { + clusters, err := storages.GetClustersByAdapterId(id) + if err != nil { + return nil, err + } + exeClusterMap, colClusterMap := InitAiClusterMap(conf, clusters.List) + aiService.AiExecutorAdapterMap[id] = exeClusterMap + aiService.AiCollectorAdapterMap[id] = colClusterMap + } + + return aiService, nil +} + +func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) { executorMap := make(map[string]executor.AiExecutor) collectorMap := make(map[string]collector.AiCollector) - for _, c := range clusters.List { + for _, c := range clusters { switch c.Name { case OCTOPUS: id, _ := strconv.ParseInt(c.Id, 10, 64) + octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf)) octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) collectorMap[c.Id] = octopus executorMap[c.Id] = octopus case MODELARTS: id, _ := strconv.ParseInt(c.Id, 10, 64) + modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) + modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf)) modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Nickname, id) collectorMap[c.Id] = modelarts executorMap[c.Id] = modelarts case SHUGUANGAI: id, _ := strconv.ParseInt(c.Id, 10, 64) + aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf)) sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) collectorMap[c.Id] = sgai executorMap[c.Id] = sgai } } - return &executorMap, &collectorMap + return executorMap, collectorMap } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index 2cda06f6..55e51b35 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -128,13 +128,13 @@ func GetResourceTypes() []string { return resourceTypes } -func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.AiCollector) ([]string, error) { +func GetDatasetsNames(ctx context.Context, collectorMap map[string]collector.AiCollector) ([]string, error) { var wg sync.WaitGroup - var errCh = make(chan interface{}, len(*collectorMap)) + var errCh = make(chan interface{}, len(collectorMap)) var errs []interface{} var names []string var mu sync.Mutex - colMap := *collectorMap + colMap := collectorMap for s, col := range colMap { wg.Add(1) c := col @@ -200,14 +200,14 @@ func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.Ai return names, nil } -func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) { +func GetAlgorithms(ctx context.Context, collectorMap map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) { var names []string var wg sync.WaitGroup - var errCh = make(chan interface{}, len(*collectorMap)) + var errCh = make(chan interface{}, len(collectorMap)) var errs []interface{} var mu sync.Mutex - colMap := *collectorMap + colMap := collectorMap for s, col := range colMap { wg.Add(1) c := col diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index ee6fc50f..c4291a22 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -116,24 +116,28 @@ func NewServiceContext(c config.Config) *ServiceContext { }) // scheduler - octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)) - aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)) - modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)) - modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)) + //octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)) + //aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)) + //modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)) + //modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)) storage := &database.AiStorage{DbEngin: dbEngin} - aiExecutor, resourceCollector := service.InitAiClusterMap(octopusRpc, modelArtsRpc, modelArtsImgRpc, aCRpc, storage) - scheduler := scheduler.NewSchdlr(resourceCollector, storage, aiExecutor) + aiService, err := service.NewAiService(&c, storage) + if err != nil { + logx.Error(err.Error()) + return nil + } + scheduler := scheduler.NewSchdlr(aiService, storage) return &ServiceContext{ Cron: cron.New(cron.WithSeconds()), DbEngin: dbEngin, Config: c, RedisClient: redisClient, - ModelArtsRpc: modelArtsRpc, - ModelArtsImgRpc: modelArtsImgRpc, + ModelArtsRpc: modelartsservice.NewModelArtsService(zrpc.MustNewClient(c.ModelArtsRpcConf)), + ModelArtsImgRpc: imagesservice.NewImagesService(zrpc.MustNewClient(c.ModelArtsImgRpcConf)), CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)), - ACRpc: aCRpc, - OctopusRpc: octopusRpc, + ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), + OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)), MonitorClient: make(map[int64]tracker.Prometheus),