From dd5d975df2f9fc5ca97903010be5aa76a86a2976 Mon Sep 17 00:00:00 2001 From: jagger Date: Mon, 29 Apr 2024 21:46:07 +0800 Subject: [PATCH 1/2] impl cloud Scheduling Algorithm Signed-off-by: jagger Former-commit-id: 7f60b2093407bb7ce871fba74f86a817dc0c756e --- api/desc/core/pcm-core.api | 1 + .../logic/cloud/commitgeneraltasklogic.go | 36 ++- api/internal/mqs/ScheduleCloud.go | 48 ++-- .../scheduler/schedulers/cloudScheduler.go | 227 ++++++++++++------ .../schedulers/option/cloudOption.go | 1 + api/internal/types/types.go | 1 + 6 files changed, 198 insertions(+), 116 deletions(-) diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index be4782d6..64689eb8 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -118,6 +118,7 @@ type ( Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` ReqBody []string `json:"reqBody"` + Replicas int64 `json:"replicas,string"` } ) diff --git a/api/internal/logic/cloud/commitgeneraltasklogic.go b/api/internal/logic/cloud/commitgeneraltasklogic.go index 524d29cd..e2bf7c5b 100644 --- a/api/internal/logic/cloud/commitgeneraltasklogic.go +++ b/api/internal/logic/cloud/commitgeneraltasklogic.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "github.com/pkg/errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" @@ -14,6 +16,7 @@ import ( syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" kyaml "k8s.io/apimachinery/pkg/util/yaml" "sigs.k8s.io/yaml" + "strconv" "strings" "time" @@ -69,15 +72,29 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er } taskCloud := cloud.TaskCloudModel{} //TODO 执行策略返回集群跟 Replica - for _, c := range clusters { + opt := &option.CloudOption{} + utils.Convert(&req, &opt) + sc, err := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient) + if err != nil { + return err + } + results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc) + if err != nil { + return err + } + + rs := (results).([]*schedulers.CloudResult) + for _, r := range rs { for _, s := range req.ReqBody { - sStruct := UnMarshalK8sStruct(s) + sStruct := UnMarshalK8sStruct(s, int64(r.Replica)) unString, _ := sStruct.MarshalJSON() taskCloud.Id = utils.GenSnowflakeIDUint() taskCloud.TaskId = uint(taskModel.Id) - taskCloud.AdapterId = c.AdapterId - taskCloud.ClusterId = c.Id - taskCloud.ClusterName = c.Name + adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64) + clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) + taskCloud.AdapterId = uint(adapterId) + taskCloud.ClusterId = uint(clusterId) + taskCloud.ClusterName = r.ClusterName taskCloud.Status = "Pending" taskCloud.YamlString = string(unString) taskCloud.Kind = sStruct.GetKind() @@ -89,11 +106,10 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er } } } - return nil } -func UnMarshalK8sStruct(yamlString string) *unstructured.Unstructured { +func UnMarshalK8sStruct(yamlString string, replica int64) *unstructured.Unstructured { unstructuredObj := &unstructured.Unstructured{} d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) var err error @@ -116,11 +132,7 @@ func UnMarshalK8sStruct(yamlString string) *unstructured.Unstructured { } //设置副本数 if unstructuredObj.GetKind() == "Deployment" || unstructuredObj.GetKind() == "StatefulSet" { - unstructured.SetNestedField( - unstructuredObj.Object, - int64(6), - "spec", "replicas", - ) + unstructured.SetNestedField(unstructuredObj.Object, replica, "spec", "replicas") } } return unstructuredObj diff --git a/api/internal/mqs/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go index a379ecbe..a74056ff 100644 --- a/api/internal/mqs/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -16,8 +16,6 @@ package mqs import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" ) @@ -38,28 +36,28 @@ func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq { func (l *CloudMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - cloudScheduler := schedulers.NewCloudScheduler() - schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) - if err != nil { - return err - } - - //检测是否指定了集群列表 - schdl.SpecifyClusters() - - //检测是否指定了nsID - schdl.SpecifyNsID() - - //通过标签匹配筛选出集群范围 - schdl.MatchLabels() - - //todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 - schdl.TempAssign() - - // 存储数据 - err = schdl.SaveToDb() - if err != nil { - return err - } + //cloudScheduler := schedulers.NewCloudScheduler() + //schdl, err := scheduler.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) + //if err != nil { + // return err + //} + // + ////检测是否指定了集群列表 + //schdl.SpecifyClusters() + // + ////检测是否指定了nsID + //schdl.SpecifyNsID() + // + ////通过标签匹配筛选出集群范围 + //schdl.MatchLabels() + // + ////todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 + //schdl.TempAssign() + // + //// 存储数据 + //err = schdl.SaveToDb() + //if err != nil { + // return err + //} return nil } diff --git a/api/internal/scheduler/schedulers/cloudScheduler.go b/api/internal/scheduler/schedulers/cloudScheduler.go index e4035574..4f00aaba 100644 --- a/api/internal/scheduler/schedulers/cloudScheduler.go +++ b/api/internal/scheduler/schedulers/cloudScheduler.go @@ -15,106 +15,175 @@ package schedulers import ( - "bytes" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" + "context" + "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "io" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" - kyaml "k8s.io/apimachinery/pkg/util/yaml" + "gorm.io/gorm" + "math" + "time" ) type CloudScheduler struct { - storage database.Storage + yamlString string + task *response.TaskInfo + *scheduler.Scheduler + option *option.CloudOption + ctx context.Context + dbEngin *gorm.DB + promClient tracker.Prometheus + svcCtx *svc.ServiceContext } -func NewCloudScheduler() *CloudScheduler { - return &CloudScheduler{} +type CloudResult struct { + TaskId string + ClusterId string + ClusterName string + Strategy string + Replica int32 + Msg string } -func (cs *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - //获取所有计算中心 - //调度算法 - strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{}) - return strategy, nil +func NewCloudScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.CloudOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*CloudScheduler, error) { + return &CloudScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil } -func (cs *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { - cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID) - cloud.Id = utils.GenSnowflakeID() - cloud.NsID = task.NsID - - cloud.ParticipantId = participantId - return cloud, nil -} - -func (cs *CloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64, nsID string) models.Cloud { - var cloud models.Cloud - d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) - var err error - for { - var rawObj runtime.RawExtension - err = d.Decode(&rawObj) - if err == io.EOF { - break - } - if err != nil { - } - obj := &unstructured.Unstructured{} - syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj) - if err != nil { - } - - unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) - if err != nil { - } - - unstructureObj := &unstructured.Unstructured{Object: unstructuredMap} - if len(nsID) != 0 { - unstructureObj.SetNamespace(nsID) - } - cloud = models.Cloud{ - TaskId: taskId, - ApiVersion: unstructureObj.GetAPIVersion(), - Name: unstructureObj.GetName(), - Kind: unstructureObj.GetKind(), - Namespace: unstructureObj.GetNamespace(), - Status: "Saved", - } - // 命名空间为空 设置默认值 - if len(unstructureObj.GetNamespace()) == 0 { - cloud.Namespace = "default" - } - //unstructureObj转成string - unString, _ := unstructureObj.MarshalJSON() - cloud.YamlString = string(unString) +func (as *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { + c := cloud.TaskCloudModel{ + AdapterId: uint(participantId), + TaskId: uint(task.TaskId), + Status: "Pending", + YamlString: as.yamlString, } - return cloud + utils.Convert(task.Metadata, &c) + return c, nil } -func (cs *CloudScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) { - proParams, err := cs.storage.GetProviderParams() +func (as *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { + if len(as.option.ClusterIds) == 1 { + return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil + } + + resources, err := as.findClustersWithResources() + if err != nil { - return nil, nil, nil + return nil, err } - var providerList []*providerPricing.Provider - for _, p := range proParams { - provider := providerPricing.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0) - providerList = append(providerList, provider) + if len(resources) == 0 { + return nil, errors.New("no cluster has resources") } - //replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64) - //t := algorithm.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000) + if len(resources) == 1 { + var cluster strategy.AssignedCluster + cluster.ClusterId = resources[0].ClusterId + cluster.Replicas = 1 + return &strategy.SingleAssignment{Cluster: &cluster}, nil + } - return nil, providerList, nil + params := ¶m.Params{Resources: resources} + + switch as.option.Strategy { + case strategy.REPLICATION: + var clusterIds []string + for _, resource := range resources { + clusterIds = append(clusterIds, resource.ClusterId) + } + strategy := strategy.NewReplicationStrategy(clusterIds, as.option.Replica) + return strategy, nil + case strategy.RESOURCES_PRICING: + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: as.option.Replica}) + return strategy, nil + case strategy.DYNAMIC_RESOURCES: + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1) + return strategy, nil + case strategy.STATIC_WEIGHT: + //todo resources should match cluster StaticWeightMap + strategy := strategy.NewStaticWeightStrategy(as.option.StaticWeightMap, as.option.Replica) + return strategy, nil + } + + return nil, errors.New("no strategy has been chosen") } -func (cs *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { - return nil, nil +func (as *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { + if clusters == nil { + return nil, errors.New("clusters is nil") + } + + for i := len(clusters) - 1; i >= 0; i-- { + if clusters[i].Replicas == 0 { + clusters = append(clusters[:i], clusters[i+1:]...) + } + } + + if len(clusters) == 0 { + return nil, errors.New("clusters is nil") + } + + var results []*CloudResult + + for _, cluster := range clusters { + cName := "" + as.dbEngin.Table("t_cluster").Select("name").Where("id=?", cluster.ClusterId).Find(&cName) + cr := CloudResult{ + ClusterId: cluster.ClusterId, + ClusterName: cName, + Replica: cluster.Replicas, + } + cr.ClusterId = cluster.ClusterId + cr.Replica = cluster.Replicas + + cr.ClusterName = cName + results = append(results, &cr) + } + + return results, nil +} + +func (as *CloudScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { + resp := []*collector.ResourceStats{} + //查询集群资源信息 + var rMetrics []tracker.Metric + metrics := []string{"cluster_cpu_utilisation", "cluster_cpu_avail", "cluster_cpu_total", "cluster_memory_total", "cluster_memory_avail", "cluster_memory_utilisation", "cluster_disk_utilisation", "cluster_disk_avail", "cluster_disk_total", "cluster_pod_utilisation"} + var clusterNames []string + as.dbEngin.Table("t_cluster").Select("name").Where("id in ?", as.option.ClusterIds).Find(&clusterNames) + for _, c := range clusterNames { + rMetrics = as.promClient.GetNamedMetrics(metrics, time.Now(), tracker.ClusterOption{ClusterName: c}) + r := collector.ResourceStats{} + var cid string + as.dbEngin.Table("t_cluster").Select("id").Where("name = ?", c).Find(&cid) + r.ClusterId = cid + r.Name = c + for _, metric := range rMetrics { + if metric.MetricName == "cluster_cpu_total" { + r.CpuCoreTotal = int64(metric.MetricData.MetricValues[0].Sample.Value()) + } + if metric.MetricName == "cluster_cpu_avail" { + cpuAvail := metric.MetricData.MetricValues[0].Sample.Value() + r.CpuCoreAvail = int64(math.Round(cpuAvail)) + } + if metric.MetricName == "cluster_memory_total" { + r.MemTotal = metric.MetricData.MetricValues[0].Sample.Value() + } + if metric.MetricName == "cluster_memory_avail" { + r.MemAvail = metric.MetricData.MetricValues[0].Sample.Value() + } + if metric.MetricName == "cluster_disk_total" { + r.DiskTotal = metric.MetricData.MetricValues[0].Sample.Value() + } + if metric.MetricName == "cluster_disk_avail" { + r.DiskAvail = metric.MetricData.MetricValues[0].Sample.Value() + } + } + resp = append(resp, &r) + } + return resp, nil } diff --git a/api/internal/scheduler/schedulers/option/cloudOption.go b/api/internal/scheduler/schedulers/option/cloudOption.go index cf2df437..2654c755 100644 --- a/api/internal/scheduler/schedulers/option/cloudOption.go +++ b/api/internal/scheduler/schedulers/option/cloudOption.go @@ -7,6 +7,7 @@ type CloudOption struct { Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` ReqBody []string `json:"reqBody"` + Replica int32 `json:"replicas,string"` } func (c CloudOption) GetOptionType() string { diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 48baec41..c3884c99 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -105,6 +105,7 @@ type GeneralTaskReq struct { Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` ReqBody []string `json:"reqBody"` + Replicas int64 `json:"replicas,string"` } type DeleteTaskReq struct { From 61338174e8fb5dd02a271cfc60b88da7e66e9589 Mon Sep 17 00:00:00 2001 From: jagger Date: Tue, 30 Apr 2024 16:07:53 +0800 Subject: [PATCH 2/2] fix Signed-off-by: jagger Former-commit-id: 0d2a0bae21f85a108817f73c6f0f5f3657720087 --- api/etc/pcm.yaml | 4 +- .../logic/cloud/commitgeneraltasklogic.go | 99 ++++++++++++------- api/internal/logic/core/pulltaskinfologic.go | 3 +- pkg/constants/task.go | 1 + pkg/models/cloud/task_cloud.go | 23 +++-- 5 files changed, 79 insertions(+), 51 deletions(-) diff --git a/api/etc/pcm.yaml b/api/etc/pcm.yaml index 2a27ffba..ee51794e 100644 --- a/api/etc/pcm.yaml +++ b/api/etc/pcm.yaml @@ -5,8 +5,8 @@ Port: 8999 Timeout: 50000 DB: - DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local - # DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local + DataSource: root:uJpLd6u-J?HC1@(10.206.0.12:3306)/pcm?parseTime=true&loc=Local +# DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local Redis: Host: 10.206.0.12:6379 Pass: redisPW123 diff --git a/api/internal/logic/cloud/commitgeneraltasklogic.go b/api/internal/logic/cloud/commitgeneraltasklogic.go index e2bf7c5b..cf8842b9 100644 --- a/api/internal/logic/cloud/commitgeneraltasklogic.go +++ b/api/internal/logic/cloud/commitgeneraltasklogic.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "github.com/pkg/errors" + clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -15,7 +16,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" kyaml "k8s.io/apimachinery/pkg/util/yaml" - "sigs.k8s.io/yaml" "strconv" "strings" "time" @@ -41,71 +41,98 @@ func NewCommitGeneralTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) } func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) error { - var yamlStr []string - for _, s := range req.ReqBody { - j2, err := yaml.YAMLToJSON([]byte(s)) - if err != nil { - logx.Errorf("Failed to convert yaml to JSON, err: %v", err) - return err + tx := l.svcCtx.DbEngin.Begin() + // 执行回滚或者提交操作 + defer func() { + if p := recover(); p != nil { + tx.Rollback() + logx.Error(p) + } else if tx.Error != nil { + logx.Info("rollback, error", tx.Error) + tx.Rollback() + } else { + tx = tx.Commit() + logx.Info("commit success") } - yamlStr = append(yamlStr, string(j2)) - } - result := strings.Join(yamlStr, ",") - //TODO The namespace is fixed to ns-admin for the time being. Later, the namespace is obtained based on the user - taskModel := models.Task{ - Status: constants.Saved, - Name: req.Name, - CommitTime: time.Now(), - YamlString: "[" + result + "]", - } - // Save the task data to the database - tx := l.svcCtx.DbEngin.Create(&taskModel) - if tx.Error != nil { - return tx.Error - } - + }() + //TODO adapter + adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64) var clusters []*models.CloudModel - err := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error + err := tx.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error if err != nil { logx.Errorf("CommitGeneralTask() => sql execution error: %v", err) return errors.Errorf("the cluster does not match the drive resources. Check the data") } taskCloud := cloud.TaskCloudModel{} - //TODO 执行策略返回集群跟 Replica opt := &option.CloudOption{} utils.Convert(&req, &opt) - sc, err := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient) - if err != nil { - return err - } + sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc) if err != nil { + logx.Errorf("AssignAndSchedule() => execution error: %v", err) return err } rs := (results).([]*schedulers.CloudResult) + + var synergyStatus int64 + if len(rs) > 1 { + synergyStatus = 1 + } + var strategy int64 + sqlStr := `select t_dict_item.item_value + from t_dict + left join t_dict_item on t_dict.id = t_dict_item.dict_id + where item_text = ? + and t_dict.dict_code = 'schedule_Strategy'` + //查询调度策略 + err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error + taskModel := models.Task{ + Id: utils.GenSnowflakeID(), + Status: constants.Pending, + Name: req.Name, + CommitTime: time.Now(), + YamlString: strings.Join(req.ReqBody, "\n---\n"), + TaskTypeDict: 0, + SynergyStatus: synergyStatus, + Strategy: strategy, + } + var taskClouds []cloud.TaskCloudModel for _, r := range rs { for _, s := range req.ReqBody { sStruct := UnMarshalK8sStruct(s, int64(r.Replica)) unString, _ := sStruct.MarshalJSON() taskCloud.Id = utils.GenSnowflakeIDUint() taskCloud.TaskId = uint(taskModel.Id) - adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64) clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) taskCloud.AdapterId = uint(adapterId) taskCloud.ClusterId = uint(clusterId) taskCloud.ClusterName = r.ClusterName - taskCloud.Status = "Pending" + taskCloud.Status = constants.Pending taskCloud.YamlString = string(unString) taskCloud.Kind = sStruct.GetKind() taskCloud.Namespace = sStruct.GetNamespace() - tx = l.svcCtx.DbEngin.Create(&taskCloud) - if tx.Error != nil { - logx.Errorf("CommitGeneralTask() create taskCloud => sql execution error: %v", err) - return tx.Error - } + taskClouds = append(taskClouds, taskCloud) } } + adapterName := "" + tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName) + noticeInfo := clientCore.NoticeInfo{ + AdapterId: int64(adapterId), + AdapterName: adapterName, + NoticeType: "create", + TaskName: req.Name, + Incident: "任务创建中", + CreatedTime: time.Now(), + } + db := tx.Table("task").Create(&taskModel) + db = tx.Table("task_cloud").Create(&taskClouds) + db = tx.Table("t_notice").Create(¬iceInfo) + if db.Error != nil { + logx.Errorf("Task creation failure, err: %v", db.Error) + return errors.New("task creation failure") + } return nil } diff --git a/api/internal/logic/core/pulltaskinfologic.go b/api/internal/logic/core/pulltaskinfologic.go index ef9b86d9..9581659e 100644 --- a/api/internal/logic/core/pulltaskinfologic.go +++ b/api/internal/logic/core/pulltaskinfologic.go @@ -5,6 +5,7 @@ import ( "github.com/jinzhu/copier" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gorm.io/gorm" @@ -54,7 +55,7 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie } } case 0: - var cloudModelList []models.Cloud + var cloudModelList []cloud.TaskCloudModel err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList) if err != nil { return nil, err diff --git a/pkg/constants/task.go b/pkg/constants/task.go index daf8879f..0ec079f3 100644 --- a/pkg/constants/task.go +++ b/pkg/constants/task.go @@ -26,4 +26,5 @@ const ( WaitRestart = "WaitRestart" WaitPause = "WaitPause" WaitStart = "WaitStart" + Pending = "Pending" ) diff --git a/pkg/models/cloud/task_cloud.go b/pkg/models/cloud/task_cloud.go index 13e8c045..3dec32bc 100644 --- a/pkg/models/cloud/task_cloud.go +++ b/pkg/models/cloud/task_cloud.go @@ -6,18 +6,17 @@ import ( ) type TaskCloudModel struct { - Id uint `json:"id" gorm:"primarykey;not null;comment:id"` - TaskId uint `json:"taskId" gorm:"not null;comment:task表id"` - AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"` - ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"` - ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"` - Kind string `json:"kind" gorm:"comment:种类"` - Status string `json:"status" gorm:"comment:状态"` - StartTime time.Time `json:"startTime" gorm:"comment:开始时间"` - YamlString string `json:"yamlString" gorm:"not null;comment:入参"` - Result string `json:"result" gorm:"comment:运行结果"` - Namespace string `json:"namespace" gorm:"comment:命名空间"` - Replica int `json:"replica" gorm:"not null;comment:副本数"` + Id uint `json:"id" gorm:"primarykey;not null;comment:id"` + TaskId uint `json:"taskId" gorm:"not null;comment:task表id"` + AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"` + ClusterId uint `json:"clusterId" gorm:"not null;comment:集群id"` + ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"` + Kind string `json:"kind" gorm:"comment:种类"` + Status string `json:"status" gorm:"comment:状态"` + StartTime *time.Time `json:"startTime" gorm:"comment:开始时间"` + YamlString string `json:"yamlString" gorm:"not null;comment:入参"` + Result string `json:"result" gorm:"comment:运行结果"` + Namespace string `json:"namespace" gorm:"comment:命名空间"` base.BaseModel }