From 60359ea095833863ec7b6f0ebb796240804ce7c1 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 7 May 2024 16:55:35 +0800 Subject: [PATCH] updated getCentertaskList function Former-commit-id: 0c2585ad3301d2a1dc973cc5a14b5fee041dbcc6 --- .../logic/ai/getcentertasklistlogic.go | 19 +++++++----- .../logic/schedule/schedulesubmitlogic.go | 2 +- api/internal/scheduler/database/aiStorage.go | 10 ++++-- .../scheduler/schedulers/aiScheduler.go | 8 ++--- .../scheduler/service/collector/collector.go | 8 +++++ api/internal/storeLink/modelarts.go | 4 +++ api/internal/storeLink/octopus.go | 31 +++++++++++++++++++ api/internal/storeLink/shuguangai.go | 10 ++++++ pkg/constants/task.go | 1 + pkg/constants/time.go | 3 ++ 10 files changed, 80 insertions(+), 16 deletions(-) create mode 100644 pkg/constants/time.go diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index 0a800630..6fce581a 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -17,8 +17,6 @@ type GetCenterTaskListLogic struct { svcCtx *svc.ServiceContext } -const layout = "2006-01-02 15:04:05" - func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic { return &GetCenterTaskListLogic{ Logger: logx.WithContext(ctx), @@ -42,12 +40,17 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList } for _, task := range taskList { var elapsed time.Duration - start, _ := time.Parse(layout, task.CommitTime) - if task.Status != constants.Completed { - elapsed = start.Sub(time.Now()) - } else { - end, _ := time.Parse(layout, task.EndTime) - elapsed = start.Sub(end) + switch task.Status { + case constants.Completed: + end, err := time.ParseInLocation(constants.Layout, task.EndTime, time.Local) + if err != nil { + elapsed = time.Duration(0) + } + elapsed = end.Sub(task.CommitTime) + case constants.Running: + elapsed = time.Now().Sub(task.CommitTime) + default: + elapsed = 0 } t := &types.AiTask{ diff --git a/api/internal/logic/schedule/schedulesubmitlogic.go b/api/internal/logic/schedule/schedulesubmitlogic.go index e46ffe7d..d3b1cd99 100644 --- a/api/internal/logic/schedule/schedulesubmitlogic.go +++ b/api/internal/logic/schedule/schedulesubmitlogic.go @@ -66,7 +66,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type scheResult.Strategy = r.Strategy scheResult.Replica = r.Replica scheResult.Msg = r.Msg - err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Running, r.Msg) + err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Saved, r.Msg) if err != nil { return nil, err } diff --git a/api/internal/scheduler/database/aiStorage.go b/api/internal/scheduler/database/aiStorage.go index 018f58b8..e75401f5 100644 --- a/api/internal/scheduler/database/aiStorage.go +++ b/api/internal/scheduler/database/aiStorage.go @@ -61,8 +61,8 @@ func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo, return list, nil } -func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*types.AiTaskDb, error) { - var resp []*types.AiTaskDb +func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) { + var resp []*models.TaskAi tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp) if tx.Error != nil { logx.Errorf(tx.Error.Error()) @@ -197,6 +197,10 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c return nil } -func (s *AiStorage) UpdateTask() error { +func (s *AiStorage) UpdateAiTask(task models.TaskAi) error { + tx := s.DbEngin.Updates(&task) + if tx.Error != nil { + return tx.Error + } return nil } diff --git a/api/internal/scheduler/schedulers/aiScheduler.go b/api/internal/scheduler/schedulers/aiScheduler.go index af50d201..1fa4eb63 100644 --- a/api/internal/scheduler/schedulers/aiScheduler.go +++ b/api/internal/scheduler/schedulers/aiScheduler.go @@ -176,7 +176,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa if len(errs) != 0 { taskId, err := as.AiStorages.SaveTask(as.option.TaskName) if err != nil { - return nil, err + return nil, errors.New("database add failed: " + err.Error()) } var errmsg string for _, err := range errs { @@ -188,7 +188,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa errmsg += msg err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg) if err != nil { - return nil, err + return nil, errors.New("database add failed: " + err.Error()) } } for s := range ch { @@ -197,14 +197,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa errmsg += msg err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg) if err != nil { - return nil, err + return nil, errors.New("database add failed: " + err.Error()) } } else { msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId) errmsg += msg err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg) if err != nil { - return nil, err + return nil, errors.New("database add failed: " + err.Error()) } } } diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index 99d34b51..01406901 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -7,6 +7,7 @@ type AiCollector interface { GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error) GetAlgorithms(ctx context.Context) ([]*Algorithm, error) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) + GetTrainingTask(ctx context.Context, taskId string) (*Task, error) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error } @@ -45,3 +46,10 @@ type Algorithm struct { Platform string TaskType string } + +type Task struct { + Id string + Start string + End string + Status string +} diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index 84bc0dbf..1ae255f2 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -174,6 +174,10 @@ func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, i return "", nil } +func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { + return nil, nil +} + func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { err := m.GenerateSubmitParams(ctx, option) if err != nil { diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 22781e2f..f72f63ab 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -19,12 +19,14 @@ import ( "errors" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "math" "strconv" "strings" + "time" ) type OctopusLink struct { @@ -364,6 +366,35 @@ func (o *OctopusLink) GetTrainingTaskLog(ctx context.Context, taskId string, ins return resp.Content, nil } +func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { + resp, err := o.QueryTask(ctx, taskId) + if err != nil { + return nil, err + } + jobresp := (resp).(*octopus.GetTrainJobResp) + if !jobresp.Success { + return nil, errors.New(jobresp.Error.Message) + } + var task collector.Task + task.Id = jobresp.Payload.TrainJob.Id + task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout) + task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout) + switch jobresp.Payload.TrainJob.Status { + case "succeeded": + task.Status = constants.Completed + case "failed": + task.Status = constants.Failed + case "running": + task.Status = constants.Running + case "stopped": + task.Status = constants.Stopped + default: + task.Status = "undefined" + } + + return &task, nil +} + func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { err := o.GenerateSubmitParams(ctx, option) if err != nil { diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 4a84cea4..7f3ee370 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -17,6 +17,7 @@ package storeLink import ( "context" "errors" + "fmt" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" @@ -473,6 +474,15 @@ func (s *ShuguangAi) GetTrainingTaskLog(ctx context.Context, taskId string, inst return resp.Data.Content, nil } +func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { + task, err := s.QueryTask(ctx, taskId) + if err != nil { + return nil, err + } + fmt.Println(task) + return nil, nil +} + func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { err := s.GenerateSubmitParams(ctx, option) if err != nil { diff --git a/pkg/constants/task.go b/pkg/constants/task.go index daf8879f..14e4b5fe 100644 --- a/pkg/constants/task.go +++ b/pkg/constants/task.go @@ -26,4 +26,5 @@ const ( WaitRestart = "WaitRestart" WaitPause = "WaitPause" WaitStart = "WaitStart" + Stopped = "Stopped" ) diff --git a/pkg/constants/time.go b/pkg/constants/time.go new file mode 100644 index 00000000..deecc715 --- /dev/null +++ b/pkg/constants/time.go @@ -0,0 +1,3 @@ +package constants + +const Layout = "2006-01-02 15:04:05"