scheduler refactor updated

Former-commit-id: 95e2b32695d5f67ecdee979ccc2e483fc8847f40
This commit is contained in:
tzwang 2024-01-23 11:42:10 +08:00
parent 6c8dae6a78
commit 0c87a541c5
4 changed files with 65 additions and 60 deletions

View File

@ -17,7 +17,8 @@ package mqs
import ( import (
"context" "context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
scheduler2 "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector"
) )
/* /*
@ -27,34 +28,30 @@ Listening to the payment flow status change notification message queue
type AiQueue struct { type AiQueue struct {
ctx context.Context ctx context.Context
svcCtx *svc.ServiceContext svcCtx *svc.ServiceContext
scheduler *scheduler.Scheduler
} }
func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
acCollector := &collector.ShuguangAiCollector{ACRpc: svcCtx.ACRpc}
resourceCollectiors := []collector.ResourceCollector{acCollector}
return &AiQueue{ return &AiQueue{
ctx: ctx, ctx: ctx,
svcCtx: svcCtx, svcCtx: svcCtx,
scheduler: scheduler.NewScheduler2(resourceCollectiors, nil),
} }
} }
func (l *AiQueue) Consume(val string) error { func (l *AiQueue) Consume(val string) error {
// 接受消息, 根据标签筛选过滤 // 接受消息, 根据标签筛选过滤
aiSchdl := scheduler2.NewAiScheduler(val) aiSchdl := scheduler.NewAiScheduler(val, nil)
schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin, nil)
if err != nil { //schdl.MatchLabels()
return err
}
schdl.MatchLabels()
// 调度算法 // 调度算法
err = schdl.AssignAndSchedule() err := l.scheduler.AssignAndSchedule(aiSchdl)
if err != nil { if err != nil {
return err return err
} }
// 存储数据
err = schdl.SaveToDb()
if err != nil {
return err
}
return nil return nil
} }

View File

@ -25,11 +25,13 @@ import (
type AiScheduler struct { type AiScheduler struct {
yamlString string yamlString string
collector collector.ResourceCollector resourceCollectors []collector.ResourceCollector
task *response.TaskInfo
//storelink
} }
func NewAiScheduler(val string) *AiScheduler { func NewAiScheduler(val string, resourceCollectors []collector.ResourceCollector) *AiScheduler {
return &AiScheduler{yamlString: val} return &AiScheduler{yamlString: val, resourceCollectors: resourceCollectors}
} }
func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
@ -44,9 +46,8 @@ func (as *AiScheduler) getNewStructForDb(task *response.TaskInfo, resource strin
} }
func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) { func (as *AiScheduler) pickOptimalStrategy() (strategies.Strategy, error) {
//a, b := as.genTaskAndProviders() strategy := strategies.NewReplicationStrategy(nil, 0)
return strategy, nil
return nil, nil
} }
func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) { func (as *AiScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider) {

View File

@ -5,6 +5,7 @@ type ResourceCollector interface {
} }
type ResourceSpecs struct { type ResourceSpecs struct {
ParticipantId int64
CpuAvail float64 CpuAvail float64
MemAvail float64 MemAvail float64
DiskAvail float64 DiskAvail float64

View File

@ -20,13 +20,14 @@ import (
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/collector"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/database"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm" "gorm.io/gorm"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
"strings" "strings"
) )
type scheduler struct { type Scheduler struct {
task *response.TaskInfo task *response.TaskInfo
participantIds []int64 participantIds []int64
scheduleService scheduleService scheduleService scheduleService
@ -34,19 +35,24 @@ type scheduler struct {
result []string //pID:子任务yamlstring 键值对 result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService participantRpc participantservice.ParticipantService
resourceCollectors []collector.ResourceCollector resourceCollectors []collector.ResourceCollector
storages []database.Storage
//storelink //storelink
} }
func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*scheduler, error) { func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
var task *response.TaskInfo var task *response.TaskInfo
err := json.Unmarshal([]byte(val), &task) err := json.Unmarshal([]byte(val), &task)
if err != nil { if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error()) return nil, errors.New("create scheduler failed : " + err.Error())
} }
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil return &Scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil
} }
func (s *scheduler) SpecifyClusters() { func NewScheduler2(resourceCollectors []collector.ResourceCollector, storages []database.Storage) *Scheduler {
return &Scheduler{resourceCollectors: resourceCollectors, storages: storages}
}
func (s *Scheduler) SpecifyClusters() {
// 如果已指定集群名通过数据库查询后返回p端ip列表 // 如果已指定集群名通过数据库查询后返回p端ip列表
if len(s.task.Clusters) != 0 { if len(s.task.Clusters) != 0 {
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds) s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds)
@ -54,7 +60,7 @@ func (s *scheduler) SpecifyClusters() {
} }
} }
func (s *scheduler) SpecifyNsID() { func (s *Scheduler) SpecifyNsID() {
// 未指定集群名只指定nsID // 未指定集群名只指定nsID
if len(s.task.Clusters) == 0 { if len(s.task.Clusters) == 0 {
if len(s.task.NsID) != 0 { if len(s.task.NsID) != 0 {
@ -70,7 +76,7 @@ func (s *scheduler) SpecifyNsID() {
} }
} }
func (s *scheduler) MatchLabels() { func (s *Scheduler) MatchLabels() {
var ids []int64 var ids []int64
count := 0 count := 0
@ -93,7 +99,7 @@ func (s *scheduler) MatchLabels() {
} }
// TempAssign todo 屏蔽原调度算法 // TempAssign todo 屏蔽原调度算法
func (s *scheduler) TempAssign() error { func (s *Scheduler) TempAssign() error {
//需要判断task中的资源类型针对metadata中的多个kind做不同处理 //需要判断task中的资源类型针对metadata中的多个kind做不同处理
//输入副本数和集群列表最终结果输出为pID对应副本数量列表针对多个kind需要做拆分和重新拼接组合 //输入副本数和集群列表最终结果输出为pID对应副本数量列表针对多个kind需要做拆分和重新拼接组合
@ -113,28 +119,28 @@ func (s *scheduler) TempAssign() error {
return nil return nil
} }
func (s *scheduler) AssignAndSchedule() error { func (s *Scheduler) AssignAndSchedule(ss scheduleService) error {
// 已指定 ParticipantId //// 已指定 ParticipantId
if s.task.ParticipantId != 0 { //if s.task.ParticipantId != 0 {
return nil // return nil
} //}
// 标签匹配以及后未找到ParticipantIds //// 标签匹配以及后未找到ParticipantIds
if len(s.participantIds) == 0 { //if len(s.participantIds) == 0 {
return errors.New("未找到匹配的ParticipantIds") // return errors.New("未找到匹配的ParticipantIds")
} //}
//
//// 指定或者标签匹配的结果只有一个集群,给任务信息指定
//if len(s.participantIds) == 1 {
// s.task.ParticipantId = s.participantIds[0]
// //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
// //result := make(map[int64]string)
// //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
// //s.result = result
//
// return nil
//}
// 指定或者标签匹配的结果只有一个集群,给任务信息指定 strategy, err := ss.pickOptimalStrategy()
if len(s.participantIds) == 1 {
s.task.ParticipantId = s.participantIds[0]
//replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
//result := make(map[int64]string)
//result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
//s.result = result
return nil
}
strategy, err := s.scheduleService.pickOptimalStrategy()
if err != nil { if err != nil {
return err return err
} }
@ -150,7 +156,7 @@ func (s *scheduler) AssignAndSchedule() error {
// return nil // return nil
//} //}
err = s.scheduleService.assignTask(clusters) err = ss.assignTask(clusters)
if err != nil { if err != nil {
return err return err
} }
@ -158,7 +164,7 @@ func (s *scheduler) AssignAndSchedule() error {
return nil return nil
} }
func (s *scheduler) SaveToDb() error { func (s *Scheduler) SaveToDb() error {
for _, participantId := range s.participantIds { for _, participantId := range s.participantIds {