From 7d2ac609e154e3bbf872401999b79b2163e868ff Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 12 Jul 2024 17:38:49 +0800 Subject: [PATCH 1/3] updated clusterResource stats Former-commit-id: 8ecbfaedf342ca6f4a0bc08ebc1b76f97186941e --- go.mod | 3 +- internal/scheduler/database/aiStorage.go | 6 ++- .../scheduler/service/collector/collector.go | 27 +++++------ .../service/updater/clusterResources.go | 8 +++- internal/storeLink/shuguangai.go | 46 +++++++++++++++++-- pkg/models/tclusterresourcemodel_gen.go | 3 ++ 6 files changed, 71 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 41adddbc..888d5ae4 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,6 @@ module gitlink.org.cn/JointCloud/pcm-coordinator go 1.22.0 - require ( github.com/JCCE-nudt/apigw-go-sdk v0.0.0-20230525025609-34159d6f2818 github.com/Masterminds/squirrel v1.5.4 @@ -19,7 +18,7 @@ require ( github.com/prometheus/common v0.54.0 github.com/robfig/cron/v3 v3.0.1 github.com/zeromicro/go-zero v1.6.5 - gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240619113316-c0186ee7b60c + gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240712090657-cfba062e68e1 gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240620065702-5dcad373c1fe gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35 gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 diff --git a/internal/scheduler/database/aiStorage.go b/internal/scheduler/database/aiStorage.go index a482e5a7..ab188766 100644 --- a/internal/scheduler/database/aiStorage.go +++ b/internal/scheduler/database/aiStorage.go @@ -244,7 +244,8 @@ func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterR } func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64, - memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error { + memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64, cardHours float64, + balance float64, taskCompleted int64) error { cId, err := strconv.ParseInt(clusterId, 10, 64) if err != nil { return err @@ -268,6 +269,9 @@ func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clu GpuTotal: gpuTotal, CardTotal: cardTotal, CardTopsTotal: topsTotal, + CardHours: cardHours, + Balance: balance, + TaskCompleted: taskCompleted, } tx := s.DbEngin.Create(&clusterResource) if tx.Error != nil { diff --git a/internal/scheduler/service/collector/collector.go b/internal/scheduler/service/collector/collector.go index c0bb35f1..24b5f618 100644 --- a/internal/scheduler/service/collector/collector.go +++ b/internal/scheduler/service/collector/collector.go @@ -17,19 +17,20 @@ type AiCollector interface { } type ResourceStats struct { - ClusterId string - Name string - CpuCoreAvail int64 - CpuCoreTotal int64 - MemAvail float64 - MemTotal float64 - DiskAvail float64 - DiskTotal float64 - GpuAvail int64 - GpuTotal int64 - CardsAvail []*Card - CpuCoreHours float64 - Balance float64 + ClusterId string + Name string + CpuCoreAvail int64 + CpuCoreTotal int64 + MemAvail float64 + MemTotal float64 + DiskAvail float64 + DiskTotal float64 + GpuAvail int64 + GpuTotal int64 + CardsAvail []*Card + CpuCoreHours float64 + Balance float64 + TaskCompleted int64 } type Card struct { diff --git a/internal/scheduler/service/updater/clusterResources.go b/internal/scheduler/service/updater/clusterResources.go index 939492c6..b4c3389f 100644 --- a/internal/scheduler/service/updater/clusterResources.go +++ b/internal/scheduler/service/updater/clusterResources.go @@ -46,14 +46,17 @@ func UpdateClusterResources(svc *svc.ServiceContext, list []*types.AdapterInfo) } var cardTotal int64 var topsTotal float64 + var cardHours float64 for _, card := range stat.CardsAvail { cardTotal += int64(card.CardNum) topsTotal += card.TOpsAtFp16 * float64(card.CardNum) + cardHours += card.CardHours } if (models.TClusterResource{} == *clusterResource) { err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), - stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) + stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal, cardHours, + stat.Balance, stat.TaskCompleted) if err != nil { wg.Done() return @@ -71,6 +74,9 @@ func UpdateClusterResources(svc *svc.ServiceContext, list []*types.AdapterInfo) clusterResource.MemTotal = stat.MemTotal clusterResource.DiskAvail = stat.DiskAvail clusterResource.DiskTotal = stat.DiskTotal + clusterResource.CardHours = cardHours + clusterResource.Balance = stat.Balance + clusterResource.TaskCompleted = stat.TaskCompleted err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource) if err != nil { diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index cb41ba11..e2e405d2 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -49,6 +49,7 @@ const ( CPUCOREPRICEPERHOUR = 0.09 DCUPRICEPERHOUR = 2.0 KB = 1024 + TIMEOUT = 20 ) var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ @@ -270,7 +271,7 @@ func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) { func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { var wg sync.WaitGroup - wg.Add(4) + wg.Add(5) var cBalance = make(chan float64) var cMemTotal = make(chan float64) var cTotalCpu = make(chan int64) @@ -287,6 +288,26 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS TOpsAtFp16: DCU_TOPS, } + //history jobs + go func() { + hReq := &hpcAC.ListHistoryJobReq{} + hReq.Start = 0 + hReq.Limit = 1 + hReq.IsQueryByQueueTime = "false" + hReq.TimeType = "CUSTOM" + hReq.StartTime = "2024-01-01 01:01:01" + endTime := time.Now().Format("2006-01-02 15:04:05") + hReq.EndTime = endTime + hResp, err := s.aCRpc.ListHistoryJob(ctx, hReq) + if err != nil || hResp.Code != "0" { + wg.Done() + return + } + resourceStats.TaskCompleted = int64(hResp.Data.Total) + + wg.Done() + }() + //balance go func() { userReq := &hpcAC.GetUserInfoReq{} @@ -304,7 +325,7 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS go func() { limitReq := &hpcAC.QueueReq{} limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq) - if err != nil { + if err != nil || limitResp.Code != "0" { wg.Done() return } @@ -351,8 +372,22 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS //resources being occupied go func() { - memSize := <-cMemTotal - totalCpu := <-cTotalCpu + var memSize float64 + var totalCpu int64 + select { + case v := <-cMemTotal: + memSize = v + case <-time.After(TIMEOUT * time.Second): + wg.Done() + return + } + select { + case v := <-cTotalCpu: + totalCpu = v + case <-time.After(TIMEOUT * time.Second): + wg.Done() + return + } memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil) if err != nil { wg.Done() @@ -392,7 +427,7 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS select { case v := <-cBalance: balance = v - case <-time.After(2 * time.Second): + case <-time.After(TIMEOUT * time.Second): return nil, errors.New("get balance rpc call failed") } @@ -402,6 +437,7 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS dcu.CardHours = cardHours resourceStats.CpuCoreHours = cpuHours + resourceStats.Balance = balance wg.Wait() diff --git a/pkg/models/tclusterresourcemodel_gen.go b/pkg/models/tclusterresourcemodel_gen.go index f02722ff..a769e2dc 100644 --- a/pkg/models/tclusterresourcemodel_gen.go +++ b/pkg/models/tclusterresourcemodel_gen.go @@ -49,6 +49,9 @@ type ( CardTotal int64 `db:"card_total"` // 算力卡数量 CardTopsTotal float64 `db:"card_tops_total"` // 算力总量tops AdapterId int64 `db:"adapter_id"` + CardHours float64 `db:"card_hours"` + Balance float64 `db:"balance"` + TaskCompleted int64 `db:"task_completed"` } ) From a5e6313974bd9912e048f7f1d95756e9ee15d76e Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 12 Jul 2024 17:39:17 +0800 Subject: [PATCH 2/3] updated clusterResource stats Former-commit-id: 165801769d76c7128918b8661d0dbeed31fcc734 --- go.sum | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.sum b/go.sum index 56a21131..5f9165ae 100644 --- a/go.sum +++ b/go.sum @@ -471,8 +471,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zeromicro/go-zero v1.6.5 h1:JgsBa25/knnEL7+KQksbwktudIkNQvaAin0nisVgnSA= github.com/zeromicro/go-zero v1.6.5/go.mod h1:XjbssEVEzFKueAh0Fie5kNf+cRqFlQQk46fY9WgEGaM= -gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240619113316-c0186ee7b60c h1:HrU3GPuHWTAajQxbkDFygsAm/HURJFGT2yckUMdOdGM= -gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240619113316-c0186ee7b60c/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= +gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240712090657-cfba062e68e1 h1:Wc9M/vq+9Iw49KZb6mgHj85sysGHjVY+QlHJeZKlx4w= +gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240712090657-cfba062e68e1/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240620065702-5dcad373c1fe h1:teAWL7sJszDb1ZA7uptrzPSwJ1OIV840Q1/nrrDsx7E= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240620065702-5dcad373c1fe/go.mod h1:/eOmBFZKWGoabG3sRVkVvIbLwsd2631k4jkUBR6x1AA= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35 h1:E2QfpS3Y0FjR8Zyv5l2Ti/2NetQFqHG66c8+T/+J1u0= From 4120d605114c1f99b578351f440224b83c0c1989 Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 12 Jul 2024 17:58:43 +0800 Subject: [PATCH 3/3] updated aioption Former-commit-id: ba671ba0ca3d1c1ae66c229aedccdb7e5ef280d3 --- internal/scheduler/database/aiStorage.go | 2 ++ internal/scheduler/schedulers/option/aiOption.go | 1 + pkg/models/taskaimodel_gen.go | 1 + 3 files changed, 4 insertions(+) diff --git a/internal/scheduler/database/aiStorage.go b/internal/scheduler/database/aiStorage.go index ab188766..54b02c20 100644 --- a/internal/scheduler/database/aiStorage.go +++ b/internal/scheduler/database/aiStorage.go @@ -147,6 +147,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, opt option.Option, adapterName stri aiOpt.Replica = inferOpt.Replica aiOpt.AdapterId = inferOpt.AdapterId aiOpt.TaskType = inferOpt.ModelType + aiOpt.ModelName = inferOpt.ModelName aiOpt.StrategyName = inferOpt.Strategy } // 构建主任务结构体 @@ -169,6 +170,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, opt option.Option, adapterName stri Replica: int64(aiOpt.Replica), JobId: jobId, TaskType: aiOpt.TaskType, + ModelName: aiOpt.ModelName, Strategy: aiOpt.StrategyName, Status: status, Msg: msg, diff --git a/internal/scheduler/schedulers/option/aiOption.go b/internal/scheduler/schedulers/option/aiOption.go index b3cc4de0..e6b7a838 100644 --- a/internal/scheduler/schedulers/option/aiOption.go +++ b/internal/scheduler/schedulers/option/aiOption.go @@ -15,6 +15,7 @@ type AiOption struct { ComputeCard string CodeType string AlgorithmName string + ModelName string ImageId string SpecId string diff --git a/pkg/models/taskaimodel_gen.go b/pkg/models/taskaimodel_gen.go index 24473016..94a56845 100644 --- a/pkg/models/taskaimodel_gen.go +++ b/pkg/models/taskaimodel_gen.go @@ -55,6 +55,7 @@ type ( DeletedAt *time.Time `db:"deleted_at"` Card string `db:"card"` InferUrl string `db:"infer_url"` + ModelName string `db:"model_name"` } )