From 45b483566bd30a4c368bd9b9a43b7f59f8400d59 Mon Sep 17 00:00:00 2001 From: zhouqunjie Date: Mon, 25 Mar 2024 15:11:40 +0800 Subject: [PATCH] submit hpc task Former-commit-id: dbe3f6554b47591a595279e4a60b8742a13da1d1 --- api/client/task.go | 28 ----- api/client/types.go | 28 +++++ api/desc/hpc/pcm-hpc.api | 30 ++--- api/desc/participant/pcm-participant.api | 125 +++++++++++++++++++ api/desc/pcm.api | 13 +- api/internal/handler/routes.go | 10 ++ api/internal/logic/core/pulltaskinfologic.go | 23 +--- api/internal/logic/core/pushtaskinfologic.go | 2 +- api/internal/logic/hpc/commithpctasklogic.go | 25 +++- api/internal/types/types.go | 43 ++++--- 10 files changed, 238 insertions(+), 89 deletions(-) create mode 100644 api/desc/participant/pcm-participant.api diff --git a/api/client/task.go b/api/client/task.go index 086249f7..3367ab95 100644 --- a/api/client/task.go +++ b/api/client/task.go @@ -11,31 +11,3 @@ type Task interface { PushTaskInfo(pushTaskInfoReq PushTaskInfoReq) (*PushTaskInfoResp, error) PushResourceInfo(pushResourceInfoReq PushResourceInfoReq) error } - -type PullTaskInfoReq struct { - AdapterId int64 `json:"adapterId"` -} - -type PullTaskInfoResp struct { - HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"` - CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"` - AiInfoList []*AiInfo `json:"AiInfoList,omitempty"` - VmInfoList []*VmInfo `json:"VmInfoList,omitempty"` -} - -type PushTaskInfoReq struct { - AdapterId int64 `json:"adapterId"` - HpcInfoList []*HpcInfo - CloudInfoList []*CloudInfo - AiInfoList []*AiInfo - VmInfoList []*VmInfo -} - -type PushTaskInfoResp struct { - Code int64 - Msg string -} - -type PushResourceInfoReq struct { - AdapterId int64 `json:"adapterId"` -} diff --git a/api/client/types.go b/api/client/types.go index 155ad628..758f6bcb 100644 --- a/api/client/types.go +++ b/api/client/types.go @@ -5,6 +5,34 @@ import ( "time" ) +type PullTaskInfoReq struct { + AdapterId int64 `form:"adapterId"` +} + +type PullTaskInfoResp struct { + HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"` + CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"` + AiInfoList []*AiInfo `json:"AiInfoList,omitempty"` + VmInfoList []*VmInfo `json:"VmInfoList,omitempty"` +} + +type PushTaskInfoReq struct { + AdapterId int64 `json:"adapterId"` + HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty,optional"` + CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty,optional"` + AiInfoList []*AiInfo `json:"AiInfoList,omitempty,optional"` + VmInfoList []*VmInfo `json:"VmInfoList,omitempty,optional"` +} + +type PushTaskInfoResp struct { + Code int64 + Msg string +} + +type PushResourceInfoReq struct { + AdapterId int64 `json:"adapterId"` +} + type HpcInfo struct { Id int64 `json:"id"` // id TaskId int64 `json:"task_id"` // 任务id diff --git a/api/desc/hpc/pcm-hpc.api b/api/desc/hpc/pcm-hpc.api index 38f0ed08..23bb516c 100644 --- a/api/desc/hpc/pcm-hpc.api +++ b/api/desc/hpc/pcm-hpc.api @@ -23,21 +23,21 @@ type ( Description string `json:"description,optional"` tenantId int64 `json:"tenantId,optional"` TaskId int64 `json:"taskId,optional"` - participantId int64 `json:"participantId,optional"` - matchLabels map[string]string `json:"matchLabels,optional"` - cardCount int64 `json:"cardCount,optional"` - workDir string `json:"workDir,optional"` //paratera:workingDir - wallTime string `json:"wallTime,optional"` - cmdScript string `json:"cmdScript,optional"` // paratera:bootScript - appType string `json:"appType,optional"` - appName string `json:"appName,optional"` // paratera:jobGroupName ac:appname - queue string `json:"queue,optional"` - nNode string `json:"nNode,optional"` - submitType string `json:"submitType,optional"` - stdOutFile string `json:"stdOutFile,optional"` - stdErrFile string `json:"stdErrFile,optional"` - stdInput string `json:"stdInput,optional"` - environment map[string]string `json:"environment,optional"` + AdapterId int64 `json:"adapterId,optional"` + MatchLabels map[string]string `json:"matchLabels,optional"` + CardCount int64 `json:"cardCount,optional"` + WorkDir string `json:"workDir,optional"` //paratera:workingDir + WallTime string `json:"wallTime,optional"` + CmdScript string `json:"cmdScript,optional"` // paratera:bootScript + AppType string `json:"appType,optional"` + AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname + Queue string `json:"queue,optional"` + NNode string `json:"nNode,optional"` + SubmitType string `json:"submitType,optional"` + StdOutFile string `json:"stdOutFile,optional"` + StdErrFile string `json:"stdErrFile,optional"` + StdInput string `json:"stdInput,optional"` + Environment map[string]string `json:"environment,optional"` } ) diff --git a/api/desc/participant/pcm-participant.api b/api/desc/participant/pcm-participant.api new file mode 100644 index 00000000..f76fc604 --- /dev/null +++ b/api/desc/participant/pcm-participant.api @@ -0,0 +1,125 @@ +syntax = "v1" + +info( + title: "type title here" + desc: "type desc here" + author: "type author here" + email: "type email here" + version: "type version here" +) + +type PullTaskInfoReq { + AdapterId int64 `form:"adapterId"` +} + +type PullTaskInfoResp struct { + HpcInfoList []*HpcInfo `json:"HpcInfoList,omitempty"` + CloudInfoList []*CloudInfo `json:"CloudInfoList,omitempty"` + AiInfoList []*AiInfo `json:"AiInfoList,omitempty"` + VmInfoList []*VmInfo `json:"VmInfoList,omitempty"` +} + +type HpcInfo struct { + Id int64 `json:"id"` // id + TaskId int64 `json:"task_id"` // 任务id + JobId string `json:"job_id"` // 作业id(在第三方系统中的作业id) + ClusterId int64 `json:"cluster_id"` // 执行任务的集群id + ClusterType string `json:"cluster_type"` // 执行任务的集群类型 + Name string `json:"name"` // 名称 + Status string `json:"status"` // 状态 + CmdScript string `json:"cmd_script"` + StartTime string `json:"start_time"` // 开始时间 + RunningTime int64 `json:"running_time"` // 运行时间 + DerivedEs string `json:"derived_es"` + Cluster string `json:"cluster"` + BlockId int64 `json:"block_id"` + AllocNodes int64 `json:"alloc_nodes"` + AllocCpu int64 `json:"alloc_cpu"` + CardCount int64 `json:"card_count"` // 卡数 + Version string `json:"version"` + Account string `json:"account"` + WorkDir string `json:"work_dir"` // 工作路径 + AssocId int64 `json:"assoc_id"` + ExitCode int64 `json:"exit_code"` + WallTime string `json:"wall_time"` // 最大运行时间 + Result string `json:"result"` // 运行结果 + DeletedAt string `json:"deleted_at"` // 删除时间 + YamlString string `json:"yaml_string"` + AppType string `json:"app_type"` // 应用类型 + AppName string `json:"app_name"` // 应用名称 + Queue string `json:"queue"` // 队列名称 + SubmitType string `json:"submit_type"` // cmd(命令行模式) + NNode string `json:"n_node"` // 节点个数(当指定该参数时,GAP_NODE_STRING必须为"") + StdOutFile string `json:"std_out_file"` // 工作路径/std.err.%j + StdErrFile string `json:"std_err_file"` // 工作路径/std.err.%j + StdInput string `json:"std_input"` + Environment string `json:"environment"` + DeletedFlag int64 `json:"deleted_flag"` // 是否删除(0-否,1-是) + CreatedBy int64 `json:"created_by"` // 创建人 + CreatedTime string `json:"created_time"` // 创建时间 + UpdatedBy int64 `json:"updated_by"` // 更新人 + UpdatedTime string `json:"updated_time"` // 更新时间 +} + +type CloudInfo struct { + Participant int64 `json:"participant,omitempty"` + Id int64 `json:"id,omitempty"` + TaskId int64 `json:"taskId,omitempty"` + ApiVersion string `json:"apiVersion,omitempty"` + Kind string `json:"kind,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + Status string `json:"status,omitempty"` + StartTime string `json:"startTime,omitempty"` + RunningTime int64 `json:"runningTime,omitempty"` + Result string `json:"result,omitempty"` + YamlString string `json:"yamlString,omitempty"` +} + +type AiInfo struct { + ParticipantId int64 `json:"participantId,omitempty"` + TaskId int64 `json:"taskId,omitempty"` + ProjectId string `json:"project_id,omitempty"` + Name string `json:"name,omitempty"` + Status string `json:"status,omitempty"` + StartTime string `json:"startTime,omitempty"` + RunningTime int64 `json:"runningTime,omitempty"` + Result string `json:"result,omitempty"` + JobId string `json:"jobId,omitempty"` + CreateTime string `json:"createTime,omitempty"` + ImageUrl string `json:"imageUrl,omitempty"` + Command string `json:"command,omitempty"` + FlavorId string `json:"flavorId,omitempty"` + SubscriptionId string `json:"subscriptionId,omitempty"` + ItemVersionId string `json:"itemVersionId,omitempty"` +} + +type VmInfo struct { + ParticipantId int64 `json:"participantId,omitempty"` + TaskId int64 `json:"taskId,omitempty"` + Name string `json:"name,omitempty"` + FlavorRef string `json:"flavor_ref,omitempty"` + ImageRef string `json:"image_ref,omitempty"` + NetworkUuid string `json:"network_uuid,omitempty"` + BlockUuid string `json:"block_uuid,omitempty"` + SourceType string `json:"source_type,omitempty"` + DeleteOnTermination bool `json:"delete_on_termination,omitempty"` + State string `json:"state,omitempty"` +} + +type PushTaskInfoReq struct { + AdapterId int64 `json:"adapterId"` + HpcInfoList []*HpcInfo + CloudInfoList []*CloudInfo + AiInfoList []*AiInfo + VmInfoList []*VmInfo +} + +type PushTaskInfoResp struct { + Code int64 + Msg string +} + +type PushResourceInfoReq struct { + AdapterId int64 `json:"adapterId"` +} \ No newline at end of file diff --git a/api/desc/pcm.api b/api/desc/pcm.api index dca9d708..df0d8643 100644 --- a/api/desc/pcm.api +++ b/api/desc/pcm.api @@ -9,6 +9,7 @@ import ( "cloud/pcm-cloud.api" "storelink/pcm-storelink.api" "schedule/pcm-schedule.api" + "participant/pcm-participant.api" ) info( @@ -112,6 +113,14 @@ service pcm { @doc "metrics" @handler metricsHandler get /core/metrics + + @doc "provided to participant to pull task info from core" + @handler pullTaskInfoHandler + get /core/pullTaskInfo (PullTaskInfoReq) returns (PullTaskInfoResp) + + @doc "provided to participant to push task info to core" + @handler pushTaskInfoHandler + post /core/pushTaskInfo (PushTaskInfoReq) returns (PushTaskInfoResp) } //hpc二级接口 @@ -292,9 +301,9 @@ service pcm { @doc "挂载notebook存储" @handler mountNotebookStorageHandler post /ai/mountNotebookStorage (MountNotebookStorageReq) returns (MountNotebookStorageResp) - /******************Notebook Method end*************************/ +/******************Notebook Method end*************************/ - /******************Visualization Job Method start*************************/ +/******************Visualization Job Method start*************************/ @doc "获取虚拟化任务" @handler getVisualizationJobHandler get /ai/getVisualizationJob (GetVisualizationJobReq) returns (GetVisualizationJobResp) diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index 6822d544..50f31d6c 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -134,6 +134,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { Path: "/core/metrics", Handler: core.MetricsHandler(serverCtx), }, + { + Method: http.MethodGet, + Path: "/core/pullTaskInfo", + Handler: core.PullTaskInfoHandler(serverCtx), + }, + { + Method: http.MethodPost, + Path: "/core/pushTaskInfo", + Handler: core.PushTaskInfoHandler(serverCtx), + }, }, rest.WithPrefix("/pcm/v1"), ) diff --git a/api/internal/logic/core/pulltaskinfologic.go b/api/internal/logic/core/pulltaskinfologic.go index 438475d4..bc152ddc 100644 --- a/api/internal/logic/core/pulltaskinfologic.go +++ b/api/internal/logic/core/pulltaskinfologic.go @@ -27,26 +27,12 @@ func NewPullTaskInfoLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Pull } func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clientCore.PullTaskInfoResp, error) { - //opt := clientPCM.Options{ - // Url: "http://localhost:8999", - // DataSource: "root:uJpLd6u-J?HC1@(47.92.88.143:3306)/pcm?parseTime=true&loc=Local", - //} - //coreCli, _ := clientPCM.NewClient(opt) - //taskOpt := clientPCM.TaskOptions{} - //coreTask, _ := coreCli.Task(taskOpt) - //adapterId := 1706858330967773111 - //// 查询core端分发下来的任务列表 - //pullReq := types.PullTaskInfoReq{ - // AdapterId: int64(adapterId), - //} - //hpcList, _ := coreTask.PullTaskInfo(pullReq) - //println(hpcList) - // 查询p端类型 resp := clientCore.PullTaskInfoResp{} + // check the kind of adapter var kind int32 l.svcCtx.DbEngin.Raw("select type as kind from `t_adapter` where id = ?", req.AdapterId).Scan(&kind) - // 查询云智超中的数据列表 + // pull task list from database switch kind { case 2: var hpcModelList []models.TaskHpc @@ -85,8 +71,9 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie return &resp, nil } -func findModelList(participantId int64, dbEngin *gorm.DB, data interface{}) error { - tx := dbEngin.Where("cluster_id = (select id from t_cluster where adapter_id = ?) AND status NOT IN ?", participantId, []string{"Deleted", "Succeeded", "Completed", "Failed"}).Find(data) +func findModelList(adapterId int64, dbEngin *gorm.DB, data interface{}) error { + tx := dbEngin.Where("cluster_id in (select id from t_cluster where adapter_id = ?) AND status not in "+ + "('Deleted', 'Succeeded', 'COMPLETED', 'Completed', 'Failed','FAIL','statC','statE')", adapterId).Find(data) if tx.Error != nil { return tx.Error } diff --git a/api/internal/logic/core/pushtaskinfologic.go b/api/internal/logic/core/pushtaskinfologic.go index 6a882aee..2944711d 100644 --- a/api/internal/logic/core/pushtaskinfologic.go +++ b/api/internal/logic/core/pushtaskinfologic.go @@ -40,7 +40,7 @@ func (l *PushTaskInfoLogic) PushTaskInfo(req *clientCore.PushTaskInfoReq) (*clie case 2: for _, hpcInfo := range req.HpcInfoList { l.svcCtx.DbEngin.Exec("update task_hpc set status = ?,start_time = ?,job_id = ? where cluster_id = ? and task_id = ? and name = ?", - hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, req.AdapterId, hpcInfo.TaskId, hpcInfo.Name) + hpcInfo.Status, hpcInfo.StartTime, hpcInfo.JobId, hpcInfo.ClusterId, hpcInfo.TaskId, hpcInfo.Name) syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId) } case 1: diff --git a/api/internal/logic/hpc/commithpctasklogic.go b/api/internal/logic/hpc/commithpctasklogic.go index d8827ce9..6211a13d 100644 --- a/api/internal/logic/hpc/commithpctasklogic.go +++ b/api/internal/logic/hpc/commithpctasklogic.go @@ -5,6 +5,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "k8s.io/apimachinery/pkg/util/json" + "math/rand" "time" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" @@ -28,6 +29,7 @@ func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Com } func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) { + // 构建主任务结构体 taskModel := models.Task{ Status: constants.Saved, @@ -41,10 +43,18 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t return nil, tx.Error } + var clusterIds []int64 + l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id = ? and label = ?", req.AdapterId, req.ClusterType).Scan(&clusterIds) + env, _ := json.Marshal(req.Environment) + + if len(clusterIds) == 0 || clusterIds == nil { + return nil, nil + } + hpcInfo := models.TaskHpc{ TaskId: taskModel.Id, - ClusterId: 1706858330967773111, + ClusterId: clusterIds[rand.Intn(len(clusterIds))], Name: taskModel.Name, Status: "Saved", CmdScript: req.CmdScript, @@ -61,9 +71,9 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t StdErrFile: req.StdErrFile, StdInput: req.StdInput, DeletedFlag: 0, - CreatedBy: req.ParticipantId, + CreatedBy: 0, CreatedTime: time.Now(), - UpdatedBy: req.ParticipantId, + UpdatedBy: 0, UpdatedTime: time.Now(), Environment: string(env), } @@ -72,7 +82,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t if tx.Error != nil { return nil, tx.Error } - // 将任务数据转换成消息体 + // todo mq task manage //reqMessage, err := json.Marshal(mqInfo) //if err != nil { // logx.Error(err) @@ -82,5 +92,10 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t //if publish.Err() != nil { // return nil, publish.Err() //} - return + resp = &types.CommitHpcTaskResp{ + Code: 200, + Msg: "success", + TaskId: taskModel.Id, + } + return resp, nil } diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 17307edd..6f165681 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -849,29 +849,32 @@ type Job struct { } type CommitHpcTaskReq struct { - Name string `json:"name"` // paratera:jobName - Description string `json:"description,optional"` - TenantId int64 `json:"tenantId,optional"` - TaskId int64 `json:"taskId,optional"` - ParticipantId int64 `json:"participantId,optional"` - MatchLabels map[string]string `json:"matchLabels,optional"` - CardCount int64 `json:"cardCount,optional"` - WorkDir string `json:"workDir,optional"` //paratera:workingDir - WallTime string `json:"wallTime,optional"` - CmdScript string `json:"cmdScript,optional"` // paratera:bootScript - AppType string `json:"appType,optional"` - AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname - Queue string `json:"queue,optional"` - NNode string `json:"nNode,optional"` - SubmitType string `json:"submitType,optional"` - StdOutFile string `json:"stdOutFile,optional"` - StdErrFile string `json:"stdErrFile,optional"` - StdInput string `json:"stdInput,optional"` - Environment map[string]string `json:"environment,optional"` + Name string `json:"name"` // paratera:jobName + Description string `json:"description,optional"` + TenantId int64 `json:"tenantId,optional"` + TaskId int64 `json:"taskId,optional"` + AdapterId int64 `json:"adapterId,optional"` + ClusterType string `json:"clusterType,optional"` + MatchLabels map[string]string `json:"matchLabels,optional"` + CardCount int64 `json:"cardCount,optional"` + WorkDir string `json:"workDir,optional"` //paratera:workingDir + WallTime string `json:"wallTime,optional"` + CmdScript string `json:"cmdScript,optional"` // paratera:bootScript + AppType string `json:"appType,optional"` + AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname + Queue string `json:"queue,optional"` + NNode string `json:"nNode,optional"` + SubmitType string `json:"submitType,optional"` + StdOutFile string `json:"stdOutFile,optional"` + StdErrFile string `json:"stdErrFile,optional"` + StdInput string `json:"stdInput,optional"` + Environment map[string]string `json:"environment,optional"` } type CommitHpcTaskResp struct { - TaskId int64 `json:"taskId"` + Code int32 `json:"code"` + Msg string `json:"msg"` + TaskId int64 `json:"taskId"` } type ListJobReq struct {