fix:create vm server

Former-commit-id: d45c7c3228f7cedeaac94d7175d220855cf39c46
This commit is contained in:
qiwang 2024-05-11 08:58:57 +08:00
parent a57e0b7ba8
commit 54dd6a7eb2
6 changed files with 206 additions and 84 deletions

View File

@ -159,7 +159,7 @@ type (
type (
GeneralTaskReq {
Name string `json:"name"`
AdapterIds []string `json:"adapterIds"`
AdapterIds []string `json:"adapterIds"`
ClusterIds []string `json:"clusterIds"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
@ -203,15 +203,30 @@ type (
type (
commitVmTaskReq {
// Name string `json:"name"`
// NsID string `json:"nsID"`
Name string `json:"name"`
AdapterIds []string `json:"adapterIds,optional"`
ClusterIds []string `json:"clusterIds"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
MinCount int64 `json:"min_count,optional"`
ImageRef int64 `json:"imageRef,optional"`
FlavorRef int64 `json:"flavorRef,optional"`
Uuid int64 `json:"uuid,optional"`
//Replicas int64 `json:"replicas,string"`
VmName string `json:"vm_name,optional"`
// Replicas int64 `json:"replicas,optional"`
// MatchLabels map[string]string `json:"matchLabels,optional"`
// AdapterId string `json:"adapterId,optional"`
// ClusterType string `json:"clusterType,optional"`
// //Virtual Machine Section
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
VmOption *VmOption `json:"vmOption,optional"`
//CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
//VmOption *VmOption `json:"vmOption,optional"`
}
TaskVm {
ImageRef string `json:"imageRef"`
FlavorRef string `json:"flavorRef"`
Uuid string `json:"uuid"`
Platform string `json:"platform"`
}
VmOption {
AdapterId string `json:"adapterId"`
@ -225,23 +240,6 @@ type (
MatchLabels map[string]string `json:"matchLabels,optional"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
// Id int64 `json:"id"`
// ParticipantId int64 `json:"participantId"`
// TaskId int64 `json:"taskId"`
// AdapterId int64 `json:"adapterId"`
// ClusterId int64 `json:"clusterId"`
// FlavorRef string `json:"flavorRef"`
// ImageRef string `json:"imageRef"`
// Status string `json:"status"`
// Platform string `json:"platform"`
// Description string `json:"description"`
// AvailabilityZone string `json:"availabilityZone"`
// MinCount int64 `json:"minCount"`
// Uuid string `json:"uuid"`
// StartTime string `json:"startTime"`
// RunningTime string `json:"runningTime"`
// Result string `json:"result"`
// DeletedAt string `json:"deletedAt"`
}
CreateMulDomainServer {

View File

@ -1190,6 +1190,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/schedule/ai/getAlgorithms/:adapterId/:resourceType/:taskType/:dataset",
Handler: schedule.ScheduleGetAlgorithmsHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/schedule/ai/getJobLog/:adapterId/:clusterId/:taskId/:instanceNum",
Handler: schedule.ScheduleGetAiJobLogLogHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/schedule/submit",
@ -1294,7 +1299,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
},
{
Method: http.MethodPost,
Path: "/core/syncClusterAlert",
Path: "/monitoring/syncClusterAlert",
Handler: monitoring.SyncClusterAlertHandler(serverCtx),
},
{

View File

@ -3,12 +3,14 @@ package core
import (
"context"
"fmt"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"strconv"
"time"
@ -31,9 +33,27 @@ func NewCommitVmTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm
func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) {
// todo: add your logic here and delete this line
/*var ImageRef string
var FlavorRef string
var NetworkRef string*/
resp = &types.CommitVmTaskResp{}
tx := l.svcCtx.DbEngin.Begin()
//Building the main task structure
opt := &option.VmOption{
defer func() {
if p := recover(); p != nil {
tx.Rollback()
logx.Error(p)
} else if tx.Error != nil {
logx.Info("rollback, error", tx.Error)
tx.Rollback()
} else {
tx = tx.Commit()
logx.Info("commit success")
}
}()
//TODO adapter
adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64)
/* opt := &option.VmOption{
AdapterId: req.VmOption.AdapterId,
Replicas: req.VmOption.Replicas,
Strategy: req.VmOption.Strategy,
@ -43,32 +63,21 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type
StaticWeightMap: req.VmOption.StaticWeightMap,
Name: req.VmOption.Name,
CommitTime: time.Now(),
}
taskModel := models.Task{
Status: constants.Saved,
Name: req.VmOption.Name,
CommitTime: time.Now(),
Description: "vm task",
}
// Save task data to database
tx := l.svcCtx.DbEngin.Create(&taskModel)
if tx.Error != nil {
return nil, tx.Error
}
}*/
//var clusters []*models.VmModel
//err2 := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.VmOption.AdapterId, req.VmOption.VmClusterIds).Scan(&clusters).Error
//if err2 != nil {
// logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
// //return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil
//}
var clusters []*models.VmModel
err2 := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error
if err2 != nil {
logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
//return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil
}
taskVm := models.TaskVm{}
//TODO 执行策略返回集群跟 Replica
/*opt := &option.VmOption{}
utils.Convert(&req, &opt)*/
opt := &option.VmOption{}
utils.Convert(&req, &opt)
// 2、Initialize scheduler
vmSchdl, err := schedulers.NewVmScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient)
vmSchdl, _ := schedulers.NewVmScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient)
if err != nil {
return nil, err
}
@ -76,43 +85,139 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type
// 3、Return scheduling results
results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl)
if err != nil {
logx.Errorf("AssignAndSchedule() => execution error: %v", err)
return nil, err
}
rs := (results).([]*schedulers.VmResult)
var synergyStatus int64
if len(rs) > 1 {
synergyStatus = 1
}
var strategy int64
sqlStr := `select t_dict_item.item_value
from t_dict
left join t_dict_item on t_dict.id = t_dict_item.dict_id
where item_text = ?
and t_dict.dict_code = 'schedule_Strategy'`
//查询调度策略
err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error
taskModel := models.Task{
Id: utils.GenSnowflakeID(),
Status: constants.Saved,
Name: req.Name,
CommitTime: time.Now(),
Description: "vm task",
AdapterTypeDict: 0,
SynergyStatus: synergyStatus,
Strategy: strategy,
}
var taskVms models.TaskVm
var VmObject types.TaskVm
for _, r := range rs {
for _, CreateMulServer := range req.CreateMulServer {
if r.Replica > 0 && r.ClusterId == CreateMulServer.ClusterId {
fmt.Println("", req.CreateMulServer)
var clusterIds []int64
l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? ", req.VmOption.AdapterId).Scan(&clusterIds)
if len(clusterIds) == 0 || clusterIds == nil {
return nil, nil
for _, clusterId := range req.ClusterIds {
if r.Replica > 0 && r.ClusterId == clusterId {
fmt.Println("", clusterId)
sql := `SELECT vi.image_id as imageRef,vf.flavor_id as flavorRef,vn.network_id as uuid,vi.cluster_name as platform FROM
vm_flavor vf
LEFT JOIN vm_image vi ON vf.cluster_id = vi.cluster_id
LEFT JOIN vm_network vn ON vf.cluster_id = vn.cluster_id
WHERE
vi.cluster_id = ?
AND vf.public_flavor_id = ?
AND vi.public_image_id = ?
AND vn.public_network_id = ?`
// err2 := l.svcCtx.DbEngin.Raw(sql, clusterId, req.FlavorRef, req.ImageRef, req.Uuid).Scan(&taskVm).Error
txVm := l.svcCtx.DbEngin.Raw(sql, clusterId, req.FlavorRef, req.ImageRef, req.Uuid).Scan(&VmObject)
if txVm.Error != nil {
logx.Error(err)
return nil, txVm.Error
}
adapterId, _ := strconv.ParseUint(req.VmOption.AdapterId, 10, 64)
taskVm.AdapterId = int64(adapterId)
clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
taskVm.ClusterId = int64(clusterId)
taskVm.Name = req.VmOption.Name
taskVm.TaskId = taskModel.Id
clusterId, _ = strconv.ParseUint(r.ClusterId, 10, 64)
taskVm.ClusterId = int64(clusterId)
if err2 != nil {
logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
//return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil
}
taskVms.Name = req.Name
taskVm.Status = "Saved"
taskVm.StartTime = time.Now().String()
taskVm.MinCount = CreateMulServer.Min_count
taskVm.ImageRef = CreateMulServer.ImageRef
taskVm.FlavorRef = CreateMulServer.FlavorRef
taskVm.Uuid = CreateMulServer.Uuid
taskVm.Platform = CreateMulServer.Platform
taskVm.MinCount = req.MinCount
/* sqlImage := "SELECT image_id FROM `vm_image_dict` vm left join vm_image vi on vm.id=vi.public_image_id where cluster_id =? AND public_image_id = ?"
txImage := l.svcCtx.DbEngin.Raw(sqlImage, clusterId, req.ImageRef).Scan(&ImageRef)
if txImage.Error != nil {
logx.Error(err)
return nil, txImage.Error
}*/
taskVm.ImageRef = VmObject.ImageRef
/* sqlFlavor := "SELECT * FROM `vm_flavor_dict` vm left join vm_flavor vf on vm.id=vf.public_flavor_id where cluster_id =? AND public_flavor_id = ?"
txFlavor := l.svcCtx.DbEngin.Raw(sqlFlavor, clusterId, req.FlavorRef).Scan(&FlavorRef)
if txFlavor.Error != nil {
logx.Error(err)
return nil, txFlavor.Error
}*/
taskVm.FlavorRef = VmObject.FlavorRef
/* sqlNetwork := "SELECT * FROM `vm_network_dict` vm left join vm_network vi on vm.id=vi.public_network_id where cluster_id =? AND public_network_id = ?"
txNetwork := l.svcCtx.DbEngin.Raw(sqlNetwork, clusterId, req.Uuid).Scan(&NetworkRef)
if txNetwork.Error != nil {
logx.Error(err)
return nil, txNetwork.Error
}*/
taskVm.Uuid = VmObject.Uuid
taskVm.Platform = VmObject.Platform
tx = l.svcCtx.DbEngin.Create(&taskVm)
if tx.Error != nil {
return nil, tx.Error
}
//var clusterIds []int64
//l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? ", req.VmOption.AdapterId).Scan(&clusterIds)
//if len(clusterIds) == 0 || clusterIds == nil {
// return nil, nil
//}
//adapterId, _ := strconv.ParseUint(req.VmOption.AdapterId, 10, 64)
//taskVm.AdapterId = int64(adapterId)
//clusterId, _ = strconv.ParseUint(r.ClusterId, 10, 64)
//taskVm.ClusterId = int64(clusterId)
//taskVm.Status = "Saved"
//taskVm.StartTime = time.Now().String()
//taskVm.ImageRef = CreateMulServer.ImageRef
//taskVm.FlavorRef = CreateMulServer.FlavorRef
//taskVm.Uuid = CreateMulServer.Uuid
//taskVm.Platform = CreateMulServer.Platform
//tx = l.svcCtx.DbEngin.Create(&taskVm)
//if tx.Error != nil {
// return nil, tx.Error
//}
}
}
}
adapterName := ""
tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName)
noticeInfo := clientCore.NoticeInfo{
AdapterId: int64(adapterId),
AdapterName: adapterName,
NoticeType: "create",
TaskName: req.Name,
Incident: "任务创建中",
CreatedTime: time.Now(),
}
db := tx.Table("task").Create(&taskModel)
db = tx.Table("task_cloud").Create(&taskVm)
db = tx.Table("t_notice").Create(&noticeInfo)
if db.Error != nil {
logx.Errorf("Task creation failure, err: %v", db.Error)
}
//db = tx.Table("t_notice").Create(&noticeInfo)
// Save task data to database
//tf := l.svcCtx.DbEngin.Create(&taskModel)
//if tf.Error != nil {
// return nil, tf.Error
//}
//tn := l.svcCtx.DbEngin.Create(&noticeInfo)
//if tn.Error != nil {
// return nil, tn.Error
//}
resp.Code = 200
resp.Msg = "Success"
return resp, nil
}

View File

@ -83,7 +83,7 @@ func (vm *VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
return strategy, nil*/
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(vm.option.ClusterToStaticWeight, 1)
strategy := strategy.NewStaticWeightStrategy(vm.option.StaticWeightMap, 1)
return strategy, nil
}

View File

@ -182,8 +182,23 @@ type TaskYaml struct {
}
type CommitVmTaskReq struct {
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
VmOption *VmOption `json:"vmOption,optional"`
Name string `json:"name"`
AdapterIds []string `json:"adapterIds,optional"`
ClusterIds []string `json:"clusterIds"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
MinCount int64 `json:"min_count,optional"`
ImageRef int64 `json:"imageRef,optional"`
FlavorRef int64 `json:"flavorRef,optional"`
Uuid int64 `json:"uuid,optional"`
VmName string `json:"vm_name,optional"`
}
type TaskVm struct {
ImageRef string `json:"imageRef"`
FlavorRef string `json:"flavorRef"`
Uuid string `json:"uuid"`
Platform string `json:"platform"`
}
type VmOption struct {

View File

@ -35,17 +35,16 @@ type (
}
TaskVm struct {
Id int64 `db:"id"` // id
ParticipantId int64 `db:"participant_id"` // p端id
TaskId int64 `db:"task_id"` // 任务id
Name string `db:"name"` // 虚拟机名称
AdapterId int64 `db:"adapter_id"` // 执行任务的适配器id
ClusterId int64 `db:"cluster_id"` // 执行任务的集群id
FlavorRef string `db:"flavor_ref"` // 规格索引
ImageRef string `db:"image_ref"` // 镜像索引
Status string `db:"status"` // 状态
Platform string `db:"platform"` // 平台
Description string `db:"description"` // 描述
Id int64 `db:"id"` // id
TaskId int64 `db:"task_id"` // 任务id
Name string `db:"name"` // 虚拟机名称
AdapterId int64 `db:"adapter_id"` // 执行任务的适配器id
ClusterId int64 `db:"cluster_id"` // 执行任务的集群id
FlavorRef string `db:"flavor_ref"` // 规格索引
ImageRef string `db:"image_ref"` // 镜像索引
Status string `db:"status"` // 状态
Platform string `db:"platform"` // 平台
Description string `db:"description"` // 描述
AvailabilityZone string `db:"availability_zone"`
MinCount int64 `db:"min_count"` // 数量
Uuid string `db:"uuid"` // 网络id
@ -91,14 +90,14 @@ func (m *defaultTaskVmModel) FindOne(ctx context.Context, id int64) (*TaskVm, er
}
func (m *defaultTaskVmModel) Insert(ctx context.Context, data *TaskVm) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.ParticipantId, data.TaskId, data.Name, data.AdapterId, data.ClusterId, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt)
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.ClusterId, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt)
return ret, err
}
func (m *defaultTaskVmModel) Update(ctx context.Context, data *TaskVm) error {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskVmRowsWithPlaceHolder)
_, err := m.conn.ExecCtx(ctx, query, data.ParticipantId, data.TaskId, data.Name, data.AdapterId, data.ClusterId, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.Id)
_, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.ClusterId, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.Id)
return err
}