屏蔽调度算法,采用随机分发策略

Former-commit-id: e9df36b8275c018dd91771a0a80113f9eefcd410
This commit is contained in:
zhouqunjie 2023-11-25 15:17:20 +08:00
parent 56a60801b5
commit 876ce432eb
4 changed files with 61 additions and 16 deletions

View File

@ -47,11 +47,11 @@ func (l *CloudMq) Consume(val string) error {
//通过标签匹配筛选出集群范围
schdl.MatchLabels()
// 调度算法
err = schdl.AssignAndSchedule()
if err != nil {
return err
}
//todo 屏蔽调度算法,
//err = schdl.AssignAndSchedule()
//if err != nil {
// return err
//}
// 存储数据
err = schdl.SaveToDb()

View File

@ -49,6 +49,7 @@ type (
YamlString string `db:"yaml_string"`
Result string `db:"result"` // 运行结果
NsID string `db:"ns_id"`
Replica int32 `db:"replica"`
}
)

View File

@ -23,6 +23,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"io"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
@ -49,14 +50,30 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg
func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) {
bytes, err := json.Marshal(task.Metadata)
//replica 需要替换到yaml中
var bytesNew []byte
//replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
if err != nil {
return nil, err
}
cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId)
switch cloud.Kind {
case "Deployment":
deployment := v1.Deployment{}
json.Unmarshal(bytes, &deployment)
deployment.Spec.Replicas = &replica
bytesNew, _ = json.Marshal(deployment)
case "StatefulSet":
statefulSet := v1.StatefulSet{}
json.Unmarshal(bytes, &statefulSet)
statefulSet.Spec.Replicas = &replica
bytesNew, _ = json.Marshal(statefulSet)
}
cloud.Replica = replica
cloud.Id = utils.GenSnowflakeID()
cloud.YamlString = string(bytes)
cloud.YamlString = string(bytesNew)
cloud.NsID = task.NsID
cloud.ParticipantId = participantId
return cloud, nil
}

View File

@ -22,6 +22,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm"
"strings"
)
type Replicas int64
@ -70,18 +71,44 @@ func (s *scheduler) MatchLabels() {
}
return
}
// 如果未指定集群名,通过用户名(nsID)筛选出用户对应的集群
if len(s.task.NsID) != 0 {
var clusters string
s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters)
//如果均未指定,则通过标签匹配
for key := range s.task.MatchLabels {
var participantIds []int64
s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
if count == 0 {
ids = participantIds
clusterArr := strings.Split(clusters, ",")
for i, _ := range clusterArr {
clusterName := clusterArr[i]
var participantId int64
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` = ?", clusterName).Scan(&participantId)
s.participantIds = append(s.participantIds, participantId)
}
ids = intersect(ids, participantIds)
count++
} else {
//如果均未指定,则通过标签匹配
for key := range s.task.MatchLabels {
var participantIds []int64
s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
if count == 0 {
ids = participantIds
}
ids = intersect(ids, participantIds)
count++
}
s.participantIds = ids
}
s.participantIds = ids
//todo 屏蔽调度算法目前直接在matchlabel阶段随机分配到不同集群
replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
array := make([]int, len(s.participantIds))
for i := range array {
array[i] = int(replicas) / len(s.participantIds)
s.result[ParticipantId(s.participantIds[i])] = Replicas(array[i])
}
array[0] += int(replicas) % len(s.participantIds)
s.result[ParticipantId(s.participantIds[0])] = Replicas(array[0])
return
}
func (s *scheduler) AssignAndSchedule() error {