Merge pull request 'Create virtual machine task' (#135) from qiwang/pcm-coordinator:wizard_vm into master

Former-commit-id: d80828a656bab574f9850544e16b7da659a5e401
This commit is contained in:
qiwang 2024-05-06 15:32:51 +08:00
commit 6bc4512013
7 changed files with 372 additions and 103 deletions

View File

@ -154,42 +154,116 @@ type (
}
)
type (
commitVmTaskReq {
Name string `json:"name"`
NsID string `json:"nsID"`
Replicas int64 `json:"replicas,optional"`
MatchLabels map[string]string `json:"matchLabels,optional"`
AdapterId string `json:"adapterId,optional"`
ClusterType string `json:"clusterType,optional"`
//Virtual Machine Section
// Name string `json:"name"`
// NsID string `json:"nsID"`
// Replicas int64 `json:"replicas,optional"`
// MatchLabels map[string]string `json:"matchLabels,optional"`
// AdapterId string `json:"adapterId,optional"`
// ClusterType string `json:"clusterType,optional"`
// //Virtual Machine Section
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
VmOption *VmOption `json:"vmOption,optional"`
}
VmOption {
AdapterId string `json:"adapterId"`
VmClusterIds []string `json:"vmClusterIds"`
Replicas int64 `json:"replicas,optional"`
Name string `json:"name"`
//ResourceType string `json:"resourceType"`
//TaskType string `json:"taskType"`
Strategy string `json:"strategy"`
ClusterToStaticWeight map[string]int32 `json:"clusterToStaticWeight"`
MatchLabels map[string]string `json:"matchLabels,optional"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
// Id int64 `json:"id"`
// ParticipantId int64 `json:"participantId"`
// TaskId int64 `json:"taskId"`
// AdapterId int64 `json:"adapterId"`
// ClusterId int64 `json:"clusterId"`
// FlavorRef string `json:"flavorRef"`
// ImageRef string `json:"imageRef"`
// Status string `json:"status"`
// Platform string `json:"platform"`
// Description string `json:"description"`
// AvailabilityZone string `json:"availabilityZone"`
// MinCount int64 `json:"minCount"`
// Uuid string `json:"uuid"`
// StartTime string `json:"startTime"`
// RunningTime string `json:"runningTime"`
// Result string `json:"result"`
// DeletedAt string `json:"deletedAt"`
}
CreateMulDomainServer {
Platform string `json:"platform,optional"`
Name string `json:"name,optional"`
Min_count int64 `json:"min_count,optional"`
ImageRef string `json:"imageRef,optional"`
FlavorRef string `json:"flavorRef,optional"`
Uuid string `json:"uuid,optional"`
Platform string `json:"platform,optional"`
name string `json:"name,optional"`
min_count int64 `json:"min_count,optional"`
imageRef string `json:"imageRef,optional"`
flavorRef string `json:"flavorRef,optional"`
uuid string `json:"uuid,optional"`
ClusterId string `json:"clusterId,optional"`
}
commitVmTaskResp {
// VmTask []VmTask `json:"vmTask" copier:"VmTask"`
TaskId int64 `json:"taskId"`
Code int32 `json:"code"`
Msg string `json:"msg"`
}
VmTask {
Id string `json:"id" copier:"Id"`
Links []VmLinks `json:"links" copier:"Links"`
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"`
AdminPass string `json:"adminPass" copier:"AdminPass"`
ScheduleVmResult struct {
ClusterId string `json:"clusterId"`
TaskId string `json:"taskId"`
Strategy string `json:"strategy"`
Replica int32 `json:"replica"`
Msg string `json:"msg"`
}
VmTask{
Id string `json:"id" copier:"Id"`
Links []VmLinks `json:"links" copier:"Links"`
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"`
AdminPass string `json:"adminPass" copier:"AdminPass"`
}
VmLinks {
Href string `json:"href " copier:"Href"`
Rel string `json:"rel" copier:"Rel"`
Rel string `json:"rel" copier:"Rel"`
}
// commitVmTaskReq {
// Name string `json:"name"`
// NsID string `json:"nsID"`
// Replicas int64 `json:"replicas,optional"`
// MatchLabels map[string]string `json:"matchLabels,optional"`
// AdapterId string `json:"adapterId,optional"`
// ClusterType string `json:"clusterType,optional"`
// //Virtual Machine Section
// CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
// }
// CreateMulDomainServer {
// Platform string `json:"platform,optional"`
// Name string `json:"name,optional"`
// Min_count int64 `json:"min_count,optional"`
// ImageRef string `json:"imageRef,optional"`
// FlavorRef string `json:"flavorRef,optional"`
// Uuid string `json:"uuid,optional"`
// }
// commitVmTaskResp {
// // VmTask []VmTask `json:"vmTask" copier:"VmTask"`
// TaskId int64 `json:"taskId"`
// Code int32 `json:"code"`
// Msg string `json:"msg"`
// }
// VmTask {
// Id string `json:"id" copier:"Id"`
// Links []VmLinks `json:"links" copier:"Links"`
// OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
// SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"`
// AdminPass string `json:"adminPass" copier:"AdminPass"`
// }
// VmLinks {
// Href string `json:"href " copier:"Href"`
// Rel string `json:"rel" copier:"Rel"`
// }
VmSecurity_groups_server {
Name string `json:"name" copier:"Name"`

View File

@ -3,11 +3,13 @@ package core
import (
"context"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"math/rand"
"strconv"
"time"
"github.com/zeromicro/go-zero/core/logx"
@ -29,11 +31,24 @@ func NewCommitVmTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm
func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) {
// todo: add your logic here and delete this line
resp = &types.CommitVmTaskResp{}
//Building the main task structure
opt := &option.VmOption{
AdapterId: req.VmOption.AdapterId,
Replicas: req.VmOption.Replicas,
Strategy: req.VmOption.Strategy,
ClusterToStaticWeight: req.VmOption.StaticWeightMap,
Status: constants.Saved,
MatchLabels: req.VmOption.MatchLabels,
StaticWeightMap: req.VmOption.StaticWeightMap,
Name: req.VmOption.Name,
CommitTime: time.Now(),
}
taskModel := models.Task{
Status: constants.Saved,
Name: req.Name,
CommitTime: time.Now(),
Status: constants.Saved,
Name: req.VmOption.Name,
CommitTime: time.Now(),
Description: "vm task",
}
// Save task data to database
tx := l.svcCtx.DbEngin.Create(&taskModel)
@ -41,38 +56,63 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type
return nil, tx.Error
}
for _, CreateMulServer := range req.CreateMulServer {
fmt.Println("", req.CreateMulServer)
var clusterIds []int64
l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? and label = ?", req.AdapterId, req.ClusterType).Scan(&clusterIds)
//var clusters []*models.VmModel
//err2 := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.VmOption.AdapterId, req.VmOption.VmClusterIds).Scan(&clusters).Error
//if err2 != nil {
// logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
// //return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil
//}
if len(clusterIds) == 0 || clusterIds == nil {
return nil, nil
}
taskVm := models.TaskVm{}
//TODO 执行策略返回集群跟 Replica
/*opt := &option.VmOption{}
utils.Convert(&req, &opt)*/
// 2、Initialize scheduler
vmSchdl, err := schedulers.NewVmScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient)
if err != nil {
return nil, err
}
vmInfo := models.TaskVm{
TaskId: taskModel.Id,
ClusterId: clusterIds[rand.Intn(len(clusterIds))],
Name: taskModel.Name,
Status: "Saved",
StartTime: time.Now().String(),
MinCount: CreateMulServer.Min_count,
ImageRef: CreateMulServer.ImageRef,
FlavorRef: CreateMulServer.FlavorRef,
Uuid: CreateMulServer.Uuid,
Platform: CreateMulServer.Platform,
}
// 3、Return scheduling results
results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl)
if err != nil {
return nil, err
}
tx = l.svcCtx.DbEngin.Create(&vmInfo)
if tx.Error != nil {
return nil, tx.Error
}
resp = &types.CommitVmTaskResp{
Code: 200,
Msg: "success",
TaskId: taskModel.Id,
rs := (results).([]*schedulers.VmResult)
for _, r := range rs {
for _, CreateMulServer := range req.CreateMulServer {
if r.Replica > 0 && r.ClusterId == CreateMulServer.ClusterId {
fmt.Println("", req.CreateMulServer)
var clusterIds []int64
l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? ", req.VmOption.AdapterId).Scan(&clusterIds)
if len(clusterIds) == 0 || clusterIds == nil {
return nil, nil
}
adapterId, _ := strconv.ParseUint(req.VmOption.AdapterId, 10, 64)
taskVm.AdapterId = int64(adapterId)
clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
taskVm.ClusterId = int64(clusterId)
taskVm.Name = req.VmOption.Name
taskVm.TaskId = taskModel.Id
clusterId, _ = strconv.ParseUint(r.ClusterId, 10, 64)
taskVm.ClusterId = int64(clusterId)
taskVm.Status = "Saved"
taskVm.StartTime = time.Now().String()
taskVm.MinCount = CreateMulServer.Min_count
taskVm.ImageRef = CreateMulServer.ImageRef
taskVm.FlavorRef = CreateMulServer.FlavorRef
taskVm.Uuid = CreateMulServer.Uuid
taskVm.Platform = CreateMulServer.Platform
tx = l.svcCtx.DbEngin.Create(&taskVm)
if tx.Error != nil {
return nil, tx.Error
}
}
}
}
resp.Code = 200
resp.Msg = "Success"
return resp, nil
}

View File

@ -2,8 +2,6 @@ package mqs
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
)
@ -24,28 +22,28 @@ func NewVmMq(ctx context.Context, svcCtx *svc.ServiceContext) *VmMq {
func (l *VmMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
vmScheduler := schedulers.NewVmScheduler()
schdl, err := scheduler.NewScheduler(vmScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc)
if err != nil {
return err
}
//检测是否指定了集群列表
schdl.SpecifyClusters()
//检测是否指定了nsID
schdl.SpecifyNsID()
//通过标签匹配筛选出集群范围
schdl.MatchLabels()
//todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度
schdl.TempAssign()
// 存储数据
err = schdl.SaveToDb()
if err != nil {
return err
}
//vmScheduler := schedulers.NewVmScheduler()
//schdl, err := scheduler.NewScheduler(vmScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc)
//if err != nil {
// return err
//}
//
////检测是否指定了集群列表
//schdl.SpecifyClusters()
//
////检测是否指定了nsID
//schdl.SpecifyNsID()
//
////通过标签匹配筛选出集群范围
//schdl.MatchLabels()
//
////todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度
//schdl.TempAssign()
//
//// 存储数据
//err = schdl.SaveToDb()
//if err != nil {
// return err
//}
return nil
}

View File

@ -4,6 +4,7 @@ const (
AI = "ai"
CLOUD = "cloud"
HPC = "hpc"
VM = "vm"
)
type Option interface {

View File

@ -0,0 +1,49 @@
package option
import "time"
type VmOption struct {
AdapterId string
ClusterIds []string
TaskName string
ResourceType string // cpu/gpu/compute card
TaskType string // pytorch/tensorflow/mindspore
Strategy string
ClusterToStaticWeight map[string]int32
CommitTime time.Time
NsID string
Replicas int64
MatchLabels map[string]string
StaticWeightMap map[string]int32
CreateMulServer []CreateMulDomainServer
Id int64
ParticipantId int64
TaskId int64
Name string
ClusterId int64
FlavorRef string
ImageRef string
Status string
Platform string
Description string
AvailabilityZone string
MinCount int64
Uuid string
StartTime string
RunningTime string
Result string
DeletedAt string
}
type CreateMulDomainServer struct {
Platform string
Name string
Min_count int64
ImageRef string
FlavorRef string
Uuid string
}
func (a VmOption) GetOptionType() string {
return VM
}

View File

@ -1,29 +1,96 @@
package schedulers
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
)
type VmScheduler struct {
storage database.Storage
yamlString string
storage database.Storage
task *response.TaskInfo
*scheduler.Scheduler
option *option.VmOption
ctx context.Context
promClient tracker.Prometheus
dbEngin *gorm.DB
}
func NewVmScheduler() *VmScheduler {
return &VmScheduler{}
type VmResult struct {
TaskId string
ClusterId string
ClusterName string
Strategy string
Replica int32
Msg string
}
func NewVmScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.VmOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*VmScheduler, error) {
return &VmScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil
}
/*func NewCloudScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.CloudOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*CloudScheduler, error) {
return &CloudScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil
}*/
func (vm *VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
//获取所有计算中心
//调度算法
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{})
return strategy, nil
if len(vm.option.ClusterIds) == 1 {
// TODO database operation Find
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: vm.option.ClusterIds[0], Replicas: 1}}, nil
}
//resources, err := vm.findClustersWithResources()
/* if err != nil {
return nil, err
}*/
/* if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}*/
//
//if len(resources) == 1 {
// var cluster strategy.AssignedCluster
// cluster.ClusterId = resources[0].ClusterId
// cluster.Replicas = 1
// return &strategy.SingleAssignment{Cluster: &cluster}, nil
//}
//params := &param.Params{Resources: resources}
switch vm.option.Strategy {
/* case strategy.REPLICATION:
var clusterIds []string
for _, resource := range resources {
clusterIds = append(clusterIds, resource.ClusterId)
}
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
return strategy, nil
case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: 1})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, vm.option, 1)
return strategy, nil*/
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(vm.option.ClusterToStaticWeight, 1)
return strategy, nil
}
/*strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{})
return strategy, nil*/
return nil, errors.New("no strategy has been chosen")
}
func (v *VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
@ -41,12 +108,6 @@ func (v *VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string
vm.ParticipantId = participantId*/
}
/*
func (vm *VmScheduler) UnMarshalVmStruct(yamlString string, taskId int64, nsID string) models.vm {
var vm models.Vm
vm := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
}
*/
func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) {
proParams, err := vm.storage.GetProviderParams()
if err != nil {
@ -64,7 +125,38 @@ func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider
return nil, providerList, nil
}
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
func (as *VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
//TODO implement me
panic("implement me")
if clusters == nil {
return nil, errors.New("clusters is nil")
}
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
}
}
if len(clusters) == 0 {
return nil, errors.New("clusters is nil")
}
var results []*VmResult
for _, cluster := range clusters {
cName := ""
as.dbEngin.Table("t_cluster").Select("name").Where("id=?", cluster.ClusterId).Find(&cName)
cr := VmResult{
ClusterId: cluster.ClusterId,
ClusterName: cName,
Replica: cluster.Replicas,
}
cr.ClusterId = cluster.ClusterId
cr.Replica = cluster.Replicas
cr.ClusterName = cName
results = append(results, &cr)
}
return results, nil
}

View File

@ -140,13 +140,20 @@ type TaskYaml struct {
}
type CommitVmTaskReq struct {
Name string `json:"name"`
NsID string `json:"nsID"`
Replicas int64 `json:"replicas,optional"`
MatchLabels map[string]string `json:"matchLabels,optional"`
AdapterId string `json:"adapterId,optional"`
ClusterType string `json:"clusterType,optional"`
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
VmOption *VmOption `json:"vmOption,optional"`
}
type VmOption struct {
AdapterId string `json:"adapterId"`
VmClusterIds []string `json:"vmClusterIds"`
Replicas int64 `json:"replicas,optional"`
Name string `json:"name"`
Strategy string `json:"strategy"`
ClusterToStaticWeight map[string]int32 `json:"clusterToStaticWeight"`
MatchLabels map[string]string `json:"matchLabels,optional"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
}
type CreateMulDomainServer struct {
@ -156,12 +163,20 @@ type CreateMulDomainServer struct {
ImageRef string `json:"imageRef,optional"`
FlavorRef string `json:"flavorRef,optional"`
Uuid string `json:"uuid,optional"`
ClusterId string `json:"clusterId,optional"`
}
type CommitVmTaskResp struct {
TaskId int64 `json:"taskId"`
Code int32 `json:"code"`
Msg string `json:"msg"`
Code int32 `json:"code"`
Msg string `json:"msg"`
}
type ScheduleVmResult struct {
ClusterId string `json:"clusterId"`
TaskId string `json:"taskId"`
Strategy string `json:"strategy"`
Replica int32 `json:"replica"`
Msg string `json:"msg"`
}
type VmTask struct {