scheduler refactor updated

Former-commit-id: 8d2fffb6f6b50350549876e18a36527231d12dc2
This commit is contained in:
tzwang 2024-01-23 17:51:11 +08:00
parent 0c87a541c5
commit 73c2c43468
8 changed files with 114 additions and 39 deletions

View File

@ -18,7 +18,8 @@ import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor"
)
/*
@ -34,18 +35,18 @@ type AiQueue struct {
func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
acCollector := &collector.ShuguangAiCollector{ACRpc: svcCtx.ACRpc}
resourceCollectiors := []collector.ResourceCollector{acCollector}
executorMap := make(map[string]executor.Executor)
executorMap["ai"] = &executor.ShuguangAiExecutor{ACRpc: svcCtx.ACRpc}
return &AiQueue{
ctx: ctx,
svcCtx: svcCtx,
scheduler: scheduler.NewScheduler2(resourceCollectiors, nil),
scheduler: scheduler.NewScheduler2(resourceCollectiors, nil, executorMap),
}
}
func (l *AiQueue) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
aiSchdl := scheduler.NewAiScheduler(val, nil)
//schdl.MatchLabels()
aiSchdl, _ := scheduler.NewAiScheduler(val, l.scheduler)
// 调度算法
err := l.scheduler.AssignAndSchedule(aiSchdl)

View File

@ -17,7 +17,6 @@ package mqs
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
scheduler2 "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
)
/*
@ -37,24 +36,5 @@ func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq {
}
func (l *HpcMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
hpcSchdl := scheduler2.NewHpcScheduler(val)
schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin, nil)
if err != nil {
return err
}
schdl.MatchLabels()
// 调度算法
err = schdl.AssignAndSchedule()
if err != nil {
return err
}
// 存储数据
err = schdl.SaveToDb()
if err != nil {
return err
}
return nil
}

View File

@ -15,23 +15,24 @@
package scheduler
import (
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
)
type AiScheduler struct {
yamlString string
resourceCollectors []collector.ResourceCollector
task *response.TaskInfo
//storelink
yamlString string
task *response.TaskInfo
*Scheduler
}
func NewAiScheduler(val string, resourceCollectors []collector.ResourceCollector) *AiScheduler {
return &AiScheduler{yamlString: val, resourceCollectors: resourceCollectors}
func NewAiScheduler(val string, scheduler *Scheduler) (*AiScheduler, error) {
return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil
}
func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
@ -46,14 +47,55 @@ func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin
}
func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) {
strategy := strategies.NewReplicationStrategy(nil, 0)
resources, err := as.findProvidersWithResource()
if err != nil {
return nil, err
}
if len(resources) < 2 /*|| as.task */ {
var pros []entity.Participant
for _, resource := range resources {
pros = append(pros, entity.Participant{
Participant_id: resource.ParticipantId,
Name: resource.Name,
})
}
strategy := strategies.NewReplicationStrategy(nil, 0)
return strategy, nil
}
task, providerList := as.genTaskAndProviders()
if err != nil {
return nil, nil
}
strategy := strategies.NewPricingStrategy(task, providerList...)
return strategy, nil
}
func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) {
return nil, nil
}
func (as *AiScheduler) assignTask(clusters []*strategies.AssignedCluster) error {
if clusters == nil {
return errors.New("clusters is nil")
}
return nil
}
func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) {
var resourceSpecs []*collector.ResourceSpecs
for _, resourceCollector := range as.resourceCollectors {
spec, err := resourceCollector.GetResourceSpecs()
if err != nil {
continue
}
resourceSpecs = append(resourceSpecs, spec)
}
if len(resourceSpecs) == 0 {
return nil, errors.New("no resource found")
}
return resourceSpecs, nil
}

View File

@ -8,9 +8,10 @@ import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
//DCU单价=队列DCU费率×计算中心DCU单价
type ShuguangAiCollector struct {
Name string
ACRpc hpcacclient.HpcAC
}
func (a *ShuguangAiCollector) getResourceSpecs() (*ResourceSpecs, error) {
func (a *ShuguangAiCollector) GetResourceSpecs() (*ResourceSpecs, error) {
return nil, nil
}

View File

@ -1,11 +1,12 @@
package collector
type ResourceCollector interface {
getResourceSpecs() (*ResourceSpecs, error)
GetResourceSpecs() (*ResourceSpecs, error)
}
type ResourceSpecs struct {
ParticipantId int64
Name string
CpuAvail float64
MemAvail float64
DiskAvail float64

View File

@ -0,0 +1,32 @@
package executor
import "gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
type ShuguangAiExecutor struct {
Name string
ACRpc hpcacclient.HpcAC
}
func (s ShuguangAiExecutor) QueryImageList() ([]Image, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) QueryTask(taskId string) (Task, error) {
//TODO implement me
panic("implement me")
}
func (s ShuguangAiExecutor) QuerySpecs() (Spec, error) {
//TODO implement me
panic("implement me")
}
func NewShuguangAiExecutor(name string, acRpc hpcacclient.HpcAC) *ShuguangAiExecutor {
return &ShuguangAiExecutor{Name: name, ACRpc: acRpc}
}

View File

@ -0,0 +1,17 @@
package executor
type Executor interface {
QueryImageList() ([]Image, error)
SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string) (Task, error)
QueryTask(taskId string) (Task, error)
QuerySpecs() (Spec, error)
}
type Image struct {
}
type Task struct {
}
type Spec struct {
}

View File

@ -19,8 +19,9 @@ import (
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm"
"sigs.k8s.io/yaml"
@ -36,7 +37,7 @@ type Scheduler struct {
participantRpc participantservice.ParticipantService
resourceCollectors []collector.ResourceCollector
storages []database.Storage
//storelink
aiExecutor map[string]executor.Executor
}
func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
@ -48,8 +49,8 @@ func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB,
return &Scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil
}
func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage) *Scheduler {
return &Scheduler{resourceCollectors: resourceCollectors, storages: storages}
func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler {
return &Scheduler{resourceCollectors: resourceCollectors, storages: storages, aiExecutor: aiExecutor}
}
func (s *Scheduler) SpecifyClusters() {