Merge pull request 'updated scheduleResult' (#176) from tzwang/pcm-coordinator:master into master

Former-commit-id: 7398d71826b76f7c5971992e6b8ffe862fd8b953
This commit is contained in:
tzwang 2024-05-15 18:34:54 +08:00
commit ab187a3579
9 changed files with 116 additions and 26 deletions

View File

@ -19,7 +19,9 @@ type (
ScheduleResult {
ClusterId string `json:"clusterId"`
TaskId string `json:"taskId"`
Card string `json:"card"`
Strategy string `json:"strategy"`
JobId string `json:"jobId"`
Replica int32 `json:"replica"`
Msg string `json:"msg"`
}
@ -32,6 +34,7 @@ type (
AdapterId string `json:"adapterId"`
AiClusterIds []string `json:"aiClusterIds"`
ResourceType string `json:"resourceType"`
ComputeCard string `json:"card"`
Tops float64 `json:"Tops,optional"`
TaskType string `json:"taskType"`
Datasets string `json:"datasets"`

View File

@ -2,6 +2,8 @@ package ai
import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"strconv"
"sync"
@ -46,6 +48,9 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
if err != nil {
continue
}
if len(taskList) == 0 {
continue
}
for _, task := range taskList {
var elapsed time.Duration
switch task.Status {
@ -82,7 +87,6 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
case <-time.After(2 * time.Second):
return resp, nil
}
}
func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
@ -92,15 +96,20 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<-
if err != nil {
continue
}
if len(taskList) == 0 {
continue
}
for _, task := range taskList {
t := task
if t.Status == constants.Completed || t.JobId == "" {
if t.Status == constants.Completed {
continue
}
wg.Add(1)
go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}

View File

@ -2,12 +2,16 @@ package core
import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
"strconv"
"sync"
"time"
"github.com/zeromicro/go-zero/core/logx"
@ -53,8 +57,9 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
}
// 更新智算任务状态
var ch = make(chan struct{})
go l.updateAitaskStatus(list, ch)
chs := [2]chan struct{}{make(chan struct{}), make(chan struct{})}
go l.updateTaskStatus(list, chs[0])
go l.updateAiTaskStatus(list, chs[1])
for _, model := range list {
if model.StartTime != "" && model.EndTime == "" {
@ -72,15 +77,18 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
resp.PageNum = req.PageNum
resp.Total = total
select {
case _ = <-ch:
return resp, nil
case <-time.After(1 * time.Second):
return resp, nil
for _, ch := range chs {
select {
case <-ch:
return
case <-time.After(1 * time.Second):
return
}
}
return
}
func (l *PageListTaskLogic) updateAitaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
for _, task := range tasks {
if task.AdapterTypeDict != 1 {
continue
@ -150,8 +158,62 @@ func (l *PageListTaskLogic) updateAitaskStatus(tasks []*types.TaskModel, ch chan
tx = l.svcCtx.DbEngin.Table("task").Updates(task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return
}
}
ch <- struct{}{}
}
func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
var wg sync.WaitGroup
for _, task := range tasks {
if task.AdapterTypeDict != 1 {
continue
}
if task.Status == constants.Succeeded {
continue
}
var aiTaskList []*models.TaskAi
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return
}
if len(aiTaskList) == 0 {
continue
}
for _, aitask := range aiTaskList {
t := aitask
if t.Status == constants.Completed {
continue
}
wg.Add(1)
go func() {
trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
t.Status = trainingTask.Status
t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End
err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t)
if err != nil {
msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
wg.Done()
}()
}
}
wg.Wait()
ch <- struct{}{}
}

View File

@ -7,6 +7,8 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"strconv"
"strings"
"github.com/zeromicro/go-zero/core/logx"
)
@ -31,6 +33,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
AdapterId: req.AiOption.AdapterId,
TaskName: req.AiOption.TaskName,
ResourceType: req.AiOption.ResourceType,
ComputeCard: req.AiOption.ComputeCard,
Replica: 1,
Tops: req.AiOption.Tops,
TaskType: req.AiOption.TaskType,
@ -69,19 +72,22 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
for _, r := range rs {
scheResult := &types.ScheduleResult{}
scheResult.ClusterId = r.ClusterId
scheResult.TaskId = r.TaskId
scheResult.TaskId = strconv.FormatInt(id, 10)
scheResult.JobId = r.JobId
scheResult.Strategy = r.Strategy
scheResult.Card = strings.ToUpper(r.Card)
scheResult.Replica = r.Replica
scheResult.Msg = r.Msg
opt.ComputeCard = r.Card
opt.ComputeCard = strings.ToUpper(r.Card)
clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(r.ClusterId)
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, clusterName, r.TaskId, constants.Saved, r.Msg)
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg)
if err != nil {
return nil, err
}
resp.Results = append(resp.Results, scheResult)
}

View File

@ -73,10 +73,11 @@ func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo,
func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) {
var resp []*models.TaskAi
tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
db := s.DbEngin.Model(&models.TaskAi{}).Table("task_ai")
db = db.Where("adapter_id = ?", adapterId)
err := db.Order("commit_time desc").Find(&resp).Error
if err != nil {
return nil, err
}
return resp, nil
}

View File

@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
@ -42,7 +43,7 @@ type AiScheduler struct {
}
type AiResult struct {
TaskId string
JobId string
ClusterId string
Strategy string
Replica int32
@ -214,14 +215,15 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
return nil, errors.New("database add failed: " + err.Error())
}
} else {
msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.TaskId, constants.Saved, msg)
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
if err != nil {
return nil, errors.New("database add failed: " + err.Error())
}
}
}
logx.Errorf(errors.New(errmsg).Error())
return nil, errors.New(errmsg)
}
@ -296,7 +298,7 @@ func convertType(in interface{}) (*AiResult, error) {
case *hpcAC.SubmitTaskAiResp:
resp := (in).(*hpcAC.SubmitTaskAiResp)
if resp.Code == "0" {
result.TaskId = resp.Data
result.JobId = resp.Data
} else {
result.Msg = resp.Msg
}
@ -305,7 +307,7 @@ func convertType(in interface{}) (*AiResult, error) {
resp := (in).(*octopus.CreateTrainJobResp)
if resp.Success {
result.TaskId = resp.Payload.JobId
result.JobId = resp.Payload.JobId
} else {
result.Msg = resp.Error.Message
}

View File

@ -493,7 +493,11 @@ func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*coll
}
jobresp, ok := (resp).(*octopus.GetTrainJobResp)
if !jobresp.Success || !ok {
return nil, errors.New("get training task failed")
if jobresp.Error != nil {
return nil, errors.New(jobresp.Error.Message)
} else {
return nil, errors.New("get training task failed, empty error returned")
}
}
var task collector.Task
task.Id = jobresp.Payload.TrainJob.Id
@ -587,7 +591,7 @@ func (o *OctopusLink) generateResourceId(ctx context.Context, option *option.AiO
}
if option.ResourceType == CARD {
err = setResourceIdByCard(option, specResp, option.ComputeCard)
err = setResourceIdByCard(option, specResp, GCU)
if err != nil {
return err
}

View File

@ -82,7 +82,7 @@ var (
"3": SHUGUANGAI,
"4": SHUGUANGHPC,
}
resourceTypes = []string{CPU, CARD}
resourceTypes = []string{CARD}
taskTypes = []string{PYTORCH_TASK}
ERROR_RESP_EMPTY = errors.New("resp empty error")

View File

@ -1164,7 +1164,7 @@ type CommitHpcTaskReq struct {
Description string `json:"description,optional"`
TenantId int64 `json:"tenantId,optional"`
TaskId int64 `json:"taskId,optional"`
AdapterIds []string `json:"adapterId"`
AdapterIds []string `json:"adapterIds"`
MatchLabels map[string]string `json:"matchLabels,optional"`
CardCount int64 `json:"cardCount,optional"`
WorkDir string `json:"workDir,optional"` //paratera:workingDir
@ -5622,7 +5622,9 @@ type ScheduleResp struct {
type ScheduleResult struct {
ClusterId string `json:"clusterId"`
TaskId string `json:"taskId"`
Card string `json:"card"`
Strategy string `json:"strategy"`
JobId string `json:"jobId"`
Replica int32 `json:"replica"`
Msg string `json:"msg"`
}
@ -5635,6 +5637,7 @@ type AiOption struct {
AdapterId string `json:"adapterId"`
AiClusterIds []string `json:"aiClusterIds"`
ResourceType string `json:"resourceType"`
ComputeCard string `json:"card"`
Tops float64 `json:"Tops,optional"`
TaskType string `json:"taskType"`
Datasets string `json:"datasets"`