diff --git a/api/internal/mqs/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go index 7498a513..45ef84df 100644 --- a/api/internal/mqs/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -18,7 +18,8 @@ import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" ) /* @@ -34,18 +35,18 @@ type AiQueue struct { func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { acCollector := &collector.ShuguangAiCollector{ACRpc: svcCtx.ACRpc} resourceCollectiors := []collector.ResourceCollector{acCollector} + executorMap := make(map[string]executor.Executor) + executorMap["ai"] = &executor.ShuguangAiExecutor{ACRpc: svcCtx.ACRpc} return &AiQueue{ ctx: ctx, svcCtx: svcCtx, - scheduler: scheduler.NewScheduler2(resourceCollectiors, nil), + scheduler: scheduler.NewScheduler2(resourceCollectiors, nil, executorMap), } } func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - aiSchdl := scheduler.NewAiScheduler(val, nil) - - //schdl.MatchLabels() + aiSchdl, _ := scheduler.NewAiScheduler(val, l.scheduler) // 调度算法 err := l.scheduler.AssignAndSchedule(aiSchdl) diff --git a/api/internal/mqs/ScheduleHpc.go b/api/internal/mqs/ScheduleHpc.go index 1e188e92..f0b56aee 100644 --- a/api/internal/mqs/ScheduleHpc.go +++ b/api/internal/mqs/ScheduleHpc.go @@ -17,7 +17,6 @@ package mqs import ( "context" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" - scheduler2 "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" ) /* @@ -37,24 +36,5 @@ func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq { } func (l *HpcMq) Consume(val string) error { - // 接受消息, 根据标签筛选过滤 - hpcSchdl := scheduler2.NewHpcScheduler(val) - schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin, nil) - if err != nil { - return err - } - schdl.MatchLabels() - - // 调度算法 - err = schdl.AssignAndSchedule() - if err != nil { - return err - } - - // 存储数据 - err = schdl.SaveToDb() - if err != nil { - return err - } return nil } diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/aiScheduler.go index 8f63c39c..d5771faf 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/aiScheduler.go @@ -15,23 +15,24 @@ package scheduler import ( + "errors" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" ) type AiScheduler struct { - yamlString string - resourceCollectors []collector.ResourceCollector - task *response.TaskInfo - //storelink + yamlString string + task *response.TaskInfo + *Scheduler } -func NewAiScheduler(val string, resourceCollectors []collector.ResourceCollector) *AiScheduler { - return &AiScheduler{yamlString: val, resourceCollectors: resourceCollectors} +func NewAiScheduler(val string, scheduler *Scheduler) (*AiScheduler, error) { + return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil } func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { @@ -46,14 +47,55 @@ func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin } func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { - strategy := strategies.NewReplicationStrategy(nil, 0) + resources, err := as.findProvidersWithResource() + if err != nil { + return nil, err + } + + if len(resources) < 2 /*|| as.task */ { + var pros []entity.Participant + for _, resource := range resources { + pros = append(pros, entity.Participant{ + Participant_id: resource.ParticipantId, + Name: resource.Name, + }) + } + strategy := strategies.NewReplicationStrategy(nil, 0) + return strategy, nil + } + + task, providerList := as.genTaskAndProviders() + if err != nil { + return nil, nil + } + strategy := strategies.NewPricingStrategy(task, providerList...) return strategy, nil } func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) { + return nil, nil } func (as *AiScheduler) assignTask(clusters []*strategies.AssignedCluster) error { + if clusters == nil { + return errors.New("clusters is nil") + } + return nil } + +func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) { + var resourceSpecs []*collector.ResourceSpecs + for _, resourceCollector := range as.resourceCollectors { + spec, err := resourceCollector.GetResourceSpecs() + if err != nil { + continue + } + resourceSpecs = append(resourceSpecs, spec) + } + if len(resourceSpecs) == 0 { + return nil, errors.New("no resource found") + } + return resourceSpecs, nil +} diff --git a/pkg/scheduler/collector/acCollector.go b/pkg/scheduler/proxy/collector/acCollector.go similarity index 84% rename from pkg/scheduler/collector/acCollector.go rename to pkg/scheduler/proxy/collector/acCollector.go index 8107a4c4..587639cc 100644 --- a/pkg/scheduler/collector/acCollector.go +++ b/pkg/scheduler/proxy/collector/acCollector.go @@ -8,9 +8,10 @@ import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" //DCU单价=队列DCU费率×计算中心DCU单价 type ShuguangAiCollector struct { + Name string ACRpc hpcacclient.HpcAC } -func (a *ShuguangAiCollector) getResourceSpecs() (*ResourceSpecs, error) { +func (a *ShuguangAiCollector) GetResourceSpecs() (*ResourceSpecs, error) { return nil, nil } diff --git a/pkg/scheduler/collector/collector.go b/pkg/scheduler/proxy/collector/collector.go similarity index 83% rename from pkg/scheduler/collector/collector.go rename to pkg/scheduler/proxy/collector/collector.go index 5ce28493..0f1d5720 100644 --- a/pkg/scheduler/collector/collector.go +++ b/pkg/scheduler/proxy/collector/collector.go @@ -1,11 +1,12 @@ package collector type ResourceCollector interface { - getResourceSpecs() (*ResourceSpecs, error) + GetResourceSpecs() (*ResourceSpecs, error) } type ResourceSpecs struct { ParticipantId int64 + Name string CpuAvail float64 MemAvail float64 DiskAvail float64 diff --git a/pkg/scheduler/proxy/executor/acExecutor.go b/pkg/scheduler/proxy/executor/acExecutor.go new file mode 100644 index 00000000..cd7b6916 --- /dev/null +++ b/pkg/scheduler/proxy/executor/acExecutor.go @@ -0,0 +1,32 @@ +package executor + +import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient" + +type ShuguangAiExecutor struct { + Name string + ACRpc hpcacclient.HpcAC +} + +func (s ShuguangAiExecutor) QueryImageList() ([]Image, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) QueryTask(taskId string) (Task, error) { + //TODO implement me + panic("implement me") +} + +func (s ShuguangAiExecutor) QuerySpecs() (Spec, error) { + //TODO implement me + panic("implement me") +} + +func NewShuguangAiExecutor(name string, acRpc hpcacclient.HpcAC) *ShuguangAiExecutor { + return &ShuguangAiExecutor{Name: name, ACRpc: acRpc} +} diff --git a/pkg/scheduler/proxy/executor/executor.go b/pkg/scheduler/proxy/executor/executor.go new file mode 100644 index 00000000..61055927 --- /dev/null +++ b/pkg/scheduler/proxy/executor/executor.go @@ -0,0 +1,17 @@ +package executor + +type Executor interface { + QueryImageList() ([]Image, error) + SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error) + QueryTask(taskId string) (Task, error) + QuerySpecs() (Spec, error) +} + +type Image struct { +} + +type Task struct { +} + +type Spec struct { +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3aae57e9..3efebaba 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -19,8 +19,9 @@ import ( "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" "sigs.k8s.io/yaml" @@ -36,7 +37,7 @@ type Scheduler struct { participantRpc participantservice.ParticipantService resourceCollectors []collector.ResourceCollector storages []database.Storage - //storelink + aiExecutor map[string]executor.Executor } func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { @@ -48,8 +49,8 @@ func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, return &Scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil } -func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage) *Scheduler { - return &Scheduler{resourceCollectors: resourceCollectors, storages: storages} +func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler { + return &Scheduler{resourceCollectors: resourceCollectors, storages: storages, aiExecutor: aiExecutor} } func (s *Scheduler) SpecifyClusters() {