Merge pull request 'added ai scheduler getAlgorithms by tasktype and dataset' (#74) from tzwang/pcm-coordinator:master into master
Former-commit-id: fc2264d7ab23ddd51210e6f49e0039a9222c6a3e
This commit is contained in:
commit
e6dca0708e
|
@ -891,6 +891,9 @@ service pcm {
|
||||||
@handler ScheduleGetStrategyHandler
|
@handler ScheduleGetStrategyHandler
|
||||||
get /schedule/ai/getStrategies returns (AiStrategyResp)
|
get /schedule/ai/getStrategies returns (AiStrategyResp)
|
||||||
|
|
||||||
|
@handler ScheduleGetAlgorithmsHandler
|
||||||
|
get /schedule/ai/getAlgorithms (AiAlgorithmsReq) returns (AiAlgorithmsResp)
|
||||||
|
|
||||||
@handler ScheduleSubmitHandler
|
@handler ScheduleSubmitHandler
|
||||||
post /schedule/submit (ScheduleReq) returns (ScheduleResp)
|
post /schedule/submit (ScheduleReq) returns (ScheduleResp)
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,4 +41,14 @@ type (
|
||||||
AiStrategyResp {
|
AiStrategyResp {
|
||||||
Strategies []string `json:"strategies"`
|
Strategies []string `json:"strategies"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AiAlgorithmsReq {
|
||||||
|
ResourceType string `json:"resourceType"`
|
||||||
|
TaskType string `json:"taskType"`
|
||||||
|
Dataset string `json:"dataset"`
|
||||||
|
}
|
||||||
|
|
||||||
|
AiAlgorithmsResp {
|
||||||
|
Algorithms []string `json:"algorithms"`
|
||||||
|
}
|
||||||
)
|
)
|
|
@ -3,6 +3,7 @@ package collector
|
||||||
type AiCollector interface {
|
type AiCollector interface {
|
||||||
GetResourceStats() (*ResourceStats, error)
|
GetResourceStats() (*ResourceStats, error)
|
||||||
GetDatasetsSpecs() ([]*DatasetsSpecs, error)
|
GetDatasetsSpecs() ([]*DatasetsSpecs, error)
|
||||||
|
GetAlgorithms() ([]*Algorithm, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ResourceStats struct {
|
type ResourceStats struct {
|
||||||
|
@ -33,3 +34,9 @@ type DatasetsSpecs struct {
|
||||||
Name string
|
Name string
|
||||||
Size string
|
Size string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Algorithm struct {
|
||||||
|
Name string
|
||||||
|
Platform string
|
||||||
|
TaskType string
|
||||||
|
}
|
||||||
|
|
|
@ -157,6 +157,10 @@ func (m *ModelArtsLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *ModelArtsLink) GetAlgorithms() ([]*collector.Algorithm, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
|
func (m *ModelArtsLink) Execute(option *option.AiOption) (interface{}, error) {
|
||||||
err := m.GenerateSubmitParams(option)
|
err := m.GenerateSubmitParams(option)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -315,6 +315,29 @@ func (o *OctopusLink) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
||||||
return specs, nil
|
return specs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (o *OctopusLink) GetAlgorithms() ([]*collector.Algorithm, error) {
|
||||||
|
var algorithms []*collector.Algorithm
|
||||||
|
|
||||||
|
req := &octopus.GetMyAlgorithmListReq{
|
||||||
|
Platform: o.platform,
|
||||||
|
PageIndex: o.pageIndex,
|
||||||
|
PageSize: o.pageSize,
|
||||||
|
}
|
||||||
|
resp, err := o.svcCtx.OctopusRpc.GetMyAlgorithmList(o.ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !resp.Success {
|
||||||
|
return nil, errors.New("failed to get algorithms")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, a := range resp.Payload.Algorithms {
|
||||||
|
algorithm := &collector.Algorithm{Name: a.AlgorithmName, Platform: OCTOPUS, TaskType: strings.ToLower(a.FrameworkName)}
|
||||||
|
algorithms = append(algorithms, algorithm)
|
||||||
|
}
|
||||||
|
return algorithms, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
|
func (o *OctopusLink) Execute(option *option.AiOption) (interface{}, error) {
|
||||||
err := o.GenerateSubmitParams(option)
|
err := o.GenerateSubmitParams(option)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -267,12 +267,15 @@ func (s *ShuguangAi) QuerySpecs() (interface{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
||||||
|
//balance
|
||||||
userReq := &hpcAC.GetUserInfoReq{}
|
userReq := &hpcAC.GetUserInfoReq{}
|
||||||
userinfo, err := s.svcCtx.ACRpc.GetUserInfo(s.ctx, userReq)
|
userinfo, err := s.svcCtx.ACRpc.GetUserInfo(s.ctx, userReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
|
||||||
|
|
||||||
|
//resource limit
|
||||||
limitReq := &hpcAC.QueueReq{}
|
limitReq := &hpcAC.QueueReq{}
|
||||||
limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
|
limitResp, err := s.svcCtx.ACRpc.QueryUserQuotasLimit(s.ctx, limitReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -281,20 +284,54 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
||||||
totalCpu := limitResp.Data.AccountMaxCpu
|
totalCpu := limitResp.Data.AccountMaxCpu
|
||||||
totalDcu := limitResp.Data.AccountMaxDcu
|
totalDcu := limitResp.Data.AccountMaxDcu
|
||||||
|
|
||||||
|
//disk
|
||||||
diskReq := &hpcAC.ParaStorQuotaReq{}
|
diskReq := &hpcAC.ParaStorQuotaReq{}
|
||||||
diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
|
diskResp, err := s.svcCtx.ACRpc.ParaStorQuota(s.ctx, diskReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB, 3)
|
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
|
||||||
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB, 3)
|
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
|
||||||
|
|
||||||
generalInfo, err := s.svcCtx.ACRpc.GetGeneralInfo(s.ctx, nil)
|
//memory
|
||||||
memSize := common.RoundFloat(float64(generalInfo.MemoryInGib)*KB*KB, 3)
|
nodeResp, err := s.svcCtx.ACRpc.GetNodeResources(s.ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES
|
||||||
|
|
||||||
|
//resources being occupied
|
||||||
|
memberJobResp, err := s.svcCtx.ACRpc.GetMemberJobs(s.ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var CpuCoreAvail int64
|
||||||
|
var MemAvail float64
|
||||||
|
if len(memberJobResp.Data) != 0 {
|
||||||
|
CpuCoreAvail = totalCpu
|
||||||
|
MemAvail = memSize
|
||||||
|
} else {
|
||||||
|
var cpuCoreUsed int64
|
||||||
|
var memUsed float64
|
||||||
|
for _, datum := range memberJobResp.Data {
|
||||||
|
cpuCoreUsed += datum.CpuCore
|
||||||
|
}
|
||||||
|
memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core
|
||||||
|
if cpuCoreUsed > totalCpu {
|
||||||
|
CpuCoreAvail = 0
|
||||||
|
} else {
|
||||||
|
CpuCoreAvail = totalCpu - cpuCoreUsed
|
||||||
|
}
|
||||||
|
if memUsed > memSize {
|
||||||
|
MemAvail = 0
|
||||||
|
} else {
|
||||||
|
MemAvail = memSize - memUsed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//usable hours
|
||||||
var cards []*collector.Card
|
var cards []*collector.Card
|
||||||
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
|
|
||||||
cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
|
cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
|
||||||
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)
|
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)
|
||||||
|
|
||||||
|
@ -312,11 +349,11 @@ func (s *ShuguangAi) GetResourceStats() (*collector.ResourceStats, error) {
|
||||||
Name: s.platform,
|
Name: s.platform,
|
||||||
Balance: balance,
|
Balance: balance,
|
||||||
CpuCoreTotal: totalCpu,
|
CpuCoreTotal: totalCpu,
|
||||||
CpuCoreAvail: 0,
|
CpuCoreAvail: CpuCoreAvail,
|
||||||
DiskTotal: totalDisk,
|
DiskTotal: totalDisk,
|
||||||
DiskAvail: availDisk,
|
DiskAvail: availDisk,
|
||||||
MemTotal: memSize,
|
MemTotal: memSize,
|
||||||
MemAvail: 0,
|
MemAvail: MemAvail,
|
||||||
CpuCoreHours: cpuHours,
|
CpuCoreHours: cpuHours,
|
||||||
CardsAvail: cards,
|
CardsAvail: cards,
|
||||||
}
|
}
|
||||||
|
@ -341,6 +378,26 @@ func (s *ShuguangAi) GetDatasetsSpecs() ([]*collector.DatasetsSpecs, error) {
|
||||||
return specs, nil
|
return specs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ShuguangAi) GetAlgorithms() ([]*collector.Algorithm, error) {
|
||||||
|
var algorithms []*collector.Algorithm
|
||||||
|
for _, t := range GetTaskTypes() {
|
||||||
|
taskType := t
|
||||||
|
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0}
|
||||||
|
list, err := s.svcCtx.ACRpc.GetFileList(s.ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if list.Code != "0" {
|
||||||
|
return nil, errors.New(list.Msg)
|
||||||
|
}
|
||||||
|
for _, file := range list.Data.FileList {
|
||||||
|
algorithm := &collector.Algorithm{Name: file.Name, Platform: SHUGUANGAI, TaskType: taskType}
|
||||||
|
algorithms = append(algorithms, algorithm)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return algorithms, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
|
func (s *ShuguangAi) Execute(option *option.AiOption) (interface{}, error) {
|
||||||
err := s.GenerateSubmitParams(option)
|
err := s.GenerateSubmitParams(option)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
|
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelarts"
|
||||||
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
|
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Linkage interface {
|
type Linkage interface {
|
||||||
|
@ -152,6 +153,48 @@ func GetDatasetsNames(collectorMap *map[string]collector.AiCollector) ([]string,
|
||||||
return names, nil
|
return names, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetAlgorithms(collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
|
||||||
|
var names []string
|
||||||
|
colMap := *collectorMap
|
||||||
|
for _, col := range colMap {
|
||||||
|
var ns []string
|
||||||
|
algorithms, err := col.GetAlgorithms()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, algorithm := range algorithms {
|
||||||
|
if algorithm.TaskType != taskType {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch algorithm.Platform {
|
||||||
|
case OCTOPUS:
|
||||||
|
splitns := strings.Split(algorithm.Name, UNDERSCORE)
|
||||||
|
if dataset != splitns[0] || len(splitns) == 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ns = append(ns, splitns[1])
|
||||||
|
case SHUGUANGAI:
|
||||||
|
splitns := strings.Split(algorithm.Name, DASH)
|
||||||
|
if dataset != splitns[0] || len(splitns) == 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ns = append(ns, splitns[1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(ns) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(names) == 0 {
|
||||||
|
names = ns
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
names = common.IntersectString(names, ns)
|
||||||
|
}
|
||||||
|
names = common.RemoveDuplicates(names)
|
||||||
|
return names, nil
|
||||||
|
}
|
||||||
|
|
||||||
func GetTaskTypes() []string {
|
func GetTaskTypes() []string {
|
||||||
return taskTypes
|
return taskTypes
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue