Merge remote-tracking branch 'upstream/master'
Former-commit-id: 145e24a917b6bf77bf6759dc4bd340c506f18d44
This commit is contained in:
commit
f685ce4045
|
@ -6,6 +6,12 @@ type TaskOptions struct {
|
||||||
pushResourceInfoReq PushResourceInfoReq
|
pushResourceInfoReq PushResourceInfoReq
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Task interface {
|
||||||
|
PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error)
|
||||||
|
PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error)
|
||||||
|
PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error
|
||||||
|
}
|
||||||
|
|
||||||
type PullTaskInfoReq struct {
|
type PullTaskInfoReq struct {
|
||||||
AdapterId int64 `json:"adapterId"`
|
AdapterId int64 `json:"adapterId"`
|
||||||
}
|
}
|
||||||
|
@ -33,9 +39,3 @@ type PushTaskInfoResp struct {
|
||||||
type PushResourceInfoReq struct {
|
type PushResourceInfoReq struct {
|
||||||
AdapterId int64 `json:"adapterId"`
|
AdapterId int64 `json:"adapterId"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Task interface {
|
|
||||||
PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error)
|
|
||||||
PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error)
|
|
||||||
PushResourceInfo(pushResourceInfoReq PushResourceInfoReq)
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,13 +1,10 @@
|
||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/jinzhu/copier"
|
"io/ioutil"
|
||||||
"github.com/zeromicro/go-zero/core/logx"
|
"k8s.io/apimachinery/pkg/util/json"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
|
||||||
"gorm.io/gorm"
|
|
||||||
"log"
|
"log"
|
||||||
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
@ -29,128 +26,47 @@ func newTask(client *client, options *TaskOptions) (*task, error) {
|
||||||
return task, nil
|
return task, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) {
|
func (t *task) PullTaskInfo(pullTaskInfoReq PullTaskInfoReq) (*PullTaskInfoResp, error) {
|
||||||
result := PullTaskInfoResp{}
|
|
||||||
// 查询p端类型
|
|
||||||
var kind int32
|
|
||||||
t.client.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", pullTaskInfoReq.AdapterId).Scan(&kind)
|
|
||||||
// 查询云智超中的数据列表
|
|
||||||
switch kind {
|
|
||||||
case 2:
|
|
||||||
var hpcModelList []models.TaskHpc
|
|
||||||
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &hpcModelList)
|
|
||||||
utils.Convert(hpcModelList, &result.HpcInfoList)
|
|
||||||
if len(result.HpcInfoList) > 0 {
|
|
||||||
for i, hpcInfo := range hpcModelList {
|
|
||||||
err := copier.CopyWithOption(result.HpcInfoList[i], hpcInfo, copier.Option{Converters: utils.Converters})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var clusterType string
|
|
||||||
t.client.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType)
|
|
||||||
|
|
||||||
result.HpcInfoList[i].ClusterType = clusterType
|
url := t.client.url + "/pcm/v1/core/pullTaskInfo"
|
||||||
}
|
method := "GET"
|
||||||
}
|
infoReq := PullTaskInfoReq{AdapterId: pullTaskInfoReq.AdapterId}
|
||||||
case 0:
|
jsonStr, _ := json.Marshal(infoReq)
|
||||||
var cloudModelList []models.Cloud
|
payload := strings.NewReader(string(jsonStr))
|
||||||
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &cloudModelList)
|
|
||||||
utils.Convert(cloudModelList, &result.CloudInfoList)
|
client := &http.Client{}
|
||||||
case 1:
|
req, _ := http.NewRequest(method, url, payload)
|
||||||
var aiModelList []models.Ai
|
req.Header.Add("Content-Type", "application/json")
|
||||||
findModelList(pullTaskInfoReq.AdapterId, t.client.DbEngin, &aiModelList)
|
res, _ := client.Do(req)
|
||||||
utils.Convert(aiModelList, &result.AiInfoList)
|
defer res.Body.Close()
|
||||||
}
|
|
||||||
return &result, nil
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
|
var resp PullTaskInfoResp
|
||||||
|
json.Unmarshal(body, &resp)
|
||||||
|
return &resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) {
|
func (t *task) PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) {
|
||||||
// 查询p端类型
|
|
||||||
var kind int32
|
|
||||||
t.client.DbEngin.Raw("select type as kind from t_adapter where id = ?", pushTaskInfoReq.AdapterId).Scan(&kind)
|
|
||||||
switch kind {
|
|
||||||
case 0:
|
|
||||||
for _, cloudInfo := range pushTaskInfoReq.CloudInfoList {
|
|
||||||
t.client.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?",
|
|
||||||
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, pushTaskInfoReq.AdapterId, cloudInfo.Id)
|
|
||||||
syncTask(t.client.DbEngin, cloudInfo.TaskId)
|
|
||||||
}
|
|
||||||
case 2:
|
|
||||||
for _, hpcInfo := range pushTaskInfoReq.HpcInfoList {
|
|
||||||
t.client.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?",
|
|
||||||
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, pushTaskInfoReq.AdapterId, hpcInfo.TaskId, hpcInfo.Name)
|
|
||||||
syncTask(t.client.DbEngin, hpcInfo.TaskId)
|
|
||||||
}
|
|
||||||
case 1:
|
|
||||||
for _, aiInfo := range pushTaskInfoReq.AiInfoList {
|
|
||||||
t.client.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
|
|
||||||
aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, pushTaskInfoReq.AdapterId, aiInfo.TaskId, aiInfo.Name)
|
|
||||||
syncTask(t.client.DbEngin, aiInfo.TaskId)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &PushTaskInfoResp{}, nil
|
url := t.client.url + "/pcm/v1/core/pushTaskInfo"
|
||||||
|
method := "POST"
|
||||||
|
infoReq := PullTaskInfoReq{AdapterId: pushTaskInfoReq.AdapterId}
|
||||||
|
jsonStr, _ := json.Marshal(infoReq)
|
||||||
|
payload := strings.NewReader(string(jsonStr))
|
||||||
|
|
||||||
|
client := &http.Client{}
|
||||||
|
req, _ := http.NewRequest(method, url, payload)
|
||||||
|
req.Header.Add("Content-Type", "application/json")
|
||||||
|
res, _ := client.Do(req)
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
body, _ := ioutil.ReadAll(res.Body)
|
||||||
|
var resp PushTaskInfoResp
|
||||||
|
json.Unmarshal(body, &resp)
|
||||||
|
return &resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) {
|
func (t *task) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error {
|
||||||
//TODO implement me
|
//TODO implement me
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error {
|
|
||||||
tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data)
|
|
||||||
if tx.Error != nil {
|
|
||||||
return tx.Error
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func syncTask(gorm *gorm.DB, taskId int64) {
|
|
||||||
|
|
||||||
var allStatus string
|
|
||||||
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus)
|
|
||||||
if tx.Error != nil {
|
|
||||||
logx.Error(tx.Error)
|
|
||||||
}
|
|
||||||
// 子状态统一则修改主任务状态
|
|
||||||
statusArray := strings.Split(allStatus, ",")
|
|
||||||
if len(removeRepeatedElement(statusArray)) == 1 {
|
|
||||||
updateTask(gorm, taskId, statusArray[0])
|
|
||||||
|
|
||||||
}
|
|
||||||
// 子任务包含失败状态 主任务则失败
|
|
||||||
if strings.Contains(allStatus, constants.Failed) {
|
|
||||||
updateTask(gorm, taskId, constants.Failed)
|
|
||||||
|
|
||||||
}
|
|
||||||
if strings.Contains(allStatus, constants.Running) {
|
|
||||||
updateTask(gorm, taskId, constants.Running)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateTask(gorm *gorm.DB, taskId int64, status string) {
|
|
||||||
var task models.Task
|
|
||||||
gorm.Where("id = ? ", taskId).Find(&task)
|
|
||||||
if task.Status != status {
|
|
||||||
task.Status = status
|
|
||||||
gorm.Updates(&task)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func removeRepeatedElement(arr []string) (newArr []string) {
|
|
||||||
newArr = make([]string, 0)
|
|
||||||
for i := 0; i < len(arr); i++ {
|
|
||||||
repeat := false
|
|
||||||
for j := i + 1; j < len(arr); j++ {
|
|
||||||
if arr[i] == arr[j] {
|
|
||||||
repeat = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !repeat {
|
|
||||||
newArr = append(newArr, arr[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
|
@ -24,9 +24,9 @@ type (
|
||||||
}
|
}
|
||||||
CenterIndex {
|
CenterIndex {
|
||||||
name string `json:"name"`
|
name string `json:"name"`
|
||||||
cpu float32 `json:"cpu"`
|
cpu string `json:"cpu"`
|
||||||
memory float32 `json:"memory"`
|
memory string `json:"memory"`
|
||||||
storage float32 `json:"storage"`
|
storage string `json:"storage"`
|
||||||
centerType string `json:"centerType"`
|
centerType string `json:"centerType"`
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -35,9 +35,16 @@ type (
|
||||||
clusterLoadRecords []ClusterLoadRecord `json:"clusterLoadRecords"`
|
clusterLoadRecords []ClusterLoadRecord `json:"clusterLoadRecords"`
|
||||||
}
|
}
|
||||||
ClusterLoadRecord {
|
ClusterLoadRecord {
|
||||||
|
AdapterId int64 `json:"adapterId"`
|
||||||
ClusterName string `json:"clusterName"`
|
ClusterName string `json:"clusterName"`
|
||||||
|
CpuAvail float64 `json:"cpuAvail"`
|
||||||
|
CpuTotal float64 `json:"cpuTotal"`
|
||||||
CpuUsage float64 `json:"cpuUsage"`
|
CpuUsage float64 `json:"cpuUsage"`
|
||||||
|
MemoryAvail float64 `json:"memoryAvail"`
|
||||||
MemoryUsage float64 `json:"memoryUsage"`
|
MemoryUsage float64 `json:"memoryUsage"`
|
||||||
|
MemoryTotal float64 `json:"memoryTotal"`
|
||||||
|
DiskAvail float64 `json:"diskAvail"`
|
||||||
|
DiskTotal float64 `json:"diskTotal"`
|
||||||
DiskUsage float64 `json:"diskUsage"`
|
DiskUsage float64 `json:"diskUsage"`
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -78,6 +85,7 @@ type commitTaskReq {
|
||||||
Replicas int64 `json:"replicas,optional"`
|
Replicas int64 `json:"replicas,optional"`
|
||||||
MatchLabels map[string]string `json:"matchLabels,optional"`
|
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||||
YamlList []string `json:"yamlList"`
|
YamlList []string `json:"yamlList"`
|
||||||
|
ClusterName string `json:"clusterName"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
@ -101,7 +109,11 @@ type (
|
||||||
|
|
||||||
type (
|
type (
|
||||||
commitVmTaskReq {
|
commitVmTaskReq {
|
||||||
server ServerCommit `json:"server,optional"`
|
Name string `json:"name"`
|
||||||
|
NsID string `json:"nsID"`
|
||||||
|
Replicas int64 `json:"replicas,optional"`
|
||||||
|
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||||
|
server []ServerCommit `json:"server,optional"`
|
||||||
platform string `json:"platform,optional"`
|
platform string `json:"platform,optional"`
|
||||||
}
|
}
|
||||||
ServerCommit {
|
ServerCommit {
|
||||||
|
@ -128,7 +140,19 @@ type (
|
||||||
uuid string `json:"uuid,optional"`
|
uuid string `json:"uuid,optional"`
|
||||||
}
|
}
|
||||||
commitVmTaskResp {
|
commitVmTaskResp {
|
||||||
|
Id string `json:"id" copier:"Id"`
|
||||||
|
Links []VmLinks `json:"links" copier:"Links"`
|
||||||
|
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
|
||||||
|
SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"`
|
||||||
|
AdminPass string `json:"adminPass" copier:"AdminPass"`
|
||||||
|
}
|
||||||
|
VmLinks {
|
||||||
|
Href string `json:"href " copier:"Href"`
|
||||||
|
Rel string `json:"rel" copier:"Rel"`
|
||||||
|
}
|
||||||
|
|
||||||
|
VmSecurity_groups_server {
|
||||||
|
Name string `json:"name" copier:"Name"`
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -362,6 +362,14 @@ service pcm {
|
||||||
@handler GetVolumeLimitsHandler
|
@handler GetVolumeLimitsHandler
|
||||||
get /vm/getVolumeLimits (GetVolumeLimitsReq) returns (GetVolumeLimitsResp)
|
get /vm/getVolumeLimits (GetVolumeLimitsReq) returns (GetVolumeLimitsResp)
|
||||||
|
|
||||||
|
@doc "查询网络数量"
|
||||||
|
@handler GetNetworkNumHandler
|
||||||
|
get /vm/getNetworkNum (ListNetworksReq) returns (NetworkNum)
|
||||||
|
|
||||||
|
@doc "查询镜像列表"
|
||||||
|
@handler getImageNumHandler
|
||||||
|
get /vm/getImageNum (ListImagesReq) returns (ImageNum)
|
||||||
|
|
||||||
@doc "查询虚拟机列表"
|
@doc "查询虚拟机列表"
|
||||||
@handler ListServerHandler
|
@handler ListServerHandler
|
||||||
get /vm/listServer (ListServersReq) returns (ListServersResp)
|
get /vm/listServer (ListServersReq) returns (ListServersResp)
|
||||||
|
|
|
@ -339,13 +339,6 @@ type (
|
||||||
CreNetwork {
|
CreNetwork {
|
||||||
Uuid string `json:"uuid" copier:"Uuid"`
|
Uuid string `json:"uuid" copier:"Uuid"`
|
||||||
}
|
}
|
||||||
ServerResp {
|
|
||||||
Id string `json:"id" copier:"Id"`
|
|
||||||
Links []Links `json:"links" copier:"Links"`
|
|
||||||
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
|
|
||||||
SecurityGroups []Security_groups_server `json:"security_groups" copier:"SecurityGroups"`
|
|
||||||
AdminPass string `json:"adminPass" copier:"AdminPass"`
|
|
||||||
}
|
|
||||||
Security_groups_server {
|
Security_groups_server {
|
||||||
Name string `json:"name" copier:"Name"`
|
Name string `json:"name" copier:"Name"`
|
||||||
}
|
}
|
||||||
|
@ -356,6 +349,13 @@ type (
|
||||||
DestinationType string `json:"destination_type" copier:"DestinationType"`
|
DestinationType string `json:"destination_type" copier:"DestinationType"`
|
||||||
DeleteOnTermination bool `json:"delete_on_termination" copier:"DeleteOnTermination"`
|
DeleteOnTermination bool `json:"delete_on_termination" copier:"DeleteOnTermination"`
|
||||||
}
|
}
|
||||||
|
ServerResp {
|
||||||
|
Id string `json:"id" copier:"Id"`
|
||||||
|
Links []Links `json:"links" copier:"Links"`
|
||||||
|
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
|
||||||
|
SecurityGroups []Security_groups_server `json:"security_groups" copier:"SecurityGroups"`
|
||||||
|
AdminPass string `json:"adminPass" copier:"AdminPass"`
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
type(
|
type(
|
||||||
|
@ -674,6 +674,25 @@ type (
|
||||||
/******************find images end*************************/
|
/******************find images end*************************/
|
||||||
|
|
||||||
/******************find Networks end*************************/
|
/******************find Networks end*************************/
|
||||||
|
type (
|
||||||
|
|
||||||
|
NetworkNum {
|
||||||
|
NetworkNum int32 `json:"networkNum"`
|
||||||
|
Code int32 `json:"code,omitempty"`
|
||||||
|
Msg string `json:"msg,omitempty"`
|
||||||
|
ErrorMsg string `json:"errorMsg,omitempty"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
ImageNum {
|
||||||
|
ImageNum int32 `json:"imageNum"`
|
||||||
|
Code int32 `json:"code,omitempty"`
|
||||||
|
Msg string `json:"msg,omitempty"`
|
||||||
|
ErrorMsg string `json:"errorMsg,omitempty"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
ListNetworksReq {
|
ListNetworksReq {
|
||||||
Platform string `json:"platform,optional"`
|
Platform string `json:"platform,optional"`
|
||||||
|
|
|
@ -15,6 +15,8 @@ Cache:
|
||||||
- Host: 10.206.0.12:6379
|
- Host: 10.206.0.12:6379
|
||||||
Pass: redisPW123
|
Pass: redisPW123
|
||||||
|
|
||||||
|
PromUrl: http://47.92.39.128:30877
|
||||||
|
|
||||||
# k8s rpc
|
# k8s rpc
|
||||||
K8sNativeConf:
|
K8sNativeConf:
|
||||||
# target: nacos://10.206.0.12:8848/pcm.kubenative.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api
|
# target: nacos://10.206.0.12:8848/pcm.kubenative.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api
|
||||||
|
@ -68,9 +70,6 @@ OctopusRpcConf:
|
||||||
|
|
||||||
OpenstackRpcConf:
|
OpenstackRpcConf:
|
||||||
# target: nacos://10.206.0.12:8848/pcm.openstack.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api
|
# target: nacos://10.206.0.12:8848/pcm.openstack.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api
|
||||||
# target: nacos://127.0.0.1:8848/pcm.openstack.rpc?timeout=30s&namespaceid=test&groupname=DEFAULT_GROUP&appName=pcm.core.api
|
|
||||||
#Endpoints:
|
|
||||||
#- 127.0.0.1:2010
|
|
||||||
Endpoints:
|
Endpoints:
|
||||||
- 127.0.0.1:2010
|
- 127.0.0.1:2010
|
||||||
NonBlock: true
|
NonBlock: true
|
||||||
|
|
|
@ -49,6 +49,7 @@ type Config struct {
|
||||||
Password string
|
Password string
|
||||||
}
|
}
|
||||||
SnowflakeConf SnowflakeConf
|
SnowflakeConf SnowflakeConf
|
||||||
|
PromUrl string
|
||||||
}
|
}
|
||||||
|
|
||||||
// SnowflakeConf 雪花算法机器id配置
|
// SnowflakeConf 雪花算法机器id配置
|
||||||
|
|
|
@ -33,7 +33,7 @@ func SyncParticipantRpc(svc *svc.ServiceContext) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
svc.PromClient[participant.Id] = promClient
|
svc.MonitorClient[participant.Id] = promClient
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/rest/httpx"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
)
|
||||||
|
|
||||||
|
func PullTaskInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req clientCore.PullTaskInfoReq
|
||||||
|
if err := httpx.Parse(r, &req); err != nil {
|
||||||
|
httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
l := core.NewPullTaskInfoLogic(r.Context(), svcCtx)
|
||||||
|
resp, err := l.PullTaskInfo(&req)
|
||||||
|
if err != nil {
|
||||||
|
httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
} else {
|
||||||
|
httpx.OkJsonCtx(r.Context(), w, resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/rest/httpx"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/core"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
)
|
||||||
|
|
||||||
|
func PushTaskInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req clientCore.PushTaskInfoReq
|
||||||
|
if err := httpx.Parse(r, &req); err != nil {
|
||||||
|
httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
l := core.NewPushTaskInfoLogic(r.Context(), svcCtx)
|
||||||
|
resp, err := l.PushTaskInfo(&req)
|
||||||
|
if err != nil {
|
||||||
|
httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
} else {
|
||||||
|
httpx.OkJsonCtx(r.Context(), w, resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -425,6 +425,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
|
||||||
Path: "/vm/getVolumeLimits",
|
Path: "/vm/getVolumeLimits",
|
||||||
Handler: vm.GetVolumeLimitsHandler(serverCtx),
|
Handler: vm.GetVolumeLimitsHandler(serverCtx),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Method: http.MethodGet,
|
||||||
|
Path: "/vm/getNetworkNum",
|
||||||
|
Handler: vm.GetNetworkNumHandler(serverCtx),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Method: http.MethodGet,
|
||||||
|
Path: "/vm/getImageNum",
|
||||||
|
Handler: vm.GetImageNumHandler(serverCtx),
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Method: http.MethodGet,
|
Method: http.MethodGet,
|
||||||
Path: "/vm/listServer",
|
Path: "/vm/listServer",
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package vm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/rest/httpx"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/vm"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetImageNumHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req types.ListImagesReq
|
||||||
|
if err := httpx.Parse(r, &req); err != nil {
|
||||||
|
httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
l := vm.NewGetImageNumLogic(r.Context(), svcCtx)
|
||||||
|
resp, err := l.GetImageNum(&req)
|
||||||
|
if err != nil {
|
||||||
|
httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
} else {
|
||||||
|
httpx.OkJsonCtx(r.Context(), w, resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package vm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/rest/httpx"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/vm"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetNetworkNumHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req types.ListNetworksReq
|
||||||
|
if err := httpx.Parse(r, &req); err != nil {
|
||||||
|
httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
l := vm.NewGetNetworkNumLogic(r.Context(), svcCtx)
|
||||||
|
resp, err := l.GetNetworkNum(&req)
|
||||||
|
if err != nil {
|
||||||
|
httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
} else {
|
||||||
|
httpx.OkJsonCtx(r.Context(), w, resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,14 +27,14 @@ func NewControllerMetricsLogic(ctx context.Context, svcCtx *svc.ServiceContext)
|
||||||
|
|
||||||
func (l *ControllerMetricsLogic) ControllerMetrics(req *types.ControllerMetricsReq) (resp *types.ControllerMetricsResp, err error) {
|
func (l *ControllerMetricsLogic) ControllerMetrics(req *types.ControllerMetricsReq) (resp *types.ControllerMetricsResp, err error) {
|
||||||
resp = &types.ControllerMetricsResp{}
|
resp = &types.ControllerMetricsResp{}
|
||||||
if _, ok := l.svcCtx.PromClient[req.ParticipantId]; ok {
|
if _, ok := l.svcCtx.MonitorClient[req.ParticipantId]; ok {
|
||||||
if len(req.Pod) != 0 {
|
if len(req.Pod) != 0 {
|
||||||
resp.Data = l.svcCtx.PromClient[req.ParticipantId].GetNamedMetricsByTime(req.Metrics, req.Start, req.End, 60*time.Minute, tracker.PodOption{
|
resp.Data = l.svcCtx.MonitorClient[req.ParticipantId].GetNamedMetricsByTime(req.Metrics, req.Start, req.End, 60*time.Minute, tracker.PodOption{
|
||||||
PodName: req.Pod,
|
PodName: req.Pod,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
resp.Data = l.svcCtx.PromClient[req.ParticipantId].GetNamedMetricsByTime(req.Metrics, req.Start, req.End, 60*time.Minute, tracker.ControllerOption{
|
resp.Data = l.svcCtx.MonitorClient[req.ParticipantId].GetNamedMetricsByTime(req.Metrics, req.Start, req.End, 60*time.Minute, tracker.ControllerOption{
|
||||||
WorkloadName: req.WorkloadName,
|
WorkloadName: req.WorkloadName,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,8 @@ package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
|
||||||
|
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
|
@ -24,31 +26,41 @@ func NewCenterResourcesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *C
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *CenterResourcesLogic) CenterResources() (resp *types.CenterResourcesResp, err error) {
|
func (l *CenterResourcesLogic) CenterResources() (resp *types.CenterResourcesResp, err error) {
|
||||||
// todo: add your logic here and delete this line
|
|
||||||
centerIndex1 := types.CenterIndex{
|
|
||||||
Name: "阿里云",
|
|
||||||
Cpu: float32(12.33),
|
|
||||||
Memory: float32(64.55),
|
|
||||||
Storage: float32(33.90),
|
|
||||||
CenterType: "cloud",
|
|
||||||
}
|
|
||||||
centerIndex2 := types.CenterIndex{
|
|
||||||
Name: "A超算中心",
|
|
||||||
Cpu: float32(42.36),
|
|
||||||
Memory: float32(66.55),
|
|
||||||
Storage: float32(23.231),
|
|
||||||
CenterType: "hpc",
|
|
||||||
}
|
|
||||||
centerIndex3 := types.CenterIndex{
|
|
||||||
Name: "智算中心",
|
|
||||||
Cpu: float32(78.33),
|
|
||||||
Memory: float32(36.55),
|
|
||||||
Storage: float32(88.93),
|
|
||||||
CenterType: "ai",
|
|
||||||
}
|
|
||||||
resp = &types.CenterResourcesResp{}
|
resp = &types.CenterResourcesResp{}
|
||||||
resp.CentersIndex = append(resp.CentersIndex, centerIndex1)
|
rawData, err := l.svcCtx.PromClient.GetRawData("resource_top3", tracker.ClusterOption{})
|
||||||
resp.CentersIndex = append(resp.CentersIndex, centerIndex2)
|
if err != nil {
|
||||||
resp.CentersIndex = append(resp.CentersIndex, centerIndex3)
|
return nil, err
|
||||||
|
}
|
||||||
|
var centersIndex []*types.CenterIndex
|
||||||
|
data := rawData.(model.Vector)
|
||||||
|
for _, d := range data {
|
||||||
|
for _, v := range d.Metric {
|
||||||
|
centersIndex = append(centersIndex, &types.CenterIndex{Name: string(v)})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, centerIndex := range centersIndex {
|
||||||
|
// Query the types of resource centers
|
||||||
|
//l.svcCtx.DbEngin.Raw().Scan(¢erIndex.CenterType)
|
||||||
|
cpuRawData, err := l.svcCtx.PromClient.GetRawData("cluster_cpu_usage", tracker.ClusterOption{ClusterName: centerIndex.Name})
|
||||||
|
cpuData := cpuRawData.(model.Vector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
centerIndex.Cpu = cpuData[0].Value.String()
|
||||||
|
memoryRawData, err := l.svcCtx.PromClient.GetRawData("cluster_memory_usage", tracker.ClusterOption{ClusterName: centerIndex.Name})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
memoryData := memoryRawData.(model.Vector)
|
||||||
|
|
||||||
|
centerIndex.Memory = memoryData[0].Value.String()
|
||||||
|
diskRawData, err := l.svcCtx.PromClient.GetRawData("cluster_disk_usage", tracker.ClusterOption{ClusterName: centerIndex.Name})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
diskData := diskRawData.(model.Vector)
|
||||||
|
centerIndex.Storage = diskData[0].Value.String()
|
||||||
|
resp.CentersIndex = append(resp.CentersIndex, *centerIndex)
|
||||||
|
}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,9 +2,13 @@ package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/mqs"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/zeromicro/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
@ -25,6 +29,26 @@ func NewCommitVmTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm
|
||||||
|
|
||||||
func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) {
|
func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *types.CommitVmTaskResp, err error) {
|
||||||
// todo: add your logic here and delete this line
|
// todo: add your logic here and delete this line
|
||||||
|
//Building the main task structure
|
||||||
|
taskModel := models.Task{
|
||||||
|
Status: constants.Saved,
|
||||||
|
Name: req.Name,
|
||||||
|
CommitTime: time.Now(),
|
||||||
|
NsID: req.NsID,
|
||||||
|
}
|
||||||
|
// Save task data to database
|
||||||
|
tx := l.svcCtx.DbEngin.Create(&taskModel)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return nil, tx.Error
|
||||||
|
}
|
||||||
|
/* hpc := models.Hpc{}
|
||||||
|
tool.Convert(req, &hpc)*/
|
||||||
|
mqInfo := response.TaskInfo{
|
||||||
|
TaskId: taskModel.Id,
|
||||||
|
TaskType: "vm",
|
||||||
|
MatchLabels: req.MatchLabels,
|
||||||
|
NsID: req.NsID,
|
||||||
|
}
|
||||||
|
mqs.InsQueue.Beta.Add(&mqInfo)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/jinzhu/copier"
|
||||||
|
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PullTaskInfoLogic struct {
|
||||||
|
logx.Logger
|
||||||
|
ctx context.Context
|
||||||
|
svcCtx *svc.ServiceContext
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPullTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PullTaskInfoLogic {
|
||||||
|
return &PullTaskInfoLogic{
|
||||||
|
Logger: logx.WithContext(ctx),
|
||||||
|
ctx: ctx,
|
||||||
|
svcCtx: svcCtx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clientCore.PullTaskInfoResp, error) {
|
||||||
|
//opt := clientPCM.Options{
|
||||||
|
// Url: "http://localhost:8999",
|
||||||
|
// DataSource: "root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local",
|
||||||
|
//}
|
||||||
|
//coreCli, _ := clientPCM.NewClient(opt)
|
||||||
|
//taskOpt := clientPCM.TaskOptions{}
|
||||||
|
//coreTask, _ := coreCli.Task(taskOpt)
|
||||||
|
//adapterId := 1706858330967773111
|
||||||
|
//// 查询core端分发下来的任务列表
|
||||||
|
//pullReq := types.PullTaskInfoReq{
|
||||||
|
// AdapterId: int64(adapterId),
|
||||||
|
//}
|
||||||
|
//hpcList, _ := coreTask.PullTaskInfo(pullReq)
|
||||||
|
//println(hpcList)
|
||||||
|
// 查询p端类型
|
||||||
|
resp := clientCore.PullTaskInfoResp{}
|
||||||
|
|
||||||
|
var kind int32
|
||||||
|
l.svcCtx.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", req.AdapterId).Scan(&kind)
|
||||||
|
// 查询云智超中的数据列表
|
||||||
|
switch kind {
|
||||||
|
case 2:
|
||||||
|
var hpcModelList []models.TaskHpc
|
||||||
|
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &hpcModelList)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
utils.Convert(hpcModelList, &resp.HpcInfoList)
|
||||||
|
if len(resp.HpcInfoList) > 0 {
|
||||||
|
for i, hpcInfo := range hpcModelList {
|
||||||
|
err := copier.CopyWithOption(resp.HpcInfoList[i], hpcInfo, copier.Option{Converters: utils.Converters})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var clusterType string
|
||||||
|
l.svcCtx.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType)
|
||||||
|
|
||||||
|
resp.HpcInfoList[i].ClusterType = clusterType
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case 0:
|
||||||
|
var cloudModelList []models.Cloud
|
||||||
|
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
utils.Convert(cloudModelList, &resp.CloudInfoList)
|
||||||
|
case 1:
|
||||||
|
var aiModelList []models.Ai
|
||||||
|
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &aiModelList)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
utils.Convert(aiModelList, &resp.AiInfoList)
|
||||||
|
}
|
||||||
|
return &resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error {
|
||||||
|
tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return tx.Error
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PushTaskInfoLogic struct {
|
||||||
|
logx.Logger
|
||||||
|
ctx context.Context
|
||||||
|
svcCtx *svc.ServiceContext
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPushTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PushTaskInfoLogic {
|
||||||
|
return &PushTaskInfoLogic{
|
||||||
|
Logger: logx.WithContext(ctx),
|
||||||
|
ctx: ctx,
|
||||||
|
svcCtx: svcCtx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clientCore.PushTaskInfoResp, error) {
|
||||||
|
resp := clientCore.PushTaskInfoResp{}
|
||||||
|
var kind int32
|
||||||
|
l.svcCtx.DbEngin.Raw("select type as kind from t_adapter where id = ?", req.AdapterId).Scan(&kind)
|
||||||
|
switch kind {
|
||||||
|
case 0:
|
||||||
|
for _, cloudInfo := range req.CloudInfoList {
|
||||||
|
l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?",
|
||||||
|
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, req.AdapterId, cloudInfo.Id)
|
||||||
|
syncTask(l.svcCtx.DbEngin, cloudInfo.TaskId)
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
for _, hpcInfo := range req.HpcInfoList {
|
||||||
|
l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?",
|
||||||
|
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, req.AdapterId, hpcInfo.TaskId, hpcInfo.Name)
|
||||||
|
syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId)
|
||||||
|
}
|
||||||
|
case 1:
|
||||||
|
for _, aiInfo := range req.AiInfoList {
|
||||||
|
l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
|
||||||
|
aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, req.AdapterId, aiInfo.TaskId, aiInfo.Name)
|
||||||
|
syncTask(l.svcCtx.DbEngin, aiInfo.TaskId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func syncTask(gorm *gorm.DB, taskId int64) {
|
||||||
|
|
||||||
|
var allStatus string
|
||||||
|
tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus)
|
||||||
|
if tx.Error != nil {
|
||||||
|
logx.Error(tx.Error)
|
||||||
|
}
|
||||||
|
// 子状态统一则修改主任务状态
|
||||||
|
statusArray := strings.Split(allStatus, ",")
|
||||||
|
if len(removeRepeatedElement(statusArray)) == 1 {
|
||||||
|
updateTask(gorm, taskId, statusArray[0])
|
||||||
|
|
||||||
|
}
|
||||||
|
// 子任务包含失败状态 主任务则失败
|
||||||
|
if strings.Contains(allStatus, constants.Failed) {
|
||||||
|
updateTask(gorm, taskId, constants.Failed)
|
||||||
|
|
||||||
|
}
|
||||||
|
if strings.Contains(allStatus, constants.Running) {
|
||||||
|
updateTask(gorm, taskId, constants.Running)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateTask(gorm *gorm.DB, taskId int64, status string) {
|
||||||
|
var task models.Task
|
||||||
|
gorm.Where("id = ? ", taskId).Find(&task)
|
||||||
|
if task.Status != status {
|
||||||
|
task.Status = status
|
||||||
|
gorm.Updates(&task)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeRepeatedElement(arr []string) (newArr []string) {
|
||||||
|
newArr = make([]string, 0)
|
||||||
|
for i := 0; i < len(arr); i++ {
|
||||||
|
repeat := false
|
||||||
|
for j := i + 1; j < len(arr); j++ {
|
||||||
|
if arr[i] == arr[j] {
|
||||||
|
repeat = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !repeat {
|
||||||
|
newArr = append(newArr, arr[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/zeromicro/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
)
|
)
|
||||||
|
@ -26,9 +27,17 @@ func NewSyncClusterLoadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *S
|
||||||
func (l *SyncClusterLoadLogic) SyncClusterLoad(req *types.SyncClusterLoadReq) error {
|
func (l *SyncClusterLoadLogic) SyncClusterLoad(req *types.SyncClusterLoadReq) error {
|
||||||
if len(req.ClusterLoadRecords) != 0 {
|
if len(req.ClusterLoadRecords) != 0 {
|
||||||
for _, record := range req.ClusterLoadRecords {
|
for _, record := range req.ClusterLoadRecords {
|
||||||
tracker.ClusterCpuGauge.WithLabelValues(record.ClusterName).Set(record.CpuUsage)
|
tracker.ClusterCpuUsageGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuUsage)
|
||||||
tracker.ClusterMemoryGauge.WithLabelValues(record.ClusterName).Set(record.MemoryUsage)
|
tracker.ClusterCpuAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuAvail)
|
||||||
tracker.ClusterDiskGauge.WithLabelValues(record.ClusterName).Set(record.DiskUsage)
|
tracker.ClusterCpuTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuTotal)
|
||||||
|
|
||||||
|
tracker.ClusterMemoryUsageGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryUsage)
|
||||||
|
tracker.ClusterMemoryAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryAvail)
|
||||||
|
tracker.ClusterMemoryTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryTotal)
|
||||||
|
|
||||||
|
tracker.ClusterDiskUsageGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskUsage)
|
||||||
|
tracker.ClusterDiskAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskAvail)
|
||||||
|
tracker.ClusterDiskTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskTotal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
package vm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/jinzhu/copier"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-openstack/openstack"
|
||||||
|
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetImageNumLogic struct {
|
||||||
|
logx.Logger
|
||||||
|
ctx context.Context
|
||||||
|
svcCtx *svc.ServiceContext
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGetImageNumLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetImageNumLogic {
|
||||||
|
return &GetImageNumLogic{
|
||||||
|
Logger: logx.WithContext(ctx),
|
||||||
|
ctx: ctx,
|
||||||
|
svcCtx: svcCtx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *GetImageNumLogic) GetImageNum(req *types.ListImagesReq) (resp *types.ImageNum, err error) {
|
||||||
|
// todo: add your logic here and delete this line
|
||||||
|
resp = &types.ImageNum{}
|
||||||
|
ListImagesReq := &openstack.ListImagesReq{}
|
||||||
|
err = copier.CopyWithOption(ListImagesReq, req, copier.Option{Converters: utils.Converters})
|
||||||
|
ListImagesResp, err := l.svcCtx.OpenstackRpc.ListImages(l.ctx, ListImagesReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get Networks list"), "Failed to get db Servers list err : %v ,req:%+v", err, req)
|
||||||
|
}
|
||||||
|
var count int = len(ListImagesResp.Images)
|
||||||
|
resp.ImageNum = int32(count)
|
||||||
|
fmt.Println(count)
|
||||||
|
if err != nil {
|
||||||
|
return nil, result.NewDefaultError(err.Error())
|
||||||
|
}
|
||||||
|
return resp, err
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package vm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/jinzhu/copier"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/xerr"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-openstack/openstack"
|
||||||
|
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GetNetworkNumLogic struct {
|
||||||
|
logx.Logger
|
||||||
|
ctx context.Context
|
||||||
|
svcCtx *svc.ServiceContext
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGetNetworkNumLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetNetworkNumLogic {
|
||||||
|
return &GetNetworkNumLogic{
|
||||||
|
Logger: logx.WithContext(ctx),
|
||||||
|
ctx: ctx,
|
||||||
|
svcCtx: svcCtx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *GetNetworkNumLogic) GetNetworkNum(req *types.ListNetworksReq) (resp *types.NetworkNum, err error) {
|
||||||
|
// todo: add your logic here and delete this line
|
||||||
|
resp = &types.NetworkNum{}
|
||||||
|
ListNetworksReq := &openstack.ListNetworksReq{}
|
||||||
|
err = copier.CopyWithOption(ListNetworksReq, req, copier.Option{Converters: utils.Converters})
|
||||||
|
ListNetworksResp, err := l.svcCtx.OpenstackRpc.ListNetworks(l.ctx, ListNetworksReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get Networks list"), "Failed to get db Servers list err : %v ,req:%+v", err, req)
|
||||||
|
}
|
||||||
|
var count int = len(ListNetworksResp.Networks)
|
||||||
|
resp.NetworkNum = int32(count)
|
||||||
|
fmt.Println(count)
|
||||||
|
if err != nil {
|
||||||
|
return nil, result.NewDefaultError(err.Error())
|
||||||
|
}
|
||||||
|
return resp, err
|
||||||
|
}
|
|
@ -49,7 +49,7 @@ func (l *ListNetworksLogic) ListNetworks(req *types.ListNetworksReq) (resp *type
|
||||||
err = copier.CopyWithOption(ListNetworksReq, req, copier.Option{Converters: utils.Converters})
|
err = copier.CopyWithOption(ListNetworksReq, req, copier.Option{Converters: utils.Converters})
|
||||||
ListNetworksResp, err := l.svcCtx.OpenstackRpc.ListNetworks(l.ctx, ListNetworksReq)
|
ListNetworksResp, err := l.svcCtx.OpenstackRpc.ListNetworks(l.ctx, ListNetworksReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get Servers list"), "Failed to get db Servers list err : %v ,req:%+v", err, req)
|
return nil, errors.Wrapf(xerr.NewErrMsg("Failed to get Networks list"), "Failed to get db Servers list err : %v ,req:%+v", err, req)
|
||||||
}
|
}
|
||||||
marshal, err := json.Marshal(&ListNetworksResp)
|
marshal, err := json.Marshal(&ListNetworksResp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
package mqs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
type VmMq struct {
|
||||||
|
ctx context.Context
|
||||||
|
svcCtx *svc.ServiceContext
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewVmMq(ctx context.Context, svcCtx *svc.ServiceContext) *VmMq {
|
||||||
|
return &VmMq{
|
||||||
|
ctx: ctx,
|
||||||
|
svcCtx: svcCtx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *VmMq) Consume(val string) error {
|
||||||
|
// 接受消息, 根据标签筛选过滤
|
||||||
|
vmScheduler := schedulers.NewVmScheduler()
|
||||||
|
schdl, err := scheduler.NewScheduler(vmScheduler, val, l.svcCtx.DbEngin, l.svcCtx.ParticipantRpc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
//检测是否指定了集群列表
|
||||||
|
schdl.SpecifyClusters()
|
||||||
|
|
||||||
|
//检测是否指定了nsID
|
||||||
|
schdl.SpecifyNsID()
|
||||||
|
|
||||||
|
//通过标签匹配筛选出集群范围
|
||||||
|
schdl.MatchLabels()
|
||||||
|
|
||||||
|
//todo 屏蔽原调度算法,因为监控数据暂未上报,临时采用随机调度
|
||||||
|
schdl.TempAssign()
|
||||||
|
|
||||||
|
// 存储数据
|
||||||
|
err = schdl.SaveToDb()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -15,18 +15,11 @@
|
||||||
package common
|
package common
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
"math"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SubSchedule interface {
|
|
||||||
GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
|
|
||||||
PickOptimalStrategy() (strategy.Strategy, error)
|
|
||||||
AssignTask(clusters []*strategy.AssignedCluster) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// 求交集
|
// 求交集
|
||||||
func Intersect(slice1, slice2 []int64) []int64 {
|
func Intersect(slice1, slice2 []int64) []int64 {
|
||||||
m := make(map[int64]int)
|
m := make(map[int64]int)
|
||||||
|
@ -90,3 +83,8 @@ func MicsSlice(origin []int64, count int) []int64 {
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RoundFloat(val float64, precision uint) float64 {
|
||||||
|
ratio := math.Pow(10, float64(precision))
|
||||||
|
return math.Round(val*ratio) / ratio
|
||||||
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/rpc/client/participantservice"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
@ -33,7 +34,7 @@ import (
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
task *response.TaskInfo
|
task *response.TaskInfo
|
||||||
participantIds []int64
|
participantIds []int64
|
||||||
subSchedule common.SubSchedule
|
subSchedule SubSchedule
|
||||||
dbEngin *gorm.DB
|
dbEngin *gorm.DB
|
||||||
result []string //pID:子任务yamlstring 键值对
|
result []string //pID:子任务yamlstring 键值对
|
||||||
participantRpc participantservice.ParticipantService
|
participantRpc participantservice.ParticipantService
|
||||||
|
@ -43,7 +44,13 @@ type Scheduler struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewScheduler(subSchedule common.SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
|
type SubSchedule interface {
|
||||||
|
GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
|
||||||
|
PickOptimalStrategy() (strategy.Strategy, error)
|
||||||
|
AssignTask(clusters []*strategy.AssignedCluster) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*Scheduler, error) {
|
||||||
var task *response.TaskInfo
|
var task *response.TaskInfo
|
||||||
err := json.Unmarshal([]byte(val), &task)
|
err := json.Unmarshal([]byte(val), &task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -123,7 +130,7 @@ func (s *Scheduler) TempAssign() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) AssignAndSchedule(ss common.SubSchedule) error {
|
func (s *Scheduler) AssignAndSchedule(ss SubSchedule) error {
|
||||||
//// 已指定 ParticipantId
|
//// 已指定 ParticipantId
|
||||||
//if s.task.ParticipantId != 0 {
|
//if s.task.ParticipantId != 0 {
|
||||||
// return nil
|
// return nil
|
||||||
|
|
|
@ -74,6 +74,9 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
||||||
case strategy.RESOURCES_PRICING:
|
case strategy.RESOURCES_PRICING:
|
||||||
strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1})
|
strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{Params: params, Replicas: 1})
|
||||||
return strategy, nil
|
return strategy, nil
|
||||||
|
case strategy.DYNAMIC_RESOURCES:
|
||||||
|
strategy := strategy.NewDynamicResourcesStrategy(resources, as.option, 1)
|
||||||
|
return strategy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.New("no strategy has been chosen")
|
return nil, errors.New("no strategy has been chosen")
|
||||||
|
|
|
@ -4,6 +4,7 @@ type AiOption struct {
|
||||||
AiClusterId string // shuguangAi /octopus ClusterId
|
AiClusterId string // shuguangAi /octopus ClusterId
|
||||||
TaskName string
|
TaskName string
|
||||||
ResourceType string // cpu/gpu/compute card
|
ResourceType string // cpu/gpu/compute card
|
||||||
|
CpuCoreNum int64
|
||||||
TaskType string // pytorch/tensorflow/mindspore
|
TaskType string // pytorch/tensorflow/mindspore
|
||||||
DatasetsName string // mnist/imageNet/iris
|
DatasetsName string // mnist/imageNet/iris
|
||||||
StrategyName string
|
StrategyName string
|
||||||
|
@ -29,3 +30,7 @@ type AiOption struct {
|
||||||
Image string
|
Image string
|
||||||
Model interface{}
|
Model interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a AiOption) GetOptionType() string {
|
||||||
|
return AI
|
||||||
|
}
|
||||||
|
|
|
@ -1,5 +1,11 @@
|
||||||
package option
|
package option
|
||||||
|
|
||||||
type Option struct {
|
const (
|
||||||
Name string
|
AI = "ai"
|
||||||
|
CLOUD = "cloud"
|
||||||
|
HPC = "hpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Option interface {
|
||||||
|
GetOptionType() string
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,21 +1,67 @@
|
||||||
package schedulers
|
package schedulers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/algorithm/providerPricing"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/database"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type VmScheduler struct {
|
type VmScheduler struct {
|
||||||
|
storage database.Storage
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
|
func NewVmScheduler() *VmScheduler {
|
||||||
//TODO implement me
|
return &VmScheduler{}
|
||||||
panic("implement me")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
func (vm *VmScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
|
||||||
|
//获取所有计算中心
|
||||||
|
//调度算法
|
||||||
|
strategy := strategy.NewPricingStrategy(¶m.ResourcePricingParams{})
|
||||||
|
return strategy, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *VmScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
|
||||||
//TODO implement me
|
//TODO implement me
|
||||||
panic("implement me")
|
vm := models.Vm{}
|
||||||
|
utils.Convert(task.Metadata, &vm)
|
||||||
|
vm.Id = utils.GenSnowflakeID()
|
||||||
|
vm.TaskId = vm.TaskId
|
||||||
|
vm.Status = constants.Saved
|
||||||
|
vm.ParticipantId = participantId
|
||||||
|
return vm, nil
|
||||||
|
//vm.YamlString =v.yamlString
|
||||||
|
/* vm. = utils.GenSnowflakeID()
|
||||||
|
vm.NsID = task.NsID
|
||||||
|
vm.ParticipantId = participantId*/
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
func (vm *VmScheduler) UnMarshalVmStruct(yamlString string, taskId int64, nsID string) models.vm {
|
||||||
|
var vm models.Vm
|
||||||
|
vm := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*providerPricing.Provider, error) {
|
||||||
|
proParams, err := vm.storage.GetProviderParams()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
var providerList []*providerPricing.Provider
|
||||||
|
for _, p := range proParams {
|
||||||
|
provider := providerPricing.NewProvider(p.Participant_id, p.Cpu_avail, p.Mem_avail, p.Disk_avail, 0.0, 0.0, 0.0)
|
||||||
|
providerList = append(providerList, provider)
|
||||||
|
}
|
||||||
|
|
||||||
|
//replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
|
||||||
|
//t := algorithm.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000)
|
||||||
|
|
||||||
|
return nil, providerList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
|
func (v VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) error {
|
||||||
|
|
|
@ -8,20 +8,22 @@ type AiCollector interface {
|
||||||
type ResourceStats struct {
|
type ResourceStats struct {
|
||||||
ParticipantId int64
|
ParticipantId int64
|
||||||
Name string
|
Name string
|
||||||
CpuAvail float64
|
CpuCoreAvail int64
|
||||||
MemAvail float64
|
MemAvail float64
|
||||||
DiskAvail float64
|
DiskAvail float64
|
||||||
GpuAvail float64
|
GpuAvail int64
|
||||||
CardToHours map[Card]float64
|
CardsAvail []*Card
|
||||||
CpuToHours map[int]float64
|
CpuCoreHours float64
|
||||||
Balance float64
|
Balance float64
|
||||||
}
|
}
|
||||||
|
|
||||||
type Card struct {
|
type Card struct {
|
||||||
|
Platform string
|
||||||
Type string
|
Type string
|
||||||
Name string
|
Name string
|
||||||
TOpsAtFp16 float64
|
TOpsAtFp16 float64
|
||||||
Price int32
|
CardHours float64
|
||||||
|
Num int32
|
||||||
}
|
}
|
||||||
|
|
||||||
type DatasetsSpecs struct {
|
type DatasetsSpecs struct {
|
||||||
|
|
|
@ -1,8 +1,70 @@
|
||||||
package strategy
|
package strategy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||||
|
)
|
||||||
|
|
||||||
type DynamicResourcesStrategy struct {
|
type DynamicResourcesStrategy struct {
|
||||||
|
replicas int32
|
||||||
|
resources []*collector.ResourceStats
|
||||||
|
opt option.Option
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDynamicResourcesStrategy(resources []*collector.ResourceStats, opt option.Option, replica int32) *DynamicResourcesStrategy {
|
||||||
|
return &DynamicResourcesStrategy{resources: resources, opt: opt, replicas: replica}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
|
func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
|
||||||
return nil, nil
|
if ps.replicas < 1 {
|
||||||
|
return nil, errors.New("replicas must be greater than 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
switch ps.opt.GetOptionType() {
|
||||||
|
case option.AI:
|
||||||
|
opt := (interface{})(ps.opt).(*option.AiOption)
|
||||||
|
|
||||||
|
var maxCardHoursAvailable float64
|
||||||
|
var maxCpuCoreHoursAvailable float64
|
||||||
|
var assignedCluster *AssignedCluster
|
||||||
|
var results []*AssignedCluster
|
||||||
|
for _, res := range ps.resources {
|
||||||
|
if opt.ResourceType == "" {
|
||||||
|
if res.CpuCoreHours <= 0 {
|
||||||
|
cluster := &AssignedCluster{ParticipantId: res.ParticipantId, Name: res.Name, Replicas: ps.replicas}
|
||||||
|
results = append(results, cluster)
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.CpuCoreHours > maxCpuCoreHoursAvailable {
|
||||||
|
maxCpuCoreHoursAvailable = res.CpuCoreHours
|
||||||
|
assignedCluster.Name = res.Name
|
||||||
|
assignedCluster.ParticipantId = res.ParticipantId
|
||||||
|
assignedCluster.Replicas = ps.replicas
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if opt.ResourceType == "" {
|
||||||
|
var maxCurrentCardHours float64
|
||||||
|
for _, card := range res.CardsAvail {
|
||||||
|
cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3)
|
||||||
|
if cardHours > maxCurrentCardHours {
|
||||||
|
maxCurrentCardHours = cardHours
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if maxCurrentCardHours > maxCardHoursAvailable {
|
||||||
|
maxCardHoursAvailable = maxCurrentCardHours
|
||||||
|
assignedCluster.Name = res.Name
|
||||||
|
assignedCluster.ParticipantId = res.ParticipantId
|
||||||
|
assignedCluster.Replicas = ps.replicas
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results = append(results, assignedCluster)
|
||||||
|
return results, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, errors.New("failed to apply DynamicResourcesStrategy")
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ func (r *ResourcePricingParams) GetProviders() []*providerPricing.Provider {
|
||||||
for _, resource := range r.Resources {
|
for _, resource := range r.Resources {
|
||||||
provider := providerPricing.NewProvider(
|
provider := providerPricing.NewProvider(
|
||||||
resource.ParticipantId,
|
resource.ParticipantId,
|
||||||
resource.CpuAvail,
|
float64(resource.CpuCoreAvail),
|
||||||
resource.MemAvail,
|
resource.MemAvail,
|
||||||
resource.DiskAvail, 0.0, 0.0, 0.0)
|
resource.DiskAvail, 0.0, 0.0, 0.0)
|
||||||
providerList = append(providerList, provider)
|
providerList = append(providerList, provider)
|
||||||
|
|
|
@ -60,6 +60,10 @@ var (
|
||||||
MLU: CAMBRICON,
|
MLU: CAMBRICON,
|
||||||
GCU: ENFLAME,
|
GCU: ENFLAME,
|
||||||
}
|
}
|
||||||
|
cardTopsMap = map[string]float64{
|
||||||
|
MLU: CAMBRICONMLU290,
|
||||||
|
GCU: EnflameT20,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink {
|
func NewOctopusLink(ctx context.Context, svcCtx *svc.ServiceContext, name string, id int64) *OctopusLink {
|
||||||
|
@ -245,13 +249,49 @@ func (o *OctopusLink) GetResourceStats() (*collector.ResourceStats, error) {
|
||||||
return nil, errors.New(balanceResp.Error.Message)
|
return nil, errors.New(balanceResp.Error.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
//resourceStat := collector.ResourceStats{}
|
var cards []*collector.Card
|
||||||
//
|
balance := float64(balanceResp.Payload.BillingUser.Amount)
|
||||||
//for _, spec := range specResp.TrainResourceSpecs {
|
var cpuHours float64
|
||||||
//
|
for _, spec := range specResp.TrainResourceSpecs {
|
||||||
//}
|
if spec.Price == 0 {
|
||||||
|
ns := strings.Split(spec.Name, COMMA)
|
||||||
|
if len(ns) == 2 {
|
||||||
|
nss := strings.Split(ns[0], COLON)
|
||||||
|
if nss[0] == CPU {
|
||||||
|
cpuHours = -1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil, nil
|
if spec.Price == 1 {
|
||||||
|
ns := strings.Split(spec.Name, COMMA)
|
||||||
|
cardSpecs := strings.Split(ns[0], STAR)
|
||||||
|
|
||||||
|
cardTops, isMapContainsKey := cardTopsMap[cardSpecs[1]]
|
||||||
|
if !isMapContainsKey {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
card := &collector.Card{
|
||||||
|
Platform: OCTOPUS,
|
||||||
|
Type: CARD,
|
||||||
|
Name: cardSpecs[1],
|
||||||
|
TOpsAtFp16: cardTops,
|
||||||
|
CardHours: balance / spec.Price,
|
||||||
|
}
|
||||||
|
cards = append(cards, card)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
resourceStats := &collector.ResourceStats{
|
||||||
|
ParticipantId: o.participantId,
|
||||||
|
Name: o.platform,
|
||||||
|
Balance: balance,
|
||||||
|
CardsAvail: cards,
|
||||||
|
CpuCoreHours: cpuHours,
|
||||||
|
}
|
||||||
|
|
||||||
|
return resourceStats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
||||||
|
@ -349,6 +389,7 @@ func (o *OctopusLink) generateResourceId(option *option.AiOption) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return errors.New("failed to get ResourceId")
|
return errors.New("failed to get ResourceId")
|
||||||
|
@ -433,7 +474,14 @@ func (o *OctopusLink) generateImageId(option *option.AiOption) error {
|
||||||
|
|
||||||
func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error {
|
func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error {
|
||||||
// temporarily set algorithm to cnn
|
// temporarily set algorithm to cnn
|
||||||
|
if option.AlgorithmName == "" {
|
||||||
|
switch option.DatasetsName {
|
||||||
|
case "cifar10":
|
||||||
option.AlgorithmName = "cnn"
|
option.AlgorithmName = "cnn"
|
||||||
|
case "mnist":
|
||||||
|
option.AlgorithmName = "fcn"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
req := &octopus.GetMyAlgorithmListReq{
|
req := &octopus.GetMyAlgorithmListReq{
|
||||||
Platform: o.platform,
|
Platform: o.platform,
|
||||||
|
@ -457,14 +505,26 @@ func (o *OctopusLink) generateAlgorithmId(option *option.AiOption) error {
|
||||||
if ns[1] != option.AlgorithmName {
|
if ns[1] != option.AlgorithmName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ns[2] != option.ResourceType {
|
switch option.ResourceType {
|
||||||
|
case CPU:
|
||||||
|
if ns[2] != CPU {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
case CARD:
|
||||||
|
if ns[2] != strings.ToLower(option.ComputeCard) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
option.AlgorithmId = algorithm.AlgorithmId
|
option.AlgorithmId = algorithm.AlgorithmId
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if option.AlgorithmId == "" {
|
||||||
|
return errors.New("Algorithm does not exist")
|
||||||
|
}
|
||||||
|
|
||||||
return errors.New("failed to get AlgorithmId")
|
return errors.New("failed to get AlgorithmId")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -487,7 +547,10 @@ func (o *OctopusLink) generateEnv(option *option.AiOption) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *OctopusLink) generateParams(option *option.AiOption) error {
|
func (o *OctopusLink) generateParams(option *option.AiOption) error {
|
||||||
|
if len(option.Params) == 0 {
|
||||||
|
epoch := "epoch" + COMMA + "1"
|
||||||
|
option.Params = append(option.Params, epoch)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@ package storeLink
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||||
|
@ -42,6 +43,8 @@ const (
|
||||||
DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset"
|
DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset"
|
||||||
ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm"
|
ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm"
|
||||||
TRAIN_FILE = "train.py"
|
TRAIN_FILE = "train.py"
|
||||||
|
CPUCOREPRICEPERHOUR = 0.09
|
||||||
|
DCUPRICEPERHOUR = 2.0
|
||||||
)
|
)
|
||||||
|
|
||||||
var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
|
var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
|
||||||
|
@ -197,9 +200,9 @@ func (s *ShuguangAi) SubmitTensorflowTask(imageId string, cmd string, envs []str
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
func (s *ShuguangAi) SubmitTask(imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
|
||||||
// set algorithmId temporarily
|
// set algorithmId temporarily for storelink submit
|
||||||
if algorithmId == "" {
|
if algorithmId == "" {
|
||||||
algorithmId = "pytorch-mnist-fully_connected_network"
|
algorithmId = "pytorch-mnist-fcn"
|
||||||
}
|
}
|
||||||
|
|
||||||
// shuguangAi提交任务
|
// shuguangAi提交任务
|
||||||
|
@ -268,24 +271,41 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
limitReq := &hpcAC.QueueReq{}
|
|
||||||
_, err = s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
diskReq := &hpcAC.ParaStorQuotaReq{}
|
|
||||||
_, err = s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
|
//limitReq := &hpcAC.QueueReq{}
|
||||||
|
//limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
|
||||||
|
//if err != nil {
|
||||||
|
// return nil, err
|
||||||
|
//}
|
||||||
|
|
||||||
|
//diskReq := &hpcAC.ParaStorQuotaReq{}
|
||||||
|
//diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
|
||||||
|
//if err != nil {
|
||||||
|
// return nil, err
|
||||||
|
//}
|
||||||
|
|
||||||
|
var cards []*collector.Card
|
||||||
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
|
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
|
||||||
_ = &collector.ResourceStats{
|
cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
|
||||||
|
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)
|
||||||
|
|
||||||
|
dcu := &collector.Card{
|
||||||
|
Platform: SHUGUANGAI,
|
||||||
|
Type: CARD,
|
||||||
|
Name: DCU,
|
||||||
|
TOpsAtFp16: DCU_TOPS,
|
||||||
|
CardHours: cardHours,
|
||||||
|
}
|
||||||
|
cards = append(cards, dcu)
|
||||||
|
resourceStats := &collector.ResourceStats{
|
||||||
ParticipantId: s.participantId,
|
ParticipantId: s.participantId,
|
||||||
Name: s.platform,
|
Name: s.platform,
|
||||||
Balance: balance,
|
Balance: balance,
|
||||||
|
CardsAvail: cards,
|
||||||
|
CpuCoreHours: cpuHours,
|
||||||
}
|
}
|
||||||
return nil, nil
|
|
||||||
|
return resourceStats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
||||||
|
@ -413,6 +433,7 @@ func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error {
|
||||||
if option.DatasetsName == "" {
|
if option.DatasetsName == "" {
|
||||||
return errors.New("DatasetsName not set")
|
return errors.New("DatasetsName not set")
|
||||||
}
|
}
|
||||||
|
|
||||||
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0}
|
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0}
|
||||||
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
|
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -426,11 +447,32 @@ func (s *ShuguangAi) generateAlgorithmId(option *option.AiOption) error {
|
||||||
for _, file := range list.Data.FileList {
|
for _, file := range list.Data.FileList {
|
||||||
ns := strings.Split(file.Name, DASH)
|
ns := strings.Split(file.Name, DASH)
|
||||||
if ns[0] == option.DatasetsName {
|
if ns[0] == option.DatasetsName {
|
||||||
algorithmId = option.TaskType + DASH + file.Name
|
algoName := ns[1]
|
||||||
|
if option.AlgorithmName == "" {
|
||||||
|
switch option.DatasetsName {
|
||||||
|
case "cifar10":
|
||||||
|
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "cnn"
|
||||||
option.AlgorithmId = algorithmId
|
option.AlgorithmId = algorithmId
|
||||||
option.AlgorithmName = ns[1]
|
option.AlgorithmName = algoName
|
||||||
|
return nil
|
||||||
|
case "mnist":
|
||||||
|
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "fcn"
|
||||||
|
option.AlgorithmId = algorithmId
|
||||||
|
option.AlgorithmName = algoName
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if algoName == option.AlgorithmName {
|
||||||
|
algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + algoName
|
||||||
|
option.AlgorithmId = algorithmId
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if algorithmId == "" {
|
||||||
|
return errors.New("Algorithm does not exist")
|
||||||
}
|
}
|
||||||
|
|
||||||
return errors.New("failed to get AlgorithmId")
|
return errors.New("failed to get AlgorithmId")
|
||||||
|
@ -451,8 +493,10 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error {
|
||||||
return errors.New("ResourceType not set")
|
return errors.New("ResourceType not set")
|
||||||
}
|
}
|
||||||
|
|
||||||
//epoch := "epoch" + COMMA + "1"
|
if len(option.Params) == 0 {
|
||||||
//option.Params = append(option.Params, epoch)
|
epoch := "epoch" + COMMA + "1"
|
||||||
|
option.Params = append(option.Params, epoch)
|
||||||
|
}
|
||||||
|
|
||||||
switch option.ResourceType {
|
switch option.ResourceType {
|
||||||
case CPU:
|
case CPU:
|
||||||
|
|
|
@ -59,8 +59,9 @@ type ServiceContext struct {
|
||||||
Downloader *s3manager.Downloader
|
Downloader *s3manager.Downloader
|
||||||
Uploader *s3manager.Uploader
|
Uploader *s3manager.Uploader
|
||||||
K8sRpc kubernetesclient.Kubernetes
|
K8sRpc kubernetesclient.Kubernetes
|
||||||
PromClient map[int64]tracker.Prometheus
|
MonitorClient map[int64]tracker.Prometheus
|
||||||
ParticipantRpc participantservice.ParticipantService
|
ParticipantRpc participantservice.ParticipantService
|
||||||
|
PromClient tracker.Prometheus
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServiceContext(c config.Config) *ServiceContext {
|
func NewServiceContext(c config.Config) *ServiceContext {
|
||||||
|
@ -72,8 +73,14 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
||||||
DisableSSL: aws.Bool(false), //是否禁用https,这里表示不禁用,即使用HTTPS
|
DisableSSL: aws.Bool(false), //是否禁用https,这里表示不禁用,即使用HTTPS
|
||||||
S3ForcePathStyle: aws.Bool(true), //使用路径样式而非虚拟主机样式,区别请参考:https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
|
S3ForcePathStyle: aws.Bool(true), //使用路径样式而非虚拟主机样式,区别请参考:https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
|
||||||
})
|
})
|
||||||
|
promClient, err := tracker.NewPrometheus(c.PromUrl)
|
||||||
|
if err != nil {
|
||||||
|
logx.Errorf("InitPrometheus err: %v", err)
|
||||||
|
panic("InitSnowflake err")
|
||||||
|
}
|
||||||
|
|
||||||
//添加snowflake支持
|
//添加snowflake支持
|
||||||
err := utils.InitSnowflake(c.SnowflakeConf.MachineId)
|
err = utils.InitSnowflake(c.SnowflakeConf.MachineId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logx.Errorf("InitSnowflake err: %v", err)
|
logx.Errorf("InitSnowflake err: %v", err)
|
||||||
panic("InitSnowflake err")
|
panic("InitSnowflake err")
|
||||||
|
@ -122,10 +129,11 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
||||||
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
|
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
|
||||||
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
|
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
|
||||||
K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)),
|
K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)),
|
||||||
PromClient: make(map[int64]tracker.Prometheus),
|
MonitorClient: make(map[int64]tracker.Prometheus),
|
||||||
ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)),
|
ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)),
|
||||||
DockerClient: dockerClient,
|
DockerClient: dockerClient,
|
||||||
Downloader: downloader,
|
Downloader: downloader,
|
||||||
Uploader: uploader,
|
Uploader: uploader,
|
||||||
|
PromClient: promClient,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,9 @@ type CenterResourcesResp struct {
|
||||||
|
|
||||||
type CenterIndex struct {
|
type CenterIndex struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Cpu float32 `json:"cpu"`
|
Cpu string `json:"cpu"`
|
||||||
Memory float32 `json:"memory"`
|
Memory string `json:"memory"`
|
||||||
Storage float32 `json:"storage"`
|
Storage string `json:"storage"`
|
||||||
CenterType string `json:"centerType"`
|
CenterType string `json:"centerType"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,9 +29,16 @@ type SyncClusterLoadReq struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClusterLoadRecord struct {
|
type ClusterLoadRecord struct {
|
||||||
|
AdapterId int64 `json:"adapterId"`
|
||||||
ClusterName string `json:"clusterName"`
|
ClusterName string `json:"clusterName"`
|
||||||
|
CpuAvail float64 `json:"cpuAvail"`
|
||||||
|
CpuTotal float64 `json:"cpuTotal"`
|
||||||
CpuUsage float64 `json:"cpuUsage"`
|
CpuUsage float64 `json:"cpuUsage"`
|
||||||
|
MemoryAvail float64 `json:"memoryAvail"`
|
||||||
MemoryUsage float64 `json:"memoryUsage"`
|
MemoryUsage float64 `json:"memoryUsage"`
|
||||||
|
MemoryTotal float64 `json:"memoryTotal"`
|
||||||
|
DiskAvail float64 `json:"diskAvail"`
|
||||||
|
DiskTotal float64 `json:"diskTotal"`
|
||||||
DiskUsage float64 `json:"diskUsage"`
|
DiskUsage float64 `json:"diskUsage"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,6 +74,7 @@ type CommitTaskReq struct {
|
||||||
Replicas int64 `json:"replicas,optional"`
|
Replicas int64 `json:"replicas,optional"`
|
||||||
MatchLabels map[string]string `json:"matchLabels,optional"`
|
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||||
YamlList []string `json:"yamlList"`
|
YamlList []string `json:"yamlList"`
|
||||||
|
ClusterName string `json:"clusterName"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ScheduleTaskByYamlReq struct {
|
type ScheduleTaskByYamlReq struct {
|
||||||
|
@ -88,7 +96,11 @@ type TaskYaml struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type CommitVmTaskReq struct {
|
type CommitVmTaskReq struct {
|
||||||
Server ServerCommit `json:"server,optional"`
|
Name string `json:"name"`
|
||||||
|
NsID string `json:"nsID"`
|
||||||
|
Replicas int64 `json:"replicas,optional"`
|
||||||
|
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||||
|
Server []ServerCommit `json:"server,optional"`
|
||||||
Platform string `json:"platform,optional"`
|
Platform string `json:"platform,optional"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,6 +131,20 @@ type Block_device_mapping_v2Commit struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type CommitVmTaskResp struct {
|
type CommitVmTaskResp struct {
|
||||||
|
Id string `json:"id" copier:"Id"`
|
||||||
|
Links []VmLinks `json:"links" copier:"Links"`
|
||||||
|
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
|
||||||
|
SecurityGroups []VmSecurity_groups_server `json:"security_groups" copier:"SecurityGroups"`
|
||||||
|
AdminPass string `json:"adminPass" copier:"AdminPass"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type VmLinks struct {
|
||||||
|
Href string `json:"href " copier:"Href"`
|
||||||
|
Rel string `json:"rel" copier:"Rel"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type VmSecurity_groups_server struct {
|
||||||
|
Name string `json:"name" copier:"Name"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ScheduleTaskByYamlResp struct {
|
type ScheduleTaskByYamlResp struct {
|
||||||
|
@ -2853,14 +2879,6 @@ type CreNetwork struct {
|
||||||
Uuid string `json:"uuid" copier:"Uuid"`
|
Uuid string `json:"uuid" copier:"Uuid"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerResp struct {
|
|
||||||
Id string `json:"id" copier:"Id"`
|
|
||||||
Links []Links `json:"links" copier:"Links"`
|
|
||||||
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
|
|
||||||
SecurityGroups []Security_groups_server `json:"security_groups" copier:"SecurityGroups"`
|
|
||||||
AdminPass string `json:"adminPass" copier:"AdminPass"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type Security_groups_server struct {
|
type Security_groups_server struct {
|
||||||
Name string `json:"name" copier:"Name"`
|
Name string `json:"name" copier:"Name"`
|
||||||
}
|
}
|
||||||
|
@ -2873,6 +2891,14 @@ type Block_device_mapping_v2 struct {
|
||||||
DeleteOnTermination bool `json:"delete_on_termination" copier:"DeleteOnTermination"`
|
DeleteOnTermination bool `json:"delete_on_termination" copier:"DeleteOnTermination"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ServerResp struct {
|
||||||
|
Id string `json:"id" copier:"Id"`
|
||||||
|
Links []Links `json:"links" copier:"Links"`
|
||||||
|
OSDCFDiskConfig string `json:"OS_DCF_diskConfig" copier:"OSDCFDiskConfig"`
|
||||||
|
SecurityGroups []Security_groups_server `json:"security_groups" copier:"SecurityGroups"`
|
||||||
|
AdminPass string `json:"adminPass" copier:"AdminPass"`
|
||||||
|
}
|
||||||
|
|
||||||
type RebuildServerReq struct {
|
type RebuildServerReq struct {
|
||||||
ServerId string `json:"server_id" copier:"ServerId"`
|
ServerId string `json:"server_id" copier:"ServerId"`
|
||||||
Platform string `json:"platform,optional"`
|
Platform string `json:"platform,optional"`
|
||||||
|
@ -3180,6 +3206,20 @@ type DeleteImageResp struct {
|
||||||
ErrorMsg string `json:"errorMsg,omitempty"`
|
ErrorMsg string `json:"errorMsg,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type NetworkNum struct {
|
||||||
|
NetworkNum int32 `json:"networkNum"`
|
||||||
|
Code int32 `json:"code,omitempty"`
|
||||||
|
Msg string `json:"msg,omitempty"`
|
||||||
|
ErrorMsg string `json:"errorMsg,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ImageNum struct {
|
||||||
|
ImageNum int32 `json:"imageNum"`
|
||||||
|
Code int32 `json:"code,omitempty"`
|
||||||
|
Msg string `json:"msg,omitempty"`
|
||||||
|
ErrorMsg string `json:"errorMsg,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
type ListNetworksReq struct {
|
type ListNetworksReq struct {
|
||||||
Platform string `json:"platform,optional"`
|
Platform string `json:"platform,optional"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
package models
|
||||||
|
|
||||||
|
import "github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||||
|
|
||||||
|
var _ VmModel = (*customVmModel)(nil)
|
||||||
|
|
||||||
|
type (
|
||||||
|
// VmModel is an interface to be customized, add more methods here,
|
||||||
|
// and implement the added methods in customVmModel.
|
||||||
|
VmModel interface {
|
||||||
|
vmModel
|
||||||
|
}
|
||||||
|
|
||||||
|
customVmModel struct {
|
||||||
|
*defaultVmModel
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewVmModel returns a model for the database table.
|
||||||
|
func NewVmModel(conn sqlx.SqlConn) VmModel {
|
||||||
|
return &customVmModel{
|
||||||
|
defaultVmModel: newVmModel(conn),
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,101 @@
|
||||||
|
// Code generated by goctl. DO NOT EDIT.
|
||||||
|
|
||||||
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/stores/builder"
|
||||||
|
"github.com/zeromicro/go-zero/core/stores/sqlc"
|
||||||
|
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||||
|
"github.com/zeromicro/go-zero/core/stringx"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
vmFieldNames = builder.RawFieldNames(&Vm{})
|
||||||
|
vmRows = strings.Join(vmFieldNames, ",")
|
||||||
|
vmRowsExpectAutoSet = strings.Join(stringx.Remove(vmFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",")
|
||||||
|
vmRowsWithPlaceHolder = strings.Join(stringx.Remove(vmFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
vmModel interface {
|
||||||
|
Insert(ctx context.Context, data *Vm) (sql.Result, error)
|
||||||
|
FindOne(ctx context.Context, id int64) (*Vm, error)
|
||||||
|
Update(ctx context.Context, data *Vm) error
|
||||||
|
Delete(ctx context.Context, id int64) error
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultVmModel struct {
|
||||||
|
conn sqlx.SqlConn
|
||||||
|
table string
|
||||||
|
}
|
||||||
|
|
||||||
|
Vm struct {
|
||||||
|
Id int64 `db:"id"` // id
|
||||||
|
TaskId int64 `db:"task_id"` // 任务id
|
||||||
|
ParticipantId int64 `db:"participant_id"` // p端id
|
||||||
|
ApiVersion sql.NullString `db:"api_version"` // api版本
|
||||||
|
Name sql.NullString `db:"name"` // 名字
|
||||||
|
Namespace sql.NullString `db:"namespace"` // 命名空间
|
||||||
|
Kind sql.NullString `db:"kind"` // 种类
|
||||||
|
CreatedBy sql.NullInt64 `db:"created_by"` // 创建人
|
||||||
|
CreatedTime sql.NullTime `db:"created_time"` // 创建时间
|
||||||
|
UpdateBy sql.NullInt64 `db:"update_by"` // 修改人
|
||||||
|
UpdateTime sql.NullTime `db:"update_time"` // 修改时间
|
||||||
|
Status string `db:"status"` // 状态
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func newVmModel(conn sqlx.SqlConn) *defaultVmModel {
|
||||||
|
return &defaultVmModel{
|
||||||
|
conn: conn,
|
||||||
|
table: "`vm`",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultVmModel) withSession(session sqlx.Session) *defaultVmModel {
|
||||||
|
return &defaultVmModel{
|
||||||
|
conn: sqlx.NewSqlConnFromSession(session),
|
||||||
|
table: "`vm`",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultVmModel) Delete(ctx context.Context, id int64) error {
|
||||||
|
query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
|
||||||
|
_, err := m.conn.ExecCtx(ctx, query, id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultVmModel) FindOne(ctx context.Context, id int64) (*Vm, error) {
|
||||||
|
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", vmRows, m.table)
|
||||||
|
var resp Vm
|
||||||
|
err := m.conn.QueryRowCtx(ctx, &resp, query, id)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
return &resp, nil
|
||||||
|
case sqlc.ErrNotFound:
|
||||||
|
return nil, ErrNotFound
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultVmModel) Insert(ctx context.Context, data *Vm) (sql.Result, error) {
|
||||||
|
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, vmRowsExpectAutoSet)
|
||||||
|
ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.ParticipantId, data.ApiVersion, data.Name, data.Namespace, data.Kind, data.CreatedBy, data.CreatedTime, data.UpdateBy, data.Status)
|
||||||
|
return ret, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultVmModel) Update(ctx context.Context, data *Vm) error {
|
||||||
|
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, vmRowsWithPlaceHolder)
|
||||||
|
_, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.ParticipantId, data.ApiVersion, data.Name, data.Namespace, data.Kind, data.CreatedBy, data.CreatedTime, data.UpdateBy, data.Status, data.Id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultVmModel) tableName() string {
|
||||||
|
return m.table
|
||||||
|
}
|
|
@ -14,7 +14,10 @@
|
||||||
|
|
||||||
package tracker
|
package tracker
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
//GetMetric(expr string, time time.Time) Metric
|
//GetMetric(expr string, time time.Time) Metric
|
||||||
|
@ -27,4 +30,5 @@ type Interface interface {
|
||||||
//// meter
|
//// meter
|
||||||
//GetNamedMeters(meters []string, time time.Time, opts []QueryOption) []Metric
|
//GetNamedMeters(meters []string, time time.Time, opts []QueryOption) []Metric
|
||||||
//GetNamedMetersOverTime(metrics []string, start, end time.Time, step time.Duration, opts []QueryOption) []Metric
|
//GetNamedMetersOverTime(metrics []string, start, end time.Time, step time.Duration, opts []QueryOption) []Metric
|
||||||
|
GetRawData(expr string, o QueryOption) (model.Value, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,35 +27,13 @@ const (
|
||||||
|
|
||||||
var promQLTemplates = map[string]string{
|
var promQLTemplates = map[string]string{
|
||||||
|
|
||||||
//namespace
|
"cluster_cpu_usage": "sum by (cluster_name)(cluster_cpu_usage{$1})",
|
||||||
|
"cluster_memory_usage": "sum by (cluster_name)(cluster_memory_usage{$1})",
|
||||||
|
"cluster_disk_usage": "sum by (cluster_name)(cluster_disk_usage{$1})",
|
||||||
|
"resource_top3": "topk(3,sum by (cluster_name)(cluster_cpu_usage +cluster_memory_usage +cluster_disk_usage)/3)",
|
||||||
"namespace_cpu_usage": `round(namespace:container_cpu_usage_seconds_total:sum_rate{namespace!="", $1}, 0.001)`,
|
"namespace_cpu_usage": `round(namespace:container_cpu_usage_seconds_total:sum_rate{namespace!="", $1}, 0.001)`,
|
||||||
"namespace_memory_usage": `namespace:container_memory_usage_bytes:sum{namespace!="", $1}`,
|
"namespace_memory_usage": `namespace:container_memory_usage_bytes:sum{namespace!="", $1}`,
|
||||||
"namespace_memory_usage_wo_cache": `namespace:container_memory_usage_bytes_wo_cache:sum{namespace!="", $1}`,
|
"namespace_memory_usage_wo_cache": `namespace:container_memory_usage_bytes_wo_cache:sum{namespace!="", $1}`,
|
||||||
"namespace_net_bytes_transmitted": `sum by (namespace) (irate(container_network_transmit_bytes_total{namespace!="", pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m]) * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`,
|
|
||||||
"namespace_net_bytes_received": `sum by (namespace) (irate(container_network_receive_bytes_total{namespace!="", pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m]) * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`,
|
|
||||||
"namespace_pod_count": `sum by (namespace) (kube_pod_status_phase{phase!~"Failed|Succeeded", namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`,
|
|
||||||
"namespace_pod_running_count": `sum by (namespace) (kube_pod_status_phase{phase="Running", namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`,
|
|
||||||
"namespace_pod_succeeded_count": `sum by (namespace) (kube_pod_status_phase{phase="Succeeded", namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1}) or on(namespace) max by(namespace) (kube_namespace_labels{$1} * 0)`,
|
|
||||||
"namespace_pod_abnormal_count": `namespace:pod_abnormal:count{namespace!="", $1}`,
|
|
||||||
"namespace_pod_abnormal_ratio": `namespace:pod_abnormal:ratio{namespace!="", $1}`,
|
|
||||||
"namespace_memory_limit_hard": `min by (namespace) (kube_resourcequota{resourcequota!="quota", type="hard", namespace!="", resource="limits.memory"} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_cpu_limit_hard": `min by (namespace) (kube_resourcequota{resourcequota!="quota", type="hard", namespace!="", resource="limits.cpu"} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_pod_count_hard": `min by (namespace) (kube_resourcequota{resourcequota!="quota", type="hard", namespace!="", resource="count/pods"} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_cronjob_count": `sum by (namespace) (kube_cronjob_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_pvc_count": `sum by (namespace) (kube_persistentvolumeclaim_info{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_daemonset_count": `sum by (namespace) (kube_daemonset_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_deployment_count": `sum by (namespace) (kube_deployment_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_endpoint_count": `sum by (namespace) (kube_endpoint_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_hpa_count": `sum by (namespace) (kube_horizontalpodautoscaler_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_job_count": `sum by (namespace) (kube_job_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_statefulset_count": `sum by (namespace) (kube_statefulset_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_replicaset_count": `count by (namespace) (kube_replicaset_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_service_count": `sum by (namespace) (kube_service_info{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_secret_count": `sum by (namespace) (kube_secret_info{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_configmap_count": `sum by (namespace) (kube_configmap_info{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_ingresses_extensions_count": `sum by (namespace) (kube_ingress_labels{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
"namespace_s2ibuilder_count": `sum by (namespace) (s2i_s2ibuilder_created{namespace!=""} * on (namespace) group_left(workspace) kube_namespace_labels{$1})`,
|
|
||||||
|
|
||||||
"controller_cpu_usage_rate": `sum( node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{$1}) by (workload)/sum( kube_pod_container_resource_limits{job="kube-state-metrics", resource="cpu"}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{ }) by (workload)`,
|
"controller_cpu_usage_rate": `sum( node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{$1}) by (workload)/sum( kube_pod_container_resource_limits{job="kube-state-metrics", resource="cpu"}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{ }) by (workload)`,
|
||||||
"controller_memory_usage_rate": `sum( container_memory_working_set_bytes{job="kubelet", metrics_path="/metrics/cadvisor", container!="", image!=""} * on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{$1}) by (workload)/sum( kube_pod_container_resource_limits{job="kube-state-metrics", resource="memory"}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{ }) by (workload)`,
|
"controller_memory_usage_rate": `sum( container_memory_working_set_bytes{job="kubelet", metrics_path="/metrics/cadvisor", container!="", image!=""} * on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{$1}) by (workload)/sum( kube_pod_container_resource_limits{job="kube-state-metrics", resource="memory"}* on(namespace,pod) group_left(workload) namespace_workload_pod:kube_pod_owner:relabel{ }) by (workload)`,
|
||||||
// pod
|
// pod
|
||||||
|
@ -81,7 +59,7 @@ func makeExpr(metric string, opts QueryOptions) string {
|
||||||
tmpl := promQLTemplates[metric]
|
tmpl := promQLTemplates[metric]
|
||||||
switch opts.Level {
|
switch opts.Level {
|
||||||
case LevelCluster:
|
case LevelCluster:
|
||||||
return tmpl
|
return makeClusterMetricExpr(tmpl, opts)
|
||||||
case LevelNode:
|
case LevelNode:
|
||||||
return makeNodeMetricExpr(tmpl, opts)
|
return makeNodeMetricExpr(tmpl, opts)
|
||||||
case LevelWorkspace:
|
case LevelWorkspace:
|
||||||
|
@ -105,6 +83,14 @@ func makeExpr(metric string, opts QueryOptions) string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func makeClusterMetricExpr(tmpl string, o QueryOptions) string {
|
||||||
|
var clusterSelector string
|
||||||
|
if o.ClusterName != "" {
|
||||||
|
clusterSelector = fmt.Sprintf(`cluster_name="%s"`, o.ClusterName)
|
||||||
|
}
|
||||||
|
return strings.Replace(tmpl, "$1", clusterSelector, -1)
|
||||||
|
|
||||||
|
}
|
||||||
func makeNodeMetricExpr(tmpl string, o QueryOptions) string {
|
func makeNodeMetricExpr(tmpl string, o QueryOptions) string {
|
||||||
var nodeSelector string
|
var nodeSelector string
|
||||||
if o.NodeName != "" {
|
if o.NodeName != "" {
|
||||||
|
@ -177,19 +163,12 @@ func makePVCMetricExpr(tmpl string, o QueryOptions) string {
|
||||||
// GET /namespaces/{namespace}/persistentvolumeclaims/{persistentvolumeclaim} or
|
// GET /namespaces/{namespace}/persistentvolumeclaims/{persistentvolumeclaim} or
|
||||||
// GET /namespaces/{namespace}/persistentvolumeclaims
|
// GET /namespaces/{namespace}/persistentvolumeclaims
|
||||||
if o.Namespace != "" {
|
if o.Namespace != "" {
|
||||||
if o.PersistentVolumeClaimName != "" {
|
|
||||||
pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim="%s"`, o.Namespace, o.PersistentVolumeClaimName)
|
|
||||||
} else {
|
|
||||||
pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.Namespace, o.ResourceFilter)
|
pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.Namespace, o.ResourceFilter)
|
||||||
}
|
|
||||||
return strings.Replace(tmpl, "$1", pvcSelector, -1)
|
return strings.Replace(tmpl, "$1", pvcSelector, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// For monitoring persistentvolumeclaims of the specific storageclass
|
|
||||||
// GET /storageclasses/{storageclass}/persistentvolumeclaims
|
|
||||||
if o.StorageClassName != "" {
|
|
||||||
pvcSelector = fmt.Sprintf(`storageclass="%s", persistentvolumeclaim=~"%s"`, o.StorageClassName, o.ResourceFilter)
|
|
||||||
}
|
|
||||||
return strings.Replace(tmpl, "$1", pvcSelector, -1)
|
return strings.Replace(tmpl, "$1", pvcSelector, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,6 +68,7 @@ type QueryOptions struct {
|
||||||
NamespacedResourcesFilter string
|
NamespacedResourcesFilter string
|
||||||
QueryType string
|
QueryType string
|
||||||
ResourceFilter string
|
ResourceFilter string
|
||||||
|
ClusterName string
|
||||||
NodeName string
|
NodeName string
|
||||||
WorkspaceName string
|
WorkspaceName string
|
||||||
Namespace string
|
Namespace string
|
||||||
|
@ -77,10 +78,6 @@ type QueryOptions struct {
|
||||||
PodName string
|
PodName string
|
||||||
PodsName string
|
PodsName string
|
||||||
ContainerName string
|
ContainerName string
|
||||||
StorageClassName string
|
|
||||||
PersistentVolumeClaimName string
|
|
||||||
PVCFilter string
|
|
||||||
ApplicationName string
|
|
||||||
ServiceName string
|
ServiceName string
|
||||||
Ingress string
|
Ingress string
|
||||||
Job string
|
Job string
|
||||||
|
@ -92,10 +89,13 @@ func NewQueryOptions() *QueryOptions {
|
||||||
return &QueryOptions{}
|
return &QueryOptions{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClusterOption struct{}
|
type ClusterOption struct {
|
||||||
|
ClusterName string
|
||||||
|
}
|
||||||
|
|
||||||
func (_ ClusterOption) Apply(o *QueryOptions) {
|
func (c ClusterOption) Apply(o *QueryOptions) {
|
||||||
o.Level = LevelCluster
|
o.Level = LevelCluster
|
||||||
|
o.ClusterName = c.ClusterName
|
||||||
}
|
}
|
||||||
|
|
||||||
type NodeOption struct {
|
type NodeOption struct {
|
||||||
|
@ -110,8 +110,6 @@ func (no NodeOption) Apply(o *QueryOptions) {
|
||||||
o.Level = LevelNode
|
o.Level = LevelNode
|
||||||
o.ResourceFilter = no.ResourceFilter
|
o.ResourceFilter = no.ResourceFilter
|
||||||
o.NodeName = no.NodeName
|
o.NodeName = no.NodeName
|
||||||
o.PVCFilter = no.PVCFilter
|
|
||||||
o.StorageClassName = no.StorageClassName
|
|
||||||
o.QueryType = no.QueryType
|
o.QueryType = no.QueryType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,8 +124,6 @@ func (wo WorkspaceOption) Apply(o *QueryOptions) {
|
||||||
o.Level = LevelWorkspace
|
o.Level = LevelWorkspace
|
||||||
o.ResourceFilter = wo.ResourceFilter
|
o.ResourceFilter = wo.ResourceFilter
|
||||||
o.WorkspaceName = wo.WorkspaceName
|
o.WorkspaceName = wo.WorkspaceName
|
||||||
o.PVCFilter = wo.PVCFilter
|
|
||||||
o.StorageClassName = wo.StorageClassName
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type NamespaceOption struct {
|
type NamespaceOption struct {
|
||||||
|
@ -143,8 +139,6 @@ func (no NamespaceOption) Apply(o *QueryOptions) {
|
||||||
o.ResourceFilter = no.ResourceFilter
|
o.ResourceFilter = no.ResourceFilter
|
||||||
o.WorkspaceName = no.WorkspaceName
|
o.WorkspaceName = no.WorkspaceName
|
||||||
o.Namespace = no.NamespaceName
|
o.Namespace = no.NamespaceName
|
||||||
o.PVCFilter = no.PVCFilter
|
|
||||||
o.StorageClassName = no.StorageClassName
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ApplicationsOption struct {
|
type ApplicationsOption struct {
|
||||||
|
@ -183,8 +177,6 @@ type ApplicationOption struct {
|
||||||
func (ao ApplicationOption) Apply(o *QueryOptions) {
|
func (ao ApplicationOption) Apply(o *QueryOptions) {
|
||||||
o.Level = LevelApplication
|
o.Level = LevelApplication
|
||||||
o.Namespace = ao.NamespaceName
|
o.Namespace = ao.NamespaceName
|
||||||
o.ApplicationName = ao.Application
|
|
||||||
o.StorageClassName = ao.StorageClassName
|
|
||||||
|
|
||||||
app_components := strings.Join(ao.ApplicationComponents[:], "|")
|
app_components := strings.Join(ao.ApplicationComponents[:], "|")
|
||||||
|
|
||||||
|
@ -303,11 +295,6 @@ func (po PVCOption) Apply(o *QueryOptions) {
|
||||||
o.Level = LevelPVC
|
o.Level = LevelPVC
|
||||||
o.ResourceFilter = po.ResourceFilter
|
o.ResourceFilter = po.ResourceFilter
|
||||||
o.Namespace = po.NamespaceName
|
o.Namespace = po.NamespaceName
|
||||||
o.StorageClassName = po.StorageClassName
|
|
||||||
o.PersistentVolumeClaimName = po.PersistentVolumeClaimName
|
|
||||||
|
|
||||||
// for meter
|
|
||||||
o.PVCFilter = po.PersistentVolumeClaimName
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type IngressOption struct {
|
type IngressOption struct {
|
||||||
|
|
|
@ -27,22 +27,53 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ClusterCpuGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
ClusterCpuUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
Name: "cluster_cpu_usage",
|
Name: "cluster_cpu_usage",
|
||||||
Help: "Cluster CPU Utilization Rate.",
|
Help: "Cluster CPU Utilization Rate.",
|
||||||
}, []string{"cluster_name"})
|
}, []string{"cluster_name", "adapter_id"})
|
||||||
ClusterMemoryGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
ClusterCpuAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: "cluster_cpu_avail",
|
||||||
|
Help: "Cluster CPU Available.",
|
||||||
|
}, []string{"cluster_name", "adapter_id"})
|
||||||
|
ClusterCpuTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: "cluster_cpu_total",
|
||||||
|
Help: "Cluster CPU Total.",
|
||||||
|
}, []string{"cluster_name", "adapter_id"})
|
||||||
|
ClusterMemoryUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
Name: "cluster_memory_usage",
|
Name: "cluster_memory_usage",
|
||||||
Help: "Cluster Memory Utilization Rate.",
|
Help: "Cluster Memory Utilization Rate.",
|
||||||
}, []string{"cluster_name"})
|
}, []string{"cluster_name", "adapter_id"})
|
||||||
ClusterDiskGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
ClusterMemoryAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: "cluster_memory_avail",
|
||||||
|
Help: "Cluster Memory Available.",
|
||||||
|
}, []string{"cluster_name", "adapter_id"})
|
||||||
|
ClusterMemoryTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: "cluster_memory_total",
|
||||||
|
Help: "Cluster Memory Total.",
|
||||||
|
}, []string{"cluster_name", "adapter_id"})
|
||||||
|
ClusterDiskUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
Name: "cluster_disk_usage",
|
Name: "cluster_disk_usage",
|
||||||
Help: "Cluster Disk Utilization Rate.",
|
Help: "Cluster Disk Utilization Rate.",
|
||||||
}, []string{"cluster_name"})
|
}, []string{"cluster_name", "adapter_id"})
|
||||||
|
ClusterDiskAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: "cluster_disk_avail",
|
||||||
|
Help: "Cluster Disk Available.",
|
||||||
|
}, []string{"cluster_name", "adapter_id"})
|
||||||
|
ClusterDiskTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Name: "cluster_disk_total",
|
||||||
|
Help: "Cluster Disk Total.",
|
||||||
|
}, []string{"cluster_name", "adapter_id"})
|
||||||
|
|
||||||
metrics = []prometheus.Collector{
|
metrics = []prometheus.Collector{
|
||||||
ClusterCpuGauge,
|
ClusterCpuUsageGauge,
|
||||||
ClusterMemoryGauge,
|
ClusterCpuAvailGauge,
|
||||||
ClusterDiskGauge,
|
ClusterCpuTotalGauge,
|
||||||
|
ClusterMemoryUsageGauge,
|
||||||
|
ClusterMemoryAvailGauge,
|
||||||
|
ClusterMemoryTotalGauge,
|
||||||
|
ClusterDiskUsageGauge,
|
||||||
|
ClusterDiskAvailGauge,
|
||||||
|
ClusterDiskTotalGauge,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -225,3 +256,13 @@ func genMetricFilter(o QueryOption) func(metric model.Metric) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p Prometheus) GetRawData(expr string, o QueryOption) (model.Value, error) {
|
||||||
|
opts := NewQueryOptions()
|
||||||
|
o.Apply(opts)
|
||||||
|
value, _, err := p.client.Query(context.Background(), makeExpr(expr, *opts), time.Now())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return value, nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue