Merge pull request 'updated aicenteroverview logic' (#142) from tzwang/pcm-coordinator:master into master

Former-commit-id: b6b0e39f5178b39741568f3396ed2283ea505016
This commit is contained in:
tzwang 2024-05-08 18:55:24 +08:00
commit f9cdacad6a
4 changed files with 100 additions and 13 deletions

View File

@ -2,10 +2,12 @@ package ai
import (
"context"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"strconv"
"sync"
)
type GetCenterOverviewLogic struct {
@ -24,6 +26,8 @@ func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext)
func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) {
resp = &types.CenterOverviewResp{}
var mu sync.RWMutex
ch := make(chan struct{})
var centerNum int32
var taskNum int32
@ -37,6 +41,8 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
centerNum = int32(len(adapterList))
resp.CenterNum = centerNum
go l.updateClusterResource(&mu, ch, adapterList)
for _, adapter := range adapterList {
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
if err != nil {
@ -52,7 +58,10 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
continue
}
for _, cluster := range clusters.List {
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id)
mu.RUnlock()
if err != nil {
continue
}
@ -60,9 +69,71 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
totalTops += clusterResource.CardTopsTotal
}
}
resp.CardNum = cardNum
resp.PowerInTops = totalTops
<-ch
return resp, nil
}
func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
if err != nil {
continue
}
for _, cluster := range clusters.List {
c := cluster
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
mu.RUnlock()
if err != nil {
continue
}
wg.Add(1)
go func() {
stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx)
if err != nil {
wg.Done()
return
}
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
if err != nil {
wg.Done()
return
}
var cardTotal int64
var topsTotal float64
for _, card := range stat.CardsAvail {
cardTotal += int64(card.CardNum)
topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
}
mu.Lock()
if (models.TClusterResource{} == *clusterResource) {
err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
if err != nil {
mu.Unlock()
wg.Done()
return
}
} else {
clusterResource.CardTotal = cardTotal
clusterResource.CardTopsTotal = topsTotal
err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
if err != nil {
mu.Unlock()
wg.Done()
return
}
}
mu.Unlock()
wg.Done()
}()
}
}
wg.Wait()
ch <- struct{}{}
}

View File

@ -30,16 +30,19 @@ func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext)
func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) {
resp = &types.CenterTaskListResp{}
var mu sync.RWMutex
ch := make(chan struct{})
adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1")
if err != nil {
return nil, err
}
l.updateAiTaskStatus(&mu, adapterList)
go l.updateAiTaskStatus(&mu, ch, adapterList)
for _, adapter := range adapterList {
mu.RLock()
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
mu.RUnlock()
if err != nil {
continue
}
@ -70,21 +73,23 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
resp.List = append(resp.List, t)
}
}
<-ch
return resp, nil
}
func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, list []*types.AdapterInfo) {
func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
mu.RLock()
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
mu.RUnlock()
if err != nil {
continue
}
for _, task := range taskList {
t := task
if t.Status == constants.Completed {
continue
}
wg.Add(1)
go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
@ -107,4 +112,5 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, list []*ty
}
}
wg.Wait()
ch <- struct{}{}
}

View File

@ -74,12 +74,13 @@ func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, e
func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int64) (int64, error) {
// 构建主任务结构体
taskModel := models.Task{
Status: constants.Saved,
Description: "ai task",
Name: name,
SynergyStatus: synergyStatus,
Strategy: strategyCode,
CommitTime: time.Now(),
Status: constants.Saved,
Description: "ai task",
Name: name,
SynergyStatus: synergyStatus,
Strategy: strategyCode,
AdapterTypeDict: 1,
CommitTime: time.Now(),
}
// 保存任务数据到数据库
tx := s.DbEngin.Create(&taskModel)
@ -199,6 +200,14 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c
return nil
}
func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error {
tx := s.DbEngin.Updates(clusterResource)
if tx.Error != nil {
return tx.Error
}
return nil
}
func (s *AiStorage) UpdateAiTask(task *models.TaskAi) error {
tx := s.DbEngin.Updates(task)
if tx.Error != nil {

View File

@ -22,6 +22,7 @@ type ResourceStats struct {
DiskAvail float64
DiskTotal float64
GpuAvail int64
GpuTotal int64
CardsAvail []*Card
CpuCoreHours float64
Balance float64