scheduler refactor modified

Former-commit-id: 1e70763e7f5008da3ad10efcebb0d5338d8665ca
This commit is contained in:
tzwang 2024-01-24 11:16:59 +08:00
parent 73c2c43468
commit b9e79dc671
16 changed files with 80 additions and 52 deletions

View File

@ -18,8 +18,9 @@ import (
"context" "context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
) )
/* /*
@ -46,7 +47,7 @@ func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
func (l *AiQueue) Consume(val string) error { func (l *AiQueue) Consume(val string) error {
// 接受消息, 根据标签筛选过滤 // 接受消息, 根据标签筛选过滤
aiSchdl, _ := scheduler.NewAiScheduler(val, l.scheduler) aiSchdl, _ := schedulers.NewAiScheduler(val, l.scheduler)
// 调度算法 // 调度算法
err := l.scheduler.AssignAndSchedule(aiSchdl) err := l.scheduler.AssignAndSchedule(aiSchdl)

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/schedulers"
) )
/* /*
@ -37,7 +38,7 @@ func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq {
func (l *CloudMq) Consume(val string) error { func (l *CloudMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤 // 接受消息, 根据标签筛选过滤
cloudScheduler := scheduler.NewCloudScheduler() cloudScheduler := schedulers.NewCloudScheduler()
schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc)
if err != nil { if err != nil {
return err return err

View File

@ -12,23 +12,23 @@
*/ */
package scheduler package common
import ( import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
"math/rand" "math/rand"
"time" "time"
) )
type scheduleService interface { type SubSchedule interface {
getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
pickOptimalStrategy() (strategies.Strategy, error) PickOptimalStrategy() (strategy.Strategy, error)
assignTask(clusters []*strategies.AssignedCluster) error AssignTask(clusters []*strategy.AssignedCluster) error
} }
// 求交集 // 求交集
func intersect(slice1, slice2 []int64) []int64 { func Intersect(slice1, slice2 []int64) []int64 {
m := make(map[int64]int) m := make(map[int64]int)
nn := make([]int64, 0) nn := make([]int64, 0)
for _, v := range slice1 { for _, v := range slice1 {
@ -44,7 +44,7 @@ func intersect(slice1, slice2 []int64) []int64 {
return nn return nn
} }
func micsSlice(origin []int64, count int) []int64 { func MicsSlice(origin []int64, count int) []int64 {
tmpOrigin := make([]int64, len(origin)) tmpOrigin := make([]int64, len(origin))
copy(tmpOrigin, origin) copy(tmpOrigin, origin)
//一定要seed //一定要seed

View File

@ -19,9 +19,10 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/common"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/executor" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/executor"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm" "gorm.io/gorm"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
@ -31,26 +32,26 @@ import (
type Scheduler struct { type Scheduler struct {
task *response.TaskInfo task *response.TaskInfo
participantIds []int64 participantIds []int64
scheduleService scheduleService subSchedule common.SubSchedule
dbEngin *gorm.DB dbEngin *gorm.DB
result []string //pID:子任务yamlstring 键值对 result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService participantRpc participantservice.ParticipantService
resourceCollectors []collector.ResourceCollector ResourceCollectors []collector.ResourceCollector
storages []database.Storage Storages []database.Storage
aiExecutor map[string]executor.Executor AiExecutor map[string]executor.Executor
} }
func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) { func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
var task *response.TaskInfo var task *response.TaskInfo
err := json.Unmarshal([]byte(val), &task) err := json.Unmarshal([]byte(val), &task)
if err != nil { if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error()) return nil, errors.New("create scheduler failed : " + err.Error())
} }
return &Scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin, participantRpc: participantRpc}, nil
} }
func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler { func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage, aiExecutor map[string]executor.Executor) *Scheduler {
return &Scheduler{resourceCollectors: resourceCollectors, storages: storages, aiExecutor: aiExecutor} return &Scheduler{ResourceCollectors: resourceCollectors, Storages: storages, AiExecutor: aiExecutor}
} }
func (s *Scheduler) SpecifyClusters() { func (s *Scheduler) SpecifyClusters() {
@ -90,7 +91,7 @@ func (s *Scheduler) MatchLabels() {
if count == 0 { if count == 0 {
ids = participantIds ids = participantIds
} }
ids = intersect(ids, participantIds) ids = common.Intersect(ids, participantIds)
count++ count++
} }
s.participantIds = ids s.participantIds = ids
@ -120,7 +121,7 @@ func (s *Scheduler) TempAssign() error {
return nil return nil
} }
func (s *Scheduler) AssignAndSchedule(ss scheduleService) error { func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error {
//// 已指定 ParticipantId //// 已指定 ParticipantId
//if s.task.ParticipantId != 0 { //if s.task.ParticipantId != 0 {
// return nil // return nil
@ -141,7 +142,7 @@ func (s *Scheduler) AssignAndSchedule(ss scheduleService) error {
// return nil // return nil
//} //}
strategy, err := ss.pickOptimalStrategy() strategy, err := ss.PickOptimalStrategy()
if err != nil { if err != nil {
return err return err
} }
@ -157,7 +158,7 @@ func (s *Scheduler) AssignAndSchedule(ss scheduleService) error {
// return nil // return nil
//} //}
err = ss.assignTask(clusters) err = ss.AssignTask(clusters)
if err != nil { if err != nil {
return err return err
} }
@ -170,7 +171,7 @@ func (s *Scheduler) SaveToDb() error {
for _, participantId := range s.participantIds { for _, participantId := range s.participantIds {
for _, resource := range s.task.Metadata { for _, resource := range s.task.Metadata {
structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, participantId) structForDb, err := s.subSchedule.GetNewStructForDb(s.task, resource, participantId)
if err != nil { if err != nil {
return err return err
} }

View File

@ -12,30 +12,31 @@
*/ */
package scheduler package schedulers
import ( import (
"errors" "errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/entity"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/proxy/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/service/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
) )
type AiScheduler struct { type AiScheduler struct {
yamlString string yamlString string
task *response.TaskInfo task *response.TaskInfo
*Scheduler *scheduler.Scheduler
} }
func NewAiScheduler(val string, scheduler *Scheduler) (*AiScheduler, error) { func NewAiScheduler(val string, scheduler *scheduler.Scheduler) (*AiScheduler, error) {
return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil return &AiScheduler{yamlString: val, Scheduler: scheduler}, nil
} }
func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
ai := models.Ai{ ai := models.Ai{
ParticipantId: participantId, ParticipantId: participantId,
TaskId: task.TaskId, TaskId: task.TaskId,
@ -46,7 +47,7 @@ func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin
return ai, nil return ai, nil
} }
func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
resources, err := as.findProvidersWithResource() resources, err := as.findProvidersWithResource()
if err != nil { if err != nil {
return nil, err return nil, err
@ -60,7 +61,7 @@ func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) {
Name: resource.Name, Name: resource.Name,
}) })
} }
strategy := strategies.NewReplicationStrategy(nil, 0) strategy := strategy.NewReplicationStrategy(nil, 0)
return strategy, nil return strategy, nil
} }
@ -68,7 +69,7 @@ func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) {
if err != nil { if err != nil {
return nil, nil return nil, nil
} }
strategy := strategies.NewPricingStrategy(task, providerList...) strategy := strategy.NewPricingStrategy(task, providerList...)
return strategy, nil return strategy, nil
} }
@ -77,7 +78,7 @@ func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider
return nil, nil return nil, nil
} }
func (as *AiScheduler) assignTask(clusters []*strategies.AssignedCluster) error { func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
if clusters == nil { if clusters == nil {
return errors.New("clusters is nil") return errors.New("clusters is nil")
} }
@ -87,7 +88,7 @@ func (as *AiScheduler) assignTask(clusters []*strategies.AssignedCluster) error
func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) { func (as *AiScheduler) findProvidersWithResource() ([]*collector.ResourceSpecs, error) {
var resourceSpecs []*collector.ResourceSpecs var resourceSpecs []*collector.ResourceSpecs
for _, resourceCollector := range as.resourceCollectors { for _, resourceCollector := range as.ResourceCollectors {
spec, err := resourceCollector.GetResourceSpecs() spec, err := resourceCollector.GetResourceSpecs()
if err != nil { if err != nil {
continue continue

View File

@ -12,7 +12,7 @@
*/ */
package scheduler package schedulers
import ( import (
"bytes" "bytes"
@ -20,7 +20,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"io" "io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -37,17 +37,17 @@ func NewCloudScheduler() *CloudScheduler {
return &CloudScheduler{} return &CloudScheduler{}
} }
func (cs *CloudScheduler) pickOptimalStrategy() (strategies.Strategy, error) { func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
task, providerList, err := cs.genTaskAndProviders() task, providerList, err := cs.genTaskAndProviders()
if err != nil { if err != nil {
return nil, nil return nil, nil
} }
//调度算法 //调度算法
strategy := strategies.NewPricingStrategy(task, providerList...) strategy := strategy.NewPricingStrategy(task, providerList...)
return strategy, nil return strategy, nil
} }
func (cs *CloudScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { func (cs *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID) cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID)
cloud.Id = utils.GenSnowflakeID() cloud.Id = utils.GenSnowflakeID()
cloud.NsID = task.NsID cloud.NsID = task.NsID
@ -117,6 +117,6 @@ func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*provi
return nil, providerList, nil return nil, providerList, nil
} }
func (cs *CloudScheduler) assignTask(clusters []*strategies.AssignedCluster) error { func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil return nil
} }

View File

@ -12,14 +12,14 @@
*/ */
package scheduler package schedulers
import ( import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategies" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
) )
@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *HpcScheduler {
return &HpcScheduler{yamlString: val} return &HpcScheduler{yamlString: val}
} }
func (h *HpcScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { func (h *HpcScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
hpc := models.Hpc{} hpc := models.Hpc{}
utils.Convert(task.Metadata, &hpc) utils.Convert(task.Metadata, &hpc)
hpc.Id = utils.GenSnowflakeID() hpc.Id = utils.GenSnowflakeID()
@ -42,7 +42,7 @@ func (h *HpcScheduler) getNewStructForDb(task *response.TaskInfo, resource strin
return hpc, nil return hpc, nil
} }
func (h *HpcScheduler) pickOptimalStrategy() (strategies.Strategy, error) { func (h *HpcScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return nil, nil return nil, nil
} }
@ -50,6 +50,6 @@ func (h *HpcScheduler) genTaskAndProviders(task *response.TaskInfo) (*providerPr
return nil, nil return nil, nil
} }
func (h *HpcScheduler) assignTask(clusters []*strategies.AssignedCluster) error { func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
return nil return nil
} }

View File

@ -0,0 +1,24 @@
package schedulers
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/strategy"
)
type VmScheduler struct {
}
func (v VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
//TODO implement me
panic("implement me")
}
func (v VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
//TODO implement me
panic("implement me")
}
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
//TODO implement me
panic("implement me")
}

View File

@ -1,4 +1,4 @@
package strategies package strategy
import ( import (
"github.com/pkg/errors" "github.com/pkg/errors"

View File

@ -12,7 +12,7 @@
*/ */
package strategies package strategy
import ( import (
"errors" "errors"

View File

@ -1,4 +1,4 @@
package strategies package strategy
type StaticWeightStrategy struct { type StaticWeightStrategy struct {
// TODO: add fields // TODO: add fields

View File

@ -1,4 +1,4 @@
package strategies package strategy
type Strategy interface { type Strategy interface {
Schedule() ([]*AssignedCluster, error) Schedule() ([]*AssignedCluster, error)