From dd5d975df2f9fc5ca97903010be5aa76a86a2976 Mon Sep 17 00:00:00 2001 From: jagger Date: Mon, 29 Apr 2024 21:46:07 +0800 Subject: [PATCH] impl cloud Scheduling Algorithm Signed-off-by: jagger Former-commit-id: 7f60b2093407bb7ce871fba74f86a817dc0c756e --- api/desc/core/pcm-core.api | 1 + .../logic/cloud/commitgeneraltasklogic.go | 36 ++- api/internal/mqs/ScheduleCloud.go | 48 ++-- .../scheduler/schedulers/cloudScheduler.go | 227 ++++++++++++------ .../schedulers/option/cloudOption.go | 1 + api/internal/types/types.go | 1 + 6 files changed, 198 insertions(+), 116 deletions(-) diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index be4782d6..64689eb8 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -118,6 +118,7 @@ type ( Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` ReqBody []string `json:"reqBody"` + Replicas int64 `json:"replicas,string"` } ) diff --git a/api/internal/logic/cloud/commitgeneraltasklogic.go b/api/internal/logic/cloud/commitgeneraltasklogic.go index 524d29cd..e2bf7c5b 100644 --- a/api/internal/logic/cloud/commitgeneraltasklogic.go +++ b/api/internal/logic/cloud/commitgeneraltasklogic.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "github.com/pkg/errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" @@ -14,6 +16,7 @@ import ( syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" kyaml "k8s.io/apimachinery/pkg/util/yaml" "sigs.k8s.io/yaml" + "strconv" "strings" "time" @@ -69,15 +72,29 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er } taskCloud := cloud.TaskCloudModel{} //TODO 执行策略返回集群跟 Replica - for _, c := range clusters { + opt := &option.CloudOption{} + utils.Convert(&req, &opt) + sc, err := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient) + if err != nil { + return err + } + results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc) + if err != nil { + return err + } + + rs := (results).([]*schedulers.CloudResult) + for _, r := range rs { for _, s := range req.ReqBody { - sStruct := UnMarshalK8sStruct(s) + sStruct := UnMarshalK8sStruct(s, int64(r.Replica)) unString, _ := sStruct.MarshalJSON() taskCloud.Id = utils.GenSnowflakeIDUint() taskCloud.TaskId = uint(taskModel.Id) - taskCloud.AdapterId = c.AdapterId - taskCloud.ClusterId = c.Id - taskCloud.ClusterName = c.Name + adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64) + clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) + taskCloud.AdapterId = uint(adapterId) + taskCloud.ClusterId = uint(clusterId) + taskCloud.ClusterName = r.ClusterName taskCloud.Status = "Pending" taskCloud.YamlString = string(unString) taskCloud.Kind = sStruct.GetKind() @@ -89,11 +106,10 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er } } } - return nil } -func UnMarshalK8sStruct(yamlString string) *unstructured.Unstructured { +func UnMarshalK8sStruct(yamlString string, replica int64) *unstructured.Unstructured { unstructuredObj := &unstructured.Unstructured{} d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) var err error @@ -116,11 +132,7 @@ func UnMarshalK8sStruct(yamlString string) *unstructured.Unstructured { } //设置副本数 if unstructuredObj.GetKind() == "Deployment" || unstructuredObj.GetKind() == "StatefulSet" { - unstructured.SetNestedField( - unstructuredObj.Object, - int64(6), - "spec", "replicas", - ) + unstructured.SetNestedField(unstructuredObj.Object, replica, "spec", "replicas") } } return unstructuredObj diff --git a/api/internal/mqs/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go index a379ecbe..a74056ff 100644 --- a/api/internal/mqs/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -16,8 +16,6 @@ package mqs import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" ) @@ -38,28 +36,28 @@ func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq { func (l *CloudMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - cloudScheduler := schedulers.NewCloudScheduler() - schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) - if err != nil { - return err - } - - //检测是否指定了集群列表 - schdl.SpecifyClusters() - - //检测是否指定了nsID - schdl.SpecifyNsID() - - //通过标签匹配筛选出集群范围 - schdl.MatchLabels() - - //todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 - schdl.TempAssign() - - // 存储数据 - err = schdl.SaveToDb() - if err != nil { - return err - } + //cloudScheduler := schedulers.NewCloudScheduler() + //schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) + //if err != nil { + // return err + //} + // + ////检测是否指定了集群列表 + //schdl.SpecifyClusters() + // + ////检测是否指定了nsID + //schdl.SpecifyNsID() + // + ////通过标签匹配筛选出集群范围 + //schdl.MatchLabels() + // + ////todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 + //schdl.TempAssign() + // + //// 存储数据 + //err = schdl.SaveToDb() + //if err != nil { + // return err + //} return nil } diff --git a/api/internal/scheduler/schedulers/cloudScheduler.go b/api/internal/scheduler/schedulers/cloudScheduler.go index e4035574..4f00aaba 100644 --- a/api/internal/scheduler/schedulers/cloudScheduler.go +++ b/api/internal/scheduler/schedulers/cloudScheduler.go @@ -15,106 +15,175 @@ package schedulers import ( - "bytes" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" + "context" + "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "io" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" - kyaml "k8s.io/apimachinery/pkg/util/yaml" + "gorm.io/gorm" + "math" + "time" ) type CloudScheduler struct { - storage database.Storage + yamlString string + task *response.TaskInfo + *scheduler.Scheduler + option *option.CloudOption + ctx context.Context + dbEngin *gorm.DB + promClient tracker.Prometheus + svcCtx *svc.ServiceContext } -func NewCloudScheduler() *CloudScheduler { - return &CloudScheduler{} +type CloudResult struct { + TaskId string + ClusterId string + ClusterName string + Strategy string + Replica int32 + Msg string } -func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - //获取所有计算中心 - //调度算法 - strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{}) - return strategy, nil +func NewCloudScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.CloudOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*CloudScheduler, error) { + return &CloudScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil } -func (cs *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { - cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID) - cloud.Id = utils.GenSnowflakeID() - cloud.NsID = task.NsID - - cloud.ParticipantId = participantId - return cloud, nil -} - -func (cs *CloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64, nsID string) models.Cloud { - var cloud models.Cloud - d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) - var err error - for { - var rawObj runtime.RawExtension - err = d.Decode(&rawObj) - if err == io.EOF { - break - } - if err != nil { - } - obj := &unstructured.Unstructured{} - syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj) - if err != nil { - } - - unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) - if err != nil { - } - - unstructureObj := &unstructured.Unstructured{Object: unstructuredMap} - if len(nsID) != 0 { - unstructureObj.SetNamespace(nsID) - } - cloud = models.Cloud{ - TaskId: taskId, - ApiVersion: unstructureObj.GetAPIVersion(), - Name: unstructureObj.GetName(), - Kind: unstructureObj.GetKind(), - Namespace: unstructureObj.GetNamespace(), - Status: "Saved", - } - // 命名空间为空 设置默认值 - if len(unstructureObj.GetNamespace()) == 0 { - cloud.Namespace = "default" - } - //unstructureObj转成string - unString, _ := unstructureObj.MarshalJSON() - cloud.YamlString = string(unString) +func (as *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { + c := cloud.TaskCloudModel{ + AdapterId: uint(participantId), + TaskId: uint(task.TaskId), + Status: "Pending", + YamlString: as.yamlString, } - return cloud + utils.Convert(task.Metadata, &c) + return c, nil } -func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) { - proParams, err := cs.storage.GetProviderParams() +func (as *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { + if len(as.option.ClusterIds) == 1 { + return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil + } + + resources, err := as.findClustersWithResources() + if err != nil { - return nil, nil, nil + return nil, err } - var providerList []*providerPricing.Provider - for _, p := range proParams { - provider := providerPricing.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0) - providerList = append(providerList, provider) + if len(resources) == 0 { + return nil, errors.New("no cluster has resources") } - //replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) - //t := algorithm.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000) + if len(resources) == 1 { + var cluster strategy.AssignedCluster + cluster.ClusterId = resources[0].ClusterId + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil + } - return nil, providerList, nil + params := ¶m.Params{Resources: resources} + + switch as.option.Strategy { + case strategy.REPLICATION: + var clusterIds []string + for _, resource := range resources { + clusterIds = append(clusterIds, resource.ClusterId) + } + strategy := strategy.NewReplicationStrategy(clusterIds, as.option.Replica) + return strategy, nil + case strategy.RESOURCES_PRICING: + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: as.option.Replica}) + return strategy, nil + case strategy.DYNAMIC_RESOURCES: + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) + return strategy, nil + case strategy.STATIC_WEIGHT: + //todo resources should match cluster StaticWeightMap + strategy := strategy.NewStaticWeightStrategy(as.option.StaticWeightMap, as.option.Replica) + return strategy, nil + } + + return nil, errors.New("no strategy has been chosen") } -func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { - return nil, nil +func (as *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { + if clusters == nil { + return nil, errors.New("clusters is nil") + } + + for i := len(clusters) - 1; i >= 0; i-- { + if clusters[i].Replicas == 0 { + clusters = append(clusters[:i], clusters[i+1:]...) + } + } + + if len(clusters) == 0 { + return nil, errors.New("clusters is nil") + } + + var results []*CloudResult + + for _, cluster := range clusters { + cName := "" + as.dbEngin.Table("t_cluster").Select("name").Where("id=?", cluster.ClusterId).Find(&cName) + cr := CloudResult{ + ClusterId: cluster.ClusterId, + ClusterName: cName, + Replica: cluster.Replicas, + } + cr.ClusterId = cluster.ClusterId + cr.Replica = cluster.Replicas + + cr.ClusterName = cName + results = append(results, &cr) + } + + return results, nil +} + +func (as *CloudScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { + resp := []*collector.ResourceStats{} + //查询集群资源信息 + var rMetrics []tracker.Metric + metrics := []string{"cluster_cpu_utilisation", "cluster_cpu_avail", "cluster_cpu_total", "cluster_memory_total", "cluster_memory_avail", "cluster_memory_utilisation", "cluster_disk_utilisation", "cluster_disk_avail", "cluster_disk_total", "cluster_pod_utilisation"} + var clusterNames []string + as.dbEngin.Table("t_cluster").Select("name").Where("id in ?", as.option.ClusterIds).Find(&clusterNames) + for _, c := range clusterNames { + rMetrics = as.promClient.GetNamedMetrics(metrics, time.Now(), tracker.ClusterOption{ClusterName: c}) + r := collector.ResourceStats{} + var cid string + as.dbEngin.Table("t_cluster").Select("id").Where("name = ?", c).Find(&cid) + r.ClusterId = cid + r.Name = c + for _, metric := range rMetrics { + if metric.MetricName == "cluster_cpu_total" { + r.CpuCoreTotal = int64(metric.MetricData.MetricValues[0].Sample.Value()) + } + if metric.MetricName == "cluster_cpu_avail" { + cpuAvail := metric.MetricData.MetricValues[0].Sample.Value() + r.CpuCoreAvail = int64(math.Round(cpuAvail)) + } + if metric.MetricName == "cluster_memory_total" { + r.MemTotal = metric.MetricData.MetricValues[0].Sample.Value() + } + if metric.MetricName == "cluster_memory_avail" { + r.MemAvail = metric.MetricData.MetricValues[0].Sample.Value() + } + if metric.MetricName == "cluster_disk_total" { + r.DiskTotal = metric.MetricData.MetricValues[0].Sample.Value() + } + if metric.MetricName == "cluster_disk_avail" { + r.DiskAvail = metric.MetricData.MetricValues[0].Sample.Value() + } + } + resp = append(resp, &r) + } + return resp, nil } diff --git a/api/internal/scheduler/schedulers/option/cloudOption.go b/api/internal/scheduler/schedulers/option/cloudOption.go index cf2df437..2654c755 100644 --- a/api/internal/scheduler/schedulers/option/cloudOption.go +++ b/api/internal/scheduler/schedulers/option/cloudOption.go @@ -7,6 +7,7 @@ type CloudOption struct { Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` ReqBody []string `json:"reqBody"` + Replica int32 `json:"replicas,string"` } func (c CloudOption) GetOptionType() string { diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 48baec41..c3884c99 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -105,6 +105,7 @@ type GeneralTaskReq struct { Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` ReqBody []string `json:"reqBody"` + Replicas int64 `json:"replicas,string"` } type DeleteTaskReq struct {