submit hpc task

Former-commit-id: dbe3f6554b47591a595279e4a60b8742a13da1d1
This commit is contained in:
zhouqunjie 2024-03-25 15:11:40 +08:00
parent 6641c7452f
commit 45b483566b
10 changed files with 238 additions and 89 deletions

View File

@ -11,31 +11,3 @@ type Task interface {
PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error)
PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error
}
type PullTaskInfoReq struct {
AdapterId int64 `json:"adapterId"`
}
type PullTaskInfoResp struct {
HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"`
CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"`
AiInfoList []*AiInfo `json:"AiInfoList,omitempty"`
VmInfoList []*VmInfo `json:"VmInfoList,omitempty"`
}
type PushTaskInfoReq struct {
AdapterId int64 `json:"adapterId"`
HpcInfoList []*HpcInfo
CloudInfoList []*CloudInfo
AiInfoList []*AiInfo
VmInfoList []*VmInfo
}
type PushTaskInfoResp struct {
Code int64
Msg string
}
type PushResourceInfoReq struct {
AdapterId int64 `json:"adapterId"`
}

View File

@ -5,6 +5,34 @@ import (
"time"
)
type PullTaskInfoReq struct {
AdapterId int64 `form:"adapterId"`
}
type PullTaskInfoResp struct {
HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"`
CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"`
AiInfoList []*AiInfo `json:"AiInfoList,omitempty"`
VmInfoList []*VmInfo `json:"VmInfoList,omitempty"`
}
type PushTaskInfoReq struct {
AdapterId int64 `json:"adapterId"`
HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty,optional"`
CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty,optional"`
AiInfoList []*AiInfo `json:"AiInfoList,omitempty,optional"`
VmInfoList []*VmInfo `json:"VmInfoList,omitempty,optional"`
}
type PushTaskInfoResp struct {
Code int64
Msg string
}
type PushResourceInfoReq struct {
AdapterId int64 `json:"adapterId"`
}
type HpcInfo struct {
Id int64 `json:"id"` // id
TaskId int64 `json:"task_id"` // 任务id

View File

@ -23,21 +23,21 @@ type (
Description string `json:"description,optional"`
tenantId int64 `json:"tenantId,optional"`
TaskId int64 `json:"taskId,optional"`
participantId int64 `json:"participantId,optional"`
matchLabels map[string]string `json:"matchLabels,optional"`
cardCount int64 `json:"cardCount,optional"`
workDir string `json:"workDir,optional"` //paratera:workingDir
wallTime string `json:"wallTime,optional"`
cmdScript string `json:"cmdScript,optional"` // paratera:bootScript
appType string `json:"appType,optional"`
appName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
queue string `json:"queue,optional"`
nNode string `json:"nNode,optional"`
submitType string `json:"submitType,optional"`
stdOutFile string `json:"stdOutFile,optional"`
stdErrFile string `json:"stdErrFile,optional"`
stdInput string `json:"stdInput,optional"`
environment map[string]string `json:"environment,optional"`
AdapterId int64 `json:"adapterId,optional"`
MatchLabels map[string]string `json:"matchLabels,optional"`
CardCount int64 `json:"cardCount,optional"`
WorkDir string `json:"workDir,optional"` //paratera:workingDir
WallTime string `json:"wallTime,optional"`
CmdScript string `json:"cmdScript,optional"` // paratera:bootScript
AppType string `json:"appType,optional"`
AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
Queue string `json:"queue,optional"`
NNode string `json:"nNode,optional"`
SubmitType string `json:"submitType,optional"`
StdOutFile string `json:"stdOutFile,optional"`
StdErrFile string `json:"stdErrFile,optional"`
StdInput string `json:"stdInput,optional"`
Environment map[string]string `json:"environment,optional"`
}
)

View File

@ -0,0 +1,125 @@
syntax = "v1"
info(
title: "type title here"
desc: "type desc here"
author: "type author here"
email: "type email here"
version: "type version here"
)
type PullTaskInfoReq {
AdapterId int64 `form:"adapterId"`
}
type PullTaskInfoResp struct {
HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"`
CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"`
AiInfoList []*AiInfo `json:"AiInfoList,omitempty"`
VmInfoList []*VmInfo `json:"VmInfoList,omitempty"`
}
type HpcInfo struct {
Id int64 `json:"id"` // id
TaskId int64 `json:"task_id"` // 任务id
JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id)
ClusterId int64 `json:"cluster_id"` // 执行任务的集群id
ClusterType string `json:"cluster_type"` // 执行任务的集群类型
Name string `json:"name"` // 名称
Status string `json:"status"` // 状态
CmdScript string `json:"cmd_script"`
StartTime string `json:"start_time"` // 开始时间
RunningTime int64 `json:"running_time"` // 运行时间
DerivedEs string `json:"derived_es"`
Cluster string `json:"cluster"`
BlockId int64 `json:"block_id"`
AllocNodes int64 `json:"alloc_nodes"`
AllocCpu int64 `json:"alloc_cpu"`
CardCount int64 `json:"card_count"` // 卡数
Version string `json:"version"`
Account string `json:"account"`
WorkDir string `json:"work_dir"` // 工作路径
AssocId int64 `json:"assoc_id"`
ExitCode int64 `json:"exit_code"`
WallTime string `json:"wall_time"` // 最大运行时间
Result string `json:"result"` // 运行结果
DeletedAt string `json:"deleted_at"` // 删除时间
YamlString string `json:"yaml_string"`
AppType string `json:"app_type"` // 应用类型
AppName string `json:"app_name"` // 应用名称
Queue string `json:"queue"` // 队列名称
SubmitType string `json:"submit_type"` // cmd命令行模式
NNode string `json:"n_node"` // 节点个数当指定该参数时GAP_NODE_STRING必须为""
StdOutFile string `json:"std_out_file"` // 工作路径/std.err.%j
StdErrFile string `json:"std_err_file"` // 工作路径/std.err.%j
StdInput string `json:"std_input"`
Environment string `json:"environment"`
DeletedFlag int64 `json:"deleted_flag"` // 是否删除0-否1-是)
CreatedBy int64 `json:"created_by"` // 创建人
CreatedTime string `json:"created_time"` // 创建时间
UpdatedBy int64 `json:"updated_by"` // 更新人
UpdatedTime string `json:"updated_time"` // 更新时间
}
type CloudInfo struct {
Participant int64 `json:"participant,omitempty"`
Id int64 `json:"id,omitempty"`
TaskId int64 `json:"taskId,omitempty"`
ApiVersion string `json:"apiVersion,omitempty"`
Kind string `json:"kind,omitempty"`
Namespace string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
Status string `json:"status,omitempty"`
StartTime string `json:"startTime,omitempty"`
RunningTime int64 `json:"runningTime,omitempty"`
Result string `json:"result,omitempty"`
YamlString string `json:"yamlString,omitempty"`
}
type AiInfo struct {
ParticipantId int64 `json:"participantId,omitempty"`
TaskId int64 `json:"taskId,omitempty"`
ProjectId string `json:"project_id,omitempty"`
Name string `json:"name,omitempty"`
Status string `json:"status,omitempty"`
StartTime string `json:"startTime,omitempty"`
RunningTime int64 `json:"runningTime,omitempty"`
Result string `json:"result,omitempty"`
JobId string `json:"jobId,omitempty"`
CreateTime string `json:"createTime,omitempty"`
ImageUrl string `json:"imageUrl,omitempty"`
Command string `json:"command,omitempty"`
FlavorId string `json:"flavorId,omitempty"`
SubscriptionId string `json:"subscriptionId,omitempty"`
ItemVersionId string `json:"itemVersionId,omitempty"`
}
type VmInfo struct {
ParticipantId int64 `json:"participantId,omitempty"`
TaskId int64 `json:"taskId,omitempty"`
Name string `json:"name,omitempty"`
FlavorRef string `json:"flavor_ref,omitempty"`
ImageRef string `json:"image_ref,omitempty"`
NetworkUuid string `json:"network_uuid,omitempty"`
BlockUuid string `json:"block_uuid,omitempty"`
SourceType string `json:"source_type,omitempty"`
DeleteOnTermination bool `json:"delete_on_termination,omitempty"`
State string `json:"state,omitempty"`
}
type PushTaskInfoReq struct {
AdapterId int64 `json:"adapterId"`
HpcInfoList []*HpcInfo
CloudInfoList []*CloudInfo
AiInfoList []*AiInfo
VmInfoList []*VmInfo
}
type PushTaskInfoResp struct {
Code int64
Msg string
}
type PushResourceInfoReq struct {
AdapterId int64 `json:"adapterId"`
}

View File

@ -9,6 +9,7 @@ import (
"cloud/pcm-cloud.api"
"storelink/pcm-storelink.api"
"schedule/pcm-schedule.api"
"participant/pcm-participant.api"
)
info(
@ -112,6 +113,14 @@ service pcm {
@doc "metrics"
@handler metricsHandler
get /core/metrics
@doc "provided to participant to pull task info from core"
@handler pullTaskInfoHandler
get /core/pullTaskInfo (PullTaskInfoReq) returns (PullTaskInfoResp)
@doc "provided to participant to push task info to core"
@handler pushTaskInfoHandler
post /core/pushTaskInfo (PushTaskInfoReq) returns (PushTaskInfoResp)
}
//hpc二级接口
@ -292,9 +301,9 @@ service pcm {
@doc "挂载notebook存储"
@handler mountNotebookStorageHandler
post /ai/mountNotebookStorage (MountNotebookStorageReq) returns (MountNotebookStorageResp)
/******************Notebook Method end*************************/
/******************Notebook Method end*************************/
/******************Visualization Job Method start*************************/
/******************Visualization Job Method start*************************/
@doc "获取虚拟化任务"
@handler getVisualizationJobHandler
get /ai/getVisualizationJob (GetVisualizationJobReq) returns (GetVisualizationJobResp)

View File

@ -134,6 +134,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/core/metrics",
Handler: core.MetricsHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/core/pullTaskInfo",
Handler: core.PullTaskInfoHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/core/pushTaskInfo",
Handler: core.PushTaskInfoHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)

View File

@ -27,26 +27,12 @@ func NewPullTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Pull
}
func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clientCore.PullTaskInfoResp, error) {
//opt := clientPCM.Options{
// Url: "http://localhost:8999",
// DataSource: "root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local",
//}
//coreCli, _ := clientPCM.NewClient(opt)
//taskOpt := clientPCM.TaskOptions{}
//coreTask, _ := coreCli.Task(taskOpt)
//adapterId := 1706858330967773111
//// 查询core端分发下来的任务列表
//pullReq := types.PullTaskInfoReq{
// AdapterId: int64(adapterId),
//}
//hpcList, _ := coreTask.PullTaskInfo(pullReq)
//println(hpcList)
// 查询p端类型
resp := clientCore.PullTaskInfoResp{}
// check the kind of adapter
var kind int32
l.svcCtx.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", req.AdapterId).Scan(&kind)
// 查询云智超中的数据列表
// pull task list from database
switch kind {
case 2:
var hpcModelList []models.TaskHpc
@ -85,8 +71,9 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie
return &resp, nil
}
func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error {
tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data)
func findModelList(adapterId int64, dbEngin *gorm.DB, data interface{}) error {
tx := dbEngin.Where("cluster_id in (select id from t_cluster where adapter_id = ?) AND status not in "+
"('Deleted', 'Succeeded', 'COMPLETED', 'Completed', 'Failed','FAIL','statC','statE')", adapterId).Find(data)
if tx.Error != nil {
return tx.Error
}

View File

@ -40,7 +40,7 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie
case 2:
for _, hpcInfo := range req.HpcInfoList {
l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?",
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, req.AdapterId, hpcInfo.TaskId, hpcInfo.Name)
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.JobId, hpcInfo.ClusterId, hpcInfo.TaskId, hpcInfo.Name)
syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId)
}
case 1:

View File

@ -5,6 +5,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"k8s.io/apimachinery/pkg/util/json"
"math/rand"
"time"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
@ -28,6 +29,7 @@ func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Com
}
func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
// 构建主任务结构体
taskModel := models.Task{
Status: constants.Saved,
@ -41,10 +43,18 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
return nil, tx.Error
}
var clusterIds []int64
l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? and label = ?", req.AdapterId, req.ClusterType).Scan(&clusterIds)
env, _ := json.Marshal(req.Environment)
if len(clusterIds) == 0 || clusterIds == nil {
return nil, nil
}
hpcInfo := models.TaskHpc{
TaskId: taskModel.Id,
ClusterId: 1706858330967773111,
ClusterId: clusterIds[rand.Intn(len(clusterIds))],
Name: taskModel.Name,
Status: "Saved",
CmdScript: req.CmdScript,
@ -61,9 +71,9 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
StdErrFile: req.StdErrFile,
StdInput: req.StdInput,
DeletedFlag: 0,
CreatedBy: req.ParticipantId,
CreatedBy: 0,
CreatedTime: time.Now(),
UpdatedBy: req.ParticipantId,
UpdatedBy: 0,
UpdatedTime: time.Now(),
Environment: string(env),
}
@ -72,7 +82,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
if tx.Error != nil {
return nil, tx.Error
}
// 将任务数据转换成消息体
// todo mq task manage
//reqMessage, err := json.Marshal(mqInfo)
//if err != nil {
// logx.Error(err)
@ -82,5 +92,10 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
//if publish.Err() != nil {
// return nil, publish.Err()
//}
return
resp = &types.CommitHpcTaskResp{
Code: 200,
Msg: "success",
TaskId: taskModel.Id,
}
return resp, nil
}

View File

@ -849,29 +849,32 @@ type Job struct {
}
type CommitHpcTaskReq struct {
Name string `json:"name"` // paratera:jobName
Description string `json:"description,optional"`
TenantId int64 `json:"tenantId,optional"`
TaskId int64 `json:"taskId,optional"`
ParticipantId int64 `json:"participantId,optional"`
MatchLabels map[string]string `json:"matchLabels,optional"`
CardCount int64 `json:"cardCount,optional"`
WorkDir string `json:"workDir,optional"` //paratera:workingDir
WallTime string `json:"wallTime,optional"`
CmdScript string `json:"cmdScript,optional"` // paratera:bootScript
AppType string `json:"appType,optional"`
AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
Queue string `json:"queue,optional"`
NNode string `json:"nNode,optional"`
SubmitType string `json:"submitType,optional"`
StdOutFile string `json:"stdOutFile,optional"`
StdErrFile string `json:"stdErrFile,optional"`
StdInput string `json:"stdInput,optional"`
Environment map[string]string `json:"environment,optional"`
Name string `json:"name"` // paratera:jobName
Description string `json:"description,optional"`
TenantId int64 `json:"tenantId,optional"`
TaskId int64 `json:"taskId,optional"`
AdapterId int64 `json:"adapterId,optional"`
ClusterType string `json:"clusterType,optional"`
MatchLabels map[string]string `json:"matchLabels,optional"`
CardCount int64 `json:"cardCount,optional"`
WorkDir string `json:"workDir,optional"` //paratera:workingDir
WallTime string `json:"wallTime,optional"`
CmdScript string `json:"cmdScript,optional"` // paratera:bootScript
AppType string `json:"appType,optional"`
AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
Queue string `json:"queue,optional"`
NNode string `json:"nNode,optional"`
SubmitType string `json:"submitType,optional"`
StdOutFile string `json:"stdOutFile,optional"`
StdErrFile string `json:"stdErrFile,optional"`
StdInput string `json:"stdInput,optional"`
Environment map[string]string `json:"environment,optional"`
}
type CommitHpcTaskResp struct {
TaskId int64 `json:"taskId"`
Code int32 `json:"code"`
Msg string `json:"msg"`
TaskId int64 `json:"taskId"`
}
type ListJobReq struct {