Merge branch 'master' of https://gitlink.org.cn/JointCloud/pcm-coordinator
# Conflicts: # api/internal/cron/cron.go # api/internal/scheduler/database/aiStorage.go Former-commit-id: 30a9579debd6cd3803bd423c958d0b670625cc7c
This commit is contained in:
commit
83fa3218ab
|
@ -156,23 +156,28 @@ type AiInfo struct {
|
|||
}
|
||||
|
||||
type VmInfo struct {
|
||||
TaskId int64 `json:"taskId,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
AdapterId int64 `json:"adapterId,omitempty,optional"`
|
||||
AdapterName string `json:"adapterName,omitempty,optional"`
|
||||
ClusterId int64 `json:"clusterId,omitempty,optional"`
|
||||
ClusterName string `json:"clusterName,omitempty,optional"`
|
||||
FlavorRef string `json:"flavor_ref,omitempty"`
|
||||
ImageRef string `json:"image_ref,omitempty"`
|
||||
NetworkUuid string `json:"network_uuid,omitempty"`
|
||||
BlockUuid string `json:"block_uuid,omitempty"`
|
||||
SourceType string `json:"source_type,omitempty"`
|
||||
DeleteOnTermination bool `json:"delete_on_termination,omitempty"`
|
||||
Status string `json:"Status,omitempty"`
|
||||
StartTime string `json:"startTime,omitempty"`
|
||||
Platform string `json:"platform,omitempty"`
|
||||
VmName string `json:"vm_name,omitempty"`
|
||||
ServerId string `json:"server_id,omitempty"`
|
||||
Id int64 `json:"id,omitempty"`
|
||||
TaskId int64 `json:"taskId,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
AdapterId int64 `json:"adapterId,omitempty,optional"`
|
||||
AdapterName string `json:"adapterName,omitempty,optional"`
|
||||
ClusterId int64 `json:"clusterId,omitempty,optional"`
|
||||
ClusterName string `json:"clusterName,omitempty,optional"`
|
||||
FlavorRef string `json:"flavorRef,omitempty"`
|
||||
ImageRef string `json:"imageRef,omitempty"`
|
||||
Status string `json:"status,omitempty"`
|
||||
Platform string `json:"platform,omitempty"`
|
||||
Description string `json:"description,omitempty"` // 描述
|
||||
AvailabilityZone string `json:"availabilityZone,omitempty"`
|
||||
MinCount int64 `json:"minCount,omitempty"`
|
||||
Uuid string `json:"uuid,omitempty"`
|
||||
StartTime string `json:"startTime,omitempty"`
|
||||
RunningTime string `json:"runningTime,omitempty"`
|
||||
Result string `json:"result,omitempty"`
|
||||
DeletedAt string `json:"deletedAt,omitempty"`
|
||||
VmName string `json:"vmName,omitempty"`
|
||||
Replicas int64 `json:"replicas,omitempty"`
|
||||
ServerId string `json:"serverId,omitempty"`
|
||||
}
|
||||
|
||||
type ResourceStats struct {
|
||||
|
|
|
@ -215,17 +215,17 @@ type (
|
|||
|
||||
type (
|
||||
commitVmTaskReq {
|
||||
Name string `json:"name"`
|
||||
Name string `json:"name"`
|
||||
AdapterIds []string `json:"adapterIds,optional"`
|
||||
ClusterIds []string `json:"clusterIds"`
|
||||
Strategy string `json:"strategy"`
|
||||
ClusterIds []string `json:"clusterIds"`
|
||||
Strategy string `json:"strategy"`
|
||||
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
|
||||
MinCount int64 `json:"min_count,optional"`
|
||||
ImageRef int64 `json:"imageRef,optional"`
|
||||
FlavorRef int64 `json:"flavorRef,optional"`
|
||||
Uuid int64 `json:"uuid,optional"`
|
||||
Replicas int32 `json:"replicas,string"`
|
||||
VmName string `json:"vm_name,optional"`
|
||||
MinCount int64 `json:"min_count,optional"`
|
||||
ImageRef int64 `json:"imageRef,optional"`
|
||||
FlavorRef int64 `json:"flavorRef,optional"`
|
||||
Uuid int64 `json:"uuid,optional"`
|
||||
Replicas int64 `json:"replicas,string"`
|
||||
VmName string `json:"vm_name,optional"`
|
||||
}
|
||||
TaskVm {
|
||||
Image string `json:"image"`
|
||||
|
@ -1234,12 +1234,24 @@ type TaskStatusResp {
|
|||
Saved int `json:"Saved"`
|
||||
}
|
||||
|
||||
type TaskDetailsResp {
|
||||
Name string `json:"name"`
|
||||
description string `json:"description"`
|
||||
StartTime string `json:"startTime"`
|
||||
EndTime string `json:"endTime"`
|
||||
Strategy int64 `json:"strategy"`
|
||||
SynergyStatus int64 `json:"synergyStatus"`
|
||||
ClusterInfos []*ClusterInfo `json:"clusterInfos"`
|
||||
}
|
||||
type (
|
||||
TaskDetailsResp {
|
||||
Name string `json:"name"`
|
||||
description string `json:"description"`
|
||||
StartTime string `json:"startTime"`
|
||||
EndTime string `json:"endTime"`
|
||||
Strategy int64 `json:"strategy"`
|
||||
SynergyStatus int64 `json:"synergyStatus"`
|
||||
ClusterInfos []*ClusterInfo `json:"clusterInfos"`
|
||||
SubTaskInfos []*SubTaskInfo `json:"subTaskInfos"`
|
||||
}
|
||||
|
||||
SubTaskInfo{
|
||||
Id string `json:"id" db:"id"`
|
||||
Name string `json:"name" db:"name"`
|
||||
ClusterId string `json:"clusterId" db:"cluster_id"`
|
||||
ClusterName string `json:"clusterName" db:"cluster_name"`
|
||||
Status string `json:"status" db:"status"`
|
||||
Remark string `json:"remark" db:"remark"`
|
||||
}
|
||||
)
|
|
@ -42,7 +42,7 @@ ACRpcConf:
|
|||
# Endpoints:
|
||||
# - 127.0.0.1:8888
|
||||
NonBlock: true
|
||||
Timeout: 20000
|
||||
Timeout: 50000
|
||||
|
||||
#rpc
|
||||
CephRpcConf:
|
||||
|
|
|
@ -0,0 +1,479 @@
|
|||
package cron
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"github.com/zeromicro/go-zero/zrpc"
|
||||
"gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
|
||||
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
|
||||
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
OCTOPUS = "octopus"
|
||||
MODELARTS = "modelarts"
|
||||
SHUGUANGAI = "shuguangAi"
|
||||
)
|
||||
|
||||
func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) {
|
||||
limit := 10
|
||||
offset := 0
|
||||
var list []*types.TaskModel
|
||||
db := svc.DbEngin.Model(&types.TaskModel{}).Table("task")
|
||||
|
||||
db = db.Where("deleted_at is null")
|
||||
|
||||
//count total
|
||||
var total int64
|
||||
err := db.Count(&total).Error
|
||||
db.Limit(limit).Offset(offset)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = db.Order("created_time desc").Find(&list).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return list, nil
|
||||
}
|
||||
|
||||
func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
|
||||
list := make([]*types.TaskModel, len(tasklist))
|
||||
copy(list, tasklist)
|
||||
for i := len(list) - 1; i >= 0; i-- {
|
||||
if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
|
||||
list = append(list[:i], list[i+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
if len(list) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
task := list[0]
|
||||
for i := range list {
|
||||
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
|
||||
latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
|
||||
if latest.Before(earliest) {
|
||||
task = list[i]
|
||||
}
|
||||
}
|
||||
|
||||
var aiTaskList []*models.TaskAi
|
||||
tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
|
||||
if tx.Error != nil {
|
||||
logx.Errorf(tx.Error.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if len(aiTaskList) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, aitask := range aiTaskList {
|
||||
t := aitask
|
||||
if t.Status == constants.Completed || t.Status == constants.Failed {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
h := http.Request{}
|
||||
trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
|
||||
if err != nil {
|
||||
if status.Code(err) == codes.DeadlineExceeded {
|
||||
msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
|
||||
logx.Errorf(errors.New(msg).Error())
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
|
||||
logx.Errorf(errors.New(msg).Error())
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
if trainingTask == nil {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
switch trainingTask.Status {
|
||||
case constants.Running:
|
||||
if t.Status != trainingTask.Status {
|
||||
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中")
|
||||
t.Status = trainingTask.Status
|
||||
}
|
||||
case constants.Failed:
|
||||
if t.Status != trainingTask.Status {
|
||||
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败")
|
||||
t.Status = trainingTask.Status
|
||||
}
|
||||
case constants.Completed:
|
||||
if t.Status != trainingTask.Status {
|
||||
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成")
|
||||
t.Status = trainingTask.Status
|
||||
}
|
||||
default:
|
||||
if t.Status != trainingTask.Status {
|
||||
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending")
|
||||
t.Status = trainingTask.Status
|
||||
}
|
||||
}
|
||||
t.StartTime = trainingTask.Start
|
||||
t.EndTime = trainingTask.End
|
||||
err = svc.Scheduler.AiStorages.UpdateAiTask(t)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
|
||||
logx.Errorf(errors.New(msg).Error())
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
|
||||
list := make([]*types.TaskModel, len(tasklist))
|
||||
copy(list, tasklist)
|
||||
for i := len(list) - 1; i >= 0; i-- {
|
||||
if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
|
||||
list = append(list[:i], list[i+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
if len(list) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
task := list[0]
|
||||
for i := range list {
|
||||
earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime)
|
||||
latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime)
|
||||
if latest.Before(earliest) {
|
||||
task = list[i]
|
||||
}
|
||||
}
|
||||
|
||||
var aiTask []*models.TaskAi
|
||||
tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
|
||||
if tx.Error != nil {
|
||||
logx.Errorf(tx.Error.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if len(aiTask) == 0 {
|
||||
tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
|
||||
if tx.Error != nil {
|
||||
logx.Errorf(tx.Error.Error())
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if len(aiTask) == 1 {
|
||||
if aiTask[0].Status == constants.Completed {
|
||||
task.Status = constants.Succeeded
|
||||
} else {
|
||||
task.Status = aiTask[0].Status
|
||||
}
|
||||
task.StartTime = aiTask[0].StartTime
|
||||
task.EndTime = aiTask[0].EndTime
|
||||
task.UpdatedTime = time.Now().Format(constants.Layout)
|
||||
tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
|
||||
if tx.Error != nil {
|
||||
logx.Errorf(tx.Error.Error())
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for i := len(aiTask) - 1; i >= 0; i-- {
|
||||
if aiTask[i].StartTime == "" {
|
||||
task.Status = aiTask[i].Status
|
||||
aiTask = append(aiTask[:i], aiTask[i+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
if len(aiTask) == 0 {
|
||||
task.UpdatedTime = time.Now().Format(constants.Layout)
|
||||
tx = svc.DbEngin.Table("task").Model(task).Updates(task)
|
||||
if tx.Error != nil {
|
||||
logx.Errorf(tx.Error.Error())
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
|
||||
end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
|
||||
|
||||
var status string
|
||||
var count int
|
||||
for _, a := range aiTask {
|
||||
s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
|
||||
e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
|
||||
|
||||
if s.Before(start) {
|
||||
start = s
|
||||
}
|
||||
|
||||
if e.After(end) {
|
||||
end = e
|
||||
}
|
||||
|
||||
if a.Status == constants.Failed {
|
||||
status = a.Status
|
||||
break
|
||||
}
|
||||
|
||||
if a.Status == constants.Pending {
|
||||
status = a.Status
|
||||
continue
|
||||
}
|
||||
|
||||
if a.Status == constants.Running {
|
||||
status = a.Status
|
||||
continue
|
||||
}
|
||||
|
||||
if a.Status == constants.Completed {
|
||||
count++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if count == len(aiTask) {
|
||||
status = constants.Succeeded
|
||||
}
|
||||
|
||||
if status != "" {
|
||||
task.Status = status
|
||||
task.StartTime = start.Format(constants.Layout)
|
||||
task.EndTime = end.Format(constants.Layout)
|
||||
}
|
||||
|
||||
task.UpdatedTime = time.Now().Format(constants.Layout)
|
||||
tx = svc.DbEngin.Table("task").Model(task).Updates(task)
|
||||
if tx.Error != nil {
|
||||
logx.Errorf(tx.Error.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateAiAdapterMaps(svc *svc.ServiceContext) {
|
||||
var aiType = "1"
|
||||
adapterIds, err := svc.Scheduler.AiStorages.GetAdapterIdsByType(aiType)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
|
||||
logx.Errorf(errors.New(msg).Error())
|
||||
return
|
||||
}
|
||||
if len(adapterIds) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, id := range adapterIds {
|
||||
clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(id)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
|
||||
logx.Errorf(errors.New(msg).Error())
|
||||
return
|
||||
}
|
||||
if len(clusters.List) == 0 {
|
||||
continue
|
||||
}
|
||||
if isAdapterExist(svc, id, len(clusters.List)) {
|
||||
continue
|
||||
} else {
|
||||
if isAdapterEmpty(svc, id) {
|
||||
exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap
|
||||
svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap
|
||||
} else {
|
||||
UpdateClusterMaps(svc, id, clusters.List)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []types.ClusterInfo) {
|
||||
for _, c := range clusters {
|
||||
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id]
|
||||
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[adapterId][c.Id]
|
||||
if !ok && !ok2 {
|
||||
switch c.Name {
|
||||
case OCTOPUS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(svc.Config.OctopusRpcConf))
|
||||
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
|
||||
case MODELARTS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(svc.Config.ModelArtsRpcConf))
|
||||
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(svc.Config.ModelArtsImgRpcConf))
|
||||
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
|
||||
case SHUGUANGAI:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(svc.Config.ACRpcConf))
|
||||
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
|
||||
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
|
||||
}
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
|
||||
emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
|
||||
cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
|
||||
if ok && ok2 {
|
||||
if len(emap) == clusterNum && len(cmap) == clusterNum {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isAdapterEmpty(svc *svc.ServiceContext, id string) bool {
|
||||
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
|
||||
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
|
||||
if !ok && !ok2 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) {
|
||||
executorMap := make(map[string]executor.AiExecutor)
|
||||
collectorMap := make(map[string]collector.AiCollector)
|
||||
for _, c := range clusters {
|
||||
switch c.Name {
|
||||
case OCTOPUS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
|
||||
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
|
||||
collectorMap[c.Id] = octopus
|
||||
executorMap[c.Id] = octopus
|
||||
case MODELARTS:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
|
||||
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
|
||||
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
|
||||
collectorMap[c.Id] = modelarts
|
||||
executorMap[c.Id] = modelarts
|
||||
case SHUGUANGAI:
|
||||
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
||||
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
|
||||
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
|
||||
collectorMap[c.Id] = sgai
|
||||
executorMap[c.Id] = sgai
|
||||
}
|
||||
}
|
||||
|
||||
return executorMap, collectorMap
|
||||
}
|
||||
|
||||
func UpdateClusterResource(svc *svc.ServiceContext) {
|
||||
list, err := svc.Scheduler.AiStorages.GetAdaptersByType("1")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for _, adapter := range list {
|
||||
clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, cluster := range clusters.List {
|
||||
c := cluster
|
||||
clusterResource, err := svc.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
_, ok := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id]
|
||||
if !ok {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
h := http.Request{}
|
||||
stat, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(h.Context())
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
if stat == nil {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
var cardTotal int64
|
||||
var topsTotal float64
|
||||
for _, card := range stat.CardsAvail {
|
||||
cardTotal += int64(card.CardNum)
|
||||
topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
|
||||
}
|
||||
|
||||
if (models.TClusterResource{} == *clusterResource) {
|
||||
err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
|
||||
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
clusterResource.CardTotal = cardTotal
|
||||
clusterResource.CardTopsTotal = topsTotal
|
||||
clusterResource.CpuAvail = float64(stat.CpuCoreAvail)
|
||||
clusterResource.CpuTotal = float64(stat.CpuCoreTotal)
|
||||
clusterResource.MemAvail = stat.MemAvail
|
||||
clusterResource.MemTotal = stat.MemTotal
|
||||
clusterResource.DiskAvail = stat.DiskAvail
|
||||
clusterResource.DiskTotal = stat.DiskTotal
|
||||
|
||||
err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
|
||||
Copyright (c) [2023] [pcm]
|
||||
[pcm-coordinator] is licensed under Mulan PSL v2.
|
||||
You can use this software according to the terms and conditions of the Mulan PSL v2.
|
||||
You may obtain a copy of Mulan PSL v2 at:
|
||||
http://license.coscl.org.cn/MulanPSL2
|
||||
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
See the Mulan PSL v2 for more details.
|
||||
|
||||
*/
|
||||
|
||||
package cron
|
||||
|
||||
import (
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
||||
)
|
||||
|
||||
func AddCronGroup(svc *svc.ServiceContext) {
|
||||
// 删除三天前的监控信息
|
||||
svc.Cron.AddFunc("0 0 0 ? * ? ", func() {
|
||||
ClearMetricsData(svc)
|
||||
})
|
||||
|
||||
// 同步任务信息到core端
|
||||
svc.Cron.AddFunc("*/5 * * * * ?", func() {
|
||||
SyncParticipantRpc(svc)
|
||||
})
|
||||
|
||||
svc.Cron.AddFunc("*/5 * * * * ?", func() {
|
||||
list, err := GetTaskList(svc)
|
||||
if err != nil {
|
||||
logx.Errorf(err.Error())
|
||||
return
|
||||
}
|
||||
UpdateTaskStatus(svc, list)
|
||||
UpdateAiTaskStatus(svc, list)
|
||||
})
|
||||
|
||||
svc.Cron.AddFunc("*/5 * * * * ?", func() {
|
||||
UpdateAiAdapterMaps(svc)
|
||||
})
|
||||
|
||||
svc.Cron.AddFunc("*/59 * * * * ?", func() {
|
||||
UpdateClusterResource(svc)
|
||||
})
|
||||
}
|
|
@ -79,7 +79,6 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
|
|||
case <-time.After(1 * time.Second):
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
|
||||
|
@ -127,7 +126,7 @@ func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan
|
|||
|
||||
mu.Lock()
|
||||
if (models.TClusterResource{} == *clusterResource) {
|
||||
err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
|
||||
err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
|
||||
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
|
||||
if err != nil {
|
||||
mu.Unlock()
|
||||
|
@ -135,8 +134,19 @@ func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan
|
|||
return
|
||||
}
|
||||
} else {
|
||||
if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
clusterResource.CardTotal = cardTotal
|
||||
clusterResource.CardTopsTotal = topsTotal
|
||||
clusterResource.CpuAvail = float64(stat.CpuCoreAvail)
|
||||
clusterResource.CpuTotal = float64(stat.CpuCoreTotal)
|
||||
clusterResource.MemAvail = stat.MemAvail
|
||||
clusterResource.MemTotal = stat.MemTotal
|
||||
clusterResource.DiskAvail = stat.DiskAvail
|
||||
clusterResource.DiskTotal = stat.DiskTotal
|
||||
|
||||
err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
|
||||
if err != nil {
|
||||
mu.Unlock()
|
||||
|
|
|
@ -106,6 +106,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
|
|||
sStruct := UnMarshalK8sStruct(s, int64(r.Replica))
|
||||
unString, _ := sStruct.MarshalJSON()
|
||||
taskCloud.Id = utils.GenSnowflakeIDUint()
|
||||
taskCloud.Name = sStruct.GetName() + "-" + sStruct.GetKind()
|
||||
taskCloud.TaskId = uint(taskModel.Id)
|
||||
clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
|
||||
taskCloud.AdapterId = uint(adapterId)
|
||||
|
|
|
@ -119,7 +119,8 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type
|
|||
logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
|
||||
//return errors.Errorf("the cluster does not match the drive resources. Check the data"), nil
|
||||
}*/
|
||||
taskVm.Name = req.VmName
|
||||
taskVm.Name = req.Name
|
||||
taskVm.TaskId = taskModel.Id
|
||||
taskVm.Status = "Saved"
|
||||
taskVm.StartTime = time.Now().String()
|
||||
taskVm.MinCount = req.MinCount
|
||||
|
@ -133,6 +134,8 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type
|
|||
l.svcCtx.DbEngin.Raw("select name from t_cluster where id= ?", r.ClusterId).Scan(&clusterName)
|
||||
taskVm.ClusterName = clusterName
|
||||
taskVm.ClusterId, err = strconv.ParseInt(clusterId, 10, 64)
|
||||
taskVm.VmName = req.VmName
|
||||
taskVm.Replicas = req.Replicas
|
||||
if err != nil {
|
||||
fmt.Println("Error converting string to int64:", err)
|
||||
return
|
||||
|
|
|
@ -80,6 +80,7 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
|
|||
for _, ch := range chs {
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
}
|
||||
return
|
||||
|
|
|
@ -55,12 +55,25 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie
|
|||
}
|
||||
}
|
||||
case 0:
|
||||
var cloudModelList []cloud.TaskCloudModel
|
||||
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var resourceType int32
|
||||
l.svcCtx.DbEngin.Raw("select resource_type as resourceType from `t_adapter` where id = ?", req.AdapterId).Scan(&resourceType)
|
||||
switch resourceType {
|
||||
case 01:
|
||||
var cloudModelList []cloud.TaskCloudModel
|
||||
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &cloudModelList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
utils.Convert(cloudModelList, &resp.CloudInfoList)
|
||||
case 02:
|
||||
var vmModelList []models.TaskVm
|
||||
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &vmModelList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
utils.Convert(vmModelList, &resp.VmInfoList)
|
||||
}
|
||||
utils.Convert(cloudModelList, &resp.CloudInfoList)
|
||||
|
||||
case 1:
|
||||
var aiModelList []models.Ai
|
||||
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &aiModelList)
|
||||
|
@ -68,13 +81,6 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie
|
|||
return nil, err
|
||||
}
|
||||
utils.Convert(aiModelList, &resp.AiInfoList)
|
||||
case 3:
|
||||
var vmModelList []models.TaskVm
|
||||
err := findModelList(req.AdapterId, l.svcCtx.DbEngin, &vmModelList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
utils.Convert(vmModelList, &resp.VmInfoList)
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
|
|
@ -33,26 +33,46 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie
|
|||
l.svcCtx.DbEngin.Raw("select type as kind from t_adapter where id = ?", req.AdapterId).Scan(&kind)
|
||||
switch kind {
|
||||
case 0:
|
||||
for _, cloudInfo := range req.CloudInfoList {
|
||||
var taskId uint
|
||||
result := l.svcCtx.DbEngin.Table("task_cloud").Select("task_id").Where("task_id = ?", cloudInfo.TaskId).Find(&taskId)
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return nil, errors.New("Record does not exist")
|
||||
var resourceType int32
|
||||
l.svcCtx.DbEngin.Raw("select resource_type as resourceType from `t_adapter` where id = ?", req.AdapterId).Scan(&resourceType)
|
||||
switch resourceType {
|
||||
case 01:
|
||||
for _, cloudInfo := range req.CloudInfoList {
|
||||
var taskId uint
|
||||
result := l.svcCtx.DbEngin.Table("task_cloud").Select("task_id").Where("task_id = ?", cloudInfo.TaskId).Find(&taskId)
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return nil, errors.New("Record does not exist")
|
||||
}
|
||||
l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?",
|
||||
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId)
|
||||
var taskName string
|
||||
l.svcCtx.DbEngin.Raw("select name as kind from task where id = ?", taskId).Scan(&taskName)
|
||||
noticeInfo := clientCore.NoticeInfo{
|
||||
TaskId: cloudInfo.TaskId,
|
||||
AdapterId: cloudInfo.AdapterId,
|
||||
AdapterName: cloudInfo.AdapterName,
|
||||
ClusterId: cloudInfo.ClusterId,
|
||||
ClusterName: cloudInfo.ClusterName,
|
||||
TaskName: taskName,
|
||||
}
|
||||
syncTask(l.svcCtx.DbEngin, noticeInfo)
|
||||
}
|
||||
l.svcCtx.DbEngin.Exec("update task_cloud set status = ?,start_time = ?,result = ? where task_id = ?",
|
||||
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, cloudInfo.TaskId)
|
||||
var taskName string
|
||||
l.svcCtx.DbEngin.Raw("select name as kind from task where id = ?", taskId).Scan(&taskName)
|
||||
noticeInfo := clientCore.NoticeInfo{
|
||||
TaskId: cloudInfo.TaskId,
|
||||
AdapterId: cloudInfo.AdapterId,
|
||||
AdapterName: cloudInfo.AdapterName,
|
||||
ClusterId: cloudInfo.ClusterId,
|
||||
ClusterName: cloudInfo.ClusterName,
|
||||
TaskName: taskName,
|
||||
case 02:
|
||||
for _, vmInfo := range req.VmInfoList {
|
||||
l.svcCtx.DbEngin.Exec("update task_vm set status = ?,start_time = ? where participant_id = ? and task_id = ? and name = ?",
|
||||
vmInfo.Status, vmInfo.StartTime, req.AdapterId, vmInfo.TaskId, vmInfo.Name)
|
||||
noticeInfo := clientCore.NoticeInfo{
|
||||
TaskId: vmInfo.TaskId,
|
||||
AdapterId: vmInfo.AdapterId,
|
||||
AdapterName: vmInfo.AdapterName,
|
||||
ClusterId: vmInfo.ClusterId,
|
||||
ClusterName: vmInfo.ClusterName,
|
||||
TaskName: vmInfo.Name,
|
||||
}
|
||||
syncTask(l.svcCtx.DbEngin, noticeInfo)
|
||||
}
|
||||
syncTask(l.svcCtx.DbEngin, noticeInfo)
|
||||
}
|
||||
|
||||
case 2:
|
||||
for _, hpcInfo := range req.HpcInfoList {
|
||||
l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?",
|
||||
|
@ -81,20 +101,6 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie
|
|||
}
|
||||
syncTask(l.svcCtx.DbEngin, noticeInfo)
|
||||
}
|
||||
case 3:
|
||||
for _, vmInfo := range req.VmInfoList {
|
||||
l.svcCtx.DbEngin.Exec("update task_vm set status = ?,start_time = ? where participant_id = ? and task_id = ? and name = ?",
|
||||
vmInfo.Status, vmInfo.StartTime, req.AdapterId, vmInfo.TaskId, vmInfo.Name)
|
||||
noticeInfo := clientCore.NoticeInfo{
|
||||
TaskId: vmInfo.TaskId,
|
||||
AdapterId: vmInfo.AdapterId,
|
||||
AdapterName: vmInfo.AdapterName,
|
||||
ClusterId: vmInfo.ClusterId,
|
||||
ClusterName: vmInfo.ClusterName,
|
||||
TaskName: vmInfo.Name,
|
||||
}
|
||||
syncTask(l.svcCtx.DbEngin, noticeInfo)
|
||||
}
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
|
|
@ -32,18 +32,22 @@ func (l *TaskDetailsLogic) TaskDetails(req *types.FId) (resp *types.TaskDetailsR
|
|||
if errors.Is(l.svcCtx.DbEngin.Where("id", req.Id).First(&task).Error, gorm.ErrRecordNotFound) {
|
||||
return nil, errors.New("记录不存在")
|
||||
}
|
||||
clusterIds := make([]int64, 0)
|
||||
clusterIds := make([]string, 0)
|
||||
var cList []*types.ClusterInfo
|
||||
var subList []*types.SubTaskInfo
|
||||
switch task.AdapterTypeDict {
|
||||
case 0:
|
||||
l.svcCtx.DbEngin.Table("task_cloud").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds)
|
||||
if len(clusterIds) <= 0 {
|
||||
l.svcCtx.DbEngin.Table("task_vm").Select("cluster_id").Where("task_id", task.Id).Find(&clusterIds)
|
||||
l.svcCtx.DbEngin.Table("task_cloud").Where("task_id", task.Id).Scan(&subList)
|
||||
if len(subList) <= 0 {
|
||||
l.svcCtx.DbEngin.Table("task_vm").Where("task_id", task.Id).Find(&subList)
|
||||
}
|
||||
case 1:
|
||||
l.svcCtx.DbEngin.Table("task_ai").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds)
|
||||
l.svcCtx.DbEngin.Table("task_ai").Where("task_id", task.Id).Scan(&subList)
|
||||
case 2:
|
||||
l.svcCtx.DbEngin.Table("task_hpc").Select("cluster_id").Where("task_id", task.Id).Scan(&clusterIds)
|
||||
l.svcCtx.DbEngin.Table("task_hpc").Where("task_id", task.Id).Scan(&subList)
|
||||
}
|
||||
for _, sub := range subList {
|
||||
clusterIds = append(clusterIds, sub.ClusterId)
|
||||
}
|
||||
err = l.svcCtx.DbEngin.Table("t_cluster").Where("id in ?", clusterIds).Scan(&cList).Error
|
||||
if err != nil {
|
||||
|
@ -51,5 +55,6 @@ func (l *TaskDetailsLogic) TaskDetails(req *types.FId) (resp *types.TaskDetailsR
|
|||
}
|
||||
utils.Convert(&task, &resp)
|
||||
resp.ClusterInfos = cList
|
||||
resp.SubTaskInfos = subList
|
||||
return
|
||||
}
|
||||
|
|
|
@ -64,7 +64,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
|
|||
synergystatus = 1
|
||||
}
|
||||
strategyCode, err := l.svcCtx.Scheduler.AiStorages.GetStrategyCode(req.AiOption.Strategy)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(rs[0].AdapterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName, strategyCode, synergystatus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -84,11 +90,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
|
|||
|
||||
clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(r.ClusterId)
|
||||
|
||||
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg)
|
||||
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, adapterName, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(r.AdapterId, adapterName, r.ClusterId, clusterName, r.TaskName, "create", "任务创建中")
|
||||
|
||||
resp.Results = append(resp.Results, scheResult)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package database
|
|||
|
||||
import (
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||
|
@ -46,6 +47,16 @@ func (s *AiStorage) GetClusterNameById(id string) (string, error) {
|
|||
return name, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) GetAdapterNameById(id string) (string, error) {
|
||||
var name string
|
||||
tx := s.DbEngin.Raw("select `name` from t_adapter where `id` = ?", id).Scan(&name)
|
||||
if tx.Error != nil {
|
||||
logx.Errorf(tx.Error.Error())
|
||||
return "", tx.Error
|
||||
}
|
||||
return name, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) {
|
||||
var list []types.AdapterInfo
|
||||
var ids []string
|
||||
|
@ -102,7 +113,7 @@ func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int6
|
|||
return taskModel.Id, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId string, clusterName string, jobId string, status string, msg string) error {
|
||||
func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, adapterName string, clusterId string, clusterName string, jobId string, status string, msg string) error {
|
||||
// 构建主任务结构体
|
||||
aId, err := strconv.ParseInt(option.AdapterId, 10, 64)
|
||||
if err != nil {
|
||||
|
@ -116,6 +127,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId
|
|||
aiTaskModel := models.TaskAi{
|
||||
TaskId: taskId,
|
||||
AdapterId: aId,
|
||||
AdapterName: adapterName,
|
||||
ClusterId: cId,
|
||||
ClusterName: clusterName,
|
||||
Name: option.TaskName,
|
||||
|
@ -187,13 +199,18 @@ func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterR
|
|||
return &clusterResource, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
|
||||
func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
|
||||
memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error {
|
||||
cId, err := strconv.ParseInt(clusterId, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aId, err := strconv.ParseInt(adapterId, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clusterResource := models.TClusterResource{
|
||||
AdapterId: aId,
|
||||
ClusterId: cId,
|
||||
ClusterName: clusterName,
|
||||
ClusterType: clusterType,
|
||||
|
@ -212,26 +229,45 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c
|
|||
if tx.Error != nil {
|
||||
return tx.Error
|
||||
}
|
||||
|
||||
// prometheus
|
||||
param := tracker.ClusterLoadRecord{
|
||||
ClusterName: clusterName,
|
||||
CpuAvail: cpuAvail,
|
||||
CpuTotal: cpuTotal,
|
||||
MemoryAvail: memAvail,
|
||||
MemoryTotal: memTotal,
|
||||
DiskAvail: diskAvail,
|
||||
DiskTotal: diskTotal,
|
||||
AdapterId: aId,
|
||||
ClusterName: clusterName,
|
||||
CpuAvail: cpuAvail,
|
||||
CpuTotal: cpuTotal,
|
||||
CpuUtilisation: clusterResource.CpuAvail / clusterResource.CpuTotal,
|
||||
MemoryAvail: memAvail,
|
||||
MemoryTotal: memTotal,
|
||||
MemoryUtilisation: clusterResource.MemAvail / clusterResource.MemTotal,
|
||||
DiskAvail: diskAvail,
|
||||
DiskTotal: diskTotal,
|
||||
DiskUtilisation: clusterResource.DiskAvail / clusterResource.DiskTotal,
|
||||
}
|
||||
tracker.SyncClusterLoad(param)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error {
|
||||
tx := s.DbEngin.Model(clusterResource).Updates(clusterResource)
|
||||
tx := s.DbEngin.Where("cluster_id = ?", clusterResource.ClusterId).Updates(clusterResource)
|
||||
|
||||
if tx.Error != nil {
|
||||
return tx.Error
|
||||
}
|
||||
// prometheus
|
||||
param := tracker.ClusterLoadRecord{
|
||||
AdapterId: clusterResource.AdapterId,
|
||||
ClusterName: clusterResource.ClusterName,
|
||||
CpuAvail: clusterResource.CpuAvail,
|
||||
CpuTotal: clusterResource.CpuTotal,
|
||||
CpuUtilisation: clusterResource.CpuAvail / clusterResource.CpuTotal,
|
||||
MemoryAvail: clusterResource.MemAvail,
|
||||
MemoryTotal: clusterResource.MemTotal,
|
||||
MemoryUtilisation: clusterResource.MemAvail / clusterResource.MemTotal,
|
||||
DiskAvail: clusterResource.DiskAvail,
|
||||
DiskTotal: clusterResource.DiskTotal,
|
||||
DiskUtilisation: clusterResource.DiskAvail / clusterResource.DiskTotal,
|
||||
}
|
||||
tracker.SyncClusterLoad(param)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -257,3 +293,28 @@ func (s *AiStorage) GetStrategyCode(name string) (int64, error) {
|
|||
}
|
||||
return strategy, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) AddNoticeInfo(adapterId string, adapterName string, clusterId string, clusterName string, taskName string, noticeType string, incident string) {
|
||||
aId, err := strconv.ParseInt(adapterId, 10, 64)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
cId, err := strconv.ParseInt(clusterId, 10, 64)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
noticeInfo := clientCore.NoticeInfo{
|
||||
AdapterId: aId,
|
||||
AdapterName: adapterName,
|
||||
ClusterId: cId,
|
||||
ClusterName: clusterName,
|
||||
NoticeType: noticeType,
|
||||
TaskName: taskName,
|
||||
Incident: incident,
|
||||
CreatedTime: time.Now(),
|
||||
}
|
||||
result := s.DbEngin.Table("t_notice").Create(¬iceInfo)
|
||||
if result.Error != nil {
|
||||
logx.Errorf("Task creation failure, err: %v", result.Error)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,8 @@ type AiScheduler struct {
|
|||
}
|
||||
|
||||
type AiResult struct {
|
||||
AdapterId string
|
||||
TaskName string
|
||||
JobId string
|
||||
ClusterId string
|
||||
Strategy string
|
||||
|
@ -190,6 +192,8 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
|||
result, _ = convertType(resp)
|
||||
mu.Unlock()
|
||||
|
||||
result.AdapterId = opt.AdapterId
|
||||
result.TaskName = opt.TaskName
|
||||
result.Replica = c.Replicas
|
||||
result.ClusterId = c.ClusterId
|
||||
result.Strategy = as.option.StrategyName
|
||||
|
@ -222,6 +226,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
|||
if err != nil {
|
||||
return nil, errors.New("database add failed: " + err.Error())
|
||||
}
|
||||
adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var errmsg string
|
||||
for _, err := range errs {
|
||||
|
@ -234,7 +242,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
|||
|
||||
clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
|
||||
|
||||
err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, clusterName, "", constants.Failed, msg)
|
||||
err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
|
||||
if err != nil {
|
||||
return nil, errors.New("database add failed: " + err.Error())
|
||||
}
|
||||
|
@ -246,14 +254,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
|
|||
if s.Msg != "" {
|
||||
msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
|
||||
errmsg += msg
|
||||
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, "", constants.Failed, msg)
|
||||
err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
|
||||
if err != nil {
|
||||
return nil, errors.New("database add failed: " + err.Error())
|
||||
}
|
||||
} else {
|
||||
msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
|
||||
errmsg += msg
|
||||
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
|
||||
err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
|
||||
if err != nil {
|
||||
return nil, errors.New("database add failed: " + err.Error())
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
|
||||
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -24,6 +25,8 @@ const (
|
|||
type AiService struct {
|
||||
AiExecutorAdapterMap map[string]map[string]executor.AiExecutor
|
||||
AiCollectorAdapterMap map[string]map[string]collector.AiCollector
|
||||
Storage *database.AiStorage
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService, error) {
|
||||
|
@ -35,12 +38,16 @@ func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService
|
|||
aiService := &AiService{
|
||||
AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor),
|
||||
AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector),
|
||||
Storage: storages,
|
||||
}
|
||||
for _, id := range adapterIds {
|
||||
clusters, err := storages.GetClustersByAdapterId(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(clusters.List) == 0 {
|
||||
continue
|
||||
}
|
||||
exeClusterMap, colClusterMap := InitAiClusterMap(conf, clusters.List)
|
||||
aiService.AiExecutorAdapterMap[id] = exeClusterMap
|
||||
aiService.AiCollectorAdapterMap[id] = colClusterMap
|
||||
|
@ -78,3 +85,11 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
|
|||
|
||||
return executorMap, collectorMap
|
||||
}
|
||||
|
||||
//func (a *AiService) AddCluster() error {
|
||||
//
|
||||
//}
|
||||
//
|
||||
//func (a *AiService) AddAdapter() error {
|
||||
//
|
||||
//}
|
||||
|
|
|
@ -87,7 +87,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
|||
NamingStrategy: schema.NamingStrategy{
|
||||
SingularTable: true, // 使用单数表名,启用该选项,此时,`User` 的表名应该是 `t_user`
|
||||
},
|
||||
Logger: logger.Default.LogMode(logger.Error),
|
||||
Logger: logger.Default.LogMode(logger.Info),
|
||||
})
|
||||
if err != nil {
|
||||
logx.Errorf("数据库连接失败, err%v", err)
|
||||
|
|
|
@ -202,7 +202,7 @@ type CommitVmTaskReq struct {
|
|||
ImageRef int64 `json:"imageRef,optional"`
|
||||
FlavorRef int64 `json:"flavorRef,optional"`
|
||||
Uuid int64 `json:"uuid,optional"`
|
||||
Replicas int32 `json:"replicas,string"`
|
||||
Replicas int64 `json:"replicas,string"`
|
||||
VmName string `json:"vm_name,optional"`
|
||||
}
|
||||
|
||||
|
@ -1166,6 +1166,16 @@ type TaskDetailsResp struct {
|
|||
Strategy int64 `json:"strategy"`
|
||||
SynergyStatus int64 `json:"synergyStatus"`
|
||||
ClusterInfos []*ClusterInfo `json:"clusterInfos"`
|
||||
SubTaskInfos []*SubTaskInfo `json:"subTaskInfos"`
|
||||
}
|
||||
|
||||
type SubTaskInfo struct {
|
||||
Id string `json:"id" db:"id"`
|
||||
Name string `json:"name" db:"name"`
|
||||
ClusterId string `json:"clusterId" db:"cluster_id"`
|
||||
ClusterName string `json:"clusterName" db:"cluster_name"`
|
||||
Status string `json:"status" db:"status"`
|
||||
Remark string `json:"remark" db:"remark"`
|
||||
}
|
||||
|
||||
type CommitHpcTaskReq struct {
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
type TaskCloudModel struct {
|
||||
Id uint `json:"id" gorm:"primarykey;not null;comment:id"`
|
||||
Name string `json:"name" gorm:"null;comment:名称"`
|
||||
TaskId uint `json:"taskId" gorm:"not null;comment:task表id"`
|
||||
AdapterId uint `json:"adapterId" gorm:"not null;comment:适配器id"`
|
||||
AdapterName string `json:"adapterName" gorm:"not null;comment:适配器名称"`
|
||||
|
|
|
@ -37,7 +37,7 @@ type (
|
|||
TaskVm struct {
|
||||
Id int64 `db:"id"` // id
|
||||
TaskId int64 `db:"task_id"` // 任务id
|
||||
Name string `db:"name"` // 虚拟机名称
|
||||
Name string `db:"name"` // 任务名称
|
||||
AdapterId int64 `db:"adapter_id"` // 执行任务的适配器id
|
||||
AdapterName string `db:"adapter_name"` // 适配器名称
|
||||
ClusterId int64 `db:"cluster_id"` // 执行任务的集群id
|
||||
|
@ -53,7 +53,10 @@ type (
|
|||
StartTime string `db:"start_time"` // 开始时间
|
||||
RunningTime string `db:"running_time"` // 运行时间
|
||||
Result string `db:"result"` // 运行结果
|
||||
Remark string `db:"remark"` // 备注
|
||||
DeletedAt string `db:"deleted_at"` // 删除时间
|
||||
VmName string `db:"vm_name"` // 虚拟机名称
|
||||
Replicas int64 `db:"replicas"` // 副本数
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -92,14 +95,14 @@ func (m *defaultTaskVmModel) FindOne(ctx context.Context, id int64) (*TaskVm, er
|
|||
}
|
||||
|
||||
func (m *defaultTaskVmModel) Insert(ctx context.Context, data *TaskVm) (sql.Result, error) {
|
||||
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet)
|
||||
ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt)
|
||||
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskVmRowsExpectAutoSet)
|
||||
ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas)
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (m *defaultTaskVmModel) Update(ctx context.Context, data *TaskVm) error {
|
||||
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskVmRowsWithPlaceHolder)
|
||||
_, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.DeletedAt, data.Id)
|
||||
_, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.Name, data.AdapterId, data.AdapterName, data.ClusterId, data.ClusterName, data.FlavorRef, data.ImageRef, data.Status, data.Platform, data.Description, data.AvailabilityZone, data.MinCount, data.Uuid, data.StartTime, data.RunningTime, data.Result, data.Remark, data.DeletedAt, data.VmName, data.Replicas, data.Id)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ type (
|
|||
GpuTotal float64 `db:"gpu_total"`
|
||||
CardTotal int64 `db:"card_total"` // 算力卡数量
|
||||
CardTopsTotal float64 `db:"card_tops_total"` // 算力总量tops
|
||||
AdapterId int64 `db:"adapter_id"`
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -86,14 +87,14 @@ func (m *defaultTClusterResourceModel) FindOne(ctx context.Context, clusterId in
|
|||
}
|
||||
|
||||
func (m *defaultTClusterResourceModel) Insert(ctx context.Context, data *TClusterResource) (sql.Result, error) {
|
||||
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet)
|
||||
ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal)
|
||||
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, tClusterResourceRowsExpectAutoSet)
|
||||
ret, err := m.conn.ExecCtx(ctx, query, data.ClusterId, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.AdapterId)
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (m *defaultTClusterResourceModel) Update(ctx context.Context, data *TClusterResource) error {
|
||||
query := fmt.Sprintf("update %s set %s where `cluster_id` = ?", m.table, tClusterResourceRowsWithPlaceHolder)
|
||||
_, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.ClusterId)
|
||||
_, err := m.conn.ExecCtx(ctx, query, data.ClusterName, data.ClusterType, data.CpuAvail, data.CpuTotal, data.MemAvail, data.MemTotal, data.DiskAvail, data.DiskTotal, data.GpuAvail, data.GpuTotal, data.CardTotal, data.CardTopsTotal, data.AdapterId, data.ClusterId)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue