Merge branch 'master' of https://gitlink.org.cn/JointCloud/pcm-coordinator
Former-commit-id: 32aa80967d7e49848cac871a4740bd64f1594fde
This commit is contained in:
commit
6d6c7f4e47
|
@ -132,6 +132,8 @@ type (
|
|||
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||
servers []ServerCommit `json:"servers,optional"`
|
||||
platform string `json:"platform,optional"`
|
||||
AdapterId string `json:"adapterId,optional"`
|
||||
ClusterType string `json:"clusterType,optional"`
|
||||
}
|
||||
ServerCommit {
|
||||
allCardRunTime string `json:"allCardRunTime"`
|
||||
|
@ -158,7 +160,10 @@ type (
|
|||
}
|
||||
|
||||
commitVmTaskResp {
|
||||
VmTask []VmTask `json:"vmTask" copier:"VmTask"`
|
||||
// VmTask []VmTask `json:"vmTask" copier:"VmTask"`
|
||||
TaskId int64 `json:"taskId"`
|
||||
Code int32 `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
}
|
||||
VmTask{
|
||||
Id string `json:"id" copier:"Id"`
|
||||
|
@ -812,7 +817,6 @@ type (
|
|||
ItemValue string `json:"itemValue,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
SortOrder string `json:"sortOrder,omitempty"`
|
||||
Type string `json:"type,omitempty" db:"type"`
|
||||
ParentId string `json:"parentId,omitempty"`
|
||||
Status string `json:"status,omitempty" db:"status"`
|
||||
CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
|
||||
|
@ -838,7 +842,6 @@ type (
|
|||
ItemValue string `json:"itemValue,optional"`
|
||||
Description string `json:"description,optional"`
|
||||
SortOrder string `json:"sortOrder,optional"`
|
||||
Type string `json:"type,optional"`
|
||||
ParentId string `json:"parentId,optional"`
|
||||
Status string `json:"status,optional"`
|
||||
}
|
||||
|
@ -850,7 +853,6 @@ type (
|
|||
ItemValue string `json:"itemValue,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
SortOrder string `json:"sortOrder,omitempty"`
|
||||
Type string `json:"type,omitempty"`
|
||||
ParentId string `json:"parentId,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
|
||||
|
|
|
@ -43,10 +43,6 @@ service pcm {
|
|||
@handler commitVmTaskHandler
|
||||
post /core/commitVmTask (commitVmTaskReq) returns (commitVmTaskResp)
|
||||
|
||||
@doc "提交虚拟机任务临时"
|
||||
@handler commitVmTaskTempHandler
|
||||
post /core/commitVmTaskTemp (commitVmTaskReq) returns (commitVmTaskResp)
|
||||
|
||||
@doc "删除任务"
|
||||
@handler deleteTaskHandler
|
||||
delete /core/deleteTask/:id (deleteTaskReq)
|
||||
|
@ -389,10 +385,14 @@ service pcm {
|
|||
@handler GetNetworkNumHandler
|
||||
get /vm/getNetworkNum (ListNetworksReq) returns (NetworkNum)
|
||||
|
||||
@doc "查询镜像列表"
|
||||
@doc "查询镜像数量"
|
||||
@handler getImageNumHandler
|
||||
get /vm/getImageNum (ListImagesReq) returns (ImageNum)
|
||||
|
||||
@doc "查询虚拟机概览数据"
|
||||
@handler getOpenstackOverviewHandler
|
||||
get /vm/getOpenstackOverview (OpenstackOverviewReq) returns (OpenstackOverviewResp)
|
||||
|
||||
@doc "查询虚拟机列表"
|
||||
@handler ListServerHandler
|
||||
get /vm/listServer (ListServersReq) returns (ListServersResp)
|
||||
|
|
|
@ -75,6 +75,22 @@ type (
|
|||
Msg string `json:"msg,omitempty"`
|
||||
ErrorMsg string `json:"errorMsg,omitempty"`
|
||||
}
|
||||
|
||||
OpenstackOverviewReq {
|
||||
Platform string `form:"platform,optional"`
|
||||
}
|
||||
OpenstackOverviewResp {
|
||||
Data OpenstackOverview `json:"data"`
|
||||
Code int32 `json:"code,omitempty"`
|
||||
Msg string `json:"msg,omitempty"`
|
||||
ErrorMsg string `json:"errorMsg,omitempty"`
|
||||
}
|
||||
OpenstackOverview {
|
||||
max_total_cores int32 `json:"max_total_cores"`
|
||||
max_total_ram_size int32 `json:"max_total_ram_size"`
|
||||
max_total_volumes int32 `json:"max_total_volumes"`
|
||||
}
|
||||
|
||||
)
|
||||
/****************** servers start*************************/
|
||||
type (
|
||||
|
|
|
@ -45,11 +45,6 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
|
|||
Path: "/core/commitVmTask",
|
||||
Handler: core.CommitVmTaskHandler(serverCtx),
|
||||
},
|
||||
{
|
||||
Method: http.MethodPost,
|
||||
Path: "/core/commitVmTaskTemp",
|
||||
Handler: core.CommitVmTaskTempHandler(serverCtx),
|
||||
},
|
||||
{
|
||||
Method: http.MethodDelete,
|
||||
Path: "/core/deleteTask/:id",
|
||||
|
@ -461,6 +456,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
|
|||
Path: "/vm/getImageNum",
|
||||
Handler: vm.GetImageNumHandler(serverCtx),
|
||||
},
|
||||
{
|
||||
Method: http.MethodGet,
|
||||
Path: "/vm/getOpenstackOverview",
|
||||
Handler: vm.GetOpenstackOverviewHandler(serverCtx),
|
||||
},
|
||||
{
|
||||
Method: http.MethodGet,
|
||||
Path: "/vm/listServer",
|
||||
|
|
|
@ -1,24 +1,24 @@
|
|||
package core
|
||||
package vm
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/zeromicro/go-zero/rest/httpx"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/vm"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
)
|
||||
|
||||
func CommitVmTaskTempHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||
func GetOpenstackOverviewHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
var req types.CommitVmTaskReq
|
||||
var req types.OpenstackOverviewReq
|
||||
if err := httpx.Parse(r, &req); err != nil {
|
||||
httpx.ErrorCtx(r.Context(), w, err)
|
||||
return
|
||||
}
|
||||
|
||||
l := core.NewCommitVmTaskTempLogic(r.Context(), svcCtx)
|
||||
resp, err := l.CommitVmTaskTemp(&req)
|
||||
l := vm.NewGetOpenstackOverviewLogic(r.Context(), svcCtx)
|
||||
resp, err := l.GetOpenstackOverview(&req)
|
||||
if err != nil {
|
||||
httpx.ErrorCtx(r.Context(), w, err)
|
||||
} else {
|
|
@ -42,6 +42,14 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type
|
|||
if tx.Error != nil {
|
||||
return nil, tx.Error
|
||||
}
|
||||
|
||||
var clusterIds []int64
|
||||
l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? and label = ?", req.AdapterId, req.ClusterType).Scan(&clusterIds)
|
||||
|
||||
if len(clusterIds) == 0 || clusterIds == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
vm := models.Vm{}
|
||||
tool.Convert(req, &vm)
|
||||
mqInfo := response.TaskInfo{
|
||||
|
@ -52,5 +60,11 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type
|
|||
}
|
||||
//req.TaskId = taskModel.Id
|
||||
mqs.InsQueue.Beta.Add(&mqInfo)
|
||||
return
|
||||
tx = l.svcCtx.DbEngin.Create(&mqInfo)
|
||||
resp = &types.CommitVmTaskResp{
|
||||
Code: 200,
|
||||
Msg: "success",
|
||||
TaskId: taskModel.Id,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/jinzhu/copier"
|
||||
"github.com/pkg/errors"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"gitlink.org.cn/JointCloud/pcm-openstack/openstack"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type CommitVmTaskTempLogic struct {
|
||||
logx.Logger
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewCommitVmTaskTempLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitVmTaskTempLogic {
|
||||
return &CommitVmTaskTempLogic{
|
||||
Logger: logx.WithContext(ctx),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *CommitVmTaskTempLogic) CommitVmTaskTemp(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) {
|
||||
// todo: add your logic here and delete this line
|
||||
CreateServerReq := &openstack.CreateServerReq{}
|
||||
err = copier.CopyWithOption(CreateServerReq, req, copier.Option{Converters: utils.Converters})
|
||||
CreateServerResp, err := l.svcCtx.OpenstackRpc.CreateServer(l.ctx, CreateServerReq)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get Servers list"), "Failed to get db Servers list err : %v ,req:%+v", err, req)
|
||||
}
|
||||
marshal, err := json.Marshal(&CreateServerResp)
|
||||
if err != nil {
|
||||
return nil, result.NewDefaultError(err.Error())
|
||||
}
|
||||
json.Unmarshal(marshal, &resp)
|
||||
err = copier.CopyWithOption(&resp, &CreateServerResp, copier.Option{Converters: utils.Converters})
|
||||
return resp, err
|
||||
return
|
||||
}
|
|
@ -39,11 +39,10 @@ func (l *AddDictItemLogic) AddDictItem(req *types.DictItemEditReq) (resp *types.
|
|||
dictItem.ItemValue = req.ItemValue
|
||||
dictItem.Description = req.Description
|
||||
dictItem.SortOrder = req.SortOrder
|
||||
dictItem.Type = req.Type
|
||||
dictItem.ParentId = "0"
|
||||
if req.ParentId != "" {
|
||||
dictItem.ParentId = req.ParentId
|
||||
}
|
||||
dictItem.ParentId = "0"
|
||||
dictItem.Status = req.Status
|
||||
dictItem.Id = utils.GenSnowflakeIDStr()
|
||||
dictItem.CreateTime = time.Now().Format("2006-01-02 15:04:05")
|
||||
|
|
|
@ -30,7 +30,7 @@ func (l *EditDictItemLogic) EditDictItem(req *types.DictItemEditReq) (resp *type
|
|||
dictItem := &types.DictItemInfo{}
|
||||
result := l.svcCtx.DbEngin.Table("t_dict_item").First(&dictItem, req.Id)
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
logx.Errorf("Dictionary data editing failure. errors: %s", err.Error())
|
||||
logx.Errorf("Dictionary data editing failure. errors: %s", result.Error)
|
||||
return nil, errors.New("DictItem does not exist")
|
||||
}
|
||||
utils.Convert(req, &dictItem)
|
||||
|
|
|
@ -31,7 +31,7 @@ func (l *EditDictLogic) EditDict(req *types.DictEditReq) (resp *types.DictResp,
|
|||
dict := &types.DictInfo{}
|
||||
result := l.svcCtx.DbEngin.Table("t_dict").First(&dict, req.Id)
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
logx.Errorf("Dictionary editing failure. errors: %s", err.Error())
|
||||
logx.Errorf("Dictionary editing failure. errors: %s", result.Error)
|
||||
return nil, errors.New("Dict does not exist")
|
||||
}
|
||||
utils.Convert(req, &dict)
|
||||
|
|
|
@ -2,6 +2,7 @@ package dictionary
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
|
@ -29,12 +30,13 @@ func (l *ListDictItemByCodeLogic) ListDictItemByCode(req *types.DictCodeReq) (re
|
|||
db := l.svcCtx.DbEngin.Model(&types.DictInfo{}).Table("t_dict")
|
||||
|
||||
// 左连接查询
|
||||
db.Select("t_dict_item.*").Joins("left join t_dict_item on t_dict.id = t_dict_item.dict_id").
|
||||
err = db.Select("t_dict_item.*").Joins("left join t_dict_item on t_dict.id = t_dict_item.dict_id").
|
||||
Where("t_dict.dict_code = ?", req.DictCode).
|
||||
Where("t_dict_item.status", 1).
|
||||
Order("t_dict_item.sort_order").Scan(&dictList)
|
||||
Order("t_dict_item.sort_order").Scan(&dictList).Error
|
||||
if err != nil {
|
||||
return resp, err
|
||||
logx.Errorf("ListDictItemByCode()=> failed %s", err.Error())
|
||||
return nil, errors.New("description Failed to query dictionary entry data")
|
||||
}
|
||||
resp.List = dictList
|
||||
return resp, nil
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package vm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
)
|
||||
|
||||
type GetOpenstackOverviewLogic struct {
|
||||
logx.Logger
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewGetOpenstackOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetOpenstackOverviewLogic {
|
||||
return &GetOpenstackOverviewLogic{
|
||||
Logger: logx.WithContext(ctx),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *GetOpenstackOverviewLogic) GetOpenstackOverview(req *types.OpenstackOverviewReq) (resp *types.OpenstackOverviewResp, err error) {
|
||||
// todo: add your logic here and delete this line
|
||||
var openstackOverview types.OpenstackOverview
|
||||
sqlStr := "SELECT t.max_total_cores,t.max_total_ram_size,t.max_total_volumes FROM `vm_openstack_overview` t left join t_cluster tc on t.cluster_id=tc.id where tc.`name` = ?"
|
||||
l.svcCtx.DbEngin.Raw(sqlStr, req.Platform).Scan(&openstackOverview)
|
||||
resp = &types.OpenstackOverviewResp{
|
||||
Code: 200,
|
||||
Msg: "success",
|
||||
Data: openstackOverview,
|
||||
}
|
||||
return resp, err
|
||||
}
|
|
@ -5,9 +5,8 @@ import (
|
|||
)
|
||||
|
||||
type Weight struct {
|
||||
Id int64
|
||||
Id string
|
||||
Weight int32
|
||||
Name string
|
||||
Replica int32
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||
|
@ -28,7 +29,6 @@ import (
|
|||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"gitlink.org.cn/JointCloud/pcm-octopus/octopus"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
|
@ -65,7 +65,7 @@ func (as *AiScheduler) GetNewStructForDb(task *response.TaskInfo, resource strin
|
|||
func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
||||
if as.option.AiClusterId != "" {
|
||||
// TODO database operation Find
|
||||
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ParticipantId: 0, Name: "", Replicas: 1}}, nil
|
||||
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: "", Replicas: 1}}, nil
|
||||
}
|
||||
|
||||
resources, err := as.findClustersWithResources()
|
||||
|
@ -79,8 +79,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
|||
|
||||
if len(resources) == 1 {
|
||||
var cluster strategy.AssignedCluster
|
||||
cluster.ParticipantId = resources[0].ParticipantId
|
||||
cluster.Name = resources[0].Name
|
||||
cluster.ClusterId = resources[0].ClusterId
|
||||
cluster.Replicas = 1
|
||||
return &strategy.SingleAssignment{Cluster: &cluster}, nil
|
||||
}
|
||||
|
@ -89,7 +88,11 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
|||
|
||||
switch as.option.StrategyName {
|
||||
case strategy.REPLICATION:
|
||||
strategy := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: 1})
|
||||
var clusterIds []string
|
||||
for _, resource := range resources {
|
||||
clusterIds = append(clusterIds, resource.ClusterId)
|
||||
}
|
||||
strategy := strategy.NewReplicationStrategy(clusterIds, 1)
|
||||
return strategy, nil
|
||||
case strategy.RESOURCES_PRICING:
|
||||
strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1})
|
||||
|
@ -111,32 +114,46 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
|||
return nil, errors.New("clusters is nil")
|
||||
}
|
||||
|
||||
for i := len(clusters) - 1; i >= 0; i-- {
|
||||
if clusters[i].Replicas == 0 {
|
||||
clusters = append(clusters[:i], clusters[i+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
if len(clusters) == 0 {
|
||||
return nil, errors.New("clusters is nil")
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var results []*AiResult
|
||||
var errs []error
|
||||
var errs []interface{}
|
||||
var ch = make(chan *AiResult, len(clusters))
|
||||
var errCh = make(chan error, len(clusters))
|
||||
var errCh = make(chan interface{}, len(clusters))
|
||||
|
||||
executorMap := *as.AiExecutor
|
||||
for _, cluster := range clusters {
|
||||
c := cluster
|
||||
if cluster.Replicas == 0 {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
opt, _ := cloneAiOption(as.option)
|
||||
resp, err := executorMap[c.Name].Execute(as.ctx, opt)
|
||||
resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt)
|
||||
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
e := struct {
|
||||
err error
|
||||
clusterId string
|
||||
}{
|
||||
err: err,
|
||||
clusterId: c.ClusterId,
|
||||
}
|
||||
errCh <- e
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
result, _ := convertType(resp)
|
||||
result.Replica = c.Replicas
|
||||
result.ClusterId = strconv.FormatInt(c.ParticipantId, 10)
|
||||
result.ClusterId = c.ClusterId
|
||||
|
||||
ch <- result
|
||||
wg.Done()
|
||||
|
@ -150,10 +167,22 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
|||
errs = append(errs, e)
|
||||
}
|
||||
|
||||
if len(errs) != 0 {
|
||||
if len(errs) != len(clusters) {
|
||||
return nil, errors.New("submit task failed")
|
||||
}
|
||||
|
||||
if len(errs) != 0 {
|
||||
var msg string
|
||||
for _, err := range errs {
|
||||
e := (err).(struct {
|
||||
err error
|
||||
clusterId string
|
||||
})
|
||||
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
|
||||
}
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
|
||||
for s := range ch {
|
||||
// TODO: database operation
|
||||
results = append(results, s)
|
||||
|
|
|
@ -9,7 +9,7 @@ type AiCollector interface {
|
|||
}
|
||||
|
||||
type ResourceStats struct {
|
||||
ParticipantId int64
|
||||
ClusterId string
|
||||
Name string
|
||||
CpuCoreAvail int64
|
||||
CpuCoreTotal int64
|
||||
|
|
|
@ -33,15 +33,14 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
|
|||
for _, res := range ps.resources {
|
||||
if opt.ResourceType == "cpu" {
|
||||
if res.CpuCoreHours <= 0 {
|
||||
cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas}
|
||||
cluster := &AssignedCluster{ClusterId: res.ClusterId, Replicas: ps.replicas}
|
||||
results = append(results, cluster)
|
||||
return results, nil
|
||||
}
|
||||
|
||||
if res.CpuCoreHours > maxCpuCoreHoursAvailable {
|
||||
maxCpuCoreHoursAvailable = res.CpuCoreHours
|
||||
assignedCluster.Name = res.Name
|
||||
assignedCluster.ParticipantId = res.ParticipantId
|
||||
assignedCluster.ClusterId = res.ClusterId
|
||||
assignedCluster.Replicas = ps.replicas
|
||||
}
|
||||
}
|
||||
|
@ -56,8 +55,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
|
|||
}
|
||||
if maxCurrentCardHours > maxCardHoursAvailable {
|
||||
maxCardHoursAvailable = maxCurrentCardHours
|
||||
assignedCluster.Name = res.Name
|
||||
assignedCluster.ParticipantId = res.ParticipantId
|
||||
assignedCluster.ClusterId = res.ClusterId
|
||||
assignedCluster.Replicas = ps.replicas
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
package param
|
||||
|
||||
import "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
|
||||
|
||||
type ReplicationParams struct {
|
||||
Replicas int32
|
||||
*Params
|
||||
}
|
||||
|
||||
func (r *ReplicationParams) GetReplicas() int32 {
|
||||
return r.Replicas
|
||||
}
|
||||
|
||||
func (r *ReplicationParams) GetParticipants() []*entity.Participant {
|
||||
var participants []*entity.Participant
|
||||
for _, resource := range r.Resources {
|
||||
participants = append(participants, &entity.Participant{
|
||||
Participant_id: resource.ParticipantId,
|
||||
Name: resource.Name,
|
||||
})
|
||||
}
|
||||
return participants
|
||||
}
|
|
@ -2,6 +2,7 @@ package param
|
|||
|
||||
import (
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type ResourcePricingParams struct {
|
||||
|
@ -21,8 +22,9 @@ func (r *ResourcePricingParams) GetTask() *providerPricing.Task {
|
|||
func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider {
|
||||
var providerList []*providerPricing.Provider
|
||||
for _, resource := range r.Resources {
|
||||
id, _ := strconv.ParseInt(resource.ClusterId, 10, 64)
|
||||
provider := providerPricing.NewProvider(
|
||||
resource.ParticipantId,
|
||||
id,
|
||||
float64(resource.CpuCoreAvail),
|
||||
resource.MemAvail,
|
||||
resource.DiskAvail, 0.0, 0.0, 0.0)
|
||||
|
|
|
@ -2,33 +2,31 @@ package strategy
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
|
||||
)
|
||||
|
||||
type ReplicationStrategy struct {
|
||||
replicas int32
|
||||
participants []*entity.Participant
|
||||
clusterIds []string
|
||||
}
|
||||
|
||||
func NewReplicationStrategy(params *param.ReplicationParams) *ReplicationStrategy {
|
||||
return &ReplicationStrategy{replicas: params.GetReplicas(),
|
||||
participants: params.GetParticipants(),
|
||||
func NewReplicationStrategy(clusterIds []string, replicas int32) *ReplicationStrategy {
|
||||
return &ReplicationStrategy{clusterIds: clusterIds,
|
||||
replicas: replicas,
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) {
|
||||
if ps.replicas < 1 {
|
||||
func (r *ReplicationStrategy) Schedule() ([]*AssignedCluster, error) {
|
||||
if r.replicas < 1 {
|
||||
return nil, errors.New("replicas must be greater than 0")
|
||||
}
|
||||
|
||||
if ps.participants == nil {
|
||||
return nil, errors.New("participantId must be set")
|
||||
if len(r.clusterIds) == 0 {
|
||||
return nil, errors.New("clusterIds must be set")
|
||||
}
|
||||
|
||||
var results []*AssignedCluster
|
||||
for _, p := range ps.participants {
|
||||
cluster := &AssignedCluster{ParticipantId: p.Participant_id, Name: p.Name, Replicas: ps.replicas}
|
||||
for _, c := range r.clusterIds {
|
||||
cluster := &AssignedCluster{ClusterId: c, Replicas: r.replicas}
|
||||
results = append(results, cluster)
|
||||
}
|
||||
return results, nil
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"errors"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type PricingStrategy struct {
|
||||
|
@ -154,7 +155,7 @@ func (ps *PricingStrategy) Schedule() ([]*AssignedCluster, error) {
|
|||
if e == 0 {
|
||||
continue
|
||||
}
|
||||
cluster := &AssignedCluster{ParticipantId: ps.ProviderList[i].Pid, Replicas: int32(e)}
|
||||
cluster := &AssignedCluster{ClusterId: strconv.FormatInt(ps.ProviderList[i].Pid, 10), Replicas: int32(e)}
|
||||
results = append(results, cluster)
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
|
|||
weights := make([]*weightDistributing.Weight, 0)
|
||||
for k, v := range s.staticWeightMap {
|
||||
weight := &weightDistributing.Weight{
|
||||
Name: k,
|
||||
Id: k,
|
||||
Weight: v,
|
||||
}
|
||||
weights = append(weights, weight)
|
||||
|
@ -39,7 +39,7 @@ func (s *StaticWeightStrategy) Schedule() ([]*AssignedCluster, error) {
|
|||
|
||||
var results []*AssignedCluster
|
||||
for _, weight := range weights {
|
||||
cluster := &AssignedCluster{ParticipantId: weight.Id, Name: weight.Name, Replicas: weight.Replica}
|
||||
cluster := &AssignedCluster{ClusterId: weight.Id, Replicas: weight.Replica}
|
||||
results = append(results, cluster)
|
||||
}
|
||||
|
||||
|
|
|
@ -18,8 +18,7 @@ type Strategy interface {
|
|||
}
|
||||
|
||||
type AssignedCluster struct {
|
||||
ParticipantId int64
|
||||
Name string
|
||||
ClusterId string
|
||||
Replicas int32
|
||||
}
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/entity"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
|
||||
"testing"
|
||||
)
|
||||
|
||||
|
@ -17,14 +16,14 @@ func TestReplication(t *testing.T) {
|
|||
}
|
||||
rsc := []*collector.ResourceStats{
|
||||
{
|
||||
ParticipantId: 1,
|
||||
ClusterId: "1",
|
||||
Name: "test1",
|
||||
},
|
||||
{
|
||||
ParticipantId: 1,
|
||||
ClusterId: "2",
|
||||
Name: "test2"},
|
||||
{
|
||||
ParticipantId: 1,
|
||||
ClusterId: "3",
|
||||
Name: "test3"},
|
||||
}
|
||||
tests := []struct {
|
||||
|
@ -47,8 +46,11 @@ func TestReplication(t *testing.T) {
|
|||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
params := ¶m.Params{Resources: rsc}
|
||||
repl := strategy.NewReplicationStrategy(¶m.ReplicationParams{Params: params, Replicas: tt.replica})
|
||||
var clusterIds []string
|
||||
for _, stats := range rsc {
|
||||
clusterIds = append(clusterIds, stats.ClusterId)
|
||||
}
|
||||
repl := strategy.NewReplicationStrategy(clusterIds, 0)
|
||||
schedule, err := repl.Schedule()
|
||||
if err != nil {
|
||||
return
|
||||
|
|
|
@ -283,7 +283,7 @@ func (o *OctopusLink) GetResourceStats(ctx context.Context) (*collector.Resource
|
|||
}
|
||||
|
||||
resourceStats := &collector.ResourceStats{
|
||||
ParticipantId: o.participantId,
|
||||
ClusterId: strconv.FormatInt(o.participantId, 10),
|
||||
Name: o.platform,
|
||||
Balance: balance,
|
||||
CardsAvail: cards,
|
||||
|
|
|
@ -284,14 +284,14 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
|
|||
totalDcu := limitResp.Data.AccountMaxDcu
|
||||
|
||||
//disk
|
||||
//diskReq := &hpcAC.ParaStorQuotaReq{}
|
||||
//diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
//
|
||||
//totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
|
||||
//availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
|
||||
diskReq := &hpcAC.ParaStorQuotaReq{}
|
||||
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
|
||||
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
|
||||
|
||||
//memory
|
||||
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
|
||||
|
@ -344,13 +344,13 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
|
|||
}
|
||||
cards = append(cards, dcu)
|
||||
resourceStats := &collector.ResourceStats{
|
||||
ParticipantId: s.participantId,
|
||||
ClusterId: strconv.FormatInt(s.participantId, 10),
|
||||
Name: s.platform,
|
||||
Balance: balance,
|
||||
CpuCoreTotal: totalCpu,
|
||||
CpuCoreAvail: CpuCoreAvail,
|
||||
//DiskTotal: totalDisk,
|
||||
//DiskAvail: availDisk,
|
||||
DiskTotal: totalDisk,
|
||||
DiskAvail: availDisk,
|
||||
MemTotal: memSize,
|
||||
MemAvail: MemAvail,
|
||||
CpuCoreHours: cpuHours,
|
||||
|
|
|
@ -117,6 +117,8 @@ type CommitVmTaskReq struct {
|
|||
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||
Servers []ServerCommit `json:"servers,optional"`
|
||||
Platform string `json:"platform,optional"`
|
||||
AdapterId string `json:"adapterId,optional"`
|
||||
ClusterType string `json:"clusterType,optional"`
|
||||
}
|
||||
|
||||
type ServerCommit struct {
|
||||
|
@ -146,7 +148,9 @@ type Block_device_mapping_v2Commit struct {
|
|||
}
|
||||
|
||||
type CommitVmTaskResp struct {
|
||||
VmTask []VmTask `json:"vmTask" copier:"VmTask"`
|
||||
TaskId int64 `json:"taskId"`
|
||||
Code int32 `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
}
|
||||
|
||||
type VmTask struct {
|
||||
|
@ -787,7 +791,6 @@ type DictItemInfo struct {
|
|||
ItemValue string `json:"itemValue,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
SortOrder string `json:"sortOrder,omitempty"`
|
||||
Type string `json:"type,omitempty" db:"type"`
|
||||
ParentId string `json:"parentId,omitempty"`
|
||||
Status string `json:"status,omitempty" db:"status"`
|
||||
CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
|
||||
|
@ -813,7 +816,6 @@ type DictItemEditReq struct {
|
|||
ItemValue string `json:"itemValue,optional"`
|
||||
Description string `json:"description,optional"`
|
||||
SortOrder string `json:"sortOrder,optional"`
|
||||
Type string `json:"type,optional"`
|
||||
ParentId string `json:"parentId,optional"`
|
||||
Status string `json:"status,optional"`
|
||||
}
|
||||
|
@ -825,7 +827,6 @@ type DictItemResp struct {
|
|||
ItemValue string `json:"itemValue,omitempty"`
|
||||
Description string `json:"description,omitempty"`
|
||||
SortOrder string `json:"sortOrder,omitempty"`
|
||||
Type string `json:"type,omitempty"`
|
||||
ParentId string `json:"parentId,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
|
||||
|
@ -2691,6 +2692,23 @@ type GetVolumeLimitsResp struct {
|
|||
ErrorMsg string `json:"errorMsg,omitempty"`
|
||||
}
|
||||
|
||||
type OpenstackOverviewReq struct {
|
||||
Platform string `form:"platform,optional"`
|
||||
}
|
||||
|
||||
type OpenstackOverviewResp struct {
|
||||
Data OpenstackOverview `json:"data"`
|
||||
Code int32 `json:"code,omitempty"`
|
||||
Msg string `json:"msg,omitempty"`
|
||||
ErrorMsg string `json:"errorMsg,omitempty"`
|
||||
}
|
||||
|
||||
type OpenstackOverview struct {
|
||||
Max_total_cores int32 `json:"max_total_cores"`
|
||||
Max_total_ram_size int32 `json:"max_total_ram_size"`
|
||||
Max_total_volumes int32 `json:"max_total_volumes"`
|
||||
}
|
||||
|
||||
type ListServersReq struct {
|
||||
Limit int32 `form:"limit,optional"`
|
||||
OffSet int32 `form:"offSet,optional"`
|
||||
|
@ -5316,6 +5334,123 @@ type AiAlgorithmsResp struct {
|
|||
Algorithms []string `json:"algorithms"`
|
||||
}
|
||||
|
||||
type PullTaskInfoReq struct {
|
||||
AdapterId int64 `form:"adapterId"`
|
||||
}
|
||||
|
||||
type PullTaskInfoResp struct {
|
||||
HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"`
|
||||
CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"`
|
||||
AiInfoList []*AiInfo `json:"AiInfoList,omitempty"`
|
||||
VmInfoList []*VmInfo `json:"VmInfoList,omitempty"`
|
||||
}
|
||||
|
||||
type HpcInfo struct {
|
||||
Id int64 `json:"id"` // id
|
||||
TaskId int64 `json:"task_id"` // 任务id
|
||||
JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id)
|
||||
AdapterId int64 `json:"adapter_id"` // 执行任务的适配器id
|
||||
ClusterId int64 `json:"cluster_id"` // 执行任务的集群id
|
||||
ClusterType string `json:"cluster_type"` // 执行任务的集群类型
|
||||
Name string `json:"name"` // 名称
|
||||
Status string `json:"status"` // 状态
|
||||
CmdScript string `json:"cmd_script"`
|
||||
StartTime string `json:"start_time"` // 开始时间
|
||||
RunningTime int64 `json:"running_time"` // 运行时间
|
||||
DerivedEs string `json:"derived_es"`
|
||||
Cluster string `json:"cluster"`
|
||||
BlockId int64 `json:"block_id"`
|
||||
AllocNodes int64 `json:"alloc_nodes"`
|
||||
AllocCpu int64 `json:"alloc_cpu"`
|
||||
CardCount int64 `json:"card_count"` // 卡数
|
||||
Version string `json:"version"`
|
||||
Account string `json:"account"`
|
||||
WorkDir string `json:"work_dir"` // 工作路径
|
||||
AssocId int64 `json:"assoc_id"`
|
||||
ExitCode int64 `json:"exit_code"`
|
||||
WallTime string `json:"wall_time"` // 最大运行时间
|
||||
Result string `json:"result"` // 运行结果
|
||||
DeletedAt string `json:"deleted_at"` // 删除时间
|
||||
YamlString string `json:"yaml_string"`
|
||||
AppType string `json:"app_type"` // 应用类型
|
||||
AppName string `json:"app_name"` // 应用名称
|
||||
Queue string `json:"queue"` // 队列名称
|
||||
SubmitType string `json:"submit_type"` // cmd(命令行模式)
|
||||
NNode string `json:"n_node"` // 节点个数(当指定该参数时,GAP_NODE_STRING必须为"")
|
||||
StdOutFile string `json:"std_out_file"` // 工作路径/std.err.%j
|
||||
StdErrFile string `json:"std_err_file"` // 工作路径/std.err.%j
|
||||
StdInput string `json:"std_input"`
|
||||
Environment string `json:"environment"`
|
||||
DeletedFlag int64 `json:"deleted_flag"` // 是否删除(0-否,1-是)
|
||||
CreatedBy int64 `json:"created_by"` // 创建人
|
||||
CreatedTime string `json:"created_time"` // 创建时间
|
||||
UpdatedBy int64 `json:"updated_by"` // 更新人
|
||||
UpdatedTime string `json:"updated_time"` // 更新时间
|
||||
}
|
||||
|
||||
type CloudInfo struct {
|
||||
Participant int64 `json:"participant,omitempty"`
|
||||
Id int64 `json:"id,omitempty"`
|
||||
TaskId int64 `json:"taskId,omitempty"`
|
||||
ApiVersion string `json:"apiVersion,omitempty"`
|
||||
Kind string `json:"kind,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
StartTime string `json:"startTime,omitempty"`
|
||||
RunningTime int64 `json:"runningTime,omitempty"`
|
||||
Result string `json:"result,omitempty"`
|
||||
YamlString string `json:"yamlString,omitempty"`
|
||||
}
|
||||
|
||||
type AiInfo struct {
|
||||
ParticipantId int64 `json:"participantId,omitempty"`
|
||||
TaskId int64 `json:"taskId,omitempty"`
|
||||
ProjectId string `json:"project_id,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
StartTime string `json:"startTime,omitempty"`
|
||||
RunningTime int64 `json:"runningTime,omitempty"`
|
||||
Result string `json:"result,omitempty"`
|
||||
JobId string `json:"jobId,omitempty"`
|
||||
CreateTime string `json:"createTime,omitempty"`
|
||||
ImageUrl string `json:"imageUrl,omitempty"`
|
||||
Command string `json:"command,omitempty"`
|
||||
FlavorId string `json:"flavorId,omitempty"`
|
||||
SubscriptionId string `json:"subscriptionId,omitempty"`
|
||||
ItemVersionId string `json:"itemVersionId,omitempty"`
|
||||
}
|
||||
|
||||
type VmInfo struct {
|
||||
ParticipantId int64 `json:"participantId,omitempty"`
|
||||
TaskId int64 `json:"taskId,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
FlavorRef string `json:"flavor_ref,omitempty"`
|
||||
ImageRef string `json:"image_ref,omitempty"`
|
||||
NetworkUuid string `json:"network_uuid,omitempty"`
|
||||
BlockUuid string `json:"block_uuid,omitempty"`
|
||||
SourceType string `json:"source_type,omitempty"`
|
||||
DeleteOnTermination bool `json:"delete_on_termination,omitempty"`
|
||||
State string `json:"state,omitempty"`
|
||||
}
|
||||
|
||||
type PushTaskInfoReq struct {
|
||||
AdapterId int64 `json:"adapterId"`
|
||||
HpcInfoList []*HpcInfo `json:"hpcInfoList"`
|
||||
CloudInfoList []*CloudInfo `json:"cloudInfoList"`
|
||||
AiInfoList []*AiInfo `json:"aiInfoList"`
|
||||
VmInfoList []*VmInfo `json:"vmInfoList"`
|
||||
}
|
||||
|
||||
type PushTaskInfoResp struct {
|
||||
Code int64 `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
}
|
||||
|
||||
type PushResourceInfoReq struct {
|
||||
AdapterId int64 `json:"adapterId"`
|
||||
}
|
||||
|
||||
type CreateAlertRuleReq struct {
|
||||
CLusterId int64 `json:"clusterId"`
|
||||
ClusterName string `json:"clusterName"`
|
||||
|
|
Loading…
Reference in New Issue