pcm-coordinator/api/internal/pkg/scheduler/scheduler.go

130 lines
3.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package scheduler
import (
"encoding/json"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/algo"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gorm.io/gorm"
)
type scheduler struct {
task *types.TaskInfo
participantIds []int64
scheduleService scheduleService
dbEngin *gorm.DB
}
func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB) (*scheduler, error) {
var task *types.TaskInfo
err := json.Unmarshal([]byte(val), &task)
if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error())
}
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin}, nil
}
func (s *scheduler) MatchLabels() {
// 已指定 ParticipantId
if s.task.ParticipantId != 0 {
return
}
var ids []int64
count := 0
for key := range s.task.MatchLabels {
var participantIds []int64
s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
if count == 0 {
ids = participantIds
}
//if len(participantId) == 0 || len(ids) == 0 {
// return nil, nil
//}
ids = intersect(ids, participantIds)
count++
}
s.participantIds = micsSlice(ids, 1)
}
func (s *scheduler) AssignAndSchedule() error {
// 已指定 ParticipantId
if s.task.ParticipantId != 0 {
return nil
}
// 标签匹配后未找到ParticipantIds
if len(s.participantIds) == 0 {
return errors.New("未找到匹配的ParticipantIds")
}
// ParticipantIds 返回唯一值
if len(s.participantIds) == 1 {
s.task.ParticipantId = s.participantIds[0]
return nil
}
//生成算法所需参数
task, providerList := s.genTaskAndProviders()
strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...)
if err != nil {
return err
}
if strategy == nil {
s.task.ParticipantId = s.participantIds[0]
return nil
}
//调度结果 ParticipantId
for i, e := range strategy.ResourcePerTask {
if len(e) != 0 {
for _, ej := range e {
if ej == 1 {
s.task.ParticipantId = providerList[i].Pid
break
}
}
} else {
continue
}
}
return nil
}
func (s *scheduler) SaveToDb() error {
if s.task.ParticipantId == 0 {
return errors.New("participantId 为空")
}
structForDb, err := s.scheduleService.getNewStructForDb(s.task, s.task.ParticipantId)
if err != nil {
return err
}
tx := s.dbEngin.Create(structForDb)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error
}
return nil
}
func (s *scheduler) genTaskAndProviders() (*algo.Task, []*algo.Provider) {
proParams := []providerParams{}
sqlstr := "SELECT SUM(a.disk_avail) as disk_avail,SUM(a.mem_avail) as mem_avail,SUM(a.cpu_total * a.cpu_usable) as cpu_avail,participant_id from (SELECT * from sc_node_avail_info where created_time in (SELECT MAX(created_time) as time from sc_node_avail_info where deleted_flag = 0 GROUP BY participant_id,node_name)) a GROUP BY a.participant_id"
s.dbEngin.Raw(sqlstr).Scan(&proParams)
var providerList []*algo.Provider
for _, p := range proParams {
provider := algo.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0)
providerList = append(providerList, provider)
}
t := algo.NewTask(0, 1, 2, 75120000, 301214500, 1200, 2, 6, 2000)
return t, providerList
}