updated getCentertaskList function

Former-commit-id: 0c2585ad3301d2a1dc973cc5a14b5fee041dbcc6
This commit is contained in:
tzwang 2024-05-07 16:55:35 +08:00
parent 95c43cf271
commit 60359ea095
10 changed files with 80 additions and 16 deletions

View File

@ -17,8 +17,6 @@ type GetCenterTaskListLogic struct {
svcCtx *svc.ServiceContext
}
const layout = "2006-01-02 15:04:05"
func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic {
return &GetCenterTaskListLogic{
Logger: logx.WithContext(ctx),
@ -42,12 +40,17 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
}
for _, task := range taskList {
var elapsed time.Duration
start, _ := time.Parse(layout, task.CommitTime)
if task.Status != constants.Completed {
elapsed = start.Sub(time.Now())
} else {
end, _ := time.Parse(layout, task.EndTime)
elapsed = start.Sub(end)
switch task.Status {
case constants.Completed:
end, err := time.ParseInLocation(constants.Layout, task.EndTime, time.Local)
if err != nil {
elapsed = time.Duration(0)
}
elapsed = end.Sub(task.CommitTime)
case constants.Running:
elapsed = time.Now().Sub(task.CommitTime)
default:
elapsed = 0
}
t := &types.AiTask{

View File

@ -66,7 +66,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
scheResult.Strategy = r.Strategy
scheResult.Replica = r.Replica
scheResult.Msg = r.Msg
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Running, r.Msg)
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Saved, r.Msg)
if err != nil {
return nil, err
}

View File

@ -61,8 +61,8 @@ func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo,
return list, nil
}
func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*types.AiTaskDb, error) {
var resp []*types.AiTaskDb
func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) {
var resp []*models.TaskAi
tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
@ -197,6 +197,10 @@ func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, c
return nil
}
func (s *AiStorage) UpdateTask() error {
func (s *AiStorage) UpdateAiTask(task models.TaskAi) error {
tx := s.DbEngin.Updates(&task)
if tx.Error != nil {
return tx.Error
}
return nil
}

View File

@ -176,7 +176,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
if len(errs) != 0 {
taskId, err := as.AiStorages.SaveTask(as.option.TaskName)
if err != nil {
return nil, err
return nil, errors.New("database add failed: " + err.Error())
}
var errmsg string
for _, err := range errs {
@ -188,7 +188,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg)
if err != nil {
return nil, err
return nil, errors.New("database add failed: " + err.Error())
}
}
for s := range ch {
@ -197,14 +197,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg)
if err != nil {
return nil, err
return nil, errors.New("database add failed: " + err.Error())
}
} else {
msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg)
if err != nil {
return nil, err
return nil, errors.New("database add failed: " + err.Error())
}
}
}

View File

@ -7,6 +7,7 @@ type AiCollector interface {
GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error)
GetAlgorithms(ctx context.Context) ([]*Algorithm, error)
GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error)
GetTrainingTask(ctx context.Context, taskId string) (*Task, error)
DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error)
UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error
}
@ -45,3 +46,10 @@ type Algorithm struct {
Platform string
TaskType string
}
type Task struct {
Id string
Start string
End string
Status string
}

View File

@ -174,6 +174,10 @@ func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, i
return "", nil
}
func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
return nil, nil
}
func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := m.GenerateSubmitParams(ctx, option)
if err != nil {

View File

@ -19,12 +19,14 @@ import (
"errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"math"
"strconv"
"strings"
"time"
)
type OctopusLink struct {
@ -364,6 +366,35 @@ func (o *OctopusLink) GetTrainingTaskLog(ctx context.Context, taskId string, ins
return resp.Content, nil
}
func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
resp, err := o.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
jobresp := (resp).(*octopus.GetTrainJobResp)
if !jobresp.Success {
return nil, errors.New(jobresp.Error.Message)
}
var task collector.Task
task.Id = jobresp.Payload.TrainJob.Id
task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout)
task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout)
switch jobresp.Payload.TrainJob.Status {
case "succeeded":
task.Status = constants.Completed
case "failed":
task.Status = constants.Failed
case "running":
task.Status = constants.Running
case "stopped":
task.Status = constants.Stopped
default:
task.Status = "undefined"
}
return &task, nil
}
func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := o.GenerateSubmitParams(ctx, option)
if err != nil {

View File

@ -17,6 +17,7 @@ package storeLink
import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
@ -473,6 +474,15 @@ func (s *ShuguangAi) GetTrainingTaskLog(ctx context.Context, taskId string, inst
return resp.Data.Content, nil
}
func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
task, err := s.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
fmt.Println(task)
return nil, nil
}
func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := s.GenerateSubmitParams(ctx, option)
if err != nil {

View File

@ -26,4 +26,5 @@ const (
WaitRestart = "WaitRestart"
WaitPause = "WaitPause"
WaitStart = "WaitStart"
Stopped = "Stopped"
)

3
pkg/constants/time.go Normal file
View File

@ -0,0 +1,3 @@
package constants
const Layout = "2006-01-02 15:04:05"