Former-commit-id: f3af07e360c1c219b0c66df0a5109dd3c3d1836b
This commit is contained in:
zhangwei 2024-03-27 14:37:33 +08:00
commit b1b7fbe0e2
25 changed files with 534 additions and 242 deletions

View File

@ -37,6 +37,7 @@ type HpcInfo struct {
Id int64 `json:"id"` // id
TaskId int64 `json:"task_id"` // 任务id
JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id)
AdapterId int64 `json:"adapter_id"` // 执行任务的适配器id
ClusterId int64 `json:"cluster_id"` // 执行任务的集群id
ClusterType string `json:"cluster_type"` // 执行任务的集群类型
Name string `json:"name"` // 名称

View File

@ -727,6 +727,8 @@ type ClusterRelationInfo {
CLabel string `json:"cLabel,omitempty" db:"label"`
COwnerId string `json:"cOwnerId,omitempty" db:"owner_id"`
CAuthType string `json:"cAuthType,omitempty" db:"auth_type"`
CRegionDict string `json:"cRegionDict,omitempty" db:"-"`
CProducerDict string `json:"cProducerDict,omitempty" db:"-"`
CCreateTime string `json:"cCreateTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
}

View File

@ -8,29 +8,20 @@ info(
version: "type version here"
)
type Job {
SlurmVersion string `json:"slurmVersion"`
name string `json:"name"`
JobStartTime string `json:"JobStartTime"`
JobRunTime string `json:"JobRunTime"`
StateofJob string `json:"StateofJob"`
}
type (
commitHpcTaskReq {
Name string `json:"name"` // paratera:jobName
Name string `json:"name"` // paratera:jobName
Description string `json:"description,optional"`
tenantId int64 `json:"tenantId,optional"`
TaskId int64 `json:"taskId,optional"`
AdapterId int64 `json:"adapterId,optional"`
MatchLabels map[string]string `json:"matchLabels,optional"`
CardCount int64 `json:"cardCount,optional"`
WorkDir string `json:"workDir,optional"` //paratera:workingDir
WorkDir string `json:"workDir,optional"` //paratera:workingDir
WallTime string `json:"wallTime,optional"`
CmdScript string `json:"cmdScript,optional"` // paratera:bootScript
AppType string `json:"appType,optional"`
AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
Queue string `json:"queue,optional"`
NNode string `json:"nNode,optional"`
SubmitType string `json:"submitType,optional"`
@ -40,9 +31,7 @@ type (
Environment map[string]string `json:"environment,optional"`
ClusterType string `json:"clusterType,optional"`
}
)
type (
commitHpcTaskResp {
TaskId int64 `json:"taskId"`
Code int32 `json:"code"`
@ -51,35 +40,76 @@ type (
)
type (
listJobReq {
hpcOverViewReq {
}
listJobResp {
hpcOverViewResp {
Code int32 `json:"code"`
Msg string `json:"msg"`
RecordCount int32 `json:"recordCount"`
Jobs []Job `json:"jobInfos"`
Data HPCOverView `json:"data"`
}
HPCOverView {
AdapterCount int32 `json:"adapterCount"`
StackCount int32 `json:"stackCount"`
ClusterCount int32 `json:"clusterCount"`
TaskCount int32 `json:"taskCount"`
}
)
type HistoryJob {
SlurmVersion string `json:"slurmVersion"`
name string `json:"name"`
JobStartTime string `json:"JobStartTime"`
JobRunTime string `json:"JobRunTime"`
StateofJob string `json:"StateofJob"`
}
type (
listHistoryJobReq {
hpcAdapterSummaryReq {
}
listHistoryJobResp {
hpcAdapterSummaryResp {
Code int32 `json:"code"`
Msg string `json:"msg"`
RecordCount int32 `json:"recordCount"`
HistoryJobs []HistoryJob `json:"jobInfoDbs"`
Data []HPCAdapterSummary `json:"data"`
}
HPCAdapterSummary {
AdapterName string `json:"adapterName"`
StackCount int32 `json:"stackCount"`
ClusterCount int32 `json:"clusterCount"`
TaskCount int32 `json:"taskCount"`
}
)
type (
hpcJobReq {
}
hpcJobResp {
Code int32 `json:"code"`
Msg string `json:"msg"`
Data []Job `json:"data"`
}
Job {
JobName string `json:"jobName"`
JobDesc string `json:"jobDesc"`
SubmitTime string `json:"submitTime"`
JobStatus string `json:"jobStatus"`
AdapterName string `json:"adapterName"`
ClusterName string `json:"clusterName"`
ClusterType string `json:"clusterType"`
}
)
type (
hpcResourceReq {
}
hpcResourceResp {
Code int32 `json:"code"`
Msg string `json:"msg"`
HPCResource HPCResource `json:"hpcResource"`
}
HPCResource {
GPUCardsTotal int32 `json:"gpuCoresTotal"`
CPUCoresTotal int32 `json:"cpuCoresTotal"`
RAMTotal int32 `json:"ramTotal"`
GPUCardsUsed int32 `json:"gpuCoresUsed"`
CPUCoresUsed int32 `json:"cpuCoresUsed"`
RAMUsed int32 `json:"ramUsed"`
GPURate float32 `json:"gpuRate"`
CPURate float32 `json:"cpuRate"`
RAMRate float32 `json:"ramRate"`
}
)
type QueueAssetsResp {
QueueAssets []QueueAsset `json:"queueAsset"`

View File

@ -23,6 +23,7 @@ type HpcInfo struct {
Id int64 `json:"id"` // id
TaskId int64 `json:"task_id"` // 任务id
JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id)
AdapterId int64 `json:"adapter_id"` // 执行任务的适配器id
ClusterId int64 `json:"cluster_id"` // 执行任务的集群id
ClusterType string `json:"cluster_type"` // 执行任务的集群类型
Name string `json:"name"` // 名称

View File

@ -141,13 +141,17 @@ service pcm {
@handler commitHpcTaskHandler
post /hpc/commitHpcTask (commitHpcTaskReq) returns (commitHpcTaskResp)
@doc "超算查询任务列表"
@handler listJobHandler
get /hpc/listJob (listJobReq) returns (listJobResp)
@doc "超算总览"
@handler overViewHandler
get /hpc/overview (hpcOverViewReq) returns (hpcOverViewResp)
@doc "超算查询历史任务列表"
@handler listHistoryJobHandler
get /hpc/listHistoryJob (listHistoryJobReq) returns (listHistoryJobResp)
@doc "超算适配器列表"
@handler adapterSummaryHandler
get /hpc/adapterSummary (hpcAdapterSummaryReq) returns (hpcAdapterSummaryResp)
@doc "超算查询任务列表"
@handler jobHandler
get /hpc/job (hpcJobReq) returns (hpcJobResp)
@doc "超算查询资产列表"
@handler queueAssetsHandler
@ -891,6 +895,9 @@ service pcm {
@handler ScheduleGetStrategyHandler
get /schedule/ai/getStrategies returns (AiStrategyResp)
@handler ScheduleGetAlgorithmsHandler
get /schedule/ai/getAlgorithms (AiAlgorithmsReq) returns (AiAlgorithmsResp)
@handler ScheduleSubmitHandler
post /schedule/submit (ScheduleReq) returns (ScheduleResp)
}

View File

@ -41,4 +41,14 @@ type (
AiStrategyResp {
Strategies []string `json:"strategies"`
}
AiAlgorithmsReq {
ResourceType string `json:"resourceType"`
TaskType string `json:"taskType"`
Dataset string `json:"dataset"`
}
AiAlgorithmsResp {
Algorithms []string `json:"algorithms"`
}
)

View File

@ -5,7 +5,7 @@ Port: 8999
Timeout: 50000
DB:
DataSource: root:uJpLd6u-J?HC1@(10.206.0.12:3306)/pcm?parseTime=true&loc=Local
DataSource: root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local
Redis:
Host: 10.206.0.12:6379

View File

@ -0,0 +1,28 @@
package hpc
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/hpc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
)
func AdapterSummaryHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.HpcAdapterSummaryReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := hpc.NewAdapterSummaryLogic(r.Context(), svcCtx)
resp, err := l.AdapterSummary(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -1,24 +1,28 @@
package hpc
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/hpc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
)
func ListJobHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
func JobHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ListJobReq
var req types.HpcJobReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := hpc.NewListJobLogic(r.Context(), svcCtx)
resp, err := l.ListJob(&req)
result.HttpResult(r, w, resp, err)
l := hpc.NewJobLogic(r.Context(), svcCtx)
resp, err := l.Job(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,28 @@
package hpc
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/hpc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
)
func OverViewHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.HpcOverViewReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := hpc.NewOverViewLogic(r.Context(), svcCtx)
resp, err := l.OverView(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -167,13 +167,18 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
},
{
Method: http.MethodGet,
Path: "/hpc/listJob",
Handler: hpc.ListJobHandler(serverCtx),
Path: "/hpc/overview",
Handler: hpc.OverViewHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/hpc/listHistoryJob",
Handler: hpc.ListHistoryJobHandler(serverCtx),
Path: "/hpc/adapterSummary",
Handler: hpc.AdapterSummaryHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/hpc/job",
Handler: hpc.JobHandler(serverCtx),
},
{
Method: http.MethodGet,
@ -1104,6 +1109,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/schedule/ai/getStrategies",
Handler: schedule.ScheduleGetStrategyHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/schedule/ai/getAlgorithms",
Handler: schedule.ScheduleGetAlgorithmsHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/schedule/submit",

View File

@ -1,24 +1,24 @@
package hpc
package schedule
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/hpc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
)
func ListHistoryJobHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
func ScheduleGetAlgorithmsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ListHistoryJobReq
var req types.AiAlgorithmsReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := hpc.NewListHistoryJobLogic(r.Context(), svcCtx)
resp, err := l.ListHistoryJob(&req)
l := schedule.NewScheduleGetAlgorithmsLogic(r.Context(), svcCtx)
resp, err := l.ScheduleGetAlgorithms(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -75,6 +75,8 @@ func (l *GetAdapterRelationLogic) GetAdapterRelation(req *types.AdapterRelationQ
cr.CLabel = c.Label
cr.COwnerId = c.OwnerId
cr.CAuthType = c.AuthType
cr.CRegionDict = c.RegionDict
cr.CProducerDict = c.ProducerDict
cr.CCreateTime = c.CreateTime
rlist = append(rlist, cr)
}

View File

@ -0,0 +1,37 @@
package hpc
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type AdapterSummaryLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewAdapterSummaryLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AdapterSummaryLogic {
return &AdapterSummaryLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *AdapterSummaryLogic) AdapterSummary(req *types.HpcAdapterSummaryReq) (resp *types.HpcAdapterSummaryResp, err error) {
var hpcAdapterSummary []types.HPCAdapterSummary
l.svcCtx.DbEngin.Raw("SELECT ta.NAME AS adapter_name,count( DISTINCT label ) as stack_count,count( DISTINCT tc.id ) as cluster_count,count( DISTINCT th.id) as task_count FROM t_adapter ta LEFT JOIN t_cluster tc ON ta.id = tc.adapter_id LEFT JOIN task_hpc th ON ta.id = th.adapter_id WHERE ta.type = 2 GROUP BY ta.id").Scan(&hpcAdapterSummary)
resp = &types.HpcAdapterSummaryResp{
Code: 200,
Msg: "success",
Data: hpcAdapterSummary,
}
return resp, nil
}

View File

@ -0,0 +1,36 @@
package hpc
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type JobLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *JobLogic {
return &JobLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *JobLogic) Job(req *types.HpcJobReq) (resp *types.HpcJobResp, err error) {
var job []types.Job
l.svcCtx.DbEngin.Raw("SELECT th.NAME as job_name,t.description as job_desc,t.commit_time as submit_time,th.STATUS as job_status,ta.name as adapter_name,tc.name as cluster_name,tc.label as cluster_type FROM task_hpc th LEFT JOIN task t ON t.id = th.task_id JOIN t_cluster tc on th.cluster_id = tc.id JOIN t_adapter ta on tc.adapter_id = ta.id").Scan(&job)
resp = &types.HpcJobResp{
Code: 200,
Msg: "success",
Data: job,
}
return resp, nil
}

View File

@ -1,77 +0,0 @@
/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package hpc
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/enum"
"strings"
"github.com/zeromicro/go-zero/core/logx"
)
type ListHistoryJobLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewListHistoryJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ListHistoryJobLogic {
return &ListHistoryJobLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ListHistoryJobLogic) ListHistoryJob(req *types.ListHistoryJobReq) (resp *types.ListHistoryJobResp, err error) {
resp = &types.ListHistoryJobResp{}
var tasks []types.HistoryJob
// 查询任务数据
tx := l.svcCtx.DbEngin.Raw("SELECT h.service_name as SlurmVersion,h.name,h.start_time as JobStartTime,h.running_time as JobRunTime,t.status as StateofJob from hpc h join task t on t.id = h.task_id and t.status = 'Completed'").Scan(&tasks)
if tx.Error != nil {
logx.Error(err)
return nil, tx.Error
}
for _, task := range tasks {
// 承接方转义
if task.SlurmVersion != "" {
var names []string
servicesName := strings.Split(task.SlurmVersion, ",")
for _, name := range servicesName {
names = append(names, enum.Partner(name).String())
}
task.SlurmVersion = strings.Join(names, ",")
}
resp.HistoryJobs = append(resp.HistoryJobs, types.HistoryJob{
SlurmVersion: task.SlurmVersion,
Name: task.Name,
JobStartTime: task.JobStartTime,
JobRunTime: task.JobRunTime,
StateofJob: task.StateofJob,
})
}
resp.Code = 200
resp.Msg = "success"
resp.RecordCount = int32(len(resp.HistoryJobs))
return resp, nil
}

View File

@ -1,77 +0,0 @@
/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package hpc
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/enum"
"strings"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type ListJobLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewListJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ListJobLogic {
return &ListJobLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ListJobLogic) ListJob(req *types.ListJobReq) (resp *types.ListJobResp, err error) {
resp = &types.ListJobResp{}
var tasks []types.Job
// 查询任务数据
tx := l.svcCtx.DbEngin.Raw("SELECT h.service_name as SlurmVersion,h.name,h.start_time as JobStartTime,h.running_time as JobRunTime,t.status as StateofJob from hpc h join task t on t.id = h.task_id and t.status != 'Completed'").Scan(&tasks)
if tx.Error != nil {
logx.Error(err)
return nil, tx.Error
}
for _, task := range tasks {
// 承接方转义
if task.SlurmVersion != "" {
var names []string
servicesName := strings.Split(task.SlurmVersion, ",")
for _, name := range servicesName {
names = append(names, enum.Partner(name).String())
}
task.SlurmVersion = strings.Join(names, ",")
}
resp.Jobs = append(resp.Jobs, types.Job{
SlurmVersion: task.SlurmVersion,
Name: task.Name,
JobStartTime: task.JobStartTime,
JobRunTime: task.JobRunTime + "s",
StateofJob: task.StateofJob,
})
}
resp.Code = 200
resp.Msg = "success"
resp.RecordCount = int32(len(resp.Jobs))
return resp, nil
}

View File

@ -0,0 +1,37 @@
package hpc
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type OverViewLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewOverViewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *OverViewLogic {
return &OverViewLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *OverViewLogic) OverView(req *types.HpcOverViewReq) (resp *types.HpcOverViewResp, err error) {
var overView types.HPCOverView
tx := l.svcCtx.DbEngin.Raw("SELECT count( DISTINCT a.id ) as adapter_count,count( DISTINCT label ) as stack_count,count( c.id ) as cluster_count,(select count(*) from task_hpc) as task_count FROM t_cluster c JOIN t_adapter a ON c.adapter_id = a.id AND a.type = 2 ").Scan(&overView)
println(tx)
resp = &types.HpcOverViewResp{
Code: 200,
Msg: "success",
Data: overView,
}
return resp, nil
}

View File

@ -0,0 +1,30 @@
package schedule
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type ScheduleGetAlgorithmsLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewScheduleGetAlgorithmsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleGetAlgorithmsLogic {
return &ScheduleGetAlgorithmsLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ScheduleGetAlgorithmsLogic) ScheduleGetAlgorithms(req *types.AiAlgorithmsReq) (resp *types.AiAlgorithmsResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -3,6 +3,7 @@ package collector
type AiCollector interface {
GetResourceStats() (*ResourceStats, error)
GetDatasetsSpecs() ([]*DatasetsSpecs, error)
GetAlgorithms() ([]*Algorithm, error)
}
type ResourceStats struct {
@ -33,3 +34,9 @@ type DatasetsSpecs struct {
Name string
Size string
}
type Algorithm struct {
Name string
Platform string
TaskType string
}

View File

@ -157,6 +157,10 @@ func (m *ModelArtsLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
return nil, nil
}
func (m *ModelArtsLink) GetAlgorithms() ([]*collector.Algorithm, error) {
return nil, nil
}
func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
err := m.GenerateSubmitParams(option)
if err != nil {

View File

@ -315,6 +315,29 @@ func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
return specs, nil
}
func (o *OctopusLink) GetAlgorithms() ([]*collector.Algorithm, error) {
var algorithms []*collector.Algorithm
req := &octopus.GetMyAlgorithmListReq{
Platform: o.platform,
PageIndex: o.pageIndex,
PageSize: o.pageSize,
}
resp, err := o.svcCtx.OctopusRpc.GetMyAlgorithmList(o.ctx, req)
if err != nil {
return nil, err
}
if !resp.Success {
return nil, errors.New("failed to get algorithms")
}
for _, a := range resp.Payload.Algorithms {
algorithm := &collector.Algorithm{Name: a.AlgorithmName, Platform: OCTOPUS, TaskType: strings.ToLower(a.FrameworkName)}
algorithms = append(algorithms, algorithm)
}
return algorithms, nil
}
func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
err := o.GenerateSubmitParams(option)
if err != nil {

View File

@ -267,12 +267,15 @@ func (s *ShuguangAi) QuerySpecs() (interface{}, error) {
}
func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
//balance
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := s.svcCtx.ACRpc.GetUserInfo(s.ctx, userReq)
if err != nil {
return nil, err
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
//resource limit
limitReq := &hpcAC.QueueReq{}
limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
if err != nil {
@ -281,20 +284,54 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
totalCpu := limitResp.Data.AccountMaxCpu
totalDcu := limitResp.Data.AccountMaxDcu
//disk
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
if err != nil {
return nil, err
}
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB, 3)
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
generalInfo, err := s.svcCtx.ACRpc.GetGeneralInfo(s.ctx, nil)
memSize := common.RoundFloat(float64(generalInfo.MemoryInGib)*KB*KB, 3)
//memory
nodeResp, err := s.svcCtx.ACRpc.GetNodeResources(s.ctx, nil)
if err != nil {
return nil, err
}
memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES
//resources being occupied
memberJobResp, err := s.svcCtx.ACRpc.GetMemberJobs(s.ctx, nil)
if err != nil {
return nil, err
}
var CpuCoreAvail int64
var MemAvail float64
if len(memberJobResp.Data) != 0 {
CpuCoreAvail = totalCpu
MemAvail = memSize
} else {
var cpuCoreUsed int64
var memUsed float64
for _, datum := range memberJobResp.Data {
cpuCoreUsed += datum.CpuCore
}
memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core
if cpuCoreUsed > totalCpu {
CpuCoreAvail = 0
} else {
CpuCoreAvail = totalCpu - cpuCoreUsed
}
if memUsed > memSize {
MemAvail = 0
} else {
MemAvail = memSize - memUsed
}
}
//usable hours
var cards []*collector.Card
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)
@ -312,11 +349,11 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
Name: s.platform,
Balance: balance,
CpuCoreTotal: totalCpu,
CpuCoreAvail: 0,
CpuCoreAvail: CpuCoreAvail,
DiskTotal: totalDisk,
DiskAvail: availDisk,
MemTotal: memSize,
MemAvail: 0,
MemAvail: MemAvail,
CpuCoreHours: cpuHours,
CardsAvail: cards,
}
@ -341,6 +378,26 @@ func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
return specs, nil
}
func (s *ShuguangAi) GetAlgorithms() ([]*collector.Algorithm, error) {
var algorithms []*collector.Algorithm
for _, t := range GetTaskTypes() {
taskType := t
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0}
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
if err != nil {
return nil, err
}
if list.Code != "0" {
return nil, errors.New(list.Msg)
}
for _, file := range list.Data.FileList {
algorithm := &collector.Algorithm{Name: file.Name, Platform: SHUGUANGAI, TaskType: taskType}
algorithms = append(algorithms, algorithm)
}
}
return algorithms, nil
}
func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
err := s.GenerateSubmitParams(option)
if err != nil {

View File

@ -27,6 +27,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
"gorm.io/gorm"
"strings"
)
type Linkage interface {
@ -152,6 +153,48 @@ func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string,
return names, nil
}
func GetAlgorithms(collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
var names []string
colMap := *collectorMap
for _, col := range colMap {
var ns []string
algorithms, err := col.GetAlgorithms()
if err != nil {
return nil, err
}
for _, algorithm := range algorithms {
if algorithm.TaskType != taskType {
continue
}
switch algorithm.Platform {
case OCTOPUS:
splitns := strings.Split(algorithm.Name, UNDERSCORE)
if dataset != splitns[0] || len(splitns) == 1 {
continue
}
ns = append(ns, splitns[1])
case SHUGUANGAI:
splitns := strings.Split(algorithm.Name, DASH)
if dataset != splitns[0] || len(splitns) == 1 {
continue
}
ns = append(ns, splitns[1])
}
}
if len(ns) == 0 {
continue
}
if len(names) == 0 {
names = ns
continue
}
names = common.IntersectString(names, ns)
}
names = common.RemoveDuplicates(names)
return names, nil
}
func GetTaskTypes() []string {
return taskTypes
}

View File

@ -704,6 +704,8 @@ type ClusterRelationInfo struct {
CLabel string `json:"cLabel,omitempty" db:"label"`
COwnerId string `json:"cOwnerId,omitempty" db:"owner_id"`
CAuthType string `json:"cAuthType,omitempty" db:"auth_type"`
CRegionDict string `json:"cRegionDict,omitempty" db:"-"`
CProducerDict string `json:"cProducerDict,omitempty" db:"-"`
CCreateTime string `json:"cCreateTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
}
@ -835,14 +837,6 @@ type PageResult struct {
PageSize int `json:"pageSize,omitempty"`
}
type Job struct {
SlurmVersion string `json:"slurmVersion"`
Name string `json:"name"`
JobStartTime string `json:"JobStartTime"`
JobRunTime string `json:"JobRunTime"`
StateofJob string `json:"StateofJob"`
}
type CommitHpcTaskReq struct {
Name string `json:"name"` // paratera:jobName
Description string `json:"description,optional"`
@ -872,32 +866,76 @@ type CommitHpcTaskResp struct {
Msg string `json:"msg"`
}
type ListJobReq struct {
type HpcOverViewReq struct {
}
type ListJobResp struct {
Code int32 `json:"code"`
Msg string `json:"msg"`
RecordCount int32 `json:"recordCount"`
Jobs []Job `json:"jobInfos"`
type HpcOverViewResp struct {
Code int32 `json:"code"`
Msg string `json:"msg"`
Data HPCOverView `json:"data"`
}
type HistoryJob struct {
SlurmVersion string `json:"slurmVersion"`
Name string `json:"name"`
JobStartTime string `json:"JobStartTime"`
JobRunTime string `json:"JobRunTime"`
StateofJob string `json:"StateofJob"`
type HPCOverView struct {
AdapterCount int32 `json:"adapterCount"`
StackCount int32 `json:"stackCount"`
ClusterCount int32 `json:"clusterCount"`
TaskCount int32 `json:"taskCount"`
}
type ListHistoryJobReq struct {
type HpcAdapterSummaryReq struct {
}
type ListHistoryJobResp struct {
Code int32 `json:"code"`
Msg string `json:"msg"`
RecordCount int32 `json:"recordCount"`
HistoryJobs []HistoryJob `json:"jobInfoDbs"`
type HpcAdapterSummaryResp struct {
Code int32 `json:"code"`
Msg string `json:"msg"`
Data []HPCAdapterSummary `json:"data"`
}
type HPCAdapterSummary struct {
AdapterName string `json:"adapterName"`
StackCount int32 `json:"stackCount"`
ClusterCount int32 `json:"clusterCount"`
TaskCount int32 `json:"taskCount"`
}
type HpcJobReq struct {
}
type HpcJobResp struct {
Code int32 `json:"code"`
Msg string `json:"msg"`
Data []Job `json:"data"`
}
type Job struct {
JobName string `json:"jobName"`
JobDesc string `json:"jobDesc"`
SubmitTime string `json:"submitTime"`
JobStatus string `json:"jobStatus"`
AdapterName string `json:"adapterName"`
ClusterName string `json:"clusterName"`
ClusterType string `json:"clusterType"`
}
type HpcResourceReq struct {
}
type HpcResourceResp struct {
Code int32 `json:"code"`
Msg string `json:"msg"`
HPCResource HPCResource `json:"hpcResource"`
}
type HPCResource struct {
GPUCardsTotal int32 `json:"gpuCoresTotal"`
CPUCoresTotal int32 `json:"cpuCoresTotal"`
RAMTotal int32 `json:"ramTotal"`
GPUCardsUsed int32 `json:"gpuCoresUsed"`
CPUCoresUsed int32 `json:"cpuCoresUsed"`
RAMUsed int32 `json:"ramUsed"`
GPURate float32 `json:"gpuRate"`
CPURate float32 `json:"cpuRate"`
RAMRate float32 `json:"ramRate"`
}
type QueueAssetsResp struct {
@ -5169,6 +5207,16 @@ type AiStrategyResp struct {
Strategies []string `json:"strategies"`
}
type AiAlgorithmsReq struct {
ResourceType string `json:"resourceType"`
TaskType string `json:"taskType"`
Dataset string `json:"dataset"`
}
type AiAlgorithmsResp struct {
Algorithms []string `json:"algorithms"`
}
type PullTaskInfoReq struct {
AdapterId int64 `form:"adapterId"`
}
@ -5184,6 +5232,7 @@ type HpcInfo struct {
Id int64 `json:"id"` // id
TaskId int64 `json:"task_id"` // 任务id
JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id)
AdapterId int64 `json:"adapter_id"` // 执行任务的适配器id
ClusterId int64 `json:"cluster_id"` // 执行任务的集群id
ClusterType string `json:"cluster_type"` // 执行任务的集群类型
Name string `json:"name"` // 名称