diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index 2fd6e4f8..ac860431 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -19,7 +19,9 @@ type ( ScheduleResult { ClusterId string `json:"clusterId"` TaskId string `json:"taskId"` + Card string `json:"card"` Strategy string `json:"strategy"` + JobId string `json:"jobId"` Replica int32 `json:"replica"` Msg string `json:"msg"` } @@ -32,6 +34,7 @@ type ( AdapterId string `json:"adapterId"` AiClusterIds []string `json:"aiClusterIds"` ResourceType string `json:"resourceType"` + ComputeCard string `json:"card"` Tops float64 `json:"Tops,optional"` TaskType string `json:"taskType"` Datasets string `json:"datasets"` diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index 26a05d33..d56a05a5 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -2,6 +2,8 @@ package ai import ( "context" + "errors" + "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "strconv" "sync" @@ -46,6 +48,9 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList if err != nil { continue } + if len(taskList) == 0 { + continue + } for _, task := range taskList { var elapsed time.Duration switch task.Status { @@ -82,7 +87,6 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList case <-time.After(2 * time.Second): return resp, nil } - } func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) { @@ -92,15 +96,20 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- if err != nil { continue } + if len(taskList) == 0 { + continue + } for _, task := range taskList { t := task - if t.Status == constants.Completed || t.JobId == "" { + if t.Status == constants.Completed { continue } wg.Add(1) go func() { trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId) if err != nil { + msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) wg.Done() return } diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index a1f20b43..c0be0b29 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -2,12 +2,16 @@ package core import ( "context" + "errors" + "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" + "strconv" + "sync" "time" "github.com/zeromicro/go-zero/core/logx" @@ -53,8 +57,9 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa } // 更新智算任务状态 - var ch = make(chan struct{}) - go l.updateAitaskStatus(list, ch) + chs := [2]chan struct{}{make(chan struct{}), make(chan struct{})} + go l.updateTaskStatus(list, chs[0]) + go l.updateAiTaskStatus(list, chs[1]) for _, model := range list { if model.StartTime != "" && model.EndTime == "" { @@ -72,15 +77,18 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa resp.PageNum = req.PageNum resp.Total = total - select { - case _ = <-ch: - return resp, nil - case <-time.After(1 * time.Second): - return resp, nil + for _, ch := range chs { + select { + case <-ch: + return + case <-time.After(1 * time.Second): + return + } } + return } -func (l *PageListTaskLogic) updateAitaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { +func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { for _, task := range tasks { if task.AdapterTypeDict != 1 { continue @@ -150,8 +158,62 @@ func (l *PageListTaskLogic) updateAitaskStatus(tasks []*types.TaskModel, ch chan tx = l.svcCtx.DbEngin.Table("task").Updates(task) if tx.Error != nil { + logx.Errorf(tx.Error.Error()) return } } ch <- struct{}{} } + +func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { + var wg sync.WaitGroup + for _, task := range tasks { + if task.AdapterTypeDict != 1 { + continue + } + if task.Status == constants.Succeeded { + continue + } + + var aiTaskList []*models.TaskAi + tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + + if len(aiTaskList) == 0 { + continue + } + + for _, aitask := range aiTaskList { + t := aitask + if t.Status == constants.Completed { + continue + } + wg.Add(1) + go func() { + trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId) + if err != nil { + msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + t.Status = trainingTask.Status + t.StartTime = trainingTask.Start + t.EndTime = trainingTask.End + err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t) + if err != nil { + msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + wg.Done() + }() + } + } + wg.Wait() + ch <- struct{}{} +} diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index 5caf9d91..545abdd2 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -7,6 +7,8 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "strconv" + "strings" "github.com/zeromicro/go-zero/core/logx" ) @@ -31,6 +33,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type AdapterId: req.AiOption.AdapterId, TaskName: req.AiOption.TaskName, ResourceType: req.AiOption.ResourceType, + ComputeCard: req.AiOption.ComputeCard, Replica: 1, Tops: req.AiOption.Tops, TaskType: req.AiOption.TaskType, @@ -69,19 +72,22 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type for _, r := range rs { scheResult := &types.ScheduleResult{} scheResult.ClusterId = r.ClusterId - scheResult.TaskId = r.TaskId + scheResult.TaskId = strconv.FormatInt(id, 10) + scheResult.JobId = r.JobId scheResult.Strategy = r.Strategy + scheResult.Card = strings.ToUpper(r.Card) scheResult.Replica = r.Replica scheResult.Msg = r.Msg - opt.ComputeCard = r.Card + opt.ComputeCard = strings.ToUpper(r.Card) clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(r.ClusterId) - err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, clusterName, r.TaskId, constants.Saved, r.Msg) + err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg) if err != nil { return nil, err } + resp.Results = append(resp.Results, scheResult) } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index ede21f28..4de5f6bb 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -73,10 +73,11 @@ func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo, func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) { var resp []*models.TaskAi - tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return nil, tx.Error + db := s.DbEngin.Model(&models.TaskAi{}).Table("task_ai") + db = db.Where("adapter_id = ?", adapterId) + err := db.Order("commit_time desc").Find(&resp).Error + if err != nil { + return nil, err } return resp, nil } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index d401f408..51cdd669 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" @@ -42,7 +43,7 @@ type AiScheduler struct { } type AiResult struct { - TaskId string + JobId string ClusterId string Strategy string Replica int32 @@ -214,14 +215,15 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa return nil, errors.New("database add failed: " + err.Error()) } } else { - msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId) + msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId) errmsg += msg - err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.TaskId, constants.Saved, msg) + err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.JobId, constants.Saved, msg) if err != nil { return nil, errors.New("database add failed: " + err.Error()) } } } + logx.Errorf(errors.New(errmsg).Error()) return nil, errors.New(errmsg) } @@ -296,7 +298,7 @@ func convertType(in interface{}) (*AiResult, error) { case *hpcAC.SubmitTaskAiResp: resp := (in).(*hpcAC.SubmitTaskAiResp) if resp.Code == "0" { - result.TaskId = resp.Data + result.JobId = resp.Data } else { result.Msg = resp.Msg } @@ -305,7 +307,7 @@ func convertType(in interface{}) (*AiResult, error) { resp := (in).(*octopus.CreateTrainJobResp) if resp.Success { - result.TaskId = resp.Payload.JobId + result.JobId = resp.Payload.JobId } else { result.Msg = resp.Error.Message } diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 5993243f..ff4c9f32 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -493,7 +493,11 @@ func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*coll } jobresp, ok := (resp).(*octopus.GetTrainJobResp) if !jobresp.Success || !ok { - return nil, errors.New("get training task failed") + if jobresp.Error != nil { + return nil, errors.New(jobresp.Error.Message) + } else { + return nil, errors.New("get training task failed, empty error returned") + } } var task collector.Task task.Id = jobresp.Payload.TrainJob.Id @@ -587,7 +591,7 @@ func (o *OctopusLink) generateResourceId(ctx context.Context, option *option.AiO } if option.ResourceType == CARD { - err = setResourceIdByCard(option, specResp, option.ComputeCard) + err = setResourceIdByCard(option, specResp, GCU) if err != nil { return err } diff --git a/api/internal/storeLink/storeLink.go b/api/internal/storeLink/storeLink.go index b38cd561..2fbf2c12 100644 --- a/api/internal/storeLink/storeLink.go +++ b/api/internal/storeLink/storeLink.go @@ -82,7 +82,7 @@ var ( "3": SHUGUANGAI, "4": SHUGUANGHPC, } - resourceTypes = []string{CPU, CARD} + resourceTypes = []string{CARD} taskTypes = []string{PYTORCH_TASK} ERROR_RESP_EMPTY = errors.New("resp empty error") diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 3af738ab..a834baf1 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -1164,7 +1164,7 @@ type CommitHpcTaskReq struct { Description string `json:"description,optional"` TenantId int64 `json:"tenantId,optional"` TaskId int64 `json:"taskId,optional"` - AdapterIds []string `json:"adapterId"` + AdapterIds []string `json:"adapterIds"` MatchLabels map[string]string `json:"matchLabels,optional"` CardCount int64 `json:"cardCount,optional"` WorkDir string `json:"workDir,optional"` //paratera:workingDir @@ -5622,7 +5622,9 @@ type ScheduleResp struct { type ScheduleResult struct { ClusterId string `json:"clusterId"` TaskId string `json:"taskId"` + Card string `json:"card"` Strategy string `json:"strategy"` + JobId string `json:"jobId"` Replica int32 `json:"replica"` Msg string `json:"msg"` } @@ -5635,6 +5637,7 @@ type AiOption struct { AdapterId string `json:"adapterId"` AiClusterIds []string `json:"aiClusterIds"` ResourceType string `json:"resourceType"` + ComputeCard string `json:"card"` Tops float64 `json:"Tops,optional"` TaskType string `json:"taskType"` Datasets string `json:"datasets"`