From acd45f79a58b43025468ba207a2b787ebe690b80 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Wed, 29 Nov 2023 19:09:07 +0800 Subject: [PATCH] zw Former-commit-id: 5e7d1dc22ab86886081f0c4988de5045e0ed7216 --- api/desc/core/pcm-core.api | 2 +- api/etc/pcm.yaml | 2 +- .../logic/core/scheduletaskbyyamllogic.go | 1 + api/internal/types/types.go | 2 +- pkg/scheduler/aiScheduler.go | 2 +- pkg/scheduler/cloudScheduler.go | 26 ++---------- pkg/scheduler/common.go | 2 +- pkg/scheduler/hpcScheduler.go | 2 +- pkg/scheduler/scheduler.go | 40 +++++++++---------- pkg/tracker/tracker_test.go | 3 +- rpc/etc/pcmcore.yaml | 4 +- 11 files changed, 33 insertions(+), 53 deletions(-) diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index ede17fc1..73bcc53b 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -180,10 +180,10 @@ type ( Description string `yaml:"description"` tenantId int64 `yaml:"tenantId"` nsID string `yaml:"nsID"` - replicas int64 `yaml:"replicas"` tasks []TaskYaml `yaml:"tasks"` } TaskYaml { + replicas int64 `yaml:"replicas"` TaskId int64 `yaml:"taskId"` nsID string `yaml:"nsID"` taskType string `yaml:"taskType"` diff --git a/api/etc/pcm.yaml b/api/etc/pcm.yaml index 0b1914c6..050cbad2 100644 --- a/api/etc/pcm.yaml +++ b/api/etc/pcm.yaml @@ -7,7 +7,7 @@ NacosConfig: - IpAddr: 119.45.100.73 Port: 8848 ClientConfig: - NamespaceId: zhouqj + NamespaceId: zw TimeoutMs: 5000 NotLoadCacheAtStart: true LogDir: diff --git a/api/internal/logic/core/scheduletaskbyyamllogic.go b/api/internal/logic/core/scheduletaskbyyamllogic.go index 70e5cc1a..5f3ab4a5 100644 --- a/api/internal/logic/core/scheduletaskbyyamllogic.go +++ b/api/internal/logic/core/scheduletaskbyyamllogic.go @@ -66,6 +66,7 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa for _, task := range req.Tasks { task.NsID = req.NsID task.TaskId = taskModel.Id + // 将任务数据转换成消息体 reqMessage, err := json.Marshal(task) if err != nil { diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 2057a921..dc4dc25f 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -161,11 +161,11 @@ type ScheduleTaskByYamlReq struct { Description string `yaml:"description"` TenantId int64 `yaml:"tenantId"` NsID string `yaml:"nsID"` - Replicas int64 `yaml:"replicas"` Tasks []TaskYaml `yaml:"tasks"` } type TaskYaml struct { + Replicas int64 `yaml:"replicas"` TaskId int64 `yaml:"taskId"` NsID string `yaml:"nsID"` TaskType string `yaml:"taskType"` diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/aiScheduler.go index f3ddb7a9..265816ec 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/aiScheduler.go @@ -30,7 +30,7 @@ func NewAiScheduler(val string) *aiScheduler { return &aiScheduler{yamlString: val} } -func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) { +func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) { ai := models.Ai{ ParticipantId: participantId, TaskId: task.TaskId, diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/cloudScheduler.go index ff78a7e9..44185711 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/cloudScheduler.go @@ -23,7 +23,6 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gorm.io/gorm" "io" - v1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" @@ -48,34 +47,15 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg return taskResult.MaxscoreStrategy, nil } -func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) { +func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) { - bytes, err := json.Marshal(task.Metadata) - var bytesNew []byte - //replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + bytes, err := json.Marshal(resource) if err != nil { return nil, err } cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId, task.NsID) - switch cloud.Kind { - case "Deployment": - deployment := v1.Deployment{} - json.Unmarshal(bytes, &deployment) - deployment.Spec.Replicas = &replica - deployment.Namespace = cloud.NsID - bytesNew, _ = json.Marshal(deployment) - - case "StatefulSet": - statefulSet := v1.StatefulSet{} - json.Unmarshal(bytes, &statefulSet) - statefulSet.Spec.Replicas = &replica - statefulSet.Namespace = cloud.NsID - bytesNew, _ = json.Marshal(statefulSet) - - } - cloud.Replica = replica cloud.Id = utils.GenSnowflakeID() - cloud.YamlString = string(bytesNew) + cloud.YamlString = string(bytes) cloud.NsID = task.NsID cloud.ParticipantId = participantId diff --git a/pkg/scheduler/common.go b/pkg/scheduler/common.go index 46ce7d67..1e66d8de 100644 --- a/pkg/scheduler/common.go +++ b/pkg/scheduler/common.go @@ -23,7 +23,7 @@ import ( ) type scheduleService interface { - getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) + getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider) } diff --git a/pkg/scheduler/hpcScheduler.go b/pkg/scheduler/hpcScheduler.go index 99850118..59924e23 100644 --- a/pkg/scheduler/hpcScheduler.go +++ b/pkg/scheduler/hpcScheduler.go @@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *hpcScheduler { return &hpcScheduler{yamlString: val} } -func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) { +func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) { hpc := models.Hpc{} utils.Convert(task.Metadata, &hpc) hpc.Id = utils.GenSnowflakeID() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2dc98adb..3dc87889 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -16,7 +16,6 @@ package scheduler import ( "encoding/json" - "fmt" "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" @@ -33,7 +32,7 @@ type scheduler struct { participantIds []int64 scheduleService scheduleService dbEngin *gorm.DB - result map[int64]string //pID:子任务yamlstring 键值对 + result []string //pID:子任务yamlstring 键值对 participantRpc participantservice.ParticipantService } @@ -43,7 +42,7 @@ func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, if err != nil { return nil, errors.New("create scheduler failed : " + err.Error()) } - return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc, result: make(map[int64]string, 0)}, nil + return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil } func (s *scheduler) SpecifyClusters() { @@ -100,10 +99,11 @@ func (s *scheduler) TempAssign() error { var resources []interface{} tool.Convert(s.task.Metadata, &resources) - for _, resource := range resources { + for index := range resources { //如果是Deployment,需要对副本数做分发 - if resource.(map[string]interface{})["kind"].(string) == "Deployment" || resource.(map[string]interface{})["kind"].(string) == "Replicaset" { - resource.(map[string]interface{})["spec"].(map[string]interface{})["replicas"] = s.task.Replicas + if resources[index].(map[string]interface{})["kind"].(string) == "Deployment" || resources[index].(map[string]interface{})["kind"].(string) == "Replicaset" || + resources[index].(map[string]interface{})["kind"].(string) == "StatefulSet" { + resources[index].(map[string]interface{})["spec"].(map[string]interface{})["replicas"] = s.task.Replicas } } s.task.Metadata = resources @@ -126,7 +126,7 @@ func (s *scheduler) AssignAndSchedule() error { replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) result := make(map[int64]string) result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) - s.result = result + //s.result = result return nil } @@ -160,22 +160,22 @@ func (s *scheduler) AssignAndSchedule() error { func (s *scheduler) SaveToDb() error { - for key, value := range s.result { - num, err := strconv.Atoi(value) - if err != nil { - fmt.Println("转换失败:", err) + for _, participantId := range s.participantIds { + var resources []interface{} + tool.Convert(s.task.Metadata, &resources) + for _, resource := range resources { + structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, int64(participantId)) + if err != nil { + return err + } - } - structForDb, err := s.scheduleService.getNewStructForDb(s.task, int64(key), int32(num)) - if err != nil { - return err + tx := s.dbEngin.Create(structForDb) + if tx.Error != nil { + logx.Error(tx.Error) + return tx.Error + } } - tx := s.dbEngin.Create(structForDb) - if tx.Error != nil { - logx.Error(tx.Error) - return tx.Error - } } return nil } diff --git a/pkg/tracker/tracker_test.go b/pkg/tracker/tracker_test.go index cd6be9cd..6c848cde 100644 --- a/pkg/tracker/tracker_test.go +++ b/pkg/tracker/tracker_test.go @@ -30,8 +30,7 @@ func TestGetNamedMetrics(t *testing.T) { client, _ := NewPrometheus("http://10.105.20.4:30766") result := client.GetNamedMetricsByTime([]string{"pod_cpu_usage", "pod_memory_usage_wo_cache"}, "1700521446", "1700551446", 10*time.Minute, ControllerOption{ - PodsName: "notification-manager-deployment-78664576cb-vkptn|notification-manager-deployment-78664576cb-5m6mt", - Namespace: "kubesphere-monitoring-system", + PodsName: "sealos-task-112703-65c776b4b5-q4jgf", }) println("zzz", result) } diff --git a/rpc/etc/pcmcore.yaml b/rpc/etc/pcmcore.yaml index 65c58022..ee08cf8d 100644 --- a/rpc/etc/pcmcore.yaml +++ b/rpc/etc/pcmcore.yaml @@ -4,10 +4,10 @@ NacosConfig: ServerConfigs: # - IpAddr: 127.0.0.1 # Port: 8848 - - IpAddr: nacos.jcce.dev + - IpAddr: 119.45.100.73 Port: 8848 ClientConfig: - NamespaceId: test + NamespaceId: zw TimeoutMs: 5000 NotLoadCacheAtStart: true LogDir: