diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 80709f48..7026f7e5 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -182,6 +182,13 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa }) msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) } + for s := range ch { + if s.Msg != "" { + msg += fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg) + } else { + msg += fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId) + } + } return nil, errors.New(msg) } @@ -196,18 +203,26 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) { var wg sync.WaitGroup var ch = make(chan *collector.ResourceStats, len(*as.ResourceCollector)) - var errCh = make(chan error, len(*as.ResourceCollector)) + var errCh = make(chan interface{}, len(*as.ResourceCollector)) var resourceSpecs []*collector.ResourceStats - var errs []error + var errs []interface{} - for _, resourceCollector := range *as.ResourceCollector { + for s, resourceCollector := range *as.ResourceCollector { wg.Add(1) rc := resourceCollector + id := s go func() { spec, err := rc.GetResourceStats(as.ctx) if err != nil { - errCh <- err + e := struct { + err error + clusterId string + }{ + err: err, + clusterId: id, + } + errCh <- e wg.Done() return } @@ -227,13 +242,22 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, errs = append(errs, e) } - if len(errs) != 0 { + if len(errs) == len(*as.ResourceCollector) { return nil, errors.New("get resources failed") } - if len(resourceSpecs) == 0 { - return nil, errors.New("no resource found") + if len(errs) != 0 { + var msg string + for _, err := range errs { + e := (err).(struct { + err error + clusterId string + }) + msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) + } + return nil, errors.New(msg) } + return resourceSpecs, nil } diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index db89cf72..53f9bf52 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -26,6 +26,8 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "strconv" "strings" + "sync" + "time" ) const ( @@ -266,96 +268,144 @@ func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) { } func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { - //balance - userReq := &hpcAC.GetUserInfoReq{} - userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq) - if err != nil { - return nil, err - } - balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) + var wg sync.WaitGroup + wg.Add(4) + var cBalance = make(chan float64) + var cMemTotal = make(chan float64) + var cTotalCpu = make(chan int64) - //resource limit - limitReq := &hpcAC.QueueReq{} - limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq) - if err != nil { - return nil, err + resourceStats := &collector.ResourceStats{ + ClusterId: strconv.FormatInt(s.participantId, 10), + Name: s.platform, } - totalCpu := limitResp.Data.AccountMaxCpu - totalDcu := limitResp.Data.AccountMaxDcu - - //disk - diskReq := &hpcAC.ParaStorQuotaReq{} - diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) - if err != nil { - return nil, err - } - - totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3) - availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) - - //memory - nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil) - if err != nil { - return nil, err - } - memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES - - //resources being occupied - memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil) - if err != nil { - return nil, err - } - var CpuCoreAvail int64 - var MemAvail float64 - if len(memberJobResp.Data) != 0 { - CpuCoreAvail = totalCpu - MemAvail = memSize - } else { - var cpuCoreUsed int64 - var memUsed float64 - for _, datum := range memberJobResp.Data { - cpuCoreUsed += datum.CpuCore - } - memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core - if cpuCoreUsed > totalCpu { - CpuCoreAvail = 0 - } else { - CpuCoreAvail = totalCpu - cpuCoreUsed - } - if memUsed > memSize { - MemAvail = 0 - } else { - MemAvail = memSize - memUsed - } - } - - //usable hours - var cards []*collector.Card - cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3) - cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3) dcu := &collector.Card{ Platform: SHUGUANGAI, Type: CARD, Name: DCU, TOpsAtFp16: DCU_TOPS, - CardHours: cardHours, - CardNum: int32(totalDcu), } + + //balance + go func() { + userReq := &hpcAC.GetUserInfoReq{} + userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq) + if err != nil { + return + } + balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) + resourceStats.Balance = balance + + cBalance <- balance + }() + + //resource limit + go func() { + limitReq := &hpcAC.QueueReq{} + limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq) + if err != nil { + wg.Done() + return + } + totalCpu := limitResp.Data.AccountMaxCpu + totalDcu := limitResp.Data.AccountMaxDcu + + dcu.CardNum = int32(totalDcu) + resourceStats.CpuCoreTotal = totalCpu + + cTotalCpu <- totalCpu + wg.Done() + }() + + //disk + go func() { + diskReq := &hpcAC.ParaStorQuotaReq{} + diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) + if err != nil { + wg.Done() + return + } + + totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3) + availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) + + resourceStats.DiskTotal = totalDisk + resourceStats.DiskAvail = availDisk + wg.Done() + }() + + //memory + go func() { + nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil) + if err != nil { + wg.Done() + return + } + memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES + + resourceStats.MemTotal = memSize + cMemTotal <- memSize + wg.Done() + }() + + //resources being occupied + go func() { + memSize := <-cMemTotal + totalCpu := <-cTotalCpu + memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil) + if err != nil { + wg.Done() + return + } + var cpuCoreAvail int64 + var memAvail float64 + if len(memberJobResp.Data) != 0 { + cpuCoreAvail = totalCpu + memAvail = memSize + } else { + var cpuCoreUsed int64 + var memUsed float64 + for _, datum := range memberJobResp.Data { + cpuCoreUsed += datum.CpuCore + } + memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core + if cpuCoreUsed > totalCpu { + cpuCoreAvail = 0 + } else { + cpuCoreAvail = totalCpu - cpuCoreUsed + } + if memUsed > memSize { + memAvail = 0 + } else { + memAvail = memSize - memUsed + } + } + resourceStats.CpuCoreAvail = cpuCoreAvail + resourceStats.MemAvail = memAvail + wg.Done() + }() + + //usable hours + var balance float64 + + select { + case v := <-cBalance: + balance = v + case <-time.After(2 * time.Second): + return nil, errors.New("get balance rpc call failed") + } + + var cards []*collector.Card + cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3) + cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3) + + dcu.CardHours = cardHours + resourceStats.CpuCoreHours = cpuHours + + wg.Wait() + cards = append(cards, dcu) - resourceStats := &collector.ResourceStats{ - ClusterId: strconv.FormatInt(s.participantId, 10), - Name: s.platform, - Balance: balance, - CpuCoreTotal: totalCpu, - CpuCoreAvail: CpuCoreAvail, - DiskTotal: totalDisk, - DiskAvail: availDisk, - MemTotal: memSize, - MemAvail: MemAvail, - CpuCoreHours: cpuHours, - CardsAvail: cards, - } + resourceStats.CardsAvail = cards return resourceStats, nil } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index 87ca2ff8..2cda06f6 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -16,6 +16,7 @@ package storeLink import ( "context" + "fmt" "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" @@ -129,19 +130,27 @@ func GetResourceTypes() []string { func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.AiCollector) ([]string, error) { var wg sync.WaitGroup - var errCh = make(chan error, len(*collectorMap)) - var errs []error + var errCh = make(chan interface{}, len(*collectorMap)) + var errs []interface{} var names []string var mu sync.Mutex colMap := *collectorMap - for _, col := range colMap { + for s, col := range colMap { wg.Add(1) c := col + id := s go func() { var ns []string specs, err := c.GetDatasetsSpecs(ctx) if err != nil { - errCh <- err + e := struct { + err error + clusterId string + }{ + err: err, + clusterId: id, + } + errCh <- e wg.Done() return } @@ -167,12 +176,24 @@ func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.Ai wg.Wait() close(errCh) + if len(errs) == len(colMap) { + return nil, errors.New("get DatasetsNames failed") + } + for e := range errCh { errs = append(errs, e) } if len(errs) != 0 { - return nil, errors.New("get DatasetsNames failed") + var msg string + for _, err := range errs { + e := (err).(struct { + err error + clusterId string + }) + msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) + } + return nil, errors.New(msg) } names = common.RemoveDuplicates(names) @@ -182,19 +203,27 @@ func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.Ai func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) { var names []string var wg sync.WaitGroup - var errCh = make(chan error, len(*collectorMap)) - var errs []error + var errCh = make(chan interface{}, len(*collectorMap)) + var errs []interface{} var mu sync.Mutex colMap := *collectorMap - for _, col := range colMap { + for s, col := range colMap { wg.Add(1) c := col + id := s go func() { var ns []string algorithms, err := c.GetAlgorithms(ctx) if err != nil { - errCh <- err + e := struct { + err error + clusterId string + }{ + err: err, + clusterId: id, + } + errCh <- e wg.Done() return } @@ -240,10 +269,22 @@ func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCol errs = append(errs, e) } - if len(errs) != 0 { + if len(errs) == len(colMap) { return nil, errors.New("get Algorithms failed") } + if len(errs) != 0 { + var msg string + for _, err := range errs { + e := (err).(struct { + err error + clusterId string + }) + msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) + } + return nil, errors.New(msg) + } + names = common.RemoveDuplicates(names) return names, nil }