fix sync status bugs

Former-commit-id: cc45b032778fce8f19318fde3d0d712c09efb568
This commit is contained in:
tzwang 2024-07-08 17:08:36 +08:00
parent ad7efcc226
commit 1f19eda9fb
7 changed files with 170 additions and 270 deletions

View File

@ -12,13 +12,10 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"net/http"
"strconv" "strconv"
"sync"
) )
const ( const (
@ -119,26 +116,6 @@ func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []typ
} }
func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
if ok && ok2 {
if len(emap) == clusterNum && len(cmap) == clusterNum {
return true
}
}
return false
}
func isAdapterEmpty(svc *svc.ServiceContext, id string) bool {
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
if !ok && !ok2 {
return true
}
return false
}
func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) { func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) {
executorMap := make(map[string]executor.AiExecutor) executorMap := make(map[string]executor.AiExecutor)
collectorMap := make(map[string]collector.AiCollector) collectorMap := make(map[string]collector.AiCollector)
@ -169,82 +146,22 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
return executorMap, collectorMap return executorMap, collectorMap
} }
func UpdateClusterResource(svc *svc.ServiceContext) { func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
list, err := svc.Scheduler.AiStorages.GetAdaptersByType("1") emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
if err != nil { cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
return if ok && ok2 {
if len(emap) == clusterNum && len(cmap) == clusterNum {
return true
} }
var wg sync.WaitGroup
for _, adapter := range list {
clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
if err != nil {
continue
} }
for _, cluster := range clusters.List { return false
c := cluster
clusterResource, err := svc.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
if err != nil {
continue
}
wg.Add(1)
go func() {
_, ok := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id]
if !ok {
wg.Done()
return
}
h := http.Request{}
stat, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(h.Context())
if err != nil {
wg.Done()
return
}
if stat == nil {
wg.Done()
return
}
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
if err != nil {
wg.Done()
return
}
var cardTotal int64
var topsTotal float64
for _, card := range stat.CardsAvail {
cardTotal += int64(card.CardNum)
topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
} }
if (models.TClusterResource{} == *clusterResource) { func isAdapterEmpty(svc *svc.ServiceContext, id string) bool {
err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
if err != nil { if !ok && !ok2 {
wg.Done() return true
return
} }
} else { return false
if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 {
wg.Done()
return
}
clusterResource.CardTotal = cardTotal
clusterResource.CardTopsTotal = topsTotal
clusterResource.CpuAvail = float64(stat.CpuCoreAvail)
clusterResource.CpuTotal = float64(stat.CpuCoreTotal)
clusterResource.MemAvail = stat.MemAvail
clusterResource.MemTotal = stat.MemTotal
clusterResource.DiskAvail = stat.DiskAvail
clusterResource.DiskTotal = stat.DiskTotal
err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
if err != nil {
wg.Done()
return
}
}
wg.Done()
}()
}
}
wg.Wait()
} }

View File

@ -16,7 +16,7 @@ package cron
import ( import (
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
) )
@ -28,8 +28,8 @@ func AddCronGroup(svc *svc.ServiceContext) {
logx.Errorf(err.Error()) logx.Errorf(err.Error())
return return
} }
status.UpdateTaskStatus(svc, list) updater.UpdateTaskStatus(svc, list)
status.UpdateAiTaskStatus(svc, list) updater.UpdateAiTaskStatus(svc, list)
}) })
svc.Cron.AddFunc("*/5 * * * * ?", func() { svc.Cron.AddFunc("*/5 * * * * ?", func() {
@ -37,6 +37,11 @@ func AddCronGroup(svc *svc.ServiceContext) {
}) })
svc.Cron.AddFunc("*/59 * * * * ?", func() { svc.Cron.AddFunc("*/59 * * * * ?", func() {
UpdateClusterResource(svc) adapterList, err := svc.Scheduler.AiStorages.GetAdaptersByType("1")
if err != nil {
logx.Errorf(err.Error())
return
}
updater.UpdateClusterResources(svc, adapterList)
}) })
} }

View File

@ -3,12 +3,9 @@ package ai
import ( import (
"context" "context"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"strconv"
"sync"
"time"
) )
type GetCenterOverviewLogic struct { type GetCenterOverviewLogic struct {
@ -27,8 +24,6 @@ func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext)
func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) { func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) {
resp = &types.CenterOverviewResp{} resp = &types.CenterOverviewResp{}
var mu sync.RWMutex
ch := make(chan struct{})
var centerNum int32 var centerNum int32
var taskNum int32 var taskNum int32
@ -42,7 +37,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
centerNum = int32(len(adapterList)) centerNum = int32(len(adapterList))
resp.CenterNum = centerNum resp.CenterNum = centerNum
go l.updateClusterResource(&mu, ch, adapterList) go updater.UpdateClusterResources(l.svcCtx, adapterList)
for _, adapter := range adapterList { for _, adapter := range adapterList {
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
@ -59,9 +54,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
continue continue
} }
for _, cluster := range clusters.List { for _, cluster := range clusters.List {
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id) clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id)
mu.RUnlock()
if err != nil { if err != nil {
continue continue
@ -73,92 +66,5 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
resp.CardNum = cardNum resp.CardNum = cardNum
resp.PowerInTops = totalTops resp.PowerInTops = totalTops
select {
case _ = <-ch:
return resp, nil
case <-time.After(1 * time.Second):
return resp, nil return resp, nil
} }
}
func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
if err != nil {
continue
}
for _, cluster := range clusters.List {
c := cluster
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
mu.RUnlock()
if err != nil {
continue
}
wg.Add(1)
go func() {
_, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id]
if !ok {
wg.Done()
return
}
stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx)
if err != nil {
wg.Done()
return
}
if stat == nil {
wg.Done()
return
}
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
if err != nil {
wg.Done()
return
}
var cardTotal int64
var topsTotal float64
for _, card := range stat.CardsAvail {
cardTotal += int64(card.CardNum)
topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
}
mu.Lock()
if (models.TClusterResource{} == *clusterResource) {
err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
if err != nil {
mu.Unlock()
wg.Done()
return
}
} else {
if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 {
wg.Done()
return
}
clusterResource.CardTotal = cardTotal
clusterResource.CardTopsTotal = topsTotal
clusterResource.CpuAvail = float64(stat.CpuCoreAvail)
clusterResource.CpuTotal = float64(stat.CpuCoreTotal)
clusterResource.MemAvail = stat.MemAvail
clusterResource.MemTotal = stat.MemTotal
clusterResource.DiskAvail = stat.DiskAvail
clusterResource.DiskTotal = stat.DiskTotal
err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
if err != nil {
mu.Unlock()
wg.Done()
return
}
}
mu.Unlock()
wg.Done()
}()
}
}
wg.Wait()
ch <- struct{}{}
}

View File

@ -2,17 +2,12 @@ package ai
import ( import (
"context" "context"
"errors" "github.com/zeromicro/go-zero/core/logx"
"fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"strconv"
"sync"
"time"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"github.com/zeromicro/go-zero/core/logx" "time"
) )
type GetCenterTaskListLogic struct { type GetCenterTaskListLogic struct {
@ -31,20 +26,16 @@ func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext)
func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) { func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskListResp, err error) {
resp = &types.CenterTaskListResp{} resp = &types.CenterTaskListResp{}
var mu sync.RWMutex
ch := make(chan struct{})
adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1") adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1")
if err != nil { if err != nil {
return nil, err return nil, err
} }
go l.updateAiTaskStatus(&mu, ch, adapterList) go updater.UpdateTrainingTaskStatus(l.svcCtx, adapterList)
for _, adapter := range adapterList { for _, adapter := range adapterList {
mu.RLock()
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
mu.RUnlock()
if err != nil { if err != nil {
continue continue
} }
@ -81,56 +72,5 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
} }
} }
select {
case _ = <-ch:
return resp, nil
case <-time.After(2 * time.Second):
return resp, nil return resp, nil
} }
}
func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
if err != nil {
continue
}
if len(taskList) == 0 {
continue
}
for _, task := range taskList {
t := task
if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped {
continue
}
wg.Add(1)
go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
if trainingTask == nil {
wg.Done()
return
}
t.Status = trainingTask.Status
t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End
mu.Lock()
err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t)
mu.Unlock()
if err != nil {
wg.Done()
return
}
wg.Done()
}()
}
}
wg.Wait()
ch <- struct{}{}
}

View File

@ -2,7 +2,7 @@ package core
import ( import (
"context" "context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
@ -52,8 +52,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
} }
// 更新智算任务状态 // 更新智算任务状态
go status.UpdateTaskStatus(l.svcCtx, list) go updater.UpdateTaskStatus(l.svcCtx, list)
go status.UpdateAiTaskStatus(l.svcCtx, list) go updater.UpdateAiTaskStatus(l.svcCtx, list)
for _, model := range list { for _, model := range list {
if model.StartTime != "" && model.EndTime == "" { if model.StartTime != "" && model.EndTime == "" {

View File

@ -0,0 +1,87 @@
package updater
import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"net/http"
"strconv"
"sync"
)
func UpdateClusterResources(svc *svc.ServiceContext, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
if err != nil {
continue
}
for _, cluster := range clusters.List {
c := cluster
clusterResource, err := svc.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
if err != nil {
continue
}
wg.Add(1)
go func() {
_, ok := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id]
if !ok {
wg.Done()
return
}
h := http.Request{}
stat, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(h.Context())
if err != nil {
wg.Done()
return
}
if stat == nil {
wg.Done()
return
}
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
if err != nil {
wg.Done()
return
}
var cardTotal int64
var topsTotal float64
for _, card := range stat.CardsAvail {
cardTotal += int64(card.CardNum)
topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
}
if (models.TClusterResource{} == *clusterResource) {
err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
if err != nil {
wg.Done()
return
}
} else {
if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 {
wg.Done()
return
}
clusterResource.CardTotal = cardTotal
clusterResource.CardTopsTotal = topsTotal
clusterResource.CpuAvail = float64(stat.CpuCoreAvail)
clusterResource.CpuTotal = float64(stat.CpuCoreTotal)
clusterResource.MemAvail = stat.MemAvail
clusterResource.MemTotal = stat.MemTotal
clusterResource.DiskAvail = stat.DiskAvail
clusterResource.DiskTotal = stat.DiskTotal
err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
if err != nil {
wg.Done()
return
}
}
wg.Done()
}()
}
}
wg.Wait()
return
}

View File

@ -1,4 +1,4 @@
package status package updater
import ( import (
"errors" "errors"
@ -335,3 +335,48 @@ func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
} }
wg.Wait() wg.Wait()
} }
func UpdateTrainingTaskStatus(svc *svc.ServiceContext, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
taskList, err := svc.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
if err != nil {
continue
}
if len(taskList) == 0 {
continue
}
for _, task := range taskList {
t := task
if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped || task.TaskType != "pytorch" {
continue
}
wg.Add(1)
go func() {
h := http.Request{}
trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
if trainingTask == nil {
wg.Done()
return
}
t.Status = trainingTask.Status
t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End
err = svc.Scheduler.AiStorages.UpdateAiTask(t)
if err != nil {
wg.Done()
return
}
wg.Done()
}()
}
}
wg.Wait()
return
}