Merge remote-tracking branch 'origin/master' into master-wq

Former-commit-id: f59ee459bf0670a2176c1f6a13f0b6737fb2012c
This commit is contained in:
qiwang 2024-05-21 18:56:46 +08:00
commit ed059e3863
8 changed files with 162 additions and 29 deletions

View File

@ -99,11 +99,20 @@ func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan
} }
wg.Add(1) wg.Add(1)
go func() { go func() {
_, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id]
if !ok {
wg.Done()
return
}
stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx) stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx)
if err != nil { if err != nil {
wg.Done() wg.Done()
return return
} }
if stat == nil {
wg.Done()
return
}
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64) clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
if err != nil { if err != nil {
wg.Done() wg.Done()

View File

@ -42,6 +42,9 @@ func (l *GetCenterQueueingLogic) GetCenterQueueing() (resp *types.CenterQueueing
if err != nil { if err != nil {
continue continue
} }
if queues == nil {
continue
}
//todo sync current task queues //todo sync current task queues
current := &types.CenterQueue{ current := &types.CenterQueue{
Name: cluster.Name, Name: cluster.Name,

View File

@ -113,6 +113,10 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<-
wg.Done() wg.Done()
return return
} }
if trainingTask == nil {
wg.Done()
return
}
t.Status = trainingTask.Status t.Status = trainingTask.Status
t.StartTime = trainingTask.Start t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End t.EndTime = trainingTask.End

View File

@ -85,24 +85,25 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
return return
} }
func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) {
for i := len(tasks) - 1; i >= 0; i-- { list := tasklist
if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { for i := len(list) - 1; i >= 0; i-- {
tasks = append(tasks[:i], tasks[i+1:]...) if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
list = append(list[:i], list[i+1:]...)
} }
} }
if len(tasks) == 0 { if len(list) == 0 {
ch <- struct{}{} ch <- struct{}{}
return return
} }
task := tasks[0] task := list[0]
for i, _ := range tasks { for i, _ := range list {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime) latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
if latest.Before(earliest) { if latest.Before(earliest) {
task = tasks[i] task = list[i]
} }
} }
@ -206,24 +207,25 @@ func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<-
ch <- struct{}{} ch <- struct{}{}
} }
func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { func (l *PageListTaskLogic) updateAiTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) {
for i := len(tasks) - 1; i >= 0; i-- { list := tasklist
if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { for i := len(list) - 1; i >= 0; i-- {
tasks = append(tasks[:i], tasks[i+1:]...) if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
list = append(list[:i], list[i+1:]...)
} }
} }
if len(tasks) == 0 { if len(list) == 0 {
ch <- struct{}{} ch <- struct{}{}
return return
} }
task := tasks[0] task := list[0]
for i, _ := range tasks { for i, _ := range list {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime) latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
if latest.Before(earliest) { if latest.Before(earliest) {
task = tasks[i] task = list[i]
} }
} }
@ -255,6 +257,10 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan
wg.Done() wg.Done()
return return
} }
if trainingTask == nil {
wg.Done()
return
}
t.Status = trainingTask.Status t.Status = trainingTask.Status
t.StartTime = trainingTask.Start t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End t.EndTime = trainingTask.End

View File

@ -30,6 +30,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"sync" "sync"
) )
@ -342,6 +343,17 @@ func convertType(in interface{}) (*AiResult, error) {
result.Msg = resp.Error.Message result.Msg = resp.Error.Message
} }
return &result, nil
case *modelartsservice.CreateTrainingJobResp:
resp := (in).(*modelartsservice.CreateTrainingJobResp)
if resp.ErrorMsg != "" {
result.Msg = resp.ErrorMsg
} else {
result.JobId = resp.Metadata.Id
}
return &result, nil return &result, nil
default: default:
return nil, errors.New("ai task response failed") return nil, errors.New("ai task response failed")

View File

@ -64,7 +64,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
id, _ := strconv.ParseInt(c.Id, 10, 64) id, _ := strconv.ParseInt(c.Id, 10, 64)
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf)) modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Nickname, id) modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
collectorMap[c.Id] = modelarts collectorMap[c.Id] = modelarts
executorMap[c.Id] = modelarts executorMap[c.Id] = modelarts
case SHUGUANGAI: case SHUGUANGAI:

View File

@ -19,12 +19,18 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"strconv" "strconv"
"strings" "strings"
"time"
)
const (
Ascend = "Ascend"
) )
type ModelArtsLink struct { type ModelArtsLink struct {
@ -36,8 +42,8 @@ type ModelArtsLink struct {
pageSize int32 pageSize int32
} }
func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64) *ModelArtsLink { func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64, nickname string) *ModelArtsLink {
return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100} return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: nickname, participantId: id, pageIndex: 0, pageSize: 50}
} }
func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) { func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) {
@ -87,6 +93,7 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri
WorkspaceId: "0", WorkspaceId: "0",
}, },
Algorithm: &modelarts.Algorithms{ Algorithm: &modelarts.Algorithms{
Id: algorithmId,
Engine: &modelarts.EngineCreateTraining{ Engine: &modelarts.EngineCreateTraining{
ImageUrl: imageId, ImageUrl: imageId,
}, },
@ -184,7 +191,9 @@ func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorit
} }
func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) { func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) {
return nil, nil var cards []string
cards = append(cards, Ascend)
return cards, nil
} }
func (m *ModelArtsLink) GetUserBalance(ctx context.Context) (float64, error) { func (m *ModelArtsLink) GetUserBalance(ctx context.Context) (float64, error) {
@ -200,11 +209,61 @@ func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType st
} }
func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
return "", nil req := &modelartsservice.GetTrainingJobLogsPreviewReq{
Platform: m.platform,
TaskId: "worker-0",
TrainingJobId: taskId,
}
resp, err := m.modelArtsRpc.GetTrainingJobLogsPreview(ctx, req)
if err != nil {
return "", err
}
if strings.Contains(resp.Content, "404 Not Found") {
resp.Content = "waiting for logs..."
}
return resp.Content, nil
} }
func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
return nil, nil resp, err := m.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
jobresp, ok := (resp).(*modelartsservice.JobResponse)
if jobresp.ErrorMsg != "" || !ok {
if jobresp.ErrorMsg != "" {
return nil, errors.New(jobresp.ErrorMsg)
} else {
return nil, errors.New("get training task failed, empty error returned")
}
}
var task collector.Task
task.Id = jobresp.Metadata.Id
switch strings.ToLower(jobresp.Status.Phase) {
case "completed":
task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout)
duration := jobresp.Status.Duration
task.End = time.Unix(int64(jobresp.Status.StartTime)/1000+int64(duration/1000), 0).Format(constants.Layout)
task.Status = constants.Completed
case "failed":
task.Status = constants.Failed
case "running":
task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout)
task.Status = constants.Running
case "stopped":
task.Status = constants.Stopped
case "pending":
task.Status = constants.Pending
case "terminated":
//TODO Failed
task.Status = constants.Failed
default:
task.Status = "undefined"
}
return &task, nil
} }
func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
@ -224,6 +283,10 @@ func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option
if err != nil { if err != nil {
return err return err
} }
err = m.generateAlgorithmId(ctx, option)
if err != nil {
return err
}
err = m.generateImageId(option) err = m.generateImageId(option)
if err != nil { if err != nil {
return err return err
@ -244,10 +307,7 @@ func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option
} }
func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption) error { func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption) error {
_, err := m.QuerySpecs(ctx) option.ResourceId = "modelarts.kat1.xlarge"
if err != nil {
return err
}
return nil return nil
} }
@ -270,3 +330,42 @@ func (m *ModelArtsLink) generateParams(option *option.AiOption) error {
return nil return nil
} }
func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
req := &modelarts.ListAlgorithmsReq{
Platform: m.platform,
Offset: m.pageIndex,
Limit: m.pageSize,
}
resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req)
if err != nil {
return err
}
if resp.ErrorMsg != "" {
return errors.New("failed to get algorithmId")
}
for _, algorithm := range resp.Items {
engVersion := algorithm.JobConfig.Engine.EngineVersion
if strings.Contains(engVersion, option.TaskType) {
ns := strings.Split(algorithm.Metadata.Name, DASH)
if ns[0] != option.TaskType {
continue
}
if ns[1] != option.DatasetsName {
continue
}
if ns[2] != option.AlgorithmName {
continue
}
option.AlgorithmId = algorithm.Metadata.Id
return nil
}
}
if option.AlgorithmId == "" {
return errors.New("Algorithm does not exist")
}
return errors.New("failed to get AlgorithmId")
}

View File

@ -99,7 +99,7 @@ func NewStoreLink(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservic
linkStruct := NewOctopusLink(octopusRpc, participant.Name, participant.Id) linkStruct := NewOctopusLink(octopusRpc, participant.Name, participant.Id)
return &StoreLink{ILinkage: linkStruct} return &StoreLink{ILinkage: linkStruct}
case TYPE_MODELARTS: case TYPE_MODELARTS:
linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id) linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id, "")
return &StoreLink{ILinkage: linkStruct} return &StoreLink{ILinkage: linkStruct}
case TYPE_SHUGUANGAI: case TYPE_SHUGUANGAI:
linkStruct := NewShuguangAi(aCRpc, participant.Name, participant.Id) linkStruct := NewShuguangAi(aCRpc, participant.Name, participant.Id)