diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index 311bee00..6cc7dc2f 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -19,8 +19,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service" ) /* @@ -34,14 +33,11 @@ type AiQueue struct { } func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { - acCollector := &collector.ShuguangAiCollector{ACRpc: svcCtx.ACRpc} - resourceCollectiors := []collector.ResourceCollector{acCollector} - executorMap := make(map[string]executor.Executor) - executorMap["ai"] = &executor.ShuguangAiExecutor{ACRpc: svcCtx.ACRpc} + aiExecutorMap, aiCollectorMap := service.InitAiClusterMap(svcCtx.ACRpc, svcCtx.ModelArtsRpc, svcCtx.ModelArtsImgRpc, svcCtx.OctopusRpc) return &AiQueue{ ctx: ctx, svcCtx: svcCtx, - scheduler: scheduler.NewScheduler2(resourceCollectiors, nil, executorMap), + scheduler: scheduler.NewScheduler2(aiCollectorMap, nil, aiExecutorMap), } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 1407568e..49d3a150 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -30,15 +30,15 @@ import ( ) type Scheduler struct { - task *response.TaskInfo - participantIds []int64 - subSchedule common.SubSchedule - dbEngin *gorm.DB - result []string //pID:子任务yamlstring 键值对 - participantRpc participantservice.ParticipantService - ResourceCollectors []collector.ResourceCollector - Storages []database.Storage - AiExecutor map[string]executor.Executor + task *response.TaskInfo + participantIds []int64 + subSchedule common.SubSchedule + dbEngin *gorm.DB + result []string //pID:子任务yamlstring 键值对 + participantRpc participantservice.ParticipantService + ResourceCollector *map[string]collector.ResourceCollector + Storages database.Storage + AiExecutor *map[string]executor.Executor } func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { @@ -50,8 +50,8 @@ func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler { - return &Scheduler{ResourceCollectors: resourceCollectors, Storages: storages, AiExecutor: aiExecutor} +func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.Executor) *Scheduler { + return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor} } func (s *Scheduler) SpecifyClusters() { diff --git a/pkg/scheduler/schedulers/aiScheduler.go b/pkg/scheduler/schedulers/aiScheduler.go index feef4e93..d127b266 100644 --- a/pkg/scheduler/schedulers/aiScheduler.go +++ b/pkg/scheduler/schedulers/aiScheduler.go @@ -88,7 +88,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error { func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) { var resourceSpecs []*collector.ResourceSpecs - for _, resourceCollector := range as.ResourceCollectors { + for _, resourceCollector := range *as.ResourceCollector { spec, err := resourceCollector.GetResourceSpecs() if err != nil { continue diff --git a/pkg/scheduler/service/aiService.go b/pkg/scheduler/service/aiService.go new file mode 100644 index 00000000..9fa919dd --- /dev/null +++ b/pkg/scheduler/service/aiService.go @@ -0,0 +1,49 @@ +package service + +import ( + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/impl" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" +) + +const ( + OCTOPUS = "Octopus" + MODELARTS = "Modelarts" + SHUGUANGAI = "ShuguangAi" +) + +var ( + AiTypeMap = map[string]string{ + "Hanwuji": OCTOPUS, + "Suiyan": OCTOPUS, + "Sailingsi": OCTOPUS, + "modelarts-CloudBrain2": MODELARTS, + "ShuguangAi": SHUGUANGAI, + } +) + +func InitAiClusterMap(ACRpc hpcacclient.HpcAC, ModelArtsRpc modelartsservice.ModelArtsService, ModelArtsImgRpc imagesservice.ImagesService, OctopusRpc octopusclient.Octopus) (*map[string]executor.Executor, *map[string]collector.ResourceCollector) { + executorMap := make(map[string]executor.Executor) + collectorMap := make(map[string]collector.ResourceCollector) + for k, v := range AiTypeMap { + switch v { + case OCTOPUS: + octopus := impl.NewOctopusExecutor(OctopusRpc, k) + collectorMap[k] = octopus + executorMap[k] = octopus + case MODELARTS: + modelarts := impl.NewModelartsExecutor(ModelArtsRpc, ModelArtsImgRpc, k) + collectorMap[k] = modelarts + executorMap[k] = modelarts + case SHUGUANGAI: + sgai := impl.NewShuguangAiExecutor(ACRpc, k) + collectorMap[k] = sgai + executorMap[k] = sgai + } + } + return &executorMap, &collectorMap +} diff --git a/pkg/scheduler/service/collector/acCollector.go b/pkg/scheduler/service/collector/acCollector.go deleted file mode 100644 index 587639cc..00000000 --- a/pkg/scheduler/service/collector/acCollector.go +++ /dev/null @@ -1,17 +0,0 @@ -package collector - -import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" - -//单条作业费=作业运行秒数×(CPU核心数*CPU单价+GPU卡数×GPU单价+DCU卡数×DCU单价)/3600 -//CPU单价=队列CPU费率×计算中心CPU单价 -//GPU单价=队列GPU费率×计算中心GPU单价 -//DCU单价=队列DCU费率×计算中心DCU单价 - -type ShuguangAiCollector struct { - Name string - ACRpc hpcacclient.HpcAC -} - -func (a *ShuguangAiCollector) GetResourceSpecs() (*ResourceSpecs, error) { - return nil, nil -} diff --git a/pkg/scheduler/service/executor/acExecutor.go b/pkg/scheduler/service/executor/acExecutor.go deleted file mode 100644 index cd7b6916..00000000 --- a/pkg/scheduler/service/executor/acExecutor.go +++ /dev/null @@ -1,32 +0,0 @@ -package executor - -import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" - -type ShuguangAiExecutor struct { - Name string - ACRpc hpcacclient.HpcAC -} - -func (s ShuguangAiExecutor) QueryImageList() ([]Image, error) { - //TODO implement me - panic("implement me") -} - -func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error) { - //TODO implement me - panic("implement me") -} - -func (s ShuguangAiExecutor) QueryTask(taskId string) (Task, error) { - //TODO implement me - panic("implement me") -} - -func (s ShuguangAiExecutor) QuerySpecs() (Spec, error) { - //TODO implement me - panic("implement me") -} - -func NewShuguangAiExecutor(name string, acRpc hpcacclient.HpcAC) *ShuguangAiExecutor { - return &ShuguangAiExecutor{Name: name, ACRpc: acRpc} -} diff --git a/pkg/scheduler/service/impl/modelarts.go b/pkg/scheduler/service/impl/modelarts.go new file mode 100644 index 00000000..76f702b7 --- /dev/null +++ b/pkg/scheduler/service/impl/modelarts.go @@ -0,0 +1,44 @@ +package impl + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice" + "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice" +) + +type ModelArtsExecutor struct { + Name string + pageIndex int32 + pageSize int32 + ModelArtsRpc modelartsservice.ModelArtsService + ModelArtsImgRpc imagesservice.ImagesService +} + +func NewModelartsExecutor(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string) *ModelArtsExecutor { + return &ModelArtsExecutor{Name: name, ModelArtsRpc: modelArtsRpc, ModelArtsImgRpc: modelArtsImgRpc, pageIndex: 1, pageSize: 100} +} + +func (m ModelArtsExecutor) QueryImageList() ([]executor.Image, error) { + //TODO implement me + panic("implement me") +} + +func (m ModelArtsExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (m ModelArtsExecutor) QueryTask(taskId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (m ModelArtsExecutor) QuerySpecs() (executor.Spec, error) { + //TODO implement me + panic("implement me") +} + +func (a *ModelArtsExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) { + return nil, nil +} diff --git a/pkg/scheduler/service/impl/octopus.go b/pkg/scheduler/service/impl/octopus.go new file mode 100644 index 00000000..a4b6c5b8 --- /dev/null +++ b/pkg/scheduler/service/impl/octopus.go @@ -0,0 +1,42 @@ +package impl + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" + "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" +) + +type OctopusExecutor struct { + Name string + pageIndex int32 + pageSize int32 + OctopusRpc octopusclient.Octopus +} + +func NewOctopusExecutor(OctopusRpc octopusclient.Octopus, name string) *OctopusExecutor { + return &OctopusExecutor{OctopusRpc: OctopusRpc, Name: name, pageIndex: 1, pageSize: 100} +} + +func (o OctopusExecutor) QueryImageList() ([]executor.Image, error) { + //TODO implement me + panic("implement me") +} + +func (o OctopusExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (o OctopusExecutor) QueryTask(taskId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (o OctopusExecutor) QuerySpecs() (executor.Spec, error) { + //TODO implement me + panic("implement me") +} + +func (a *OctopusExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) { + return nil, nil +} diff --git a/pkg/scheduler/service/impl/shuguangAi.go b/pkg/scheduler/service/impl/shuguangAi.go new file mode 100644 index 00000000..049455c6 --- /dev/null +++ b/pkg/scheduler/service/impl/shuguangAi.go @@ -0,0 +1,45 @@ +package impl + +import ( + "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor" +) + +//单条作业费=作业运行秒数×(CPU核心数*CPU单价+GPU卡数×GPU单价+DCU卡数×DCU单价)/3600 +//CPU单价=队列CPU费率×计算中心CPU单价 +//GPU单价=队列GPU费率×计算中心GPU单价 +//DCU单价=队列DCU费率×计算中心DCU单价 + +type ShuguangAiExecutor struct { + Name string + ACRpc hpcacclient.HpcAC +} + +func NewShuguangAiExecutor(acRpc hpcacclient.HpcAC, name string) *ShuguangAiExecutor { + return &ShuguangAiExecutor{Name: name, ACRpc: acRpc} +} + +func (s ShuguangAiExecutor) QueryImageList() ([]executor.Image, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) QueryTask(taskId string) (executor.Task, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) QuerySpecs() (executor.Spec, error) { + //TODO implement me + panic("implement me") +} + +func (a *ShuguangAiExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) { + return nil, nil +} diff --git a/pkg/scheduler/strategy/dynamicResources.go b/pkg/scheduler/strategy/dynamicResources.go new file mode 100644 index 00000000..579333f2 --- /dev/null +++ b/pkg/scheduler/strategy/dynamicResources.go @@ -0,0 +1,4 @@ +package strategy + +type DynamicResourcesStrategy struct { +}