调度结构修改6

Former-commit-id: f8bd112cafe5adf9aea373ac9f884a02ca44d58e
This commit is contained in:
tzwang 2023-08-28 17:39:03 +08:00
parent 61a3db1551
commit a49f365a41
6 changed files with 57 additions and 21 deletions

View File

@ -25,11 +25,11 @@ func NewScheduleAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleA
func (l *ScheduleAiMq) Consume(_, val string) error {
// 接受消息, 根据标签筛选过滤
aiSchdl := scheduler.NewAiScheduler(val)
schdl, err := scheduler.NewScheduler(aiSchdl, val)
schdl, err := scheduler.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin)
if err != nil {
return err
}
schdl.MatchLabels(l.svcCtx.DbEngin)
schdl.MatchLabels()
// 调度算法
err = schdl.AssignAndSchedule()
@ -38,7 +38,7 @@ func (l *ScheduleAiMq) Consume(_, val string) error {
}
// 存储数据
err = schdl.SaveToDb(l.svcCtx.DbEngin)
err = schdl.SaveToDb()
if err != nil {
return err
}

View File

@ -25,11 +25,11 @@ func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *Schedu
func (l *ScheduleCloudMq) Consume(_, val string) error {
// 接受消息, 根据标签筛选过滤
cloudScheduler := scheduler.NewCloudScheduler()
schdl, err := scheduler.NewScheduler(cloudScheduler, val)
schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin)
if err != nil {
return err
}
schdl.MatchLabels(l.svcCtx.DbEngin)
schdl.MatchLabels()
// 调度算法
err = schdl.AssignAndSchedule()
@ -38,7 +38,7 @@ func (l *ScheduleCloudMq) Consume(_, val string) error {
}
// 存储数据
err = schdl.SaveToDb(l.svcCtx.DbEngin)
err = schdl.SaveToDb()
if err != nil {
return err
}

View File

@ -25,11 +25,11 @@ func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *Schedule
func (l *ScheduleHpcMq) Consume(_, val string) error {
// 接受消息, 根据标签筛选过滤
hpcSchdl := scheduler.NewHpcScheduler(val)
schdl, err := scheduler.NewScheduler(hpcSchdl, val)
schdl, err := scheduler.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin)
if err != nil {
return err
}
schdl.MatchLabels(l.svcCtx.DbEngin)
schdl.MatchLabels()
// 调度算法
err = schdl.AssignAndSchedule()
@ -38,7 +38,7 @@ func (l *ScheduleHpcMq) Consume(_, val string) error {
}
// 存储数据
err = schdl.SaveToDb(l.svcCtx.DbEngin)
err = schdl.SaveToDb()
if err != nil {
return err
}

View File

@ -22,6 +22,12 @@ func NewCloudScheduler() *cloudScheduler {
}
func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) {
//参数为空返回 nil
if len(providers) == 0 || task == nil {
return nil, nil
}
//调度算法
strategy := algo.NewK8sStrategy(task, providers...)
taskResult, err := algo.ScheduleWithFullCollaboration(strategy, strategy.ProviderList)
if err != nil {

View File

@ -12,6 +12,13 @@ type scheduleService interface {
pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error)
}
type providerParams struct {
Disk_avail float64
Mem_avail float64
Cpu_avail float64
Participant_id int64
}
// 求交集
func intersect(slice1, slice2 []int64) []int64 {
m := make(map[int64]int)

View File

@ -13,18 +13,19 @@ type scheduler struct {
task *types.TaskInfo
participantIds []int64
scheduleService scheduleService
dbEngin *gorm.DB
}
func NewScheduler(scheduleService scheduleService, val string) (*scheduler, error) {
func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB) (*scheduler, error) {
var task *types.TaskInfo
err := json.Unmarshal([]byte(val), &task)
if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error())
}
return &scheduler{task: task, scheduleService: scheduleService}, nil
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin}, nil
}
func (s *scheduler) MatchLabels(dbEngin *gorm.DB) {
func (s *scheduler) MatchLabels() {
// 已指定 ParticipantId
if s.task.ParticipantId != 0 {
return
@ -34,7 +35,7 @@ func (s *scheduler) MatchLabels(dbEngin *gorm.DB) {
count := 0
for key := range s.task.MatchLabels {
var participantIds []int64
dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
if count == 0 {
ids = participantIds
}
@ -66,10 +67,25 @@ func (s *scheduler) AssignAndSchedule() error {
return nil
}
//调度结果 ParticipantId
for i, e := range strategy.ResourcePerTask {
if len(e) != 0 {
for _, ej := range e {
if ej == 1 {
s.task.ParticipantId = providerList[i].Pid
break
}
}
} else {
continue
}
}
return nil
}
func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error {
func (s *scheduler) SaveToDb() error {
if s.task.ParticipantId == 0 {
return errors.New("participantId 为空")
}
@ -77,7 +93,7 @@ func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error {
if err != nil {
return err
}
tx := dbEngin.Create(structForDb)
tx := s.dbEngin.Create(structForDb)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error
@ -86,10 +102,17 @@ func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error {
}
func (s *scheduler) genTaskAndProviders() (*algo.Task, []*algo.Provider) {
//var providerList []*algo.Provider
//for _, id := range s.participantIds {
// provider := algo.NewProvider(id, 100, 200, 200, 0.0, 0.0, 0.0)
//
//}
return nil, nil
proParams := []providerParams{}
sqlstr := "SELECT SUM(a.disk_avail) as disk_avail,SUM(a.mem_avail) as mem_avail,SUM(a.cpu_total * a.cpu_usable) as cpu_avail,participant_id from (SELECT * from sc_node_avail_info where created_time in (SELECT MAX(created_time) as time from sc_node_avail_info where deleted_flag = 0 GROUP BY participant_id,node_name)) a GROUP BY a.participant_id"
s.dbEngin.Raw(sqlstr).Scan(&proParams)
var providerList []*algo.Provider
for _, p := range proParams {
provider := algo.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0)
providerList = append(providerList, provider)
}
t := algo.NewTask(0, 1, 2, 75120000, 301214500, 1200, 2, 6, 2000)
return t, providerList
}