From 1f19eda9fb27b84de2644f87d093f254be110d1e Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 8 Jul 2024 17:08:36 +0800 Subject: [PATCH] fix sync status bugs Former-commit-id: cc45b032778fce8f19318fde3d0d712c09efb568 --- internal/cron/aiCronTask.go | 115 +++--------------- internal/cron/cron.go | 13 +- internal/logic/ai/getcenteroverviewlogic.go | 100 +-------------- internal/logic/ai/getcentertasklistlogic.go | 72 +---------- internal/logic/core/pagelisttasklogic.go | 6 +- .../service/updater/clusterResources.go | 87 +++++++++++++ .../{status => updater}/taskStatusSync.go | 47 ++++++- 7 files changed, 170 insertions(+), 270 deletions(-) create mode 100644 internal/scheduler/service/updater/clusterResources.go rename internal/scheduler/service/{status => updater}/taskStatusSync.go (86%) diff --git a/internal/cron/aiCronTask.go b/internal/cron/aiCronTask.go index 60511835..fd3edc62 100644 --- a/internal/cron/aiCronTask.go +++ b/internal/cron/aiCronTask.go @@ -12,13 +12,10 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" - "net/http" "strconv" - "sync" ) const ( @@ -119,26 +116,6 @@ func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []typ } -func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool { - emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] - cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] - if ok && ok2 { - if len(emap) == clusterNum && len(cmap) == clusterNum { - return true - } - } - return false -} - -func isAdapterEmpty(svc *svc.ServiceContext, id string) bool { - _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] - _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] - if !ok && !ok2 { - return true - } - return false -} - func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) { executorMap := make(map[string]executor.AiExecutor) collectorMap := make(map[string]collector.AiCollector) @@ -169,82 +146,22 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st return executorMap, collectorMap } -func UpdateClusterResource(svc *svc.ServiceContext) { - list, err := svc.Scheduler.AiStorages.GetAdaptersByType("1") - if err != nil { - return - } - var wg sync.WaitGroup - for _, adapter := range list { - clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) - if err != nil { - continue - } - for _, cluster := range clusters.List { - c := cluster - clusterResource, err := svc.Scheduler.AiStorages.GetClusterResourcesById(c.Id) - if err != nil { - continue - } - wg.Add(1) - go func() { - _, ok := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id] - if !ok { - wg.Done() - return - } - h := http.Request{} - stat, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(h.Context()) - if err != nil { - wg.Done() - return - } - if stat == nil { - wg.Done() - return - } - clusterType, err := strconv.ParseInt(adapter.Type, 10, 64) - if err != nil { - wg.Done() - return - } - var cardTotal int64 - var topsTotal float64 - for _, card := range stat.CardsAvail { - cardTotal += int64(card.CardNum) - topsTotal += card.TOpsAtFp16 * float64(card.CardNum) - } - - if (models.TClusterResource{} == *clusterResource) { - err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), - stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) - if err != nil { - wg.Done() - return - } - } else { - if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 { - wg.Done() - return - } - clusterResource.CardTotal = cardTotal - clusterResource.CardTopsTotal = topsTotal - clusterResource.CpuAvail = float64(stat.CpuCoreAvail) - clusterResource.CpuTotal = float64(stat.CpuCoreTotal) - clusterResource.MemAvail = stat.MemAvail - clusterResource.MemTotal = stat.MemTotal - clusterResource.DiskAvail = stat.DiskAvail - clusterResource.DiskTotal = stat.DiskTotal - - err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource) - if err != nil { - wg.Done() - return - } - } - wg.Done() - }() +func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool { + emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] + cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] + if ok && ok2 { + if len(emap) == clusterNum && len(cmap) == clusterNum { + return true } } - wg.Wait() + return false +} + +func isAdapterEmpty(svc *svc.ServiceContext, id string) bool { + _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] + _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] + if !ok && !ok2 { + return true + } + return false } diff --git a/internal/cron/cron.go b/internal/cron/cron.go index 342b9b34..09394d29 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -16,7 +16,7 @@ package cron import ( "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/status" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" ) @@ -28,8 +28,8 @@ func AddCronGroup(svc *svc.ServiceContext) { logx.Errorf(err.Error()) return } - status.UpdateTaskStatus(svc, list) - status.UpdateAiTaskStatus(svc, list) + updater.UpdateTaskStatus(svc, list) + updater.UpdateAiTaskStatus(svc, list) }) svc.Cron.AddFunc("*/5 * * * * ?", func() { @@ -37,6 +37,11 @@ func AddCronGroup(svc *svc.ServiceContext) { }) svc.Cron.AddFunc("*/59 * * * * ?", func() { - UpdateClusterResource(svc) + adapterList, err := svc.Scheduler.AiStorages.GetAdaptersByType("1") + if err != nil { + logx.Errorf(err.Error()) + return + } + updater.UpdateClusterResources(svc, adapterList) }) } diff --git a/internal/logic/ai/getcenteroverviewlogic.go b/internal/logic/ai/getcenteroverviewlogic.go index d4aecdb9..b52c8718 100644 --- a/internal/logic/ai/getcenteroverviewlogic.go +++ b/internal/logic/ai/getcenteroverviewlogic.go @@ -3,12 +3,9 @@ package ai import ( "context" "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" - "strconv" - "sync" - "time" ) type GetCenterOverviewLogic struct { @@ -27,8 +24,6 @@ func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) { resp = &types.CenterOverviewResp{} - var mu sync.RWMutex - ch := make(chan struct{}) var centerNum int32 var taskNum int32 @@ -42,7 +37,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview centerNum = int32(len(adapterList)) resp.CenterNum = centerNum - go l.updateClusterResource(&mu, ch, adapterList) + go updater.UpdateClusterResources(l.svcCtx, adapterList) for _, adapter := range adapterList { taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) @@ -59,9 +54,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview continue } for _, cluster := range clusters.List { - mu.RLock() clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id) - mu.RUnlock() if err != nil { continue @@ -73,92 +66,5 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview resp.CardNum = cardNum resp.PowerInTops = totalTops - select { - case _ = <-ch: - return resp, nil - case <-time.After(1 * time.Second): - return resp, nil - } -} - -func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) { - var wg sync.WaitGroup - for _, adapter := range list { - clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) - if err != nil { - continue - } - for _, cluster := range clusters.List { - c := cluster - mu.RLock() - clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(c.Id) - mu.RUnlock() - if err != nil { - continue - } - wg.Add(1) - go func() { - _, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id] - if !ok { - wg.Done() - return - } - stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx) - if err != nil { - wg.Done() - return - } - if stat == nil { - wg.Done() - return - } - clusterType, err := strconv.ParseInt(adapter.Type, 10, 64) - if err != nil { - wg.Done() - return - } - var cardTotal int64 - var topsTotal float64 - for _, card := range stat.CardsAvail { - cardTotal += int64(card.CardNum) - topsTotal += card.TOpsAtFp16 * float64(card.CardNum) - } - - mu.Lock() - if (models.TClusterResource{} == *clusterResource) { - err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), - stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) - if err != nil { - mu.Unlock() - wg.Done() - return - } - } else { - if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 { - wg.Done() - return - } - clusterResource.CardTotal = cardTotal - clusterResource.CardTopsTotal = topsTotal - clusterResource.CpuAvail = float64(stat.CpuCoreAvail) - clusterResource.CpuTotal = float64(stat.CpuCoreTotal) - clusterResource.MemAvail = stat.MemAvail - clusterResource.MemTotal = stat.MemTotal - clusterResource.DiskAvail = stat.DiskAvail - clusterResource.DiskTotal = stat.DiskTotal - - err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource) - if err != nil { - mu.Unlock() - wg.Done() - return - } - } - mu.Unlock() - wg.Done() - }() - } - } - wg.Wait() - ch <- struct{}{} + return resp, nil } diff --git a/internal/logic/ai/getcentertasklistlogic.go b/internal/logic/ai/getcentertasklistlogic.go index 90e3e497..0b55aa21 100644 --- a/internal/logic/ai/getcentertasklistlogic.go +++ b/internal/logic/ai/getcentertasklistlogic.go @@ -2,17 +2,12 @@ package ai import ( "context" - "errors" - "fmt" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" - "strconv" - "sync" - "time" - + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" - - "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "time" ) type GetCenterTaskListLogic struct { @@ -31,20 +26,16 @@ func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) { resp = &types.CenterTaskListResp{} - var mu sync.RWMutex - ch := make(chan struct{}) adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") if err != nil { return nil, err } - go l.updateAiTaskStatus(&mu, ch, adapterList) + go updater.UpdateTrainingTaskStatus(l.svcCtx, adapterList) for _, adapter := range adapterList { - mu.RLock() taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) - mu.RUnlock() if err != nil { continue } @@ -81,56 +72,5 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList } } - select { - case _ = <-ch: - return resp, nil - case <-time.After(2 * time.Second): - return resp, nil - } -} - -func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) { - var wg sync.WaitGroup - for _, adapter := range list { - taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) - if err != nil { - continue - } - if len(taskList) == 0 { - continue - } - for _, task := range taskList { - t := task - if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped { - continue - } - wg.Add(1) - go func() { - trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId) - if err != nil { - msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) - logx.Errorf(errors.New(msg).Error()) - wg.Done() - return - } - if trainingTask == nil { - wg.Done() - return - } - t.Status = trainingTask.Status - t.StartTime = trainingTask.Start - t.EndTime = trainingTask.End - mu.Lock() - err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t) - mu.Unlock() - if err != nil { - wg.Done() - return - } - wg.Done() - }() - } - } - wg.Wait() - ch <- struct{}{} + return resp, nil } diff --git a/internal/logic/core/pagelisttasklogic.go b/internal/logic/core/pagelisttasklogic.go index 6b5bfbc8..f5a44bc1 100644 --- a/internal/logic/core/pagelisttasklogic.go +++ b/internal/logic/core/pagelisttasklogic.go @@ -2,7 +2,7 @@ package core import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/status" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" @@ -52,8 +52,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa } // 更新智算任务状态 - go status.UpdateTaskStatus(l.svcCtx, list) - go status.UpdateAiTaskStatus(l.svcCtx, list) + go updater.UpdateTaskStatus(l.svcCtx, list) + go updater.UpdateAiTaskStatus(l.svcCtx, list) for _, model := range list { if model.StartTime != "" && model.EndTime == "" { diff --git a/internal/scheduler/service/updater/clusterResources.go b/internal/scheduler/service/updater/clusterResources.go new file mode 100644 index 00000000..939492c6 --- /dev/null +++ b/internal/scheduler/service/updater/clusterResources.go @@ -0,0 +1,87 @@ +package updater + +import ( + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "net/http" + "strconv" + "sync" +) + +func UpdateClusterResources(svc *svc.ServiceContext, list []*types.AdapterInfo) { + var wg sync.WaitGroup + for _, adapter := range list { + clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) + if err != nil { + continue + } + for _, cluster := range clusters.List { + c := cluster + clusterResource, err := svc.Scheduler.AiStorages.GetClusterResourcesById(c.Id) + if err != nil { + continue + } + wg.Add(1) + go func() { + _, ok := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id] + if !ok { + wg.Done() + return + } + h := http.Request{} + stat, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(h.Context()) + if err != nil { + wg.Done() + return + } + if stat == nil { + wg.Done() + return + } + clusterType, err := strconv.ParseInt(adapter.Type, 10, 64) + if err != nil { + wg.Done() + return + } + var cardTotal int64 + var topsTotal float64 + for _, card := range stat.CardsAvail { + cardTotal += int64(card.CardNum) + topsTotal += card.TOpsAtFp16 * float64(card.CardNum) + } + + if (models.TClusterResource{} == *clusterResource) { + err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), + stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) + if err != nil { + wg.Done() + return + } + } else { + if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 { + wg.Done() + return + } + clusterResource.CardTotal = cardTotal + clusterResource.CardTopsTotal = topsTotal + clusterResource.CpuAvail = float64(stat.CpuCoreAvail) + clusterResource.CpuTotal = float64(stat.CpuCoreTotal) + clusterResource.MemAvail = stat.MemAvail + clusterResource.MemTotal = stat.MemTotal + clusterResource.DiskAvail = stat.DiskAvail + clusterResource.DiskTotal = stat.DiskTotal + + err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource) + if err != nil { + wg.Done() + return + } + } + wg.Done() + }() + } + } + wg.Wait() + return +} diff --git a/internal/scheduler/service/status/taskStatusSync.go b/internal/scheduler/service/updater/taskStatusSync.go similarity index 86% rename from internal/scheduler/service/status/taskStatusSync.go rename to internal/scheduler/service/updater/taskStatusSync.go index 7a48d9f9..3923e1eb 100644 --- a/internal/scheduler/service/status/taskStatusSync.go +++ b/internal/scheduler/service/updater/taskStatusSync.go @@ -1,4 +1,4 @@ -package status +package updater import ( "errors" @@ -335,3 +335,48 @@ func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { } wg.Wait() } + +func UpdateTrainingTaskStatus(svc *svc.ServiceContext, list []*types.AdapterInfo) { + var wg sync.WaitGroup + for _, adapter := range list { + taskList, err := svc.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) + if err != nil { + continue + } + if len(taskList) == 0 { + continue + } + for _, task := range taskList { + t := task + if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped || task.TaskType != "pytorch" { + continue + } + wg.Add(1) + go func() { + h := http.Request{} + trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId) + if err != nil { + msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + if trainingTask == nil { + wg.Done() + return + } + t.Status = trainingTask.Status + t.StartTime = trainingTask.Start + t.EndTime = trainingTask.End + err = svc.Scheduler.AiStorages.UpdateAiTask(t) + if err != nil { + wg.Done() + return + } + wg.Done() + }() + } + } + wg.Wait() + return +}