From 066250ff72637437eb383e8fb2277df56da69d07 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Wed, 6 Dec 2023 21:03:01 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=8F=90=E4=BA=A4=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 2e52eb0850fe2a17c2937de6478db02ea4aa3c0e --- api/desc/core/pcm-core.api | 10 ++- api/desc/pcm.api | 6 +- .../handler/core/committaskhandler.go | 25 ++++++++ api/internal/handler/routes.go | 5 ++ api/internal/logic/core/commithpctasklogic.go | 2 +- api/internal/logic/core/committasklogic.go | 63 +++++++++++++++++++ api/internal/types/types.go | 10 ++- api/pkg/response/TaskInfo.go | 2 +- pkg/scheduler/aiScheduler.go | 2 +- pkg/scheduler/cloudScheduler.go | 18 ++---- pkg/scheduler/common.go | 2 +- pkg/scheduler/hpcScheduler.go | 2 +- pkg/scheduler/scheduler.go | 34 +++++----- pkg/utils/kubernetes.go | 30 +++++++++ 14 files changed, 174 insertions(+), 37 deletions(-) create mode 100644 api/internal/handler/core/committaskhandler.go create mode 100644 api/internal/logic/core/committasklogic.go create mode 100644 pkg/utils/kubernetes.go diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index e9c24114..1ef8df44 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -174,12 +174,20 @@ type deleteTaskReq { Id int64 `path:"id"` } +type commitTaskReq{ + Name string `json:"name"` + NsID string `json:"nsID"` + Replicas int64 `json:"replicas"` + MatchLabels map[string]string `json:"matchLabels"` + YamlList []string `json:"yamlList"` +} + type ( scheduleTaskByYamlReq { Name string `yaml:"name"` Description string `yaml:"description"` tenantId int64 `yaml:"tenantId"` - nsID string `yaml:"nsID"` + nsID string `form:"nsID"` tasks []TaskYaml `yaml:"tasks"` } TaskYaml { diff --git a/api/desc/pcm.api b/api/desc/pcm.api index 57b97590..f20836a0 100644 --- a/api/desc/pcm.api +++ b/api/desc/pcm.api @@ -30,7 +30,11 @@ service pcm { @doc "yaml提交任务" @handler scheduleTaskByYamlHandler - post /core/scheduleTaskByYaml (scheduleTaskByYamlReq) returns (scheduleTaskByYamlResp) + post /core/scheduleTaskByYaml (scheduleTaskByYamlReq) + + @doc "提交任务" + @handler commitTaskHandler + post /core/commitTask (commitTaskReq) @doc "提交超算任务" @handler commitHpcTaskHandler diff --git a/api/internal/handler/core/committaskhandler.go b/api/internal/handler/core/committaskhandler.go new file mode 100644 index 00000000..77995865 --- /dev/null +++ b/api/internal/handler/core/committaskhandler.go @@ -0,0 +1,25 @@ +package core + +import ( + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result" + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/core" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" +) + +func CommitTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.CommitTaskReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := core.NewCommitTaskLogic(r.Context(), svcCtx) + err := l.CommitTask(&req) + result.HttpResult(r, w, nil, err) + } +} diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index 7015f3b6..7d5182e6 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -31,6 +31,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/core/scheduleTaskByYaml", Handler: core.ScheduleTaskByYamlHandler(serverCtx), }, + { + Method: http.MethodPost, + Path: "/core/commitTask", + Handler: core.CommitTaskHandler(serverCtx), + }, { Method: http.MethodPost, Path: "/core/commitHpcTask", diff --git a/api/internal/logic/core/commithpctasklogic.go b/api/internal/logic/core/commithpctasklogic.go index 2f89cd41..5eba672e 100644 --- a/api/internal/logic/core/commithpctasklogic.go +++ b/api/internal/logic/core/commithpctasklogic.go @@ -48,7 +48,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t TaskId: taskModel.Id, TaskType: "hpc", MatchLabels: req.MatchLabels, - Metadata: hpc, + //Metadata: hpc, } req.TaskId = taskModel.Id // 将任务数据转换成消息体 diff --git a/api/internal/logic/core/committasklogic.go b/api/internal/logic/core/committasklogic.go new file mode 100644 index 00000000..c65d3eb4 --- /dev/null +++ b/api/internal/logic/core/committasklogic.go @@ -0,0 +1,63 @@ +package core + +import ( + "context" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" + "k8s.io/apimachinery/pkg/util/json" + "time" + + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type CommitTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewCommitTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitTaskLogic { + return &CommitTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *CommitTaskLogic) CommitTask(req *types.CommitTaskReq) error { + + taskModel := models.Task{ + Status: constants.Saved, + Name: req.Name, + CommitTime: time.Now(), + NsID: req.NsID, + } + // 保存任务数据到数据库 + tx := l.svcCtx.DbEngin.Create(&taskModel) + if tx.Error != nil { + return tx.Error + } + task := response.TaskInfo{ + TaskId: taskModel.Id, + MatchLabels: req.MatchLabels, + NsID: req.NsID, + Metadata: req.YamlList, + Replicas: req.Replicas, + } + // 将任务数据转换成消息体 + reqMessage, err := json.Marshal(task) + if err != nil { + logx.Error(err) + return err + } + publish := l.svcCtx.RedisClient.Publish(context.Background(), "cloud", reqMessage) + if publish.Err() != nil { + return publish.Err() + } + + return nil +} diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 38845aa1..d1071238 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -156,11 +156,19 @@ type DeleteTaskReq struct { Id int64 `path:"id"` } +type CommitTaskReq struct { + Name string `json:"name"` + NsID string `json:"nsID"` + Replicas int64 `json:"replicas"` + MatchLabels map[string]string `json:"matchLabels"` + YamlList []string `json:"yamlList"` +} + type ScheduleTaskByYamlReq struct { Name string `yaml:"name"` Description string `yaml:"description"` TenantId int64 `yaml:"tenantId"` - NsID string `yaml:"nsID"` + NsID string `form:"nsID"` Tasks []TaskYaml `yaml:"tasks"` } diff --git a/api/pkg/response/TaskInfo.go b/api/pkg/response/TaskInfo.go index 93ddf5fa..547ce512 100644 --- a/api/pkg/response/TaskInfo.go +++ b/api/pkg/response/TaskInfo.go @@ -25,7 +25,7 @@ type TaskInfo struct { Clusters []string `json:"clusters,optional"` //云际平台传入集群名称列表 TenantId int64 `json:"tenantId"` Replicas int64 `json:"replicas"` - Metadata interface{} `json:"metadata"` + Metadata []string `json:"metadata"` } func (t *TaskInfo) Validate() error { diff --git a/pkg/scheduler/aiScheduler.go b/pkg/scheduler/aiScheduler.go index 265816ec..dd494cba 100644 --- a/pkg/scheduler/aiScheduler.go +++ b/pkg/scheduler/aiScheduler.go @@ -30,7 +30,7 @@ func NewAiScheduler(val string) *aiScheduler { return &aiScheduler{yamlString: val} } -func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) { +func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { ai := models.Ai{ ParticipantId: participantId, TaskId: task.TaskId, diff --git a/pkg/scheduler/cloudScheduler.go b/pkg/scheduler/cloudScheduler.go index 44185711..915a66df 100644 --- a/pkg/scheduler/cloudScheduler.go +++ b/pkg/scheduler/cloudScheduler.go @@ -16,7 +16,6 @@ package scheduler import ( "bytes" - "encoding/json" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo" @@ -47,15 +46,10 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg return taskResult.MaxscoreStrategy, nil } -func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) { - - bytes, err := json.Marshal(resource) - if err != nil { - return nil, err - } - cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId, task.NsID) +func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { + cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID) cloud.Id = utils.GenSnowflakeID() - cloud.YamlString = string(bytes) + cloud.YamlString = resource cloud.NsID = task.NsID cloud.ParticipantId = participantId @@ -114,8 +108,8 @@ func (cs *cloudScheduler) genTaskAndProviders(task *response.TaskInfo, dbEngin * providerList = append(providerList, provider) } - replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) - t := algo.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000) + //replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + //t := algo.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000) - return t, providerList + return nil, providerList } diff --git a/pkg/scheduler/common.go b/pkg/scheduler/common.go index 1e66d8de..46343610 100644 --- a/pkg/scheduler/common.go +++ b/pkg/scheduler/common.go @@ -23,7 +23,7 @@ import ( ) type scheduleService interface { - getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) + getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider) } diff --git a/pkg/scheduler/hpcScheduler.go b/pkg/scheduler/hpcScheduler.go index 59924e23..45fd3cc1 100644 --- a/pkg/scheduler/hpcScheduler.go +++ b/pkg/scheduler/hpcScheduler.go @@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *hpcScheduler { return &hpcScheduler{yamlString: val} } -func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) { +func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { hpc := models.Hpc{} utils.Convert(task.Metadata, &hpc) hpc.Id = utils.GenSnowflakeID() diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 238180a5..a99ed28d 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -20,10 +20,9 @@ import ( "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo" - tool "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice" "gorm.io/gorm" - "strconv" + "sigs.k8s.io/yaml" "strings" ) @@ -96,17 +95,19 @@ func (s *scheduler) TempAssign() error { //需要判断task中的资源类型,针对metadata中的多个kind做不同处理 //输入副本数和集群列表,最终结果输出为pID对应副本数量列表,针对多个kind需要做拆分和重新拼接组合 - - var resources []interface{} - tool.Convert(s.task.Metadata, &resources) - for index := range resources { - //如果是Deployment,需要对副本数做分发 - if resources[index].(map[string]interface{})["kind"].(string) == "Deployment" || resources[index].(map[string]interface{})["kind"].(string) == "Replicaset" || - resources[index].(map[string]interface{})["kind"].(string) == "StatefulSet" { - resources[index].(map[string]interface{})["spec"].(map[string]interface{})["replicas"] = s.task.Replicas + var meData []string + for _, yamlString := range s.task.Metadata { + var data map[string]interface{} + err := yaml.Unmarshal([]byte(yamlString), &data) + if err != nil { } + + jsonData, err := json.Marshal(data) + if err != nil { + } + meData = append(meData, string(jsonData)) } - s.task.Metadata = resources + s.task.Metadata = meData return nil } @@ -123,9 +124,9 @@ func (s *scheduler) AssignAndSchedule() error { // 指定或者标签匹配的结果只有一个集群,给任务信息指定 if len(s.participantIds) == 1 { s.task.ParticipantId = s.participantIds[0] - replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) - result := make(map[int64]string) - result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) + //replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) + //result := make(map[int64]string) + //result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64) //s.result = result return nil @@ -161,9 +162,8 @@ func (s *scheduler) AssignAndSchedule() error { func (s *scheduler) SaveToDb() error { for _, participantId := range s.participantIds { - var resources []interface{} - tool.Convert(s.task.Metadata, &resources) - for _, resource := range resources { + + for _, resource := range s.task.Metadata { structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, participantId) if err != nil { return err diff --git a/pkg/utils/kubernetes.go b/pkg/utils/kubernetes.go new file mode 100644 index 00000000..e3243bbc --- /dev/null +++ b/pkg/utils/kubernetes.go @@ -0,0 +1,30 @@ +package utils + +import ( + "bytes" + "io" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" + "k8s.io/apimachinery/pkg/util/yaml" +) + +func StrToInfo(val string) *unstructured.Unstructured { + d := yaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(val), 4096) + var err error + + var rawObj runtime.RawExtension + err = d.Decode(&rawObj) + if err == io.EOF { + } + if err != nil { + } + obj := &unstructured.Unstructured{} + syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj) + + unstructuredMap, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + + unStructureObj := &unstructured.Unstructured{Object: unstructuredMap} + return unStructureObj + +}