updated schedule submit func

Former-commit-id: 8bfeec069c549374b47fa80c0dd4b2293937b04a
This commit is contained in:
tzwang 2024-04-29 17:51:25 +08:00
parent 239ca4fc17
commit cee3ae17bd
5 changed files with 96 additions and 18 deletions

View File

@ -26,7 +26,11 @@ func NewScheduleGetAiJobLogLogLogic(ctx context.Context, svcCtx *svc.ServiceCont
func (l *ScheduleGetAiJobLogLogLogic) ScheduleGetAiJobLogLog(req *types.AiJobLogReq) (resp *types.AiJobLogResp, err error) { func (l *ScheduleGetAiJobLogLogLogic) ScheduleGetAiJobLogLog(req *types.AiJobLogReq) (resp *types.AiJobLogResp, err error) {
resp = &types.AiJobLogResp{} resp = &types.AiJobLogResp{}
log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetTrainingTaskLog(l.ctx, req.TaskId, req.InstanceNum) id, err := l.svcCtx.Scheduler.AiStorages.GetAiTaskIdByClusterIdAndTaskId(req.ClusterId, req.TaskId)
if err != nil {
return nil, err
}
log, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][req.ClusterId].GetTrainingTaskLog(l.ctx, id, req.InstanceNum)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -6,6 +6,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
) )
@ -51,6 +52,10 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
switch opt.GetOptionType() { switch opt.GetOptionType() {
case option.AI: case option.AI:
id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName)
if err != nil {
return nil, err
}
rs := (results).([]*schedulers.AiResult) rs := (results).([]*schedulers.AiResult)
for _, r := range rs { for _, r := range rs {
scheResult := &types.ScheduleResult{} scheResult := &types.ScheduleResult{}
@ -59,12 +64,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
scheResult.Strategy = r.Strategy scheResult.Strategy = r.Strategy
scheResult.Replica = r.Replica scheResult.Replica = r.Replica
scheResult.Msg = r.Msg scheResult.Msg = r.Msg
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, r.TaskId, constants.Running, r.Msg)
if err != nil {
return nil, err
}
resp.Results = append(resp.Results, scheResult) resp.Results = append(resp.Results, scheResult)
} }
err = l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName)
if err != nil {
return nil, err
}
} }
return resp, nil return resp, nil

View File

@ -2,10 +2,12 @@ package database
import ( import (
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm" "gorm.io/gorm"
"strconv"
"time" "time"
) )
@ -48,7 +50,17 @@ func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) {
return ids, nil return ids, nil
} }
func (s *AiStorage) SaveTask(name string) error { func (s *AiStorage) GetAiTasks() ([]*types.AiTaskDb, error) {
var resp []*types.AiTaskDb
tx := s.DbEngin.Raw("select * from task_ai").Scan(&resp)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
}
return resp, nil
}
func (s *AiStorage) SaveTask(name string) (int64, error) {
// 构建主任务结构体 // 构建主任务结构体
taskModel := models.Task{ taskModel := models.Task{
Status: constants.Saved, Status: constants.Saved,
@ -58,12 +70,52 @@ func (s *AiStorage) SaveTask(name string) error {
} }
// 保存任务数据到数据库 // 保存任务数据到数据库
tx := s.DbEngin.Create(&taskModel) tx := s.DbEngin.Create(&taskModel)
if tx.Error != nil {
return 0, tx.Error
}
return taskModel.Id, nil
}
func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId string, jobId string, status string, msg string) error {
// 构建主任务结构体
aId, err := strconv.ParseInt(option.AdapterId, 10, 64)
if err != nil {
return err
}
cId, err := strconv.ParseInt(clusterId, 10, 64)
if err != nil {
return err
}
aiTaskModel := models.TaskAi{
TaskId: taskId,
AdapterId: aId,
ClusterId: cId,
Name: option.TaskName,
Replica: option.Replica,
JobId: jobId,
Strategy: option.StrategyName,
Status: status,
Msg: msg,
CommitTime: time.Now(),
}
// 保存任务数据到数据库
tx := s.DbEngin.Create(&aiTaskModel)
if tx.Error != nil { if tx.Error != nil {
return tx.Error return tx.Error
} }
return nil return nil
} }
func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId string) (string, error) {
var aiTask models.TaskAi
tx := s.DbEngin.Raw("select * from task_ai where `cluster_id` = ? and `task_id` = ?", clusterId, taskId).Scan(&aiTask)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return "", tx.Error
}
return aiTask.JobId, nil
}
func (s *AiStorage) UpdateTask() error { func (s *AiStorage) UpdateTask() error {
return nil return nil
} }

View File

@ -26,6 +26,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
@ -168,32 +169,46 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
errs = append(errs, e) errs = append(errs, e)
} }
if len(errs) == len(clusters) { for s := range ch {
return nil, errors.New("submit task failed") results = append(results, s)
} }
if len(errs) != 0 { if len(errs) != 0 {
var msg string taskId, err := as.AiStorages.SaveTask(as.option.TaskName)
if err != nil {
return nil, err
}
var errmsg string
for _, err := range errs { for _, err := range errs {
e := (err).(struct { e := (err).(struct {
err error err error
clusterId string clusterId string
}) })
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error()) msg := fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, "", constants.Failed, msg)
if err != nil {
return nil, err
}
} }
for s := range ch { for s := range ch {
if s.Msg != "" { if s.Msg != "" {
msg += fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg) msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, "", constants.Failed, msg)
if err != nil {
return nil, err
}
} else { } else {
msg += fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId) msg := fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, s.TaskId, constants.Succeeded, msg)
if err != nil {
return nil, err
}
} }
} }
return nil, errors.New(msg) return nil, errors.New(errmsg)
}
for s := range ch {
// TODO: database operation
results = append(results, s)
} }
return results, nil return results, nil

View File

@ -4,6 +4,7 @@ type AiOption struct {
AdapterId string AdapterId string
ClusterIds []string ClusterIds []string
TaskName string TaskName string
Replica int64
ResourceType string // cpu/gpu/compute card ResourceType string // cpu/gpu/compute card
CpuCoreNum int64 CpuCoreNum int64
TaskType string // pytorch/tensorflow/mindspore TaskType string // pytorch/tensorflow/mindspore