Merge branch 'master' of https://gitlink.org.cn/JointCloud/pcm-coordinator
Former-commit-id: 35b75b870b33edb488aea997810c5f36277f625f
This commit is contained in:
commit
d678f52b1f
|
@ -111,17 +111,17 @@ type HpcInfo struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type CloudInfo struct {
|
type CloudInfo struct {
|
||||||
Id uint `json:"id,omitempty"`
|
Id uint `json:"id,omitempty,optional"`
|
||||||
TaskId int64 `json:"taskId,omitempty"`
|
TaskId int64 `json:"taskId,omitempty,optional"`
|
||||||
AdapterId uint `json:"adapterId,omitempty"`
|
AdapterId uint `json:"adapterId,omitempty,optional"`
|
||||||
ClusterId uint `json:"clusterId,omitempty"`
|
ClusterId uint `json:"clusterId,omitempty,optional"`
|
||||||
ClusterName string `json:"clusterName,omitempty"`
|
ClusterName string `json:"clusterName,omitempty,optional"`
|
||||||
Kind string `json:"kind,omitempty"`
|
Kind string `json:"kind,omitempty,optional"`
|
||||||
Status string `json:"status,omitempty"`
|
Status string `json:"status,omitempty,optional"`
|
||||||
StartTime *time.Time `json:"startTime,omitempty"`
|
StartTime *time.Time `json:"startTime,omitempty,optional,string"`
|
||||||
YamlString string `json:"yamlString,omitempty"`
|
YamlString string `json:"yamlString,omitempty,optional"`
|
||||||
Result string `json:"result,omitempty"`
|
Result string `json:"result,omitempty,optional"`
|
||||||
Namespace string `json:"namespace,omitempty"`
|
Namespace string `json:"namespace,omitempty,optional"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type AiInfo struct {
|
type AiInfo struct {
|
||||||
|
|
|
@ -154,42 +154,116 @@ type (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
type (
|
type (
|
||||||
commitVmTaskReq {
|
commitVmTaskReq {
|
||||||
Name string `json:"name"`
|
// Name string `json:"name"`
|
||||||
NsID string `json:"nsID"`
|
// NsID string `json:"nsID"`
|
||||||
Replicas int64 `json:"replicas,optional"`
|
// Replicas int64 `json:"replicas,optional"`
|
||||||
MatchLabels map[string]string `json:"matchLabels,optional"`
|
// MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||||
AdapterId string `json:"adapterId,optional"`
|
// AdapterId string `json:"adapterId,optional"`
|
||||||
ClusterType string `json:"clusterType,optional"`
|
// ClusterType string `json:"clusterType,optional"`
|
||||||
//Virtual Machine Section
|
// //Virtual Machine Section
|
||||||
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
|
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
|
||||||
|
VmOption *VmOption `json:"vmOption,optional"`
|
||||||
}
|
}
|
||||||
|
VmOption {
|
||||||
|
AdapterId string `json:"adapterId"`
|
||||||
|
VmClusterIds []string `json:"vmClusterIds"`
|
||||||
|
Replicas int64 `json:"replicas,optional"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
//ResourceType string `json:"resourceType"`
|
||||||
|
//TaskType string `json:"taskType"`
|
||||||
|
Strategy string `json:"strategy"`
|
||||||
|
ClusterToStaticWeight map[string]int32 `json:"clusterToStaticWeight"`
|
||||||
|
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||||
|
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
|
||||||
|
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
|
||||||
|
// Id int64 `json:"id"`
|
||||||
|
// ParticipantId int64 `json:"participantId"`
|
||||||
|
// TaskId int64 `json:"taskId"`
|
||||||
|
// AdapterId int64 `json:"adapterId"`
|
||||||
|
// ClusterId int64 `json:"clusterId"`
|
||||||
|
// FlavorRef string `json:"flavorRef"`
|
||||||
|
// ImageRef string `json:"imageRef"`
|
||||||
|
// Status string `json:"status"`
|
||||||
|
// Platform string `json:"platform"`
|
||||||
|
// Description string `json:"description"`
|
||||||
|
// AvailabilityZone string `json:"availabilityZone"`
|
||||||
|
// MinCount int64 `json:"minCount"`
|
||||||
|
// Uuid string `json:"uuid"`
|
||||||
|
// StartTime string `json:"startTime"`
|
||||||
|
// RunningTime string `json:"runningTime"`
|
||||||
|
// Result string `json:"result"`
|
||||||
|
// DeletedAt string `json:"deletedAt"`
|
||||||
|
}
|
||||||
|
|
||||||
CreateMulDomainServer {
|
CreateMulDomainServer {
|
||||||
Platform string `json:"platform,optional"`
|
Platform string `json:"platform,optional"`
|
||||||
Name string `json:"name,optional"`
|
name string `json:"name,optional"`
|
||||||
Min_count int64 `json:"min_count,optional"`
|
min_count int64 `json:"min_count,optional"`
|
||||||
ImageRef string `json:"imageRef,optional"`
|
imageRef string `json:"imageRef,optional"`
|
||||||
FlavorRef string `json:"flavorRef,optional"`
|
flavorRef string `json:"flavorRef,optional"`
|
||||||
Uuid string `json:"uuid,optional"`
|
uuid string `json:"uuid,optional"`
|
||||||
|
ClusterId string `json:"clusterId,optional"`
|
||||||
}
|
}
|
||||||
commitVmTaskResp {
|
commitVmTaskResp {
|
||||||
// VmTask []VmTask `json:"vmTask" copier:"VmTask"`
|
|
||||||
TaskId int64 `json:"taskId"`
|
|
||||||
Code int32 `json:"code"`
|
Code int32 `json:"code"`
|
||||||
Msg string `json:"msg"`
|
Msg string `json:"msg"`
|
||||||
}
|
}
|
||||||
VmTask {
|
ScheduleVmResult struct {
|
||||||
Id string `json:"id" copier:"Id"`
|
ClusterId string `json:"clusterId"`
|
||||||
Links []VmLinks `json:"links" copier:"Links"`
|
TaskId string `json:"taskId"`
|
||||||
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
|
Strategy string `json:"strategy"`
|
||||||
SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"`
|
Replica int32 `json:"replica"`
|
||||||
AdminPass string `json:"adminPass" copier:"AdminPass"`
|
Msg string `json:"msg"`
|
||||||
|
}
|
||||||
|
VmTask{
|
||||||
|
Id string `json:"id" copier:"Id"`
|
||||||
|
Links []VmLinks `json:"links" copier:"Links"`
|
||||||
|
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
|
||||||
|
SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"`
|
||||||
|
AdminPass string `json:"adminPass" copier:"AdminPass"`
|
||||||
}
|
}
|
||||||
VmLinks {
|
VmLinks {
|
||||||
Href string `json:"href " copier:"Href"`
|
Href string `json:"href " copier:"Href"`
|
||||||
Rel string `json:"rel" copier:"Rel"`
|
Rel string `json:"rel" copier:"Rel"`
|
||||||
}
|
}
|
||||||
|
// commitVmTaskReq {
|
||||||
|
// Name string `json:"name"`
|
||||||
|
// NsID string `json:"nsID"`
|
||||||
|
// Replicas int64 `json:"replicas,optional"`
|
||||||
|
// MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||||
|
// AdapterId string `json:"adapterId,optional"`
|
||||||
|
// ClusterType string `json:"clusterType,optional"`
|
||||||
|
// //Virtual Machine Section
|
||||||
|
// CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
|
||||||
|
// }
|
||||||
|
// CreateMulDomainServer {
|
||||||
|
// Platform string `json:"platform,optional"`
|
||||||
|
// Name string `json:"name,optional"`
|
||||||
|
// Min_count int64 `json:"min_count,optional"`
|
||||||
|
// ImageRef string `json:"imageRef,optional"`
|
||||||
|
// FlavorRef string `json:"flavorRef,optional"`
|
||||||
|
// Uuid string `json:"uuid,optional"`
|
||||||
|
// }
|
||||||
|
// commitVmTaskResp {
|
||||||
|
// // VmTask []VmTask `json:"vmTask" copier:"VmTask"`
|
||||||
|
// TaskId int64 `json:"taskId"`
|
||||||
|
// Code int32 `json:"code"`
|
||||||
|
// Msg string `json:"msg"`
|
||||||
|
// }
|
||||||
|
// VmTask {
|
||||||
|
// Id string `json:"id" copier:"Id"`
|
||||||
|
// Links []VmLinks `json:"links" copier:"Links"`
|
||||||
|
// OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
|
||||||
|
// SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"`
|
||||||
|
// AdminPass string `json:"adminPass" copier:"AdminPass"`
|
||||||
|
// }
|
||||||
|
// VmLinks {
|
||||||
|
// Href string `json:"href " copier:"Href"`
|
||||||
|
// Rel string `json:"rel" copier:"Rel"`
|
||||||
|
// }
|
||||||
|
|
||||||
VmSecurity_groups_server {
|
VmSecurity_groups_server {
|
||||||
Name string `json:"name" copier:"Name"`
|
Name string `json:"name" copier:"Name"`
|
||||||
|
|
|
@ -3,11 +3,13 @@ package core
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||||
"math/rand"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/zeromicro/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
@ -29,11 +31,24 @@ func NewCommitVmTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm
|
||||||
|
|
||||||
func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) {
|
func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) {
|
||||||
// todo: add your logic here and delete this line
|
// todo: add your logic here and delete this line
|
||||||
|
resp = &types.CommitVmTaskResp{}
|
||||||
//Building the main task structure
|
//Building the main task structure
|
||||||
|
opt := &option.VmOption{
|
||||||
|
AdapterId: req.VmOption.AdapterId,
|
||||||
|
Replicas: req.VmOption.Replicas,
|
||||||
|
Strategy: req.VmOption.Strategy,
|
||||||
|
ClusterToStaticWeight: req.VmOption.StaticWeightMap,
|
||||||
|
Status: constants.Saved,
|
||||||
|
MatchLabels: req.VmOption.MatchLabels,
|
||||||
|
StaticWeightMap: req.VmOption.StaticWeightMap,
|
||||||
|
Name: req.VmOption.Name,
|
||||||
|
CommitTime: time.Now(),
|
||||||
|
}
|
||||||
taskModel := models.Task{
|
taskModel := models.Task{
|
||||||
Status: constants.Saved,
|
Status: constants.Saved,
|
||||||
Name: req.Name,
|
Name: req.VmOption.Name,
|
||||||
CommitTime: time.Now(),
|
CommitTime: time.Now(),
|
||||||
|
Description: "vm task",
|
||||||
}
|
}
|
||||||
// Save task data to database
|
// Save task data to database
|
||||||
tx := l.svcCtx.DbEngin.Create(&taskModel)
|
tx := l.svcCtx.DbEngin.Create(&taskModel)
|
||||||
|
@ -41,38 +56,63 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type
|
||||||
return nil, tx.Error
|
return nil, tx.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, CreateMulServer := range req.CreateMulServer {
|
//var clusters []*models.VmModel
|
||||||
fmt.Println("", req.CreateMulServer)
|
//err2 := l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.VmOption.AdapterId, req.VmOption.VmClusterIds).Scan(&clusters).Error
|
||||||
var clusterIds []int64
|
//if err2 != nil {
|
||||||
l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? and label = ?", req.AdapterId, req.ClusterType).Scan(&clusterIds)
|
// logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
|
||||||
|
// //return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil
|
||||||
|
//}
|
||||||
|
|
||||||
if len(clusterIds) == 0 || clusterIds == nil {
|
taskVm := models.TaskVm{}
|
||||||
return nil, nil
|
//TODO 执行策略返回集群跟 Replica
|
||||||
}
|
/*opt := &option.VmOption{}
|
||||||
|
utils.Convert(&req, &opt)*/
|
||||||
|
// 2、Initialize scheduler
|
||||||
|
vmSchdl, err := schedulers.NewVmScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, l.svcCtx.DbEngin, l.svcCtx.PromClient)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
vmInfo := models.TaskVm{
|
// 3、Return scheduling results
|
||||||
TaskId: taskModel.Id,
|
results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl)
|
||||||
ClusterId: clusterIds[rand.Intn(len(clusterIds))],
|
if err != nil {
|
||||||
Name: taskModel.Name,
|
return nil, err
|
||||||
Status: "Saved",
|
}
|
||||||
StartTime: time.Now().String(),
|
|
||||||
MinCount: CreateMulServer.Min_count,
|
|
||||||
ImageRef: CreateMulServer.ImageRef,
|
|
||||||
FlavorRef: CreateMulServer.FlavorRef,
|
|
||||||
Uuid: CreateMulServer.Uuid,
|
|
||||||
Platform: CreateMulServer.Platform,
|
|
||||||
}
|
|
||||||
|
|
||||||
tx = l.svcCtx.DbEngin.Create(&vmInfo)
|
rs := (results).([]*schedulers.VmResult)
|
||||||
if tx.Error != nil {
|
for _, r := range rs {
|
||||||
return nil, tx.Error
|
for _, CreateMulServer := range req.CreateMulServer {
|
||||||
}
|
if r.Replica > 0 && r.ClusterId == CreateMulServer.ClusterId {
|
||||||
resp = &types.CommitVmTaskResp{
|
fmt.Println("", req.CreateMulServer)
|
||||||
Code: 200,
|
var clusterIds []int64
|
||||||
Msg: "success",
|
l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? ", req.VmOption.AdapterId).Scan(&clusterIds)
|
||||||
TaskId: taskModel.Id,
|
if len(clusterIds) == 0 || clusterIds == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
adapterId, _ := strconv.ParseUint(req.VmOption.AdapterId, 10, 64)
|
||||||
|
taskVm.AdapterId = int64(adapterId)
|
||||||
|
clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
|
||||||
|
taskVm.ClusterId = int64(clusterId)
|
||||||
|
taskVm.Name = req.VmOption.Name
|
||||||
|
taskVm.TaskId = taskModel.Id
|
||||||
|
clusterId, _ = strconv.ParseUint(r.ClusterId, 10, 64)
|
||||||
|
taskVm.ClusterId = int64(clusterId)
|
||||||
|
taskVm.Status = "Saved"
|
||||||
|
taskVm.StartTime = time.Now().String()
|
||||||
|
taskVm.MinCount = CreateMulServer.Min_count
|
||||||
|
taskVm.ImageRef = CreateMulServer.ImageRef
|
||||||
|
taskVm.FlavorRef = CreateMulServer.FlavorRef
|
||||||
|
taskVm.Uuid = CreateMulServer.Uuid
|
||||||
|
taskVm.Platform = CreateMulServer.Platform
|
||||||
|
tx = l.svcCtx.DbEngin.Create(&taskVm)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return nil, tx.Error
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
resp.Code = 200
|
||||||
|
resp.Msg = "Success"
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,14 +2,15 @@ package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
|
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
"github.com/zeromicro/go-zero/core/logx"
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type PushTaskInfoLogic struct {
|
type PushTaskInfoLogic struct {
|
||||||
|
@ -33,9 +34,14 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie
|
||||||
switch kind {
|
switch kind {
|
||||||
case 0:
|
case 0:
|
||||||
for _, cloudInfo := range req.CloudInfoList {
|
for _, cloudInfo := range req.CloudInfoList {
|
||||||
l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?",
|
var taskId uint
|
||||||
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, req.AdapterId, cloudInfo.Id)
|
result := l.svcCtx.DbEngin.Table("task_cloud").Select("task_id").Where("task_id = ?", cloudInfo.TaskId).Find(&taskId)
|
||||||
syncTask(l.svcCtx.DbEngin, cloudInfo.TaskId)
|
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||||
|
return nil, errors.New("Record does not exist")
|
||||||
|
}
|
||||||
|
l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?",
|
||||||
|
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId)
|
||||||
|
syncTask(l.svcCtx.DbEngin, int64(taskId))
|
||||||
}
|
}
|
||||||
case 2:
|
case 2:
|
||||||
for _, hpcInfo := range req.HpcInfoList {
|
for _, hpcInfo := range req.HpcInfoList {
|
||||||
|
@ -63,7 +69,7 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie
|
||||||
func syncTask(gorm *gorm.DB, taskId int64) {
|
func syncTask(gorm *gorm.DB, taskId int64) {
|
||||||
|
|
||||||
var allStatus string
|
var allStatus string
|
||||||
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus)
|
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join task_cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus)
|
||||||
if tx.Error != nil {
|
if tx.Error != nil {
|
||||||
logx.Error(tx.Error)
|
logx.Error(tx.Error)
|
||||||
}
|
}
|
||||||
|
@ -79,7 +85,7 @@ func syncTask(gorm *gorm.DB, taskId int64) {
|
||||||
|
|
||||||
}
|
}
|
||||||
if strings.Contains(allStatus, constants.Running) {
|
if strings.Contains(allStatus, constants.Running) {
|
||||||
updateTask(gorm, taskId, constants.Running)
|
updateTaskRunning(gorm, taskId, constants.Running)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -93,6 +99,16 @@ func updateTask(gorm *gorm.DB, taskId int64, status string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateTaskRunning(gorm *gorm.DB, taskId int64, status string) {
|
||||||
|
var task models.Task
|
||||||
|
gorm.Where("id = ? ", taskId).Find(&task)
|
||||||
|
if task.Status != status {
|
||||||
|
task.Status = status
|
||||||
|
task.StartTime = time.Now().Format("2006-01-02 15:04:05")
|
||||||
|
gorm.Updates(&task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func removeRepeatedElement(arr []string) (newArr []string) {
|
func removeRepeatedElement(arr []string) (newArr []string) {
|
||||||
newArr = make([]string, 0)
|
newArr = make([]string, 0)
|
||||||
for i := 0; i < len(arr); i++ {
|
for i := 0; i < len(arr); i++ {
|
||||||
|
|
|
@ -2,8 +2,6 @@ package mqs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,28 +22,28 @@ func NewVmMq(ctx context.Context, svcCtx *svc.ServiceContext) *VmMq {
|
||||||
|
|
||||||
func (l *VmMq) Consume(val string) error {
|
func (l *VmMq) Consume(val string) error {
|
||||||
// 接受消息, 根据标签筛选过滤
|
// 接受消息, 根据标签筛选过滤
|
||||||
vmScheduler := schedulers.NewVmScheduler()
|
//vmScheduler := schedulers.NewVmScheduler()
|
||||||
schdl, err := scheduler.NewScheduler(vmScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc)
|
//schdl, err := scheduler.NewScheduler(vmScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc)
|
||||||
if err != nil {
|
//if err != nil {
|
||||||
return err
|
// return err
|
||||||
}
|
//}
|
||||||
|
//
|
||||||
//检测是否指定了集群列表
|
////检测是否指定了集群列表
|
||||||
schdl.SpecifyClusters()
|
//schdl.SpecifyClusters()
|
||||||
|
//
|
||||||
//检测是否指定了nsID
|
////检测是否指定了nsID
|
||||||
schdl.SpecifyNsID()
|
//schdl.SpecifyNsID()
|
||||||
|
//
|
||||||
//通过标签匹配筛选出集群范围
|
////通过标签匹配筛选出集群范围
|
||||||
schdl.MatchLabels()
|
//schdl.MatchLabels()
|
||||||
|
//
|
||||||
//todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度
|
////todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度
|
||||||
schdl.TempAssign()
|
//schdl.TempAssign()
|
||||||
|
//
|
||||||
// 存储数据
|
//// 存储数据
|
||||||
err = schdl.SaveToDb()
|
//err = schdl.SaveToDb()
|
||||||
if err != nil {
|
//if err != nil {
|
||||||
return err
|
// return err
|
||||||
}
|
//}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ const (
|
||||||
AI = "ai"
|
AI = "ai"
|
||||||
CLOUD = "cloud"
|
CLOUD = "cloud"
|
||||||
HPC = "hpc"
|
HPC = "hpc"
|
||||||
|
VM = "vm"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Option interface {
|
type Option interface {
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
package option
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type VmOption struct {
|
||||||
|
AdapterId string
|
||||||
|
ClusterIds []string
|
||||||
|
TaskName string
|
||||||
|
ResourceType string // cpu/gpu/compute card
|
||||||
|
TaskType string // pytorch/tensorflow/mindspore
|
||||||
|
Strategy string
|
||||||
|
ClusterToStaticWeight map[string]int32
|
||||||
|
CommitTime time.Time
|
||||||
|
NsID string
|
||||||
|
Replicas int64
|
||||||
|
MatchLabels map[string]string
|
||||||
|
StaticWeightMap map[string]int32
|
||||||
|
CreateMulServer []CreateMulDomainServer
|
||||||
|
Id int64
|
||||||
|
ParticipantId int64
|
||||||
|
TaskId int64
|
||||||
|
Name string
|
||||||
|
ClusterId int64
|
||||||
|
FlavorRef string
|
||||||
|
ImageRef string
|
||||||
|
Status string
|
||||||
|
Platform string
|
||||||
|
Description string
|
||||||
|
AvailabilityZone string
|
||||||
|
MinCount int64
|
||||||
|
Uuid string
|
||||||
|
StartTime string
|
||||||
|
RunningTime string
|
||||||
|
Result string
|
||||||
|
DeletedAt string
|
||||||
|
}
|
||||||
|
|
||||||
|
type CreateMulDomainServer struct {
|
||||||
|
Platform string
|
||||||
|
Name string
|
||||||
|
Min_count int64
|
||||||
|
ImageRef string
|
||||||
|
FlavorRef string
|
||||||
|
Uuid string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a VmOption) GetOptionType() string {
|
||||||
|
return VM
|
||||||
|
}
|
|
@ -1,29 +1,96 @@
|
||||||
package schedulers
|
package schedulers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||||
|
"gorm.io/gorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
type VmScheduler struct {
|
type VmScheduler struct {
|
||||||
storage database.Storage
|
yamlString string
|
||||||
|
storage database.Storage
|
||||||
|
task *response.TaskInfo
|
||||||
|
*scheduler.Scheduler
|
||||||
|
option *option.VmOption
|
||||||
|
ctx context.Context
|
||||||
|
promClient tracker.Prometheus
|
||||||
|
dbEngin *gorm.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVmScheduler() *VmScheduler {
|
type VmResult struct {
|
||||||
return &VmScheduler{}
|
TaskId string
|
||||||
|
ClusterId string
|
||||||
|
ClusterName string
|
||||||
|
Strategy string
|
||||||
|
Replica int32
|
||||||
|
Msg string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewVmScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.VmOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*VmScheduler, error) {
|
||||||
|
return &VmScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/*func NewCloudScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.CloudOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*CloudScheduler, error) {
|
||||||
|
return &CloudScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil
|
||||||
|
}*/
|
||||||
|
|
||||||
func (vm *VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
func (vm *VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
||||||
//获取所有计算中心
|
if len(vm.option.ClusterIds) == 1 {
|
||||||
//调度算法
|
// TODO database operation Find
|
||||||
strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{})
|
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: vm.option.ClusterIds[0], Replicas: 1}}, nil
|
||||||
return strategy, nil
|
}
|
||||||
|
//resources, err := vm.findClustersWithResources()
|
||||||
|
|
||||||
|
/* if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}*/
|
||||||
|
|
||||||
|
/* if len(resources) == 0 {
|
||||||
|
return nil, errors.New("no cluster has resources")
|
||||||
|
}*/
|
||||||
|
//
|
||||||
|
//if len(resources) == 1 {
|
||||||
|
// var cluster strategy.AssignedCluster
|
||||||
|
// cluster.ClusterId = resources[0].ClusterId
|
||||||
|
// cluster.Replicas = 1
|
||||||
|
// return &strategy.SingleAssignment{Cluster: &cluster}, nil
|
||||||
|
//}
|
||||||
|
//params := ¶m.Params{Resources: resources}
|
||||||
|
|
||||||
|
switch vm.option.Strategy {
|
||||||
|
/* case strategy.REPLICATION:
|
||||||
|
var clusterIds []string
|
||||||
|
for _, resource := range resources {
|
||||||
|
clusterIds = append(clusterIds, resource.ClusterId)
|
||||||
|
}
|
||||||
|
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
|
||||||
|
return strategy, nil
|
||||||
|
case strategy.RESOURCES_PRICING:
|
||||||
|
strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1})
|
||||||
|
return strategy, nil
|
||||||
|
case strategy.DYNAMIC_RESOURCES:
|
||||||
|
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, vm.option, 1)
|
||||||
|
return strategy, nil*/
|
||||||
|
case strategy.STATIC_WEIGHT:
|
||||||
|
//todo resources should match cluster StaticWeightMap
|
||||||
|
strategy := strategy.NewStaticWeightStrategy(vm.option.ClusterToStaticWeight, 1)
|
||||||
|
return strategy, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/*strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{})
|
||||||
|
return strategy, nil*/
|
||||||
|
|
||||||
|
return nil, errors.New("no strategy has been chosen")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
|
func (v *VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
|
||||||
|
@ -41,12 +108,6 @@ func (v *VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string
|
||||||
vm.ParticipantId = participantId*/
|
vm.ParticipantId = participantId*/
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
func (vm *VmScheduler) UnMarshalVmStruct(yamlString string, taskId int64, nsID string) models.vm {
|
|
||||||
var vm models.Vm
|
|
||||||
vm := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) {
|
func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) {
|
||||||
proParams, err := vm.storage.GetProviderParams()
|
proParams, err := vm.storage.GetProviderParams()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -64,7 +125,38 @@ func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider
|
||||||
return nil, providerList, nil
|
return nil, providerList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
|
func (as *VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
|
||||||
//TODO implement me
|
//TODO implement me
|
||||||
panic("implement me")
|
if clusters == nil {
|
||||||
|
return nil, errors.New("clusters is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := len(clusters) - 1; i >= 0; i-- {
|
||||||
|
if clusters[i].Replicas == 0 {
|
||||||
|
clusters = append(clusters[:i], clusters[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(clusters) == 0 {
|
||||||
|
return nil, errors.New("clusters is nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
var results []*VmResult
|
||||||
|
|
||||||
|
for _, cluster := range clusters {
|
||||||
|
cName := ""
|
||||||
|
as.dbEngin.Table("t_cluster").Select("name").Where("id=?", cluster.ClusterId).Find(&cName)
|
||||||
|
cr := VmResult{
|
||||||
|
ClusterId: cluster.ClusterId,
|
||||||
|
ClusterName: cName,
|
||||||
|
Replica: cluster.Replicas,
|
||||||
|
}
|
||||||
|
cr.ClusterId = cluster.ClusterId
|
||||||
|
cr.Replica = cluster.Replicas
|
||||||
|
|
||||||
|
cr.ClusterName = cName
|
||||||
|
results = append(results, &cr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,13 +140,20 @@ type TaskYaml struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type CommitVmTaskReq struct {
|
type CommitVmTaskReq struct {
|
||||||
Name string `json:"name"`
|
|
||||||
NsID string `json:"nsID"`
|
|
||||||
Replicas int64 `json:"replicas,optional"`
|
|
||||||
MatchLabels map[string]string `json:"matchLabels,optional"`
|
|
||||||
AdapterId string `json:"adapterId,optional"`
|
|
||||||
ClusterType string `json:"clusterType,optional"`
|
|
||||||
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
|
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
|
||||||
|
VmOption *VmOption `json:"vmOption,optional"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type VmOption struct {
|
||||||
|
AdapterId string `json:"adapterId"`
|
||||||
|
VmClusterIds []string `json:"vmClusterIds"`
|
||||||
|
Replicas int64 `json:"replicas,optional"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Strategy string `json:"strategy"`
|
||||||
|
ClusterToStaticWeight map[string]int32 `json:"clusterToStaticWeight"`
|
||||||
|
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||||
|
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
|
||||||
|
CreateMulServer []CreateMulDomainServer `json:"createMulServer,optional"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreateMulDomainServer struct {
|
type CreateMulDomainServer struct {
|
||||||
|
@ -156,12 +163,20 @@ type CreateMulDomainServer struct {
|
||||||
ImageRef string `json:"imageRef,optional"`
|
ImageRef string `json:"imageRef,optional"`
|
||||||
FlavorRef string `json:"flavorRef,optional"`
|
FlavorRef string `json:"flavorRef,optional"`
|
||||||
Uuid string `json:"uuid,optional"`
|
Uuid string `json:"uuid,optional"`
|
||||||
|
ClusterId string `json:"clusterId,optional"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type CommitVmTaskResp struct {
|
type CommitVmTaskResp struct {
|
||||||
TaskId int64 `json:"taskId"`
|
Code int32 `json:"code"`
|
||||||
Code int32 `json:"code"`
|
Msg string `json:"msg"`
|
||||||
Msg string `json:"msg"`
|
}
|
||||||
|
|
||||||
|
type ScheduleVmResult struct {
|
||||||
|
ClusterId string `json:"clusterId"`
|
||||||
|
TaskId string `json:"taskId"`
|
||||||
|
Strategy string `json:"strategy"`
|
||||||
|
Replica int32 `json:"replica"`
|
||||||
|
Msg string `json:"msg"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type VmTask struct {
|
type VmTask struct {
|
||||||
|
|
|
@ -13,7 +13,7 @@ type TaskCloudModel struct {
|
||||||
ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"`
|
ClusterName string `json:"clusterName" gorm:"not null;comment:集群名称"`
|
||||||
Kind string `json:"kind" gorm:"comment:种类"`
|
Kind string `json:"kind" gorm:"comment:种类"`
|
||||||
Status string `json:"status" gorm:"comment:状态"`
|
Status string `json:"status" gorm:"comment:状态"`
|
||||||
StartTime *time.Time `json:"startTime" gorm:"comment:开始时间"`
|
StartTime *time.Time `json:"startTime,string" gorm:"comment:开始时间"`
|
||||||
YamlString string `json:"yamlString" gorm:"not null;comment:入参"`
|
YamlString string `json:"yamlString" gorm:"not null;comment:入参"`
|
||||||
Result string `json:"result" gorm:"comment:运行结果"`
|
Result string `json:"result" gorm:"comment:运行结果"`
|
||||||
Namespace string `json:"namespace" gorm:"comment:命名空间"`
|
Namespace string `json:"namespace" gorm:"comment:命名空间"`
|
||||||
|
|
Loading…
Reference in New Issue