Merge pull request 'updated aioption struct' (#263) from tzwang/pcm-coordinator:master into master

Former-commit-id: 30cc75a85d3615110ecf7d8fd67a8ef59a8142b4
This commit is contained in:
tzwang 2024-07-12 17:59:37 +08:00
commit 74841c0655
9 changed files with 77 additions and 24 deletions

3
go.mod
View File

@ -2,7 +2,6 @@ module gitlink.org.cn/JointCloud/pcm-coordinator
go 1.22.0 go 1.22.0
require ( require (
github.com/JCCE-nudt/apigw-go-sdk v0.0.0-20230525025609-34159d6f2818 github.com/JCCE-nudt/apigw-go-sdk v0.0.0-20230525025609-34159d6f2818
github.com/Masterminds/squirrel v1.5.4 github.com/Masterminds/squirrel v1.5.4
@ -19,7 +18,7 @@ require (
github.com/prometheus/common v0.54.0 github.com/prometheus/common v0.54.0
github.com/robfig/cron/v3 v3.0.1 github.com/robfig/cron/v3 v3.0.1
github.com/zeromicro/go-zero v1.6.5 github.com/zeromicro/go-zero v1.6.5
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240619113316-c0186ee7b60c gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240712090657-cfba062e68e1
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240620065702-5dcad373c1fe gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240620065702-5dcad373c1fe
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35 gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203

4
go.sum
View File

@ -471,8 +471,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zeromicro/go-zero v1.6.5 h1:JgsBa25/knnEL7+KQksbwktudIkNQvaAin0nisVgnSA= github.com/zeromicro/go-zero v1.6.5 h1:JgsBa25/knnEL7+KQksbwktudIkNQvaAin0nisVgnSA=
github.com/zeromicro/go-zero v1.6.5/go.mod h1:XjbssEVEzFKueAh0Fie5kNf+cRqFlQQk46fY9WgEGaM= github.com/zeromicro/go-zero v1.6.5/go.mod h1:XjbssEVEzFKueAh0Fie5kNf+cRqFlQQk46fY9WgEGaM=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240619113316-c0186ee7b60c h1:HrU3GPuHWTAajQxbkDFygsAm/HURJFGT2yckUMdOdGM= gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240712090657-cfba062e68e1 h1:Wc9M/vq+9Iw49KZb6mgHj85sysGHjVY+QlHJeZKlx4w=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240619113316-c0186ee7b60c/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240712090657-cfba062e68e1/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240620065702-5dcad373c1fe h1:teAWL7sJszDb1ZA7uptrzPSwJ1OIV840Q1/nrrDsx7E= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240620065702-5dcad373c1fe h1:teAWL7sJszDb1ZA7uptrzPSwJ1OIV840Q1/nrrDsx7E=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240620065702-5dcad373c1fe/go.mod h1:/eOmBFZKWGoabG3sRVkVvIbLwsd2631k4jkUBR6x1AA= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240620065702-5dcad373c1fe/go.mod h1:/eOmBFZKWGoabG3sRVkVvIbLwsd2631k4jkUBR6x1AA=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35 h1:E2QfpS3Y0FjR8Zyv5l2Ti/2NetQFqHG66c8+T/+J1u0= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240510133934-6a5526289b35 h1:E2QfpS3Y0FjR8Zyv5l2Ti/2NetQFqHG66c8+T/+J1u0=

View File

@ -147,6 +147,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, opt option.Option, adapterName stri
aiOpt.Replica = inferOpt.Replica aiOpt.Replica = inferOpt.Replica
aiOpt.AdapterId = inferOpt.AdapterId aiOpt.AdapterId = inferOpt.AdapterId
aiOpt.TaskType = inferOpt.ModelType aiOpt.TaskType = inferOpt.ModelType
aiOpt.ModelName = inferOpt.ModelName
aiOpt.StrategyName = inferOpt.Strategy aiOpt.StrategyName = inferOpt.Strategy
} }
// 构建主任务结构体 // 构建主任务结构体
@ -169,6 +170,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, opt option.Option, adapterName stri
Replica: int64(aiOpt.Replica), Replica: int64(aiOpt.Replica),
JobId: jobId, JobId: jobId,
TaskType: aiOpt.TaskType, TaskType: aiOpt.TaskType,
ModelName: aiOpt.ModelName,
Strategy: aiOpt.StrategyName, Strategy: aiOpt.StrategyName,
Status: status, Status: status,
Msg: msg, Msg: msg,
@ -244,7 +246,8 @@ func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterR
} }
func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64, func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error { memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64, cardHours float64,
balance float64, taskCompleted int64) error {
cId, err := strconv.ParseInt(clusterId, 10, 64) cId, err := strconv.ParseInt(clusterId, 10, 64)
if err != nil { if err != nil {
return err return err
@ -268,6 +271,9 @@ func (s *AiStorage) SaveClusterResources(adapterId string, clusterId string, clu
GpuTotal: gpuTotal, GpuTotal: gpuTotal,
CardTotal: cardTotal, CardTotal: cardTotal,
CardTopsTotal: topsTotal, CardTopsTotal: topsTotal,
CardHours: cardHours,
Balance: balance,
TaskCompleted: taskCompleted,
} }
tx := s.DbEngin.Create(&clusterResource) tx := s.DbEngin.Create(&clusterResource)
if tx.Error != nil { if tx.Error != nil {

View File

@ -15,6 +15,7 @@ type AiOption struct {
ComputeCard string ComputeCard string
CodeType string CodeType string
AlgorithmName string AlgorithmName string
ModelName string
ImageId string ImageId string
SpecId string SpecId string

View File

@ -17,19 +17,20 @@ type AiCollector interface {
} }
type ResourceStats struct { type ResourceStats struct {
ClusterId string ClusterId string
Name string Name string
CpuCoreAvail int64 CpuCoreAvail int64
CpuCoreTotal int64 CpuCoreTotal int64
MemAvail float64 MemAvail float64
MemTotal float64 MemTotal float64
DiskAvail float64 DiskAvail float64
DiskTotal float64 DiskTotal float64
GpuAvail int64 GpuAvail int64
GpuTotal int64 GpuTotal int64
CardsAvail []*Card CardsAvail []*Card
CpuCoreHours float64 CpuCoreHours float64
Balance float64 Balance float64
TaskCompleted int64
} }
type Card struct { type Card struct {

View File

@ -46,14 +46,17 @@ func UpdateClusterResources(svc *svc.ServiceContext, list []*types.AdapterInfo)
} }
var cardTotal int64 var cardTotal int64
var topsTotal float64 var topsTotal float64
var cardHours float64
for _, card := range stat.CardsAvail { for _, card := range stat.CardsAvail {
cardTotal += int64(card.CardNum) cardTotal += int64(card.CardNum)
topsTotal += card.TOpsAtFp16 * float64(card.CardNum) topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
cardHours += card.CardHours
} }
if (models.TClusterResource{} == *clusterResource) { if (models.TClusterResource{} == *clusterResource) {
err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal, cardHours,
stat.Balance, stat.TaskCompleted)
if err != nil { if err != nil {
wg.Done() wg.Done()
return return
@ -71,6 +74,9 @@ func UpdateClusterResources(svc *svc.ServiceContext, list []*types.AdapterInfo)
clusterResource.MemTotal = stat.MemTotal clusterResource.MemTotal = stat.MemTotal
clusterResource.DiskAvail = stat.DiskAvail clusterResource.DiskAvail = stat.DiskAvail
clusterResource.DiskTotal = stat.DiskTotal clusterResource.DiskTotal = stat.DiskTotal
clusterResource.CardHours = cardHours
clusterResource.Balance = stat.Balance
clusterResource.TaskCompleted = stat.TaskCompleted
err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource) err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
if err != nil { if err != nil {

View File

@ -49,6 +49,7 @@ const (
CPUCOREPRICEPERHOUR = 0.09 CPUCOREPRICEPERHOUR = 0.09
DCUPRICEPERHOUR = 2.0 DCUPRICEPERHOUR = 2.0
KB = 1024 KB = 1024
TIMEOUT = 20
) )
var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
@ -270,7 +271,7 @@ func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) {
func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(4) wg.Add(5)
var cBalance = make(chan float64) var cBalance = make(chan float64)
var cMemTotal = make(chan float64) var cMemTotal = make(chan float64)
var cTotalCpu = make(chan int64) var cTotalCpu = make(chan int64)
@ -287,6 +288,26 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
TOpsAtFp16: DCU_TOPS, TOpsAtFp16: DCU_TOPS,
} }
//history jobs
go func() {
hReq := &hpcAC.ListHistoryJobReq{}
hReq.Start = 0
hReq.Limit = 1
hReq.IsQueryByQueueTime = "false"
hReq.TimeType = "CUSTOM"
hReq.StartTime = "2024-01-01 01:01:01"
endTime := time.Now().Format("2006-01-02 15:04:05")
hReq.EndTime = endTime
hResp, err := s.aCRpc.ListHistoryJob(ctx, hReq)
if err != nil || hResp.Code != "0" {
wg.Done()
return
}
resourceStats.TaskCompleted = int64(hResp.Data.Total)
wg.Done()
}()
//balance //balance
go func() { go func() {
userReq := &hpcAC.GetUserInfoReq{} userReq := &hpcAC.GetUserInfoReq{}
@ -304,7 +325,7 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
go func() { go func() {
limitReq := &hpcAC.QueueReq{} limitReq := &hpcAC.QueueReq{}
limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq) limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq)
if err != nil { if err != nil || limitResp.Code != "0" {
wg.Done() wg.Done()
return return
} }
@ -351,8 +372,22 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
//resources being occupied //resources being occupied
go func() { go func() {
memSize := <-cMemTotal var memSize float64
totalCpu := <-cTotalCpu var totalCpu int64
select {
case v := <-cMemTotal:
memSize = v
case <-time.After(TIMEOUT * time.Second):
wg.Done()
return
}
select {
case v := <-cTotalCpu:
totalCpu = v
case <-time.After(TIMEOUT * time.Second):
wg.Done()
return
}
memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil) memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil)
if err != nil { if err != nil {
wg.Done() wg.Done()
@ -392,7 +427,7 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
select { select {
case v := <-cBalance: case v := <-cBalance:
balance = v balance = v
case <-time.After(2 * time.Second): case <-time.After(TIMEOUT * time.Second):
return nil, errors.New("get balance rpc call failed") return nil, errors.New("get balance rpc call failed")
} }
@ -402,6 +437,7 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
dcu.CardHours = cardHours dcu.CardHours = cardHours
resourceStats.CpuCoreHours = cpuHours resourceStats.CpuCoreHours = cpuHours
resourceStats.Balance = balance
wg.Wait() wg.Wait()

View File

@ -55,6 +55,7 @@ type (
DeletedAt *time.Time `db:"deleted_at"` DeletedAt *time.Time `db:"deleted_at"`
Card string `db:"card"` Card string `db:"card"`
InferUrl string `db:"infer_url"` InferUrl string `db:"infer_url"`
ModelName string `db:"model_name"`
} }
) )

View File

@ -49,6 +49,9 @@ type (
CardTotal int64 `db:"card_total"` // 算力卡数量 CardTotal int64 `db:"card_total"` // 算力卡数量
CardTopsTotal float64 `db:"card_tops_total"` // 算力总量tops CardTopsTotal float64 `db:"card_tops_total"` // 算力总量tops
AdapterId int64 `db:"adapter_id"` AdapterId int64 `db:"adapter_id"`
CardHours float64 `db:"card_hours"`
Balance float64 `db:"balance"`
TaskCompleted int64 `db:"task_completed"`
} }
) )