diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go index a4ecf111..e6a45106 100644 --- a/api/internal/logic/ai/getcenteroverviewlogic.go +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -99,11 +99,20 @@ func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan } wg.Add(1) go func() { + _, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id] + if !ok { + wg.Done() + return + } stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx) if err != nil { wg.Done() return } + if stat == nil { + wg.Done() + return + } clusterType, err := strconv.ParseInt(adapter.Type, 10, 64) if err != nil { wg.Done() diff --git a/api/internal/logic/ai/getcenterqueueinglogic.go b/api/internal/logic/ai/getcenterqueueinglogic.go index bd5e5e2b..e7405298 100644 --- a/api/internal/logic/ai/getcenterqueueinglogic.go +++ b/api/internal/logic/ai/getcenterqueueinglogic.go @@ -42,6 +42,9 @@ func (l *GetCenterQueueingLogic) GetCenterQueueing() (resp *types.CenterQueueing if err != nil { continue } + if queues == nil { + continue + } //todo sync current task queues current := &types.CenterQueue{ Name: cluster.Name, diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index bbae384b..b7fa034f 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -113,6 +113,10 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- wg.Done() return } + if trainingTask == nil { + wg.Done() + return + } t.Status = trainingTask.Status t.StartTime = trainingTask.Start t.EndTime = trainingTask.End diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index f1858a4a..97dad450 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -85,24 +85,25 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa return } -func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { - for i := len(tasks) - 1; i >= 0; i-- { - if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { - tasks = append(tasks[:i], tasks[i+1:]...) +func (l *PageListTaskLogic) updateTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) { + list := tasklist + for i := len(list) - 1; i >= 0; i-- { + if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { + list = append(list[:i], list[i+1:]...) } } - if len(tasks) == 0 { + if len(list) == 0 { ch <- struct{}{} return } - task := tasks[0] - for i, _ := range tasks { + task := list[0] + for i, _ := range list { earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) - latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime) + latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime) if latest.Before(earliest) { - task = tasks[i] + task = list[i] } } @@ -206,24 +207,25 @@ func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- ch <- struct{}{} } -func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { - for i := len(tasks) - 1; i >= 0; i-- { - if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { - tasks = append(tasks[:i], tasks[i+1:]...) +func (l *PageListTaskLogic) updateAiTaskStatus(tasklist []*types.TaskModel, ch chan<- struct{}) { + list := tasklist + for i := len(list) - 1; i >= 0; i-- { + if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { + list = append(list[:i], list[i+1:]...) } } - if len(tasks) == 0 { + if len(list) == 0 { ch <- struct{}{} return } - task := tasks[0] - for i, _ := range tasks { + task := list[0] + for i, _ := range list { earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) - latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime) + latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime) if latest.Before(earliest) { - task = tasks[i] + task = list[i] } } @@ -255,6 +257,10 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan wg.Done() return } + if trainingTask == nil { + wg.Done() + return + } t.Status = trainingTask.Status t.StartTime = trainingTask.Start t.EndTime = trainingTask.End diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index 5a98a8d6..01187c5e 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -30,6 +30,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" "sync" ) @@ -342,6 +343,17 @@ func convertType(in interface{}) (*AiResult, error) { result.Msg = resp.Error.Message } + return &result, nil + case *modelartsservice.CreateTrainingJobResp: + resp := (in).(*modelartsservice.CreateTrainingJobResp) + + if resp.ErrorMsg != "" { + result.Msg = resp.ErrorMsg + } else { + + result.JobId = resp.Metadata.Id + } + return &result, nil default: return nil, errors.New("ai task response failed") diff --git a/api/internal/scheduler/service/aiService.go b/api/internal/scheduler/service/aiService.go index 45b6da6d..0567e4a3 100644 --- a/api/internal/scheduler/service/aiService.go +++ b/api/internal/scheduler/service/aiService.go @@ -64,7 +64,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st id, _ := strconv.ParseInt(c.Id, 10, 64) modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf)) - modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Nickname, id) + modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) collectorMap[c.Id] = modelarts executorMap[c.Id] = modelarts case SHUGUANGAI: diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index ee8df706..d19b0b82 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -19,12 +19,18 @@ import ( "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" "strconv" "strings" + "time" +) + +const ( + Ascend = "Ascend" ) type ModelArtsLink struct { @@ -36,8 +42,8 @@ type ModelArtsLink struct { pageSize int32 } -func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64) *ModelArtsLink { - return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100} +func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64, nickname string) *ModelArtsLink { + return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: nickname, participantId: id, pageIndex: 0, pageSize: 50} } func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) { @@ -87,6 +93,7 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri WorkspaceId: "0", }, Algorithm: &modelarts.Algorithms{ + Id: algorithmId, Engine: &modelarts.EngineCreateTraining{ ImageUrl: imageId, }, @@ -184,7 +191,9 @@ func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorit } func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) { - return nil, nil + var cards []string + cards = append(cards, Ascend) + return cards, nil } func (m *ModelArtsLink) GetUserBalance(ctx context.Context) (float64, error) { @@ -200,11 +209,61 @@ func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType st } func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { - return "", nil + req := &modelartsservice.GetTrainingJobLogsPreviewReq{ + Platform: m.platform, + TaskId: "worker-0", + TrainingJobId: taskId, + } + resp, err := m.modelArtsRpc.GetTrainingJobLogsPreview(ctx, req) + if err != nil { + return "", err + } + + if strings.Contains(resp.Content, "404 Not Found") { + resp.Content = "waiting for logs..." + } + return resp.Content, nil } func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { - return nil, nil + resp, err := m.QueryTask(ctx, taskId) + if err != nil { + return nil, err + } + jobresp, ok := (resp).(*modelartsservice.JobResponse) + if jobresp.ErrorMsg != "" || !ok { + if jobresp.ErrorMsg != "" { + return nil, errors.New(jobresp.ErrorMsg) + } else { + return nil, errors.New("get training task failed, empty error returned") + } + } + var task collector.Task + task.Id = jobresp.Metadata.Id + + switch strings.ToLower(jobresp.Status.Phase) { + case "completed": + task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout) + duration := jobresp.Status.Duration + task.End = time.Unix(int64(jobresp.Status.StartTime)/1000+int64(duration/1000), 0).Format(constants.Layout) + task.Status = constants.Completed + case "failed": + task.Status = constants.Failed + case "running": + task.Start = time.Unix(int64(jobresp.Status.StartTime)/1000, 0).Format(constants.Layout) + task.Status = constants.Running + case "stopped": + task.Status = constants.Stopped + case "pending": + task.Status = constants.Pending + case "terminated": + //TODO Failed + task.Status = constants.Failed + default: + task.Status = "undefined" + } + + return &task, nil } func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { @@ -224,6 +283,10 @@ func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option if err != nil { return err } + err = m.generateAlgorithmId(ctx, option) + if err != nil { + return err + } err = m.generateImageId(option) if err != nil { return err @@ -244,10 +307,7 @@ func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option } func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption) error { - _, err := m.QuerySpecs(ctx) - if err != nil { - return err - } + option.ResourceId = "modelarts.kat1.xlarge" return nil } @@ -270,3 +330,42 @@ func (m *ModelArtsLink) generateParams(option *option.AiOption) error { return nil } + +func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error { + req := &modelarts.ListAlgorithmsReq{ + Platform: m.platform, + Offset: m.pageIndex, + Limit: m.pageSize, + } + resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req) + if err != nil { + return err + } + if resp.ErrorMsg != "" { + return errors.New("failed to get algorithmId") + } + + for _, algorithm := range resp.Items { + engVersion := algorithm.JobConfig.Engine.EngineVersion + if strings.Contains(engVersion, option.TaskType) { + ns := strings.Split(algorithm.Metadata.Name, DASH) + if ns[0] != option.TaskType { + continue + } + if ns[1] != option.DatasetsName { + continue + } + if ns[2] != option.AlgorithmName { + continue + } + option.AlgorithmId = algorithm.Metadata.Id + return nil + } + } + + if option.AlgorithmId == "" { + return errors.New("Algorithm does not exist") + } + + return errors.New("failed to get AlgorithmId") +} diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index e3e86a46..24958177 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -99,7 +99,7 @@ func NewStoreLink(octopusRpc octopusclient.Octopus, modelArtsRpc modelartsservic linkStruct := NewOctopusLink(octopusRpc, participant.Name, participant.Id) return &StoreLink{ILinkage: linkStruct} case TYPE_MODELARTS: - linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id) + linkStruct := NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, participant.Name, participant.Id, "") return &StoreLink{ILinkage: linkStruct} case TYPE_SHUGUANGAI: linkStruct := NewShuguangAi(aCRpc, participant.Name, participant.Id)