From ff7aae0a88284823def65d1547cacc48a48351f6 Mon Sep 17 00:00:00 2001 From: jagger Date: Tue, 30 Apr 2024 19:19:02 +0800 Subject: [PATCH 1/3] fix Signed-off-by: jagger Former-commit-id: 7ea02a9f7d2e00567c7fd8d9a6a09fa29fc9e3ce --- api/client/types.go | 22 +++++++------- api/internal/logic/core/pushtaskinfologic.go | 32 +++++++++++++++----- pkg/models/cloud/task_cloud.go | 2 +- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/api/client/types.go b/api/client/types.go index 4bbce2e9..940c88df 100644 --- a/api/client/types.go +++ b/api/client/types.go @@ -111,17 +111,17 @@ type HpcInfo struct { } type CloudInfo struct { - Id uint `json:"id,omitempty"` - TaskId int64 `json:"taskId,omitempty"` - AdapterId uint `json:"adapterId,omitempty"` - ClusterId uint `json:"clusterId,omitempty"` - ClusterName string `json:"clusterName,omitempty"` - Kind string `json:"kind,omitempty"` - Status string `json:"status,omitempty"` - StartTime *time.Time `json:"startTime,omitempty"` - YamlString string `json:"yamlString,omitempty"` - Result string `json:"result,omitempty"` - Namespace string `json:"namespace,omitempty"` + Id uint `json:"id,omitempty,optional"` + TaskId int64 `json:"taskId,omitempty,optional"` + AdapterId uint `json:"adapterId,omitempty,optional"` + ClusterId uint `json:"clusterId,omitempty,optional"` + ClusterName string `json:"clusterName,omitempty,optional"` + Kind string `json:"kind,omitempty,optional"` + Status string `json:"status,omitempty,optional"` + StartTime *time.Time `json:"startTime,omitempty,optional,string"` + YamlString string `json:"yamlString,omitempty,optional"` + Result string `json:"result,omitempty,optional"` + Namespace string `json:"namespace,omitempty,optional"` } type AiInfo struct { diff --git a/api/internal/logic/core/pushtaskinfologic.go b/api/internal/logic/core/pushtaskinfologic.go index 1c056a86..dbc93f3a 100644 --- a/api/internal/logic/core/pushtaskinfologic.go +++ b/api/internal/logic/core/pushtaskinfologic.go @@ -2,14 +2,15 @@ package core import ( "context" + "github.com/pkg/errors" + "github.com/zeromicro/go-zero/core/logx" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gorm.io/gorm" "strings" - - "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "time" ) type PushTaskInfoLogic struct { @@ -33,9 +34,14 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie switch kind { case 0: for _, cloudInfo := range req.CloudInfoList { - l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?", - cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, req.AdapterId, cloudInfo.Id) - syncTask(l.svcCtx.DbEngin, cloudInfo.TaskId) + var taskId uint + result := l.svcCtx.DbEngin.Table("task_cloud").Select("task_id").Where("cluster_name=? and adapter_id=? and kind=?", cloudInfo.ClusterName, cloudInfo.AdapterId, cloudInfo.Kind).Find(&taskId) + if errors.Is(result.Error, gorm.ErrRecordNotFound) { + return nil, errors.New("Record does not exist") + } + l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where cluster_name = ? and adapter_id = ?", + cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.ClusterName, cloudInfo.AdapterId) + syncTask(l.svcCtx.DbEngin, int64(taskId)) } case 2: for _, hpcInfo := range req.HpcInfoList { @@ -63,7 +69,7 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie func syncTask(gorm *gorm.DB, taskId int64) { var allStatus string - tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) + tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join task_cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) if tx.Error != nil { logx.Error(tx.Error) } @@ -79,7 +85,7 @@ func syncTask(gorm *gorm.DB, taskId int64) { } if strings.Contains(allStatus, constants.Running) { - updateTask(gorm, taskId, constants.Running) + updateTaskRunning(gorm, taskId, constants.Running) } } @@ -93,6 +99,16 @@ func updateTask(gorm *gorm.DB, taskId int64, status string) { } } +func updateTaskRunning(gorm *gorm.DB, taskId int64, status string) { + var task models.Task + gorm.Where("id = ? ", taskId).Find(&task) + if task.Status != status { + task.Status = status + task.StartTime = time.Now().Format("2006-01-02 15:04:05") + gorm.Updates(&task) + } +} + func removeRepeatedElement(arr []string) (newArr []string) { newArr = make([]string, 0) for i := 0; i < len(arr); i++ { diff --git a/pkg/models/cloud/task_cloud.go b/pkg/models/cloud/task_cloud.go index 3dec32bc..d60c236e 100644 --- a/pkg/models/cloud/task_cloud.go +++ b/pkg/models/cloud/task_cloud.go @@ -13,7 +13,7 @@ type TaskCloudModel struct { ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"` Kind string `json:"kind" gorm:"comment:种类"` Status string `json:"status" gorm:"comment:状态"` - StartTime *time.Time `json:"startTime" gorm:"comment:开始时间"` + StartTime *time.Time `json:"startTime,string" gorm:"comment:开始时间"` YamlString string `json:"yamlString" gorm:"not null;comment:入参"` Result string `json:"result" gorm:"comment:运行结果"` Namespace string `json:"namespace" gorm:"comment:命名空间"` From f4d1b72c179227d8459b3e24edc820c1882d369d Mon Sep 17 00:00:00 2001 From: qiwang <1364512070@qq.com> Date: Mon, 6 May 2024 14:52:46 +0800 Subject: [PATCH 2/3] =?UTF-8?q?fix=EF=BC=9ACreate=20virtual=20machine=20ta?= =?UTF-8?q?sk?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 86a445db5f8b53773486ebe0fa866ad3471b40a9 --- api/desc/core/pcm-core.api | 118 +++++++++++++---- api/internal/logic/core/commitvmtasklogic.go | 102 +++++++++----- api/internal/mqs/ScheduleVm.go | 48 ++++--- .../scheduler/schedulers/option/option.go | 1 + .../scheduler/schedulers/option/vmOption.go | 49 +++++++ .../scheduler/schedulers/vmScheduler.go | 124 +++++++++++++++--- api/internal/types/types.go | 33 +++-- 7 files changed, 372 insertions(+), 103 deletions(-) create mode 100644 api/internal/scheduler/schedulers/option/vmOption.go diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index 64689eb8..fb7fcfce 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -154,42 +154,116 @@ type ( } ) + type ( commitVmTaskReq { - Name string `json:"name"` - NsID string `json:"nsID"` - Replicas int64 `json:"replicas,optional"` - MatchLabels map[string]string `json:"matchLabels,optional"` - AdapterId string `json:"adapterId,optional"` - ClusterType string `json:"clusterType,optional"` - //Virtual Machine Section + // Name string `json:"name"` + // NsID string `json:"nsID"` + // Replicas int64 `json:"replicas,optional"` + // MatchLabels map[string]string `json:"matchLabels,optional"` + // AdapterId string `json:"adapterId,optional"` + // ClusterType string `json:"clusterType,optional"` + // //Virtual Machine Section CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"` + VmOption *VmOption `json:"vmOption,optional"` } + VmOption { + AdapterId string `json:"adapterId"` + VmClusterIds []string `json:"vmClusterIds"` + Replicas int64 `json:"replicas,optional"` + Name string `json:"name"` + //ResourceType string `json:"resourceType"` + //TaskType string `json:"taskType"` + Strategy string `json:"strategy"` + ClusterToStaticWeight map[string]int32 `json:"clusterToStaticWeight"` + MatchLabels map[string]string `json:"matchLabels,optional"` + StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` + CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"` + // Id int64 `json:"id"` + // ParticipantId int64 `json:"participantId"` + // TaskId int64 `json:"taskId"` + // AdapterId int64 `json:"adapterId"` + // ClusterId int64 `json:"clusterId"` + // FlavorRef string `json:"flavorRef"` + // ImageRef string `json:"imageRef"` + // Status string `json:"status"` + // Platform string `json:"platform"` + // Description string `json:"description"` + // AvailabilityZone string `json:"availabilityZone"` + // MinCount int64 `json:"minCount"` + // Uuid string `json:"uuid"` + // StartTime string `json:"startTime"` + // RunningTime string `json:"runningTime"` + // Result string `json:"result"` + // DeletedAt string `json:"deletedAt"` + } + CreateMulDomainServer { - Platform string `json:"platform,optional"` - Name string `json:"name,optional"` - Min_count int64 `json:"min_count,optional"` - ImageRef string `json:"imageRef,optional"` - FlavorRef string `json:"flavorRef,optional"` - Uuid string `json:"uuid,optional"` + Platform string `json:"platform,optional"` + name string `json:"name,optional"` + min_count int64 `json:"min_count,optional"` + imageRef string `json:"imageRef,optional"` + flavorRef string `json:"flavorRef,optional"` + uuid string `json:"uuid,optional"` + ClusterId string `json:"clusterId,optional"` } commitVmTaskResp { - // VmTask []VmTask `json:"vmTask" copier:"VmTask"` - TaskId int64 `json:"taskId"` Code int32 `json:"code"` Msg string `json:"msg"` } - VmTask { - Id string `json:"id" copier:"Id"` - Links []VmLinks `json:"links" copier:"Links"` - OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"` - SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"` - AdminPass string `json:"adminPass" copier:"AdminPass"` + ScheduleVmResult struct { + ClusterId string `json:"clusterId"` + TaskId string `json:"taskId"` + Strategy string `json:"strategy"` + Replica int32 `json:"replica"` + Msg string `json:"msg"` + } + VmTask{ + Id string `json:"id" copier:"Id"` + Links []VmLinks `json:"links" copier:"Links"` + OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"` + SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"` + AdminPass string `json:"adminPass" copier:"AdminPass"` } VmLinks { Href string `json:"href " copier:"Href"` - Rel string `json:"rel" copier:"Rel"` + Rel string `json:"rel" copier:"Rel"` } +// commitVmTaskReq { +// Name string `json:"name"` +// NsID string `json:"nsID"` +// Replicas int64 `json:"replicas,optional"` +// MatchLabels map[string]string `json:"matchLabels,optional"` +// AdapterId string `json:"adapterId,optional"` +// ClusterType string `json:"clusterType,optional"` +// //Virtual Machine Section +// CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"` +// } +// CreateMulDomainServer { +// Platform string `json:"platform,optional"` +// Name string `json:"name,optional"` +// Min_count int64 `json:"min_count,optional"` +// ImageRef string `json:"imageRef,optional"` +// FlavorRef string `json:"flavorRef,optional"` +// Uuid string `json:"uuid,optional"` +// } +// commitVmTaskResp { +// // VmTask []VmTask `json:"vmTask" copier:"VmTask"` +// TaskId int64 `json:"taskId"` +// Code int32 `json:"code"` +// Msg string `json:"msg"` +// } +// VmTask { +// Id string `json:"id" copier:"Id"` +// Links []VmLinks `json:"links" copier:"Links"` +// OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"` +// SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"` +// AdminPass string `json:"adminPass" copier:"AdminPass"` +// } +// VmLinks { +// Href string `json:"href " copier:"Href"` +// Rel string `json:"rel" copier:"Rel"` +// } VmSecurity_groups_server { Name string `json:"name" copier:"Name"` diff --git a/api/internal/logic/core/commitvmtasklogic.go b/api/internal/logic/core/commitvmtasklogic.go index 3e11b7ac..ede94b86 100644 --- a/api/internal/logic/core/commitvmtasklogic.go +++ b/api/internal/logic/core/commitvmtasklogic.go @@ -3,11 +3,13 @@ package core import ( "context" "fmt" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" - "math/rand" + "strconv" "time" "github.com/zeromicro/go-zero/core/logx" @@ -29,11 +31,24 @@ func NewCommitVmTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) { // todo: add your logic here and delete this line + resp = &types.CommitVmTaskResp{} //Building the main task structure + opt := &option.VmOption{ + AdapterId: req.VmOption.AdapterId, + Replicas: req.VmOption.Replicas, + Strategy: req.VmOption.Strategy, + ClusterToStaticWeight: req.VmOption.StaticWeightMap, + Status: constants.Saved, + MatchLabels: req.VmOption.MatchLabels, + StaticWeightMap: req.VmOption.StaticWeightMap, + Name: req.VmOption.Name, + CommitTime: time.Now(), + } taskModel := models.Task{ - Status: constants.Saved, - Name: req.Name, - CommitTime: time.Now(), + Status: constants.Saved, + Name: req.VmOption.Name, + CommitTime: time.Now(), + Description: "vm task", } // Save task data to database tx := l.svcCtx.DbEngin.Create(&taskModel) @@ -41,38 +56,63 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type return nil, tx.Error } - for _, CreateMulServer := range req.CreateMulServer { - fmt.Println("", req.CreateMulServer) - var clusterIds []int64 - l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? and label = ?", req.AdapterId, req.ClusterType).Scan(&clusterIds) + //var clusters []*models.VmModel + //err2 := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.VmOption.AdapterId, req.VmOption.VmClusterIds).Scan(&clusters).Error + //if err2 != nil { + // logx.Errorf("CommitGeneralTask() => sql execution error: %v", err) + // //return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil + //} - if len(clusterIds) == 0 || clusterIds == nil { - return nil, nil - } + taskVm := models.TaskVm{} + //TODO 执行策略返回集群跟 Replica + /*opt := &option.VmOption{} + utils.Convert(&req, &opt)*/ + // 2、Initialize scheduler + vmSchdl, err := schedulers.NewVmScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient) + if err != nil { + return nil, err + } - vmInfo := models.TaskVm{ - TaskId: taskModel.Id, - ClusterId: clusterIds[rand.Intn(len(clusterIds))], - Name: taskModel.Name, - Status: "Saved", - StartTime: time.Now().String(), - MinCount: CreateMulServer.Min_count, - ImageRef: CreateMulServer.ImageRef, - FlavorRef: CreateMulServer.FlavorRef, - Uuid: CreateMulServer.Uuid, - Platform: CreateMulServer.Platform, - } + // 3、Return scheduling results + results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl) + if err != nil { + return nil, err + } - tx = l.svcCtx.DbEngin.Create(&vmInfo) - if tx.Error != nil { - return nil, tx.Error - } - resp = &types.CommitVmTaskResp{ - Code: 200, - Msg: "success", - TaskId: taskModel.Id, + rs := (results).([]*schedulers.VmResult) + for _, r := range rs { + for _, CreateMulServer := range req.CreateMulServer { + if r.Replica > 0 && r.ClusterId == CreateMulServer.ClusterId { + fmt.Println("", req.CreateMulServer) + var clusterIds []int64 + l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? ", req.VmOption.AdapterId).Scan(&clusterIds) + if len(clusterIds) == 0 || clusterIds == nil { + return nil, nil + } + adapterId, _ := strconv.ParseUint(req.VmOption.AdapterId, 10, 64) + taskVm.AdapterId = int64(adapterId) + clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) + taskVm.ClusterId = int64(clusterId) + taskVm.Name = req.VmOption.Name + taskVm.TaskId = taskModel.Id + clusterId, _ = strconv.ParseUint(r.ClusterId, 10, 64) + taskVm.ClusterId = int64(clusterId) + taskVm.Status = "Saved" + taskVm.StartTime = time.Now().String() + taskVm.MinCount = CreateMulServer.Min_count + taskVm.ImageRef = CreateMulServer.ImageRef + taskVm.FlavorRef = CreateMulServer.FlavorRef + taskVm.Uuid = CreateMulServer.Uuid + taskVm.Platform = CreateMulServer.Platform + tx = l.svcCtx.DbEngin.Create(&taskVm) + if tx.Error != nil { + return nil, tx.Error + } + } } } + resp.Code = 200 + resp.Msg = "Success" return resp, nil } diff --git a/api/internal/mqs/ScheduleVm.go b/api/internal/mqs/ScheduleVm.go index 9cf4c203..6c362807 100644 --- a/api/internal/mqs/ScheduleVm.go +++ b/api/internal/mqs/ScheduleVm.go @@ -2,8 +2,6 @@ package mqs import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" ) @@ -24,28 +22,28 @@ func NewVmMq(ctx context.Context, svcCtx *svc.ServiceContext) *VmMq { func (l *VmMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 - vmScheduler := schedulers.NewVmScheduler() - schdl, err := scheduler.NewScheduler(vmScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) - if err != nil { - return err - } - - //检测是否指定了集群列表 - schdl.SpecifyClusters() - - //检测是否指定了nsID - schdl.SpecifyNsID() - - //通过标签匹配筛选出集群范围 - schdl.MatchLabels() - - //todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 - schdl.TempAssign() - - // 存储数据 - err = schdl.SaveToDb() - if err != nil { - return err - } + //vmScheduler := schedulers.NewVmScheduler() + //schdl, err := scheduler.NewScheduler(vmScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc) + //if err != nil { + // return err + //} + // + ////检测是否指定了集群列表 + //schdl.SpecifyClusters() + // + ////检测是否指定了nsID + //schdl.SpecifyNsID() + // + ////通过标签匹配筛选出集群范围 + //schdl.MatchLabels() + // + ////todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度 + //schdl.TempAssign() + // + //// 存储数据 + //err = schdl.SaveToDb() + //if err != nil { + // return err + //} return nil } diff --git a/api/internal/scheduler/schedulers/option/option.go b/api/internal/scheduler/schedulers/option/option.go index f7c9eef1..111904b4 100644 --- a/api/internal/scheduler/schedulers/option/option.go +++ b/api/internal/scheduler/schedulers/option/option.go @@ -4,6 +4,7 @@ const ( AI = "ai" CLOUD = "cloud" HPC = "hpc" + VM = "vm" ) type Option interface { diff --git a/api/internal/scheduler/schedulers/option/vmOption.go b/api/internal/scheduler/schedulers/option/vmOption.go new file mode 100644 index 00000000..54006a83 --- /dev/null +++ b/api/internal/scheduler/schedulers/option/vmOption.go @@ -0,0 +1,49 @@ +package option + +import "time" + +type VmOption struct { + AdapterId string + ClusterIds []string + TaskName string + ResourceType string // cpu/gpu/compute card + TaskType string // pytorch/tensorflow/mindspore + Strategy string + ClusterToStaticWeight map[string]int32 + CommitTime time.Time + NsID string + Replicas int64 + MatchLabels map[string]string + StaticWeightMap map[string]int32 + CreateMulServer []CreateMulDomainServer + Id int64 + ParticipantId int64 + TaskId int64 + Name string + ClusterId int64 + FlavorRef string + ImageRef string + Status string + Platform string + Description string + AvailabilityZone string + MinCount int64 + Uuid string + StartTime string + RunningTime string + Result string + DeletedAt string +} + +type CreateMulDomainServer struct { + Platform string + Name string + Min_count int64 + ImageRef string + FlavorRef string + Uuid string +} + +func (a VmOption) GetOptionType() string { + return VM +} diff --git a/api/internal/scheduler/schedulers/vmScheduler.go b/api/internal/scheduler/schedulers/vmScheduler.go index 4bb626f3..7aa20f63 100644 --- a/api/internal/scheduler/schedulers/vmScheduler.go +++ b/api/internal/scheduler/schedulers/vmScheduler.go @@ -1,29 +1,96 @@ package schedulers import ( + "context" + "github.com/pkg/errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gorm.io/gorm" ) type VmScheduler struct { - storage database.Storage + yamlString string + storage database.Storage + task *response.TaskInfo + *scheduler.Scheduler + option *option.VmOption + ctx context.Context + promClient tracker.Prometheus + dbEngin *gorm.DB } -func NewVmScheduler() *VmScheduler { - return &VmScheduler{} +type VmResult struct { + TaskId string + ClusterId string + ClusterName string + Strategy string + Replica int32 + Msg string } +func NewVmScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.VmOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*VmScheduler, error) { + return &VmScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil +} + +/*func NewCloudScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.CloudOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*CloudScheduler, error) { + return &CloudScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil +}*/ + func (vm *VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) { - //获取所有计算中心 - //调度算法 - strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{}) - return strategy, nil + if len(vm.option.ClusterIds) == 1 { + // TODO database operation Find + return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: vm.option.ClusterIds[0], Replicas: 1}}, nil + } + //resources, err := vm.findClustersWithResources() + + /* if err != nil { + return nil, err + }*/ + + /* if len(resources) == 0 { + return nil, errors.New("no cluster has resources") + }*/ + // + //if len(resources) == 1 { + // var cluster strategy.AssignedCluster + // cluster.ClusterId = resources[0].ClusterId + // cluster.Replicas = 1 + // return &strategy.SingleAssignment{Cluster: &cluster}, nil + //} + //params := ¶m.Params{Resources: resources} + + switch vm.option.Strategy { + /* case strategy.REPLICATION: + var clusterIds []string + for _, resource := range resources { + clusterIds = append(clusterIds, resource.ClusterId) + } + strategy := strategy.NewReplicationStrategy(clusterIds, 1) + return strategy, nil + case strategy.RESOURCES_PRICING: + strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1}) + return strategy, nil + case strategy.DYNAMIC_RESOURCES: + strategy := strategy.NewDynamicResourcesStrategy(params.Resources, vm.option, 1) + return strategy, nil*/ + case strategy.STATIC_WEIGHT: + //todo resources should match cluster StaticWeightMap + strategy := strategy.NewStaticWeightStrategy(vm.option.ClusterToStaticWeight, 1) + return strategy, nil + } + + /*strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{}) + return strategy, nil*/ + + return nil, errors.New("no strategy has been chosen") } func (v *VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) { @@ -41,12 +108,6 @@ func (v *VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string vm.ParticipantId = participantId*/ } -/* - func (vm *VmScheduler) UnMarshalVmStruct(yamlString string, taskId int64, nsID string) models.vm { - var vm models.Vm - vm := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) - } -*/ func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) { proParams, err := vm.storage.GetProviderParams() if err != nil { @@ -64,7 +125,38 @@ func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider return nil, providerList, nil } -func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { +func (as *VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { //TODO implement me - panic("implement me") + if clusters == nil { + return nil, errors.New("clusters is nil") + } + + for i := len(clusters) - 1; i >= 0; i-- { + if clusters[i].Replicas == 0 { + clusters = append(clusters[:i], clusters[i+1:]...) + } + } + + if len(clusters) == 0 { + return nil, errors.New("clusters is nil") + } + + var results []*VmResult + + for _, cluster := range clusters { + cName := "" + as.dbEngin.Table("t_cluster").Select("name").Where("id=?", cluster.ClusterId).Find(&cName) + cr := VmResult{ + ClusterId: cluster.ClusterId, + ClusterName: cName, + Replica: cluster.Replicas, + } + cr.ClusterId = cluster.ClusterId + cr.Replica = cluster.Replicas + + cr.ClusterName = cName + results = append(results, &cr) + } + + return results, nil } diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 321e6755..3f8820ad 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -140,13 +140,20 @@ type TaskYaml struct { } type CommitVmTaskReq struct { - Name string `json:"name"` - NsID string `json:"nsID"` - Replicas int64 `json:"replicas,optional"` - MatchLabels map[string]string `json:"matchLabels,optional"` - AdapterId string `json:"adapterId,optional"` - ClusterType string `json:"clusterType,optional"` CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"` + VmOption *VmOption `json:"vmOption,optional"` +} + +type VmOption struct { + AdapterId string `json:"adapterId"` + VmClusterIds []string `json:"vmClusterIds"` + Replicas int64 `json:"replicas,optional"` + Name string `json:"name"` + Strategy string `json:"strategy"` + ClusterToStaticWeight map[string]int32 `json:"clusterToStaticWeight"` + MatchLabels map[string]string `json:"matchLabels,optional"` + StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` + CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"` } type CreateMulDomainServer struct { @@ -156,12 +163,20 @@ type CreateMulDomainServer struct { ImageRef string `json:"imageRef,optional"` FlavorRef string `json:"flavorRef,optional"` Uuid string `json:"uuid,optional"` + ClusterId string `json:"clusterId,optional"` } type CommitVmTaskResp struct { - TaskId int64 `json:"taskId"` - Code int32 `json:"code"` - Msg string `json:"msg"` + Code int32 `json:"code"` + Msg string `json:"msg"` +} + +type ScheduleVmResult struct { + ClusterId string `json:"clusterId"` + TaskId string `json:"taskId"` + Strategy string `json:"strategy"` + Replica int32 `json:"replica"` + Msg string `json:"msg"` } type VmTask struct { From cbf44ff18410d09a74c7bd0f1663148b36ddcc4f Mon Sep 17 00:00:00 2001 From: jagger Date: Mon, 6 May 2024 15:00:50 +0800 Subject: [PATCH 3/3] fix Signed-off-by: jagger Former-commit-id: 153446e456a61b354ff1aed88e41860d3883a38b --- api/internal/logic/core/pushtaskinfologic.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/internal/logic/core/pushtaskinfologic.go b/api/internal/logic/core/pushtaskinfologic.go index dbc93f3a..df1635d0 100644 --- a/api/internal/logic/core/pushtaskinfologic.go +++ b/api/internal/logic/core/pushtaskinfologic.go @@ -35,12 +35,12 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie case 0: for _, cloudInfo := range req.CloudInfoList { var taskId uint - result := l.svcCtx.DbEngin.Table("task_cloud").Select("task_id").Where("cluster_name=? and adapter_id=? and kind=?", cloudInfo.ClusterName, cloudInfo.AdapterId, cloudInfo.Kind).Find(&taskId) + result := l.svcCtx.DbEngin.Table("task_cloud").Select("task_id").Where("task_id = ?", cloudInfo.TaskId).Find(&taskId) if errors.Is(result.Error, gorm.ErrRecordNotFound) { return nil, errors.New("Record does not exist") } - l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where cluster_name = ? and adapter_id = ?", - cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.ClusterName, cloudInfo.AdapterId) + l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?", + cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId) syncTask(l.svcCtx.DbEngin, int64(taskId)) } case 2: