diff --git a/api/internal/mqs/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go index 437d0e9e..c74435d9 100644 --- a/api/internal/mqs/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -47,11 +47,11 @@ func (l *CloudMq) Consume(val string) error { //通过标签匹配筛选出集群范围 schdl.MatchLabels() - // 调度算法 - err = schdl.AssignAndSchedule() - if err != nil { - return err - } + //todo 屏蔽调度算法, + //err = schdl.AssignAndSchedule() + //if err != nil { + // return err + //} // 存储数据 err = schdl.SaveToDb() diff --git a/pkg/models/cloudmodel_gen.go b/pkg/models/cloudmodel_gen.go index 1a81d6fc..43cb1675 100644 --- a/pkg/models/cloudmodel_gen.go +++ b/pkg/models/cloudmodel_gen.go @@ -49,6 +49,7 @@ type ( YamlString string `db:"yaml_string"` Result string `db:"result"` // 运行结果 NsID string `db:"ns_id"` + Replica int32 `db:"replica"` } ) diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/cloudScheduler.go index cc7c9c77..05ac4d6d 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/cloudScheduler.go @@ -23,6 +23,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gorm.io/gorm" "io" + v1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" @@ -49,14 +50,30 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) { bytes, err := json.Marshal(task.Metadata) - //replica 需要替换到yaml中 + var bytesNew []byte + //replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) if err != nil { return nil, err } cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId) + switch cloud.Kind { + case "Deployment": + deployment := v1.Deployment{} + json.Unmarshal(bytes, &deployment) + deployment.Spec.Replicas = &replica + bytesNew, _ = json.Marshal(deployment) + + case "StatefulSet": + statefulSet := v1.StatefulSet{} + json.Unmarshal(bytes, &statefulSet) + statefulSet.Spec.Replicas = &replica + bytesNew, _ = json.Marshal(statefulSet) + } + cloud.Replica = replica cloud.Id = utils.GenSnowflakeID() - cloud.YamlString = string(bytes) + cloud.YamlString = string(bytesNew) cloud.NsID = task.NsID + cloud.ParticipantId = participantId return cloud, nil } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 44bd7949..b118ceb2 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -22,6 +22,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" + "strings" ) type Replicas int64 @@ -70,18 +71,44 @@ func (s *scheduler) MatchLabels() { } return } + // 如果未指定集群名,通过用户名(nsID)筛选出用户对应的集群 + if len(s.task.NsID) != 0 { + var clusters string + s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters) - //如果均未指定,则通过标签匹配 - for key := range s.task.MatchLabels { - var participantIds []int64 - s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds) - if count == 0 { - ids = participantIds + clusterArr := strings.Split(clusters, ",") + + for i, _ := range clusterArr { + clusterName := clusterArr[i] + var participantId int64 + s.dbEngin.Raw("select id from sc_participant_phy_info where `name` = ?", clusterName).Scan(&participantId) + s.participantIds = append(s.participantIds, participantId) } - ids = intersect(ids, participantIds) - count++ + } else { + //如果均未指定,则通过标签匹配 + for key := range s.task.MatchLabels { + var participantIds []int64 + s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds) + if count == 0 { + ids = participantIds + } + ids = intersect(ids, participantIds) + count++ + } + s.participantIds = ids } - s.participantIds = ids + + //todo 屏蔽调度算法,目前直接在matchlabel阶段随机分配到不同集群 + replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + + array := make([]int, len(s.participantIds)) + for i := range array { + array[i] = int(replicas) / len(s.participantIds) + s.result[ParticipantId(s.participantIds[i])] = Replicas(array[i]) + } + array[0] += int(replicas) % len(s.participantIds) + s.result[ParticipantId(s.participantIds[0])] = Replicas(array[0]) + return } func (s *scheduler) AssignAndSchedule() error {