From cda53ae9160a5eda2e13f8a9112b092ce7ca6c9e Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 15 May 2024 16:13:00 +0800 Subject: [PATCH] updated scheduleResult Former-commit-id: 38db46a22a099b0e54dd59168860503774f144a3 --- api/desc/schedule/pcm-schedule.api | 1 + .../logic/ai/getcentertasklistlogic.go | 13 +++- api/internal/logic/core/pagelisttasklogic.go | 78 +++++++++++++++++-- api/internal/scheduler/database/aiStorage.go | 2 +- .../scheduler/schedulers/aiScheduler.go | 2 + api/internal/storeLink/octopus.go | 6 +- 6 files changed, 90 insertions(+), 12 deletions(-) diff --git a/api/desc/schedule/pcm-schedule.api b/api/desc/schedule/pcm-schedule.api index 2fd6e4f8..81d8d841 100644 --- a/api/desc/schedule/pcm-schedule.api +++ b/api/desc/schedule/pcm-schedule.api @@ -19,6 +19,7 @@ type ( ScheduleResult { ClusterId string `json:"clusterId"` TaskId string `json:"taskId"` + Card string `json:"card"` Strategy string `json:"strategy"` Replica int32 `json:"replica"` Msg string `json:"msg"` diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index 26a05d33..d56a05a5 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -2,6 +2,8 @@ package ai import ( "context" + "errors" + "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "strconv" "sync" @@ -46,6 +48,9 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList if err != nil { continue } + if len(taskList) == 0 { + continue + } for _, task := range taskList { var elapsed time.Duration switch task.Status { @@ -82,7 +87,6 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList case <-time.After(2 * time.Second): return resp, nil } - } func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) { @@ -92,15 +96,20 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- if err != nil { continue } + if len(taskList) == 0 { + continue + } for _, task := range taskList { t := task - if t.Status == constants.Completed || t.JobId == "" { + if t.Status == constants.Completed { continue } wg.Add(1) go func() { trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId) if err != nil { + msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) wg.Done() return } diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index a1f20b43..c0be0b29 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -2,12 +2,16 @@ package core import ( "context" + "errors" + "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" + "strconv" + "sync" "time" "github.com/zeromicro/go-zero/core/logx" @@ -53,8 +57,9 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa } // 更新智算任务状态 - var ch = make(chan struct{}) - go l.updateAitaskStatus(list, ch) + chs := [2]chan struct{}{make(chan struct{}), make(chan struct{})} + go l.updateTaskStatus(list, chs[0]) + go l.updateAiTaskStatus(list, chs[1]) for _, model := range list { if model.StartTime != "" && model.EndTime == "" { @@ -72,15 +77,18 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa resp.PageNum = req.PageNum resp.Total = total - select { - case _ = <-ch: - return resp, nil - case <-time.After(1 * time.Second): - return resp, nil + for _, ch := range chs { + select { + case <-ch: + return + case <-time.After(1 * time.Second): + return + } } + return } -func (l *PageListTaskLogic) updateAitaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { +func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { for _, task := range tasks { if task.AdapterTypeDict != 1 { continue @@ -150,8 +158,62 @@ func (l *PageListTaskLogic) updateAitaskStatus(tasks []*types.TaskModel, ch chan tx = l.svcCtx.DbEngin.Table("task").Updates(task) if tx.Error != nil { + logx.Errorf(tx.Error.Error()) return } } ch <- struct{}{} } + +func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { + var wg sync.WaitGroup + for _, task := range tasks { + if task.AdapterTypeDict != 1 { + continue + } + if task.Status == constants.Succeeded { + continue + } + + var aiTaskList []*models.TaskAi + tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + + if len(aiTaskList) == 0 { + continue + } + + for _, aitask := range aiTaskList { + t := aitask + if t.Status == constants.Completed { + continue + } + wg.Add(1) + go func() { + trainingTask, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(l.ctx, t.JobId) + if err != nil { + msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + t.Status = trainingTask.Status + t.StartTime = trainingTask.Start + t.EndTime = trainingTask.End + err = l.svcCtx.Scheduler.AiStorages.UpdateAiTask(t) + if err != nil { + msg := fmt.Sprintf("AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) + logx.Errorf(errors.New(msg).Error()) + wg.Done() + return + } + wg.Done() + }() + } + } + wg.Wait() + ch <- struct{}{} +} diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index ede21f28..aa8b683a 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -73,7 +73,7 @@ func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo, func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) { var resp []*models.TaskAi - tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp) + tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Order("commit_time desc").Scan(&resp) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return nil, tx.Error diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index d401f408..e95cdee5 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" @@ -222,6 +223,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa } } } + logx.Errorf(errors.New(errmsg).Error()) return nil, errors.New(errmsg) } diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 5993243f..e82812e3 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -493,7 +493,11 @@ func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*coll } jobresp, ok := (resp).(*octopus.GetTrainJobResp) if !jobresp.Success || !ok { - return nil, errors.New("get training task failed") + if jobresp.Error != nil { + return nil, errors.New(jobresp.Error.Message) + } else { + return nil, errors.New("get training task failed, empty error returned") + } } var task collector.Task task.Id = jobresp.Payload.TrainJob.Id