Merge pull request 'updated ai scheduler getdatabases and getalgorithms funcs' (#113) from tzwang/pcm-coordinator:master into master

Former-commit-id: 29a22371cdf945647f9633fbc44f41fa5ceb24d0
This commit is contained in:
tzwang 2024-04-16 16:55:40 +08:00
commit a440f41fbf
3 changed files with 212 additions and 97 deletions

View File

@ -182,6 +182,13 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
for s := range ch {
if s.Msg != "" {
msg += fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
} else {
msg += fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
}
}
return nil, errors.New(msg)
}
@ -196,18 +203,26 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
var wg sync.WaitGroup
var ch = make(chan *collector.ResourceStats, len(*as.ResourceCollector))
var errCh = make(chan error, len(*as.ResourceCollector))
var errCh = make(chan interface{}, len(*as.ResourceCollector))
var resourceSpecs []*collector.ResourceStats
var errs []error
var errs []interface{}
for _, resourceCollector := range *as.ResourceCollector {
for s, resourceCollector := range *as.ResourceCollector {
wg.Add(1)
rc := resourceCollector
id := s
go func() {
spec, err := rc.GetResourceStats(as.ctx)
if err != nil {
errCh <- err
e := struct {
err error
clusterId string
}{
err: err,
clusterId: id,
}
errCh <- e
wg.Done()
return
}
@ -227,13 +242,22 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats,
errs = append(errs, e)
}
if len(errs) != 0 {
if len(errs) == len(*as.ResourceCollector) {
return nil, errors.New("get resources failed")
}
if len(resourceSpecs) == 0 {
return nil, errors.New("no resource found")
if len(errs) != 0 {
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
return nil, errors.New(msg)
}
return resourceSpecs, nil
}

View File

@ -26,6 +26,8 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"strconv"
"strings"
"sync"
"time"
)
const (
@ -266,96 +268,144 @@ func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) {
}
func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
//balance
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
if err != nil {
return nil, err
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
var wg sync.WaitGroup
wg.Add(4)
var cBalance = make(chan float64)
var cMemTotal = make(chan float64)
var cTotalCpu = make(chan int64)
//resource limit
limitReq := &hpcAC.QueueReq{}
limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq)
if err != nil {
return nil, err
resourceStats := &collector.ResourceStats{
ClusterId: strconv.FormatInt(s.participantId, 10),
Name: s.platform,
}
totalCpu := limitResp.Data.AccountMaxCpu
totalDcu := limitResp.Data.AccountMaxDcu
//disk
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
if err != nil {
return nil, err
}
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
//memory
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
if err != nil {
return nil, err
}
memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES
//resources being occupied
memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil)
if err != nil {
return nil, err
}
var CpuCoreAvail int64
var MemAvail float64
if len(memberJobResp.Data) != 0 {
CpuCoreAvail = totalCpu
MemAvail = memSize
} else {
var cpuCoreUsed int64
var memUsed float64
for _, datum := range memberJobResp.Data {
cpuCoreUsed += datum.CpuCore
}
memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core
if cpuCoreUsed > totalCpu {
CpuCoreAvail = 0
} else {
CpuCoreAvail = totalCpu - cpuCoreUsed
}
if memUsed > memSize {
MemAvail = 0
} else {
MemAvail = memSize - memUsed
}
}
//usable hours
var cards []*collector.Card
cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)
dcu := &collector.Card{
Platform: SHUGUANGAI,
Type: CARD,
Name: DCU,
TOpsAtFp16: DCU_TOPS,
CardHours: cardHours,
CardNum: int32(totalDcu),
}
//balance
go func() {
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
if err != nil {
return
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
resourceStats.Balance = balance
cBalance <- balance
}()
//resource limit
go func() {
limitReq := &hpcAC.QueueReq{}
limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq)
if err != nil {
wg.Done()
return
}
totalCpu := limitResp.Data.AccountMaxCpu
totalDcu := limitResp.Data.AccountMaxDcu
dcu.CardNum = int32(totalDcu)
resourceStats.CpuCoreTotal = totalCpu
cTotalCpu <- totalCpu
wg.Done()
}()
//disk
go func() {
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
if err != nil {
wg.Done()
return
}
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
resourceStats.DiskTotal = totalDisk
resourceStats.DiskAvail = availDisk
wg.Done()
}()
//memory
go func() {
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
if err != nil {
wg.Done()
return
}
memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES
resourceStats.MemTotal = memSize
cMemTotal <- memSize
wg.Done()
}()
//resources being occupied
go func() {
memSize := <-cMemTotal
totalCpu := <-cTotalCpu
memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil)
if err != nil {
wg.Done()
return
}
var cpuCoreAvail int64
var memAvail float64
if len(memberJobResp.Data) != 0 {
cpuCoreAvail = totalCpu
memAvail = memSize
} else {
var cpuCoreUsed int64
var memUsed float64
for _, datum := range memberJobResp.Data {
cpuCoreUsed += datum.CpuCore
}
memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core
if cpuCoreUsed > totalCpu {
cpuCoreAvail = 0
} else {
cpuCoreAvail = totalCpu - cpuCoreUsed
}
if memUsed > memSize {
memAvail = 0
} else {
memAvail = memSize - memUsed
}
}
resourceStats.CpuCoreAvail = cpuCoreAvail
resourceStats.MemAvail = memAvail
wg.Done()
}()
//usable hours
var balance float64
select {
case v := <-cBalance:
balance = v
case <-time.After(2 * time.Second):
return nil, errors.New("get balance rpc call failed")
}
var cards []*collector.Card
cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)
dcu.CardHours = cardHours
resourceStats.CpuCoreHours = cpuHours
wg.Wait()
cards = append(cards, dcu)
resourceStats := &collector.ResourceStats{
ClusterId: strconv.FormatInt(s.participantId, 10),
Name: s.platform,
Balance: balance,
CpuCoreTotal: totalCpu,
CpuCoreAvail: CpuCoreAvail,
DiskTotal: totalDisk,
DiskAvail: availDisk,
MemTotal: memSize,
MemAvail: MemAvail,
CpuCoreHours: cpuHours,
CardsAvail: cards,
}
resourceStats.CardsAvail = cards
return resourceStats, nil
}

View File

@ -16,6 +16,7 @@ package storeLink
import (
"context"
"fmt"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
@ -129,19 +130,27 @@ func GetResourceTypes() []string {
func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.AiCollector) ([]string, error) {
var wg sync.WaitGroup
var errCh = make(chan error, len(*collectorMap))
var errs []error
var errCh = make(chan interface{}, len(*collectorMap))
var errs []interface{}
var names []string
var mu sync.Mutex
colMap := *collectorMap
for _, col := range colMap {
for s, col := range colMap {
wg.Add(1)
c := col
id := s
go func() {
var ns []string
specs, err := c.GetDatasetsSpecs(ctx)
if err != nil {
errCh <- err
e := struct {
err error
clusterId string
}{
err: err,
clusterId: id,
}
errCh <- e
wg.Done()
return
}
@ -167,12 +176,24 @@ func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.Ai
wg.Wait()
close(errCh)
if len(errs) == len(colMap) {
return nil, errors.New("get DatasetsNames failed")
}
for e := range errCh {
errs = append(errs, e)
}
if len(errs) != 0 {
return nil, errors.New("get DatasetsNames failed")
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
return nil, errors.New(msg)
}
names = common.RemoveDuplicates(names)
@ -182,19 +203,27 @@ func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.Ai
func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
var names []string
var wg sync.WaitGroup
var errCh = make(chan error, len(*collectorMap))
var errs []error
var errCh = make(chan interface{}, len(*collectorMap))
var errs []interface{}
var mu sync.Mutex
colMap := *collectorMap
for _, col := range colMap {
for s, col := range colMap {
wg.Add(1)
c := col
id := s
go func() {
var ns []string
algorithms, err := c.GetAlgorithms(ctx)
if err != nil {
errCh <- err
e := struct {
err error
clusterId string
}{
err: err,
clusterId: id,
}
errCh <- e
wg.Done()
return
}
@ -240,10 +269,22 @@ func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCol
errs = append(errs, e)
}
if len(errs) != 0 {
if len(errs) == len(colMap) {
return nil, errors.New("get Algorithms failed")
}
if len(errs) != 0 {
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
return nil, errors.New(msg)
}
names = common.RemoveDuplicates(names)
return names, nil
}