Merge pull request 'fix getresources bugs' (#94) from tzwang/pcm-coordinator:master into master

Former-commit-id: 875557407a32ba5387b1d5f9fdfd71f54415b2ad
This commit is contained in:
tzwang 2024-04-02 16:56:50 +08:00
commit 5b4b7f88eb
5 changed files with 49 additions and 29 deletions

View File

@ -28,11 +28,11 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc
func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) { func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) {
resp = &types.ScheduleResp{} resp = &types.ScheduleResp{}
opt := &option.AiOption{ opt := &option.AiOption{
ResourceType: req.AiOption.ResourceType, ResourceType: req.AiOption.ResourceType,
Tops: 0, Tops: 0,
TaskType: req.AiOption.TaskType, TaskType: req.AiOption.TaskType,
DatasetsName: req.AiOption.Datasets, DatasetsName: req.AiOption.Datasets,
AlgorithmName: "cnn", //AlgorithmName: "cnn",
StrategyName: req.AiOption.Strategy, StrategyName: req.AiOption.Strategy,
ClusterToStaticWeight: nil, ClusterToStaticWeight: nil,
Params: []string{ Params: []string{

View File

@ -2,7 +2,6 @@ package database
import ( import (
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@ -24,12 +23,12 @@ func (s *AiStorage) GetParticipants() (*types.ClusterListResp, error) {
return &resp, nil return &resp, nil
} }
func (s *AiStorage) SaveTask(cluster strategy.AssignedCluster) error { func (s *AiStorage) SaveTask(name string) error {
// 构建主任务结构体 // 构建主任务结构体
taskModel := models.Task{ taskModel := models.Task{
Status: constants.Saved, Status: constants.Saved,
Description: "ai task", Description: "ai task",
Name: "testAi", Name: name,
CommitTime: time.Now(), CommitTime: time.Now(),
} }
// 保存任务数据到数据库 // 保存任务数据到数据库

View File

@ -100,6 +100,8 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
return nil, errors.New("clusters is nil") return nil, errors.New("clusters is nil")
} }
//res := struct {
//}{}
var wg sync.WaitGroup var wg sync.WaitGroup
var result []interface{} var result []interface{}
var errs []error var errs []error
@ -115,6 +117,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
wg.Add(1) wg.Add(1)
go func() { go func() {
resp, err := executorMap[c.Name].Execute(as.ctx, as.option) resp, err := executorMap[c.Name].Execute(as.ctx, as.option)
if err != nil { if err != nil {
// TODO: database operation // TODO: database operation
errCh <- err errCh <- err
@ -122,15 +125,20 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
return return
} }
// TODO: database operation // TODO: database operation
ch <- resp data := struct {
Resp interface{}
ClusterId int64
}{
Resp: resp,
ClusterId: c.ParticipantId,
}
ch <- data
wg.Done() wg.Done()
}() }()
} }
wg.Wait() wg.Wait()
close(ch)
for s := range ch { close(errCh)
result = append(result, s)
}
for e := range errCh { for e := range errCh {
errs = append(errs, e) errs = append(errs, e)
@ -140,6 +148,19 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
return nil, errors.New("submit task failed") return nil, errors.New("submit task failed")
} }
for s := range ch {
data := (s).(struct {
Resp interface{}
ClusterId int64
})
result = append(result, data.Resp)
}
err := as.AiStorages.SaveTask(as.option.TaskName)
if err != nil {
return nil, err
}
return result, nil return result, nil
} }

View File

@ -49,7 +49,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
if opt.ResourceType == "computeCard" { if opt.ResourceType == "computeCard" {
var maxCurrentCardHours float64 var maxCurrentCardHours float64
for _, card := range res.CardsAvail { for _, card := range res.CardsAvail {
cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3) cardHours := common.RoundFloat( /*card.TOpsAtFp16**/ card.CardHours, 3)
if cardHours > maxCurrentCardHours { if cardHours > maxCurrentCardHours {
maxCurrentCardHours = cardHours maxCurrentCardHours = cardHours
} }

View File

@ -284,14 +284,14 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
totalDcu := limitResp.Data.AccountMaxDcu totalDcu := limitResp.Data.AccountMaxDcu
//disk //disk
diskReq := &hpcAC.ParaStorQuotaReq{} //diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) //diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
if err != nil { //if err != nil {
return nil, err // return nil, err
} //}
//
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3) //totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) //availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
//memory //memory
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil) nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
@ -349,12 +349,12 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
Balance: balance, Balance: balance,
CpuCoreTotal: totalCpu, CpuCoreTotal: totalCpu,
CpuCoreAvail: CpuCoreAvail, CpuCoreAvail: CpuCoreAvail,
DiskTotal: totalDisk, //DiskTotal: totalDisk,
DiskAvail: availDisk, //DiskAvail: availDisk,
MemTotal: memSize, MemTotal: memSize,
MemAvail: MemAvail, MemAvail: MemAvail,
CpuCoreHours: cpuHours, CpuCoreHours: cpuHours,
CardsAvail: cards, CardsAvail: cards,
} }
return resourceStats, nil return resourceStats, nil
@ -381,7 +381,7 @@ func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm,
var algorithms []*collector.Algorithm var algorithms []*collector.Algorithm
for _, t := range GetTaskTypes() { for _, t := range GetTaskTypes() {
taskType := t taskType := t
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0} req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0, Order: "asc", OrderBy: "name", KeyWord: ""}
list, err := s.aCRpc.GetFileList(ctx, req) list, err := s.aCRpc.GetFileList(ctx, req)
if err != nil { if err != nil {
return nil, err return nil, err