updated aicenteroverview logic

Former-commit-id: 497d6e912196aedfcc36e392b006cc1c26a17138
This commit is contained in:
tzwang 2024-05-08 18:52:02 +08:00
parent 0220945d67
commit 9b8c7d36e6
4 changed files with 100 additions and 13 deletions

View File

@ -2,10 +2,12 @@ package ai
import ( import (
"context" "context"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"github.com/zeromicro/go-zero/core/logx" "strconv"
"sync"
) )
type GetCenterOverviewLogic struct { type GetCenterOverviewLogic struct {
@ -24,6 +26,8 @@ func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext)
func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) { func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) {
resp = &types.CenterOverviewResp{} resp = &types.CenterOverviewResp{}
var mu sync.RWMutex
ch := make(chan struct{})
var centerNum int32 var centerNum int32
var taskNum int32 var taskNum int32
@ -37,6 +41,8 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
centerNum = int32(len(adapterList)) centerNum = int32(len(adapterList))
resp.CenterNum = centerNum resp.CenterNum = centerNum
go l.updateClusterResource(&mu, ch, adapterList)
for _, adapter := range adapterList { for _, adapter := range adapterList {
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
if err != nil { if err != nil {
@ -52,7 +58,10 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
continue continue
} }
for _, cluster := range clusters.List { for _, cluster := range clusters.List {
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id) clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id)
mu.RUnlock()
if err != nil { if err != nil {
continue continue
} }
@ -60,9 +69,71 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
totalTops += clusterResource.CardTopsTotal totalTops += clusterResource.CardTopsTotal
} }
} }
resp.CardNum = cardNum resp.CardNum = cardNum
resp.PowerInTops = totalTops resp.PowerInTops = totalTops
<-ch
return resp, nil return resp, nil
} }
func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
if err != nil {
continue
}
for _, cluster := range clusters.List {
c := cluster
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
mu.RUnlock()
if err != nil {
continue
}
wg.Add(1)
go func() {
stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx)
if err != nil {
wg.Done()
return
}
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
if err != nil {
wg.Done()
return
}
var cardTotal int64
var topsTotal float64
for _, card := range stat.CardsAvail {
cardTotal += int64(card.CardNum)
topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
}
mu.Lock()
if (models.TClusterResource{} == *clusterResource) {
err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
if err != nil {
mu.Unlock()
wg.Done()
return
}
} else {
clusterResource.CardTotal = cardTotal
clusterResource.CardTopsTotal = topsTotal
err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
if err != nil {
mu.Unlock()
wg.Done()
return
}
}
mu.Unlock()
wg.Done()
}()
}
}
wg.Wait()
ch <- struct{}{}
}

View File

@ -30,16 +30,19 @@ func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext)
func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) { func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) {
resp = &types.CenterTaskListResp{} resp = &types.CenterTaskListResp{}
var mu sync.RWMutex var mu sync.RWMutex
ch := make(chan struct{})
adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1")
if err != nil { if err != nil {
return nil, err return nil, err
} }
l.updateAiTaskStatus(&mu, adapterList) go l.updateAiTaskStatus(&mu, ch, adapterList)
for _, adapter := range adapterList { for _, adapter := range adapterList {
mu.RLock()
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
mu.RUnlock()
if err != nil { if err != nil {
continue continue
} }
@ -70,21 +73,23 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
resp.List = append(resp.List, t) resp.List = append(resp.List, t)
} }
} }
<-ch
return resp, nil return resp, nil
} }
func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, list []*types.AdapterInfo) { func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
var wg sync.WaitGroup var wg sync.WaitGroup
for _, adapter := range list { for _, adapter := range list {
mu.RLock()
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
mu.RUnlock()
if err != nil { if err != nil {
continue continue
} }
for _, task := range taskList { for _, task := range taskList {
t := task t := task
if t.Status == constants.Completed {
continue
}
wg.Add(1) wg.Add(1)
go func() { go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId) trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
@ -107,4 +112,5 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, list []*ty
} }
} }
wg.Wait() wg.Wait()
ch <- struct{}{}
} }

View File

@ -74,12 +74,13 @@ func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, e
func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int64) (int64, error) { func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int64) (int64, error) {
// 构建主任务结构体 // 构建主任务结构体
taskModel := models.Task{ taskModel := models.Task{
Status: constants.Saved, Status: constants.Saved,
Description: "ai task", Description: "ai task",
Name: name, Name: name,
SynergyStatus: synergyStatus, SynergyStatus: synergyStatus,
Strategy: strategyCode, Strategy: strategyCode,
CommitTime: time.Now(), AdapterTypeDict: 1,
CommitTime: time.Now(),
} }
// 保存任务数据到数据库 // 保存任务数据到数据库
tx := s.DbEngin.Create(&taskModel) tx := s.DbEngin.Create(&taskModel)
@ -199,6 +200,14 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c
return nil return nil
} }
func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error {
tx := s.DbEngin.Updates(clusterResource)
if tx.Error != nil {
return tx.Error
}
return nil
}
func (s *AiStorage) UpdateAiTask(task *models.TaskAi) error { func (s *AiStorage) UpdateAiTask(task *models.TaskAi) error {
tx := s.DbEngin.Updates(task) tx := s.DbEngin.Updates(task)
if tx.Error != nil { if tx.Error != nil {

View File

@ -22,6 +22,7 @@ type ResourceStats struct {
DiskAvail float64 DiskAvail float64
DiskTotal float64 DiskTotal float64
GpuAvail int64 GpuAvail int64
GpuTotal int64
CardsAvail []*Card CardsAvail []*Card
CpuCoreHours float64 CpuCoreHours float64
Balance float64 Balance float64