调度部分调整
Former-commit-id: 62ca70a4a1aaed22d3badbfefac73a7033a5a545
This commit is contained in:
parent
27cb0d6fbe
commit
81f1a1ce1b
|
@ -28,11 +28,11 @@ func NewRegisterClusterLogic(ctx context.Context, svcCtx *svc.ServiceContext) *R
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *RegisterClusterLogic) RegisterCluster(req *types.RegisterClusterReq) (*types.CloudResp, error) {
|
func (l *RegisterClusterLogic) RegisterCluster(req *types.RegisterClusterReq) (*types.CloudResp, error) {
|
||||||
var ms []models.ScParticipantPhyInfo
|
var phyInfos []models.ScParticipantPhyInfo
|
||||||
var resp types.CloudResp
|
var resp types.CloudResp
|
||||||
|
|
||||||
l.svcCtx.DbEngin.Raw("select * from sc_participant_phy_info where `name` = ?", req.Name).Scan(&ms)
|
l.svcCtx.DbEngin.Raw("select * from sc_participant_phy_info where `name` = ?", req.Name).Scan(&phyInfos)
|
||||||
if len(ms) != 0 {
|
if len(phyInfos) != 0 {
|
||||||
resp.Code = "400"
|
resp.Code = "400"
|
||||||
resp.Msg = "cluster name already exist"
|
resp.Msg = "cluster name already exist"
|
||||||
resp.Data = ""
|
resp.Data = ""
|
||||||
|
@ -40,7 +40,6 @@ func (l *RegisterClusterLogic) RegisterCluster(req *types.RegisterClusterReq) (*
|
||||||
}
|
}
|
||||||
|
|
||||||
participant := models.ScParticipantPhyInfo{}
|
participant := models.ScParticipantPhyInfo{}
|
||||||
|
|
||||||
participant.Token = req.Token
|
participant.Token = req.Token
|
||||||
participant.Name = req.Name
|
participant.Name = req.Name
|
||||||
participant.Address = req.Address
|
participant.Address = req.Address
|
||||||
|
@ -50,10 +49,21 @@ func (l *RegisterClusterLogic) RegisterCluster(req *types.RegisterClusterReq) (*
|
||||||
participant.CreatedTime = time.Now()
|
participant.CreatedTime = time.Now()
|
||||||
participant.UpdatedTime = time.Now()
|
participant.UpdatedTime = time.Now()
|
||||||
|
|
||||||
|
labelInfo := models.ScParticipantLabelInfo{}
|
||||||
|
labelInfo.Id = utils.GenSnowflakeID()
|
||||||
|
labelInfo.ParticipantId = participant.Id
|
||||||
|
labelInfo.CreatedTime = time.Now()
|
||||||
|
labelInfo.Key = "cloud"
|
||||||
|
labelInfo.Value = "sealos"
|
||||||
|
|
||||||
tx := l.svcCtx.DbEngin.Create(&participant)
|
tx := l.svcCtx.DbEngin.Create(&participant)
|
||||||
if tx.Error != nil {
|
if tx.Error != nil {
|
||||||
return nil, tx.Error
|
return nil, tx.Error
|
||||||
}
|
}
|
||||||
|
tx2 := l.svcCtx.DbEngin.Create(&labelInfo)
|
||||||
|
if tx2.Error != nil {
|
||||||
|
return nil, tx.Error
|
||||||
|
}
|
||||||
resp.Code = string(200)
|
resp.Code = string(200)
|
||||||
resp.Msg = "success"
|
resp.Msg = "success"
|
||||||
resp.Data = "participantId:" + strconv.FormatInt(participant.Id, 10)
|
resp.Data = "participantId:" + strconv.FormatInt(participant.Id, 10)
|
||||||
|
|
|
@ -43,6 +43,8 @@ func (l *CloudMq) Consume(val string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//通过标签匹配筛选出集群范围
|
||||||
schdl.MatchLabels()
|
schdl.MatchLabels()
|
||||||
|
|
||||||
// 调度算法
|
// 调度算法
|
||||||
|
|
|
@ -3340,7 +3340,6 @@ type RegisterClusterReq struct {
|
||||||
Name string `form:"name"` // 名称
|
Name string `form:"name"` // 名称
|
||||||
Address string `form:"address"` // 地址
|
Address string `form:"address"` // 地址
|
||||||
Token string `form:"token"` // 数算集群token
|
Token string `form:"token"` // 数算集群token
|
||||||
Type string `form:"type"` // 参与者类型:CLOUD-数算集群;AI-智算集群;HPC-超算集群
|
|
||||||
MetricsUrl string `form:"metricsUrl"` //监控url
|
MetricsUrl string `form:"metricsUrl"` //监控url
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3348,22 +3347,6 @@ type DeleteClusterReq struct {
|
||||||
Name string `form:"name"` // 名称
|
Name string `form:"name"` // 名称
|
||||||
}
|
}
|
||||||
|
|
||||||
type ListParticipantResp struct {
|
|
||||||
Code string `json:"code"`
|
|
||||||
Msg string `json:"msg"`
|
|
||||||
Data []ParticipantResp `json:"data"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ParticipantResp struct {
|
|
||||||
Id int64 `json:"id"` // id
|
|
||||||
Name string `json:"name"` // 名称
|
|
||||||
Address string `json:"address"` // 地址
|
|
||||||
Token string `json:"token"` // 数算集群token
|
|
||||||
Type string `json:"type"` // 参与者类型:CLOUD-数算集群;AI-智算集群;HPC-超算集群
|
|
||||||
ParticipantId int64 `json:"name"` // participant id
|
|
||||||
MetricsUrl string `json:"metricsUrl"` //监控url
|
|
||||||
}
|
|
||||||
|
|
||||||
type CloudResp struct {
|
type CloudResp struct {
|
||||||
Code string `json:"code"`
|
Code string `json:"code"`
|
||||||
Msg string `json:"msg"`
|
Msg string `json:"msg"`
|
||||||
|
|
|
@ -18,10 +18,11 @@ import "fmt"
|
||||||
|
|
||||||
type TaskInfo struct {
|
type TaskInfo struct {
|
||||||
TaskId int64 `json:"taskId,optional"`
|
TaskId int64 `json:"taskId,optional"`
|
||||||
NsID string `json:"nsID"`
|
NsID string `json:"nsID"` //云际平台传入namespace
|
||||||
TaskType string `json:"taskType,optional"`
|
TaskType string `json:"taskType,optional"`
|
||||||
MatchLabels map[string]string `json:"matchLabels"`
|
MatchLabels map[string]string `json:"matchLabels"`
|
||||||
ParticipantId int64 `json:"participantId"`
|
ParticipantId int64 `json:"participantId,optional"` //湘江预留字段
|
||||||
|
Clusters []string `json:"clusters,optional"` //云际平台传入集群名称列表
|
||||||
TenantId int64 `json:"tenantId"`
|
TenantId int64 `json:"tenantId"`
|
||||||
Metadata interface{} `json:"metadata"`
|
Metadata interface{} `json:"metadata"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ func NewAiScheduler(val string) *aiScheduler {
|
||||||
return &aiScheduler{yamlString: val}
|
return &aiScheduler{yamlString: val}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) {
|
func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) {
|
||||||
ai := models.Ai{
|
ai := models.Ai{
|
||||||
ParticipantId: participantId,
|
ParticipantId: participantId,
|
||||||
TaskId: task.TaskId,
|
TaskId: task.TaskId,
|
||||||
|
|
|
@ -193,8 +193,13 @@ func computeHighDegree(task *Task, resourcesolution []int, providerList []*Provi
|
||||||
magnitude2 := mat.Norm(nowLeft, 2)
|
magnitude2 := mat.Norm(nowLeft, 2)
|
||||||
|
|
||||||
// 计算余弦相似度
|
// 计算余弦相似度
|
||||||
cosine_similarity := dot_product / (magnitude1 * magnitude2)
|
//临时处理被除数为0的特殊情况
|
||||||
highDegreeSum += cosine_similarity
|
var cosineSimilarity = 0.0
|
||||||
|
if magnitude1 != 0 && magnitude2 != 0 {
|
||||||
|
cosineSimilarity = dot_product / (magnitude1 * magnitude2)
|
||||||
|
}
|
||||||
|
|
||||||
|
highDegreeSum += cosineSimilarity
|
||||||
}
|
}
|
||||||
|
|
||||||
return highDegreeSum / float64(len(providerList))
|
return highDegreeSum / float64(len(providerList))
|
||||||
|
|
|
@ -37,15 +37,6 @@ func NewCloudScheduler() *cloudScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) {
|
func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) {
|
||||||
////参数为空,返回 nil
|
|
||||||
//if len(providers) == 0 || task == nil {
|
|
||||||
// return nil, errors.New("算法获取参数为空")
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
////仅有一个provider,返回nil
|
|
||||||
//if len(providers) == 1 {
|
|
||||||
// return nil, nil
|
|
||||||
//}
|
|
||||||
|
|
||||||
//调度算法
|
//调度算法
|
||||||
strategy := algo.NewK8sStrategy(task, providers...)
|
strategy := algo.NewK8sStrategy(task, providers...)
|
||||||
|
@ -56,8 +47,9 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg
|
||||||
return taskResult.MaxscoreStrategy, nil
|
return taskResult.MaxscoreStrategy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) {
|
func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) {
|
||||||
bytes, err := json.Marshal(task.Metadata)
|
bytes, err := json.Marshal(task.Metadata)
|
||||||
|
//replica 需要替换到yaml中
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -118,7 +110,8 @@ func (cs *cloudScheduler) genTaskAndProviders(task *response.TaskInfo, dbEngin *
|
||||||
providerList = append(providerList, provider)
|
providerList = append(providerList, provider)
|
||||||
}
|
}
|
||||||
|
|
||||||
t := algo.NewTask(0, 1, 2, 75120000, 301214500, 1200, 2, 6, 2000)
|
replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
|
||||||
|
t := algo.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000)
|
||||||
|
|
||||||
return t, providerList
|
return t, providerList
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type scheduleService interface {
|
type scheduleService interface {
|
||||||
getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error)
|
getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error)
|
||||||
pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error)
|
pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error)
|
||||||
genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider)
|
genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider)
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *hpcScheduler {
|
||||||
return &hpcScheduler{yamlString: val}
|
return &hpcScheduler{yamlString: val}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64) (interface{}, error) {
|
func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) {
|
||||||
hpc := models.Hpc{}
|
hpc := models.Hpc{}
|
||||||
utils.Convert(task.Metadata, &hpc)
|
utils.Convert(task.Metadata, &hpc)
|
||||||
hpc.Id = utils.GenSnowflakeID()
|
hpc.Id = utils.GenSnowflakeID()
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
package scheduler
|
package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/zeromicro/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
@ -47,27 +46,42 @@ func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB,
|
||||||
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc, result: make(map[ParticipantId]Replicas, 0)}, nil
|
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc, result: make(map[ParticipantId]Replicas, 0)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *scheduler) SepcifyClusters() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (s *scheduler) MatchLabels() {
|
func (s *scheduler) MatchLabels() {
|
||||||
// 已指定 ParticipantId
|
|
||||||
|
var ids []int64
|
||||||
|
count := 0
|
||||||
|
|
||||||
|
// 已指定 ParticipantId 直接不走标签匹配
|
||||||
if s.task.ParticipantId != 0 {
|
if s.task.ParticipantId != 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var ids []int64
|
// 如果已指定集群名,通过数据库查询后返回p端ip列表
|
||||||
count := 0
|
if len(s.task.Clusters) != 0 {
|
||||||
|
for i, _ := range s.task.Clusters {
|
||||||
|
clusterName := s.task.Clusters[i]
|
||||||
|
var participantId int64
|
||||||
|
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` = ?", clusterName).Scan(&participantId)
|
||||||
|
s.participantIds = append(s.participantIds, participantId)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//如果均未指定,则通过标签匹配
|
||||||
for key := range s.task.MatchLabels {
|
for key := range s.task.MatchLabels {
|
||||||
var participantIds []int64
|
var participantIds []int64
|
||||||
s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
|
s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
ids = participantIds
|
ids = participantIds
|
||||||
}
|
}
|
||||||
//if len(participantId) == 0 || len(ids) == 0 {
|
|
||||||
// return nil, nil
|
|
||||||
//}
|
|
||||||
ids = intersect(ids, participantIds)
|
ids = intersect(ids, participantIds)
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
s.participantIds = micsSlice(ids, 1)
|
s.participantIds = ids
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *scheduler) AssignAndSchedule() error {
|
func (s *scheduler) AssignAndSchedule() error {
|
||||||
|
@ -75,18 +89,19 @@ func (s *scheduler) AssignAndSchedule() error {
|
||||||
if s.task.ParticipantId != 0 {
|
if s.task.ParticipantId != 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
// 标签匹配以及后,未找到ParticipantIds
|
||||||
// 标签匹配后,未找到ParticipantIds
|
|
||||||
if len(s.participantIds) == 0 {
|
if len(s.participantIds) == 0 {
|
||||||
return errors.New("未找到匹配的ParticipantIds")
|
return errors.New("未找到匹配的ParticipantIds")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParticipantIds 返回唯一值
|
// 指定或者标签匹配的结果只有一个集群,给任务信息指定
|
||||||
if len(s.participantIds) == 1 {
|
if len(s.participantIds) == 1 {
|
||||||
if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) {
|
|
||||||
return errors.Errorf("集群 %d 不可用", s.participantIds[0])
|
|
||||||
}
|
|
||||||
s.task.ParticipantId = s.participantIds[0]
|
s.task.ParticipantId = s.participantIds[0]
|
||||||
|
replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
|
||||||
|
result := make(map[ParticipantId]Replicas)
|
||||||
|
result[ParticipantId(s.participantIds[0])] = Replicas(replicas)
|
||||||
|
s.result = result
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,9 +113,6 @@ func (s *scheduler) AssignAndSchedule() error {
|
||||||
|
|
||||||
//集群数量不满足,指定到标签匹配后第一个集群
|
//集群数量不满足,指定到标签匹配后第一个集群
|
||||||
if len(providerList) < 2 {
|
if len(providerList) < 2 {
|
||||||
if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) {
|
|
||||||
return errors.Errorf("集群 %d 不可用", s.participantIds[0])
|
|
||||||
}
|
|
||||||
s.task.ParticipantId = s.participantIds[0]
|
s.task.ParticipantId = s.participantIds[0]
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -121,18 +133,18 @@ func (s *scheduler) AssignAndSchedule() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *scheduler) SaveToDb() error {
|
func (s *scheduler) SaveToDb() error {
|
||||||
if s.task.ParticipantId == 0 {
|
|
||||||
return errors.New("participantId 为空")
|
for key, value := range s.result {
|
||||||
}
|
structForDb, err := s.scheduleService.getNewStructForDb(s.task, int64(key), int32(value))
|
||||||
structForDb, err := s.scheduleService.getNewStructForDb(s.task, s.task.ParticipantId)
|
if err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return err
|
}
|
||||||
}
|
|
||||||
tx := s.dbEngin.Create(structForDb)
|
tx := s.dbEngin.Create(structForDb)
|
||||||
if tx.Error != nil {
|
if tx.Error != nil {
|
||||||
// todo 保存失败数据
|
logx.Error(tx.Error)
|
||||||
logx.Error(tx.Error)
|
return tx.Error
|
||||||
return tx.Error
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -144,66 +156,9 @@ func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider, err
|
||||||
return nil, nil, errors.New("获取集群失败")
|
return nil, nil, errors.New("获取集群失败")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 过滤可用集群
|
|
||||||
err := s.filterAvailableProviders(&providerList)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return task, providerList, nil
|
return task, providerList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *scheduler) checkIfParticipantAvailable(id ParticipantId) bool {
|
|
||||||
|
|
||||||
workingIds, err := s.getAvailableParticipantIds()
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return contains(workingIds, int64(id))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduler) getAvailableParticipantIds() ([]int64, error) {
|
|
||||||
|
|
||||||
resp, err := s.participantRpc.ListParticipant(context.Background(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.Code != 200 {
|
|
||||||
return nil, errors.New("集群列表查询失败")
|
|
||||||
}
|
|
||||||
|
|
||||||
var workingIds []int64
|
|
||||||
for _, e := range resp.Data {
|
|
||||||
if e.ClientState == "UNKNOWN" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
workingIds = append(workingIds, e.ParticipantId)
|
|
||||||
}
|
|
||||||
|
|
||||||
return workingIds, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduler) filterAvailableProviders(providerList *[]*algo.Provider) error {
|
|
||||||
|
|
||||||
workingIds, err := s.getAvailableParticipantIds()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var tempList []*algo.Provider
|
|
||||||
for _, provider := range *providerList {
|
|
||||||
if contains(workingIds, provider.Pid) && contains(s.participantIds, provider.Pid) {
|
|
||||||
tempList = append(tempList, provider)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
*providerList = tempList
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList []*algo.Provider) error {
|
func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList []*algo.Provider) error {
|
||||||
|
|
||||||
if len(strategy.Tasksolution) == 0 {
|
if len(strategy.Tasksolution) == 0 {
|
||||||
|
@ -223,12 +178,3 @@ func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func contains(s []int64, e int64) bool {
|
|
||||||
for _, a := range s {
|
|
||||||
if a == e {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue