scheduler refactor modified

Former-commit-id: dbf14eb6b8c151587c8f48319a3d3a7cedbedb06
This commit is contained in:
tzwang 2024-01-24 17:59:33 +08:00
parent b9e79dc671
commit 8882790591
10 changed files with 199 additions and 68 deletions

View File

@ -19,8 +19,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service"
)
/*
@ -34,14 +33,11 @@ type AiQueue struct {
}
func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
acCollector := &collector.ShuguangAiCollector{ACRpc: svcCtx.ACRpc}
resourceCollectiors := []collector.ResourceCollector{acCollector}
executorMap := make(map[string]executor.Executor)
executorMap["ai"] = &executor.ShuguangAiExecutor{ACRpc: svcCtx.ACRpc}
aiExecutorMap, aiCollectorMap := service.InitAiClusterMap(svcCtx.ACRpc, svcCtx.ModelArtsRpc, svcCtx.ModelArtsImgRpc, svcCtx.OctopusRpc)
return &AiQueue{
ctx: ctx,
svcCtx: svcCtx,
scheduler: scheduler.NewScheduler2(resourceCollectiors, nil, executorMap),
scheduler: scheduler.NewScheduler2(aiCollectorMap, nil, aiExecutorMap),
}
}

View File

@ -30,15 +30,15 @@ import (
)
type Scheduler struct {
task *response.TaskInfo
participantIds []int64
subSchedule common.SubSchedule
dbEngin *gorm.DB
result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService
ResourceCollectors []collector.ResourceCollector
Storages []database.Storage
AiExecutor map[string]executor.Executor
task *response.TaskInfo
participantIds []int64
subSchedule common.SubSchedule
dbEngin *gorm.DB
result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService
ResourceCollector *map[string]collector.ResourceCollector
Storages database.Storage
AiExecutor *map[string]executor.Executor
}
func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
@ -50,8 +50,8 @@ func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB,
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
}
func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler {
return &Scheduler{ResourceCollectors: resourceCollectors, Storages: storages, AiExecutor: aiExecutor}
func NewScheduler2(resourceCollector *map[string]collector.ResourceCollector, storages database.Storage, aiExecutor *map[string]executor.Executor) *Scheduler {
return &Scheduler{ResourceCollector: resourceCollector, Storages: storages, AiExecutor: aiExecutor}
}
func (s *Scheduler) SpecifyClusters() {

View File

@ -88,7 +88,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) {
var resourceSpecs []*collector.ResourceSpecs
for _, resourceCollector := range as.ResourceCollectors {
for _, resourceCollector := range *as.ResourceCollector {
spec, err := resourceCollector.GetResourceSpecs()
if err != nil {
continue

View File

@ -0,0 +1,49 @@
package service
import (
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/impl"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
)
const (
OCTOPUS = "Octopus"
MODELARTS = "Modelarts"
SHUGUANGAI = "ShuguangAi"
)
var (
AiTypeMap = map[string]string{
"Hanwuji": OCTOPUS,
"Suiyan": OCTOPUS,
"Sailingsi": OCTOPUS,
"modelarts-CloudBrain2": MODELARTS,
"ShuguangAi": SHUGUANGAI,
}
)
func InitAiClusterMap(ACRpc hpcacclient.HpcAC, ModelArtsRpc modelartsservice.ModelArtsService, ModelArtsImgRpc imagesservice.ImagesService, OctopusRpc octopusclient.Octopus) (*map[string]executor.Executor, *map[string]collector.ResourceCollector) {
executorMap := make(map[string]executor.Executor)
collectorMap := make(map[string]collector.ResourceCollector)
for k, v := range AiTypeMap {
switch v {
case OCTOPUS:
octopus := impl.NewOctopusExecutor(OctopusRpc, k)
collectorMap[k] = octopus
executorMap[k] = octopus
case MODELARTS:
modelarts := impl.NewModelartsExecutor(ModelArtsRpc, ModelArtsImgRpc, k)
collectorMap[k] = modelarts
executorMap[k] = modelarts
case SHUGUANGAI:
sgai := impl.NewShuguangAiExecutor(ACRpc, k)
collectorMap[k] = sgai
executorMap[k] = sgai
}
}
return &executorMap, &collectorMap
}

View File

@ -1,17 +0,0 @@
package collector
import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
//单条作业费=作业运行秒数×(CPU核心数*CPU单价+GPU卡数×GPU单价+DCU卡数×DCU单价)/3600
//CPU单价=队列CPU费率×计算中心CPU单价
//GPU单价=队列GPU费率×计算中心GPU单价
//DCU单价=队列DCU费率×计算中心DCU单价
type ShuguangAiCollector struct {
Name string
ACRpc hpcacclient.HpcAC
}
func (a *ShuguangAiCollector) GetResourceSpecs() (*ResourceSpecs, error) {
return nil, nil
}

View File

@ -1,32 +0,0 @@
package executor
import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
type ShuguangAiExecutor struct {
Name string
ACRpc hpcacclient.HpcAC
}
func (s ShuguangAiExecutor) QueryImageList() ([]Image, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) QueryTask(taskId string) (Task, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) QuerySpecs() (Spec, error) {
//TODO implement me
panic("implement me")
}
func NewShuguangAiExecutor(name string, acRpc hpcacclient.HpcAC) *ShuguangAiExecutor {
return &ShuguangAiExecutor{Name: name, ACRpc: acRpc}
}

View File

@ -0,0 +1,44 @@
package impl
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/imagesservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/client/modelartsservice"
)
type ModelArtsExecutor struct {
Name string
pageIndex int32
pageSize int32
ModelArtsRpc modelartsservice.ModelArtsService
ModelArtsImgRpc imagesservice.ImagesService
}
func NewModelartsExecutor(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string) *ModelArtsExecutor {
return &ModelArtsExecutor{Name: name, ModelArtsRpc: modelArtsRpc, ModelArtsImgRpc: modelArtsImgRpc, pageIndex: 1, pageSize: 100}
}
func (m ModelArtsExecutor) QueryImageList() ([]executor.Image, error) {
//TODO implement me
panic("implement me")
}
func (m ModelArtsExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (m ModelArtsExecutor) QueryTask(taskId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (m ModelArtsExecutor) QuerySpecs() (executor.Spec, error) {
//TODO implement me
panic("implement me")
}
func (a *ModelArtsExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}

View File

@ -0,0 +1,42 @@
package impl
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
)
type OctopusExecutor struct {
Name string
pageIndex int32
pageSize int32
OctopusRpc octopusclient.Octopus
}
func NewOctopusExecutor(OctopusRpc octopusclient.Octopus, name string) *OctopusExecutor {
return &OctopusExecutor{OctopusRpc: OctopusRpc, Name: name, pageIndex: 1, pageSize: 100}
}
func (o OctopusExecutor) QueryImageList() ([]executor.Image, error) {
//TODO implement me
panic("implement me")
}
func (o OctopusExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (o OctopusExecutor) QueryTask(taskId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (o OctopusExecutor) QuerySpecs() (executor.Spec, error) {
//TODO implement me
panic("implement me")
}
func (a *OctopusExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}

View File

@ -0,0 +1,45 @@
package impl
import (
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
)
//单条作业费=作业运行秒数×(CPU核心数*CPU单价+GPU卡数×GPU单价+DCU卡数×DCU单价)/3600
//CPU单价=队列CPU费率×计算中心CPU单价
//GPU单价=队列GPU费率×计算中心GPU单价
//DCU单价=队列DCU费率×计算中心DCU单价
type ShuguangAiExecutor struct {
Name string
ACRpc hpcacclient.HpcAC
}
func NewShuguangAiExecutor(acRpc hpcacclient.HpcAC, name string) *ShuguangAiExecutor {
return &ShuguangAiExecutor{Name: name, ACRpc: acRpc}
}
func (s ShuguangAiExecutor) QueryImageList() ([]executor.Image, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) QueryTask(taskId string) (executor.Task, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) QuerySpecs() (executor.Spec, error) {
//TODO implement me
panic("implement me")
}
func (a *ShuguangAiExecutor) GetResourceSpecs() (*collector.ResourceSpecs, error) {
return nil, nil
}

View File

@ -0,0 +1,4 @@
package strategy
type DynamicResourcesStrategy struct {
}