Merge pull request 'updated ai functions' (#136) from tzwang/pcm-coordinator:master into master

Former-commit-id: c44592900406c69131cbca341ab68e6c864e2ed9
This commit is contained in:
tzwang 2024-05-07 17:05:13 +08:00
commit 8f381d5030
10 changed files with 108 additions and 17 deletions

View File

@ -61,7 +61,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
} }
} }
resp.CardNum = centerNum resp.CardNum = cardNum
resp.PowerInTops = totalTops resp.PowerInTops = totalTops
return resp, nil return resp, nil

View File

@ -17,8 +17,6 @@ type GetCenterTaskListLogic struct {
svcCtx *svc.ServiceContext svcCtx *svc.ServiceContext
} }
const layout = "2006-01-02 15:04:05"
func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic { func NewGetCenterTaskListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterTaskListLogic {
return &GetCenterTaskListLogic{ return &GetCenterTaskListLogic{
Logger: logx.WithContext(ctx), Logger: logx.WithContext(ctx),
@ -42,12 +40,17 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
} }
for _, task := range taskList { for _, task := range taskList {
var elapsed time.Duration var elapsed time.Duration
start, _ := time.Parse(layout, task.CommitTime) switch task.Status {
if task.Status != constants.Completed { case constants.Completed:
elapsed = start.Sub(time.Now()) end, err := time.ParseInLocation(constants.Layout, task.EndTime, time.Local)
} else { if err != nil {
end, _ := time.Parse(layout, task.EndTime) elapsed = time.Duration(0)
elapsed = start.Sub(end) }
elapsed = end.Sub(task.CommitTime)
case constants.Running:
elapsed = time.Now().Sub(task.CommitTime)
default:
elapsed = 0
} }
t := &types.AiTask{ t := &types.AiTask{

View File

@ -66,7 +66,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
scheResult.Strategy = r.Strategy scheResult.Strategy = r.Strategy
scheResult.Replica = r.Replica scheResult.Replica = r.Replica
scheResult.Msg = r.Msg scheResult.Msg = r.Msg
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Running, r.Msg) err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Saved, r.Msg)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -61,8 +61,8 @@ func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo,
return list, nil return list, nil
} }
func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*types.AiTaskDb, error) { func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) {
var resp []*types.AiTaskDb var resp []*models.TaskAi
tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp) tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp)
if tx.Error != nil { if tx.Error != nil {
logx.Errorf(tx.Error.Error()) logx.Errorf(tx.Error.Error())
@ -169,6 +169,38 @@ func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterR
return &clusterResource, nil return &clusterResource, nil
} }
func (s *AiStorage) UpdateTask() error { func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error {
cId, err := strconv.ParseInt(clusterId, 10, 64)
if err != nil {
return err
}
clusterResource := models.TClusterResource{
ClusterId: cId,
ClusterName: clusterName,
ClusterType: clusterType,
CpuAvail: cpuAvail,
CpuTotal: cpuTotal,
MemAvail: memAvail,
MemTotal: memTotal,
DiskAvail: diskAvail,
DiskTotal: diskTotal,
GpuAvail: gpuAvail,
GpuTotal: gpuTotal,
CardTotal: cardTotal,
CardTopsTotal: topsTotal,
}
tx := s.DbEngin.Create(&clusterResource)
if tx.Error != nil {
return tx.Error
}
return nil
}
func (s *AiStorage) UpdateAiTask(task models.TaskAi) error {
tx := s.DbEngin.Updates(&task)
if tx.Error != nil {
return tx.Error
}
return nil return nil
} }

View File

@ -176,7 +176,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
if len(errs) != 0 { if len(errs) != 0 {
taskId, err := as.AiStorages.SaveTask(as.option.TaskName) taskId, err := as.AiStorages.SaveTask(as.option.TaskName)
if err != nil { if err != nil {
return nil, err return nil, errors.New("database add failed: " + err.Error())
} }
var errmsg string var errmsg string
for _, err := range errs { for _, err := range errs {
@ -188,7 +188,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errmsg += msg errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg) err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg)
if err != nil { if err != nil {
return nil, err return nil, errors.New("database add failed: " + err.Error())
} }
} }
for s := range ch { for s := range ch {
@ -197,14 +197,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errmsg += msg errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg) err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg)
if err != nil { if err != nil {
return nil, err return nil, errors.New("database add failed: " + err.Error())
} }
} else { } else {
msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId) msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
errmsg += msg errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg) err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg)
if err != nil { if err != nil {
return nil, err return nil, errors.New("database add failed: " + err.Error())
} }
} }
} }

View File

@ -7,6 +7,7 @@ type AiCollector interface {
GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error) GetDatasetsSpecs(ctx context.Context) ([]*DatasetsSpecs, error)
GetAlgorithms(ctx context.Context) ([]*Algorithm, error) GetAlgorithms(ctx context.Context) ([]*Algorithm, error)
GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error)
GetTrainingTask(ctx context.Context, taskId string) (*Task, error)
DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error) DownloadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string) (string, error)
UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error UploadAlgorithmCode(ctx context.Context, resourceType string, taskType string, dataset string, algorithm string, code string) error
} }
@ -45,3 +46,10 @@ type Algorithm struct {
Platform string Platform string
TaskType string TaskType string
} }
type Task struct {
Id string
Start string
End string
Status string
}

View File

@ -174,6 +174,10 @@ func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, i
return "", nil return "", nil
} }
func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
return nil, nil
}
func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := m.GenerateSubmitParams(ctx, option) err := m.GenerateSubmitParams(ctx, option)
if err != nil { if err != nil {

View File

@ -19,12 +19,14 @@ import (
"errors" "errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"math" "math"
"strconv" "strconv"
"strings" "strings"
"time"
) )
type OctopusLink struct { type OctopusLink struct {
@ -364,6 +366,35 @@ func (o *OctopusLink) GetTrainingTaskLog(ctx context.Context, taskId string, ins
return resp.Content, nil return resp.Content, nil
} }
func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
resp, err := o.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
jobresp := (resp).(*octopus.GetTrainJobResp)
if !jobresp.Success {
return nil, errors.New(jobresp.Error.Message)
}
var task collector.Task
task.Id = jobresp.Payload.TrainJob.Id
task.Start = time.Unix(jobresp.Payload.TrainJob.StartedAt, 0).Format(constants.Layout)
task.End = time.Unix(jobresp.Payload.TrainJob.CompletedAt, 0).Format(constants.Layout)
switch jobresp.Payload.TrainJob.Status {
case "succeeded":
task.Status = constants.Completed
case "failed":
task.Status = constants.Failed
case "running":
task.Status = constants.Running
case "stopped":
task.Status = constants.Stopped
default:
task.Status = "undefined"
}
return &task, nil
}
func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := o.GenerateSubmitParams(ctx, option) err := o.GenerateSubmitParams(ctx, option)
if err != nil { if err != nil {

View File

@ -17,6 +17,7 @@ package storeLink
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/common"
@ -473,6 +474,15 @@ func (s *ShuguangAi) GetTrainingTaskLog(ctx context.Context, taskId string, inst
return resp.Data.Content, nil return resp.Data.Content, nil
} }
func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
task, err := s.QueryTask(ctx, taskId)
if err != nil {
return nil, err
}
fmt.Println(task)
return nil, nil
}
func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
err := s.GenerateSubmitParams(ctx, option) err := s.GenerateSubmitParams(ctx, option)
if err != nil { if err != nil {

3
pkg/constants/time.go Normal file
View File

@ -0,0 +1,3 @@
package constants
const Layout = "2006-01-02 15:04:05"