diff --git a/api/etc/pcm.yaml b/api/etc/pcm.yaml index 84d70409..0b1914c6 100644 --- a/api/etc/pcm.yaml +++ b/api/etc/pcm.yaml @@ -4,10 +4,10 @@ NacosConfig: ServerConfigs: # - IpAddr: 127.0.0.1 # Port: 8848 - - IpAddr: nacos.jcce.dev + - IpAddr: 119.45.100.73 Port: 8848 ClientConfig: - NamespaceId: test + NamespaceId: zhouqj TimeoutMs: 5000 NotLoadCacheAtStart: true LogDir: diff --git a/api/internal/mqs/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go index c74435d9..a7237f7b 100644 --- a/api/internal/mqs/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -44,10 +44,17 @@ func (l *CloudMq) Consume(val string) error { return err } + //检测是否指定了集群列表 + schdl.SpecifyClusters() + + //检测是否指定了nsID + schdl.SpecifyNsID() + //通过标签匹配筛选出集群范围 schdl.MatchLabels() - //todo 屏蔽调度算法, + //todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 + schdl.TempAssign() //err = schdl.AssignAndSchedule() //if err != nil { // return err diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/cloudScheduler.go index df15875e..ff78a7e9 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/cloudScheduler.go @@ -49,6 +49,7 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg } func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) { + bytes, err := json.Marshal(task.Metadata) var bytesNew []byte //replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) @@ -61,13 +62,16 @@ func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participant deployment := v1.Deployment{} json.Unmarshal(bytes, &deployment) deployment.Spec.Replicas = &replica + deployment.Namespace = cloud.NsID bytesNew, _ = json.Marshal(deployment) case "StatefulSet": statefulSet := v1.StatefulSet{} json.Unmarshal(bytes, &statefulSet) statefulSet.Spec.Replicas = &replica + statefulSet.Namespace = cloud.NsID bytesNew, _ = json.Marshal(statefulSet) + } cloud.Replica = replica cloud.Id = utils.GenSnowflakeID() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b118ceb2..5e6c782d 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -16,25 +16,26 @@ package scheduler import ( "encoding/json" + "fmt" "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo" + tool "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" + "math/rand" + "strconv" "strings" + "time" ) -type Replicas int64 - -type ParticipantId int64 - type scheduler struct { task *response.TaskInfo participantIds []int64 scheduleService scheduleService dbEngin *gorm.DB - result map[ParticipantId]Replicas + result map[int64]string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService } @@ -44,48 +45,40 @@ func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } - return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc, result: make(map[ParticipantId]Replicas, 0)}, nil + return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc, result: make(map[int64]string, 0)}, nil } -func (s *scheduler) SepcifyClusters() { +func (s *scheduler) SpecifyClusters() { + // 如果已指定集群名,通过数据库查询后返回p端ip列表 + if len(s.task.Clusters) != 0 { + s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds) + return + } +} +func (s *scheduler) SpecifyNsID() { + // 未指定集群名,只指定nsID + if len(s.task.Clusters) == 0 { + if len(s.task.NsID) != 0 { + var clusters string + s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters) + + clusterArr := strings.Split(clusters, ",") + + s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", clusterArr).Scan(&s.participantIds) + } + } else { + return + } } func (s *scheduler) MatchLabels() { var ids []int64 count := 0 - - // 已指定 ParticipantId 直接不走标签匹配 - if s.task.ParticipantId != 0 { - return - } - - // 如果已指定集群名,通过数据库查询后返回p端ip列表 - if len(s.task.Clusters) != 0 { - for i, _ := range s.task.Clusters { - clusterName := s.task.Clusters[i] - var participantId int64 - s.dbEngin.Raw("select id from sc_participant_phy_info where `name` = ?", clusterName).Scan(&participantId) - s.participantIds = append(s.participantIds, participantId) - } - return - } - // 如果未指定集群名,通过用户名(nsID)筛选出用户对应的集群 - if len(s.task.NsID) != 0 { - var clusters string - s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters) - - clusterArr := strings.Split(clusters, ",") - - for i, _ := range clusterArr { - clusterName := clusterArr[i] - var participantId int64 - s.dbEngin.Raw("select id from sc_participant_phy_info where `name` = ?", clusterName).Scan(&participantId) - s.participantIds = append(s.participantIds, participantId) - } - } else { - //如果均未指定,则通过标签匹配 + // 集群和nsID都未指定,则通过标签匹配 + if len(s.task.Clusters) == 0 && len(s.task.NsID) == 0 { + //如果集群列表或nsID均未指定 for key := range s.task.MatchLabels { var participantIds []int64 s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds) @@ -96,19 +89,54 @@ func (s *scheduler) MatchLabels() { count++ } s.participantIds = ids + } else { + return + } +} + +// TempAssign todo 屏蔽原调度算法 +func (s *scheduler) TempAssign() error { + + //需要判断task中的资源类型,针对metadata中的多个kind做不同处理 + //输入副本数和集群列表,最终结果输出为pID对应副本数量列表,针对多个kind需要做拆分和重新拼接组合 + + var resources []interface{} + tool.Convert(s.task.Metadata, &resources) + for _, resource := range resources { + //如果是Deployment,需要对副本数做分发 + if resource.(map[string]interface{})["kind"].(string) == "Deployment" || resource.(map[string]interface{})["kind"].(string) == "Replicaset" { + //replicas := int(resource.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)) + + rand.Seed(time.Now().UnixNano()) + + //// 生成pID对应副本数 数组 + //arrReplica := make(map[int64]int, len(s.participantIds)) + //for i := 0; i < len(s.participantIds)-1; i++ { + // arrReplica[s.participantIds[i]] = rand.Intn(replicas) + // replicas -= arrReplica[s.participantIds[i]] // 更新剩余的和 + //} + //arrReplica[s.participantIds[len(s.participantIds)-1]] = replicas + // + ////将副本数依次写入新的yaml中并生成result数据 + //yamlArray := make(map[int64]string, len(s.participantIds)) + // + //for i := 0; i < len(s.participantIds)-1; i++ { + // //调整yaml + // yamlArray[s.participantIds[i]] = "sds" + //} + + } + + s.result[s.participantIds[0]] = "" + } - //todo 屏蔽调度算法,目前直接在matchlabel阶段随机分配到不同集群 - replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + //var crd = s.task.Metadata + //for i := 0; i < len(s.task.Metadata); i++ { + // + //} - array := make([]int, len(s.participantIds)) - for i := range array { - array[i] = int(replicas) / len(s.participantIds) - s.result[ParticipantId(s.participantIds[i])] = Replicas(array[i]) - } - array[0] += int(replicas) % len(s.participantIds) - s.result[ParticipantId(s.participantIds[0])] = Replicas(array[0]) - return + return nil } func (s *scheduler) AssignAndSchedule() error { @@ -125,8 +153,8 @@ func (s *scheduler) AssignAndSchedule() error { if len(s.participantIds) == 1 { s.task.ParticipantId = s.participantIds[0] replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) - result := make(map[ParticipantId]Replicas) - result[ParticipantId(s.participantIds[0])] = Replicas(replicas) + result := make(map[int64]string) + result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) s.result = result return nil @@ -162,7 +190,12 @@ func (s *scheduler) AssignAndSchedule() error { func (s *scheduler) SaveToDb() error { for key, value := range s.result { - structForDb, err := s.scheduleService.getNewStructForDb(s.task, int64(key), int32(value)) + num, err := strconv.Atoi(value) + if err != nil { + fmt.Println("转换失败:", err) + + } + structForDb, err := s.scheduleService.getNewStructForDb(s.task, int64(key), int32(num)) if err != nil { return err } @@ -196,7 +229,7 @@ func (s *scheduler) assignReplicasToResult(strategy *algo.Strategy, providerList if e == 0 { continue } - s.result[ParticipantId(providerList[i].Pid)] = Replicas(int64(e)) + s.result[providerList[i].Pid] = string(e) } if len(s.result) == 0 {