diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go index eb93684b..94d44cea 100644 --- a/api/internal/logic/ai/getcenteroverviewlogic.go +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -2,10 +2,12 @@ package ai import ( "context" + "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" - - "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "strconv" + "sync" ) type GetCenterOverviewLogic struct { @@ -24,6 +26,8 @@ func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) { resp = &types.CenterOverviewResp{} + var mu sync.RWMutex + ch := make(chan struct{}) var centerNum int32 var taskNum int32 @@ -37,6 +41,8 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview centerNum = int32(len(adapterList)) resp.CenterNum = centerNum + go l.updateClusterResource(&mu, ch, adapterList) + for _, adapter := range adapterList { taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) if err != nil { @@ -52,7 +58,10 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview continue } for _, cluster := range clusters.List { + mu.RLock() clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id) + mu.RUnlock() + if err != nil { continue } @@ -60,9 +69,71 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview totalTops += clusterResource.CardTopsTotal } } - resp.CardNum = cardNum resp.PowerInTops = totalTops + <-ch return resp, nil } + +func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) { + var wg sync.WaitGroup + for _, adapter := range list { + clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + c := cluster + mu.RLock() + clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(c.Id) + mu.RUnlock() + if err != nil { + continue + } + wg.Add(1) + go func() { + stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx) + if err != nil { + wg.Done() + return + } + clusterType, err := strconv.ParseInt(adapter.Type, 10, 64) + if err != nil { + wg.Done() + return + } + var cardTotal int64 + var topsTotal float64 + for _, card := range stat.CardsAvail { + cardTotal += int64(card.CardNum) + topsTotal += card.TOpsAtFp16 * float64(card.CardNum) + } + + mu.Lock() + if (models.TClusterResource{} == *clusterResource) { + err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), + stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) + if err != nil { + mu.Unlock() + wg.Done() + return + } + } else { + clusterResource.CardTotal = cardTotal + clusterResource.CardTopsTotal = topsTotal + err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource) + if err != nil { + mu.Unlock() + wg.Done() + return + } + } + mu.Unlock() + wg.Done() + }() + } + } + wg.Wait() + ch <- struct{}{} +} diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index a0d8c9fe..ebca4dc4 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -30,16 +30,19 @@ func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) { resp = &types.CenterTaskListResp{} var mu sync.RWMutex + ch := make(chan struct{}) adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") if err != nil { return nil, err } - l.updateAiTaskStatus(&mu, adapterList) + go l.updateAiTaskStatus(&mu, ch, adapterList) for _, adapter := range adapterList { + mu.RLock() taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) + mu.RUnlock() if err != nil { continue } @@ -70,21 +73,23 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList resp.List = append(resp.List, t) } } + <-ch return resp, nil } -func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, list []*types.AdapterInfo) { +func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) { var wg sync.WaitGroup for _, adapter := range list { - mu.RLock() taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) - mu.RUnlock() if err != nil { continue } for _, task := range taskList { t := task + if t.Status == constants.Completed { + continue + } wg.Add(1) go func() { trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId) @@ -107,4 +112,5 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, list []*ty } } wg.Wait() + ch <- struct{}{} } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index 89dbbaa3..f25cd5c2 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -74,12 +74,13 @@ func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, e func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int64) (int64, error) { // 构建主任务结构体 taskModel := models.Task{ - Status: constants.Saved, - Description: "ai task", - Name: name, - SynergyStatus: synergyStatus, - Strategy: strategyCode, - CommitTime: time.Now(), + Status: constants.Saved, + Description: "ai task", + Name: name, + SynergyStatus: synergyStatus, + Strategy: strategyCode, + AdapterTypeDict: 1, + CommitTime: time.Now(), } // 保存任务数据到数据库 tx := s.DbEngin.Create(&taskModel) @@ -199,6 +200,14 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c return nil } +func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error { + tx := s.DbEngin.Updates(clusterResource) + if tx.Error != nil { + return tx.Error + } + return nil +} + func (s *AiStorage) UpdateAiTask(task *models.TaskAi) error { tx := s.DbEngin.Updates(task) if tx.Error != nil { diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index 01406901..96ddc815 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -22,6 +22,7 @@ type ResourceStats struct { DiskAvail float64 DiskTotal float64 GpuAvail int64 + GpuTotal int64 CardsAvail []*Card CpuCoreHours float64 Balance float64