pcm-coordinator/internal/storeLink/shuguangHpc.go

269 lines
7.1 KiB
Go

package storeLink
import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"strconv"
"strings"
)
type ShuguangHpc struct {
aCRpc hpcacclient.HpcAC
platform string
participantId int64
}
const (
GAP_WALL_TIME_24H = "24:00:00"
TASK_SHUGUANG_PREFIX = "ShuguangHPC"
NEWLINE = "\n"
JOBNAME = "JOBNAME"
GAP_CMD_FILE = "cmd"
GAP_NNODE = "1" // 节点个数
GAP_NODE_STRING = ""
GAP_APPNAME = "BASE"
GAP_QUEUE = "wzhdtest"
GAP_WORK_DIR = "/work/home/acgnnmfbwo/BASE/JOBNAME"
GAP_STD_OUT_FILE = "/work/home/acgnnmfbwo/BASE/JOBNAME/std.out.%j"
GAP_STD_ERR_FILE = "/work/home/acgnnmfbwo/BASE/JOBNAME/std.err.%j"
StrJobManagerID = 1637920656
Apptype = "BASIC"
EXPORT = "export"
GAP_NPROC = "1"
GAP_NDCU = "1"
GAP_EXCLUSIVE = ""
GAP_PPN = ""
GAP_NGPU = ""
GAP_MULTI_SUB = ""
StrJobInfoMap = "%d,%s:%s:"
Username = "acgnnmfbwo"
)
var RESOURCEMAP = map[string]ResourceSpecHpc{
"FPOqD5Cx8iNYqawEgDrAxLdrszp4Tmhl": {
GAP_NNODE: "1",
GAP_NPROC: "1",
GAP_NDCU: "1",
},
"Nd99eGNoBFC2ZTycKDlqD37heWTOmrMS": {
GAP_NNODE: "1",
GAP_NPROC: "2",
GAP_NDCU: "1",
},
"uAmLkz6jgSZkC6o8JywG7Yo2aiFPPOBO": {
GAP_NNODE: "1",
GAP_NPROC: "4",
GAP_NDCU: "2",
},
"D71OZQYrRabJc2nfL2GDWOdLEfbiMzYH": {
GAP_NNODE: "1",
GAP_NPROC: "8",
GAP_NDCU: "4",
},
"sXUMrGmgMDFJaLi6dPiB9LkHjFb3lvL5": {
GAP_NNODE: "1",
GAP_NPROC: "16",
GAP_NDCU: "4",
},
"ZfCKQKbNbQl9RPwlSyWLah1Gf7Ti7uJA": {
GAP_NNODE: "1",
GAP_NPROC: "32",
GAP_NDCU: "4",
},
"cfEI4ulTNo2gYUozzdG59URByUjwLl3x": {
GAP_NNODE: "2",
GAP_NPROC: "4",
GAP_NDCU: "2",
},
"vtbkaks8bErhpLRkUDiPDUHq6ssotFpD": {
GAP_NNODE: "2",
GAP_NPROC: "8",
GAP_NDCU: "4",
},
"QJXZFJSReVWWQfkvQjGyEq1JpDHN55Oh": {
GAP_NNODE: "2",
GAP_NPROC: "16",
GAP_NDCU: "4",
},
"79xSdy48yLbVLl9DqEV6tQ2J6jaHe5KO": {
GAP_NNODE: "2",
GAP_NPROC: "32",
GAP_NDCU: "8",
},
}
var RESOURCESPECSHPC = map[string]string{
"FPOqD5Cx8iNYqawEgDrAxLdrszp4Tmhl": "1*NODE, CPU:1, 1*DCU",
"Nd99eGNoBFC2ZTycKDlqD37heWTOmrMS": "1*NODE, CPU:2, 1*DCU",
"uAmLkz6jgSZkC6o8JywG7Yo2aiFPPOBO": "1*NODE, CPU:4, 2*DCU",
"D71OZQYrRabJc2nfL2GDWOdLEfbiMzYH": "1*NODE, CPU:8, 4*DCU",
"sXUMrGmgMDFJaLi6dPiB9LkHjFb3lvL5": "1*NODE, CPU:16, 4*DCU",
"ZfCKQKbNbQl9RPwlSyWLah1Gf7Ti7uJA": "1*NODE, CPU:32, 4*DCU",
"cfEI4ulTNo2gYUozzdG59URByUjwLl3x": "2*NODE, CPU:4, 2*DCU",
"vtbkaks8bErhpLRkUDiPDUHq6ssotFpD": "2*NODE, CPU:8, 4*DCU",
"QJXZFJSReVWWQfkvQjGyEq1JpDHN55Oh": "2*NODE, CPU:16, 4*DCU",
"79xSdy48yLbVLl9DqEV6tQ2J6jaHe5KO": "2*NODE, CPU:32, 8*DCU",
}
var AcStatus = map[string]string{
"statQ": "Pending",
"statR": "Running",
"statE": "Pending",
"statC": "Completed",
"statH": "Pending",
"statS": "Pending",
"statW": "Pending",
"statX": "Other",
}
type ResourceSpecHpc struct {
GAP_NNODE string
GAP_NPROC string
GAP_NDCU string
}
func NewShuguangHpc(aCRpc hpcacclient.HpcAC, name string, id int64) *ShuguangHpc {
return &ShuguangHpc{aCRpc: aCRpc, platform: name, participantId: id}
}
func (s ShuguangHpc) UploadImage(ctx context.Context, path string) (interface{}, error) {
return nil, nil
}
func (s ShuguangHpc) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
return nil, nil
}
func (s ShuguangHpc) QueryImageList(ctx context.Context) (interface{}, error) {
return nil, nil
}
func (s ShuguangHpc) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
// shuguangHpc提交任务
//判断是否resourceId匹配自定义资源Id
_, isMapContainsKey := RESOURCESPECSHPC[resourceId]
if !isMapContainsKey {
return nil, errors.New("shuguangHpc资源Id不存在")
}
//环境变量
var env string
for _, e := range envs {
s := strings.Split(e, COMMA)
env += EXPORT + SPACE + s[0] + EQUAL + s[1] + NEWLINE
}
//请求
taskName := TASK_SHUGUANG_PREFIX + UNDERSCORE + utils.RandomString(10)
GAP_WORK_DIR := strings.Replace(GAP_WORK_DIR, JOBNAME, taskName, -1)
GAP_STD_OUT_FILE := strings.Replace(GAP_STD_OUT_FILE, JOBNAME, taskName, -1)
GAP_STD_ERR_FILE := strings.Replace(GAP_STD_ERR_FILE, JOBNAME, taskName, -1)
req := &hpcAC.SubmitJobReq{
Apptype: Apptype,
Appname: GAP_APPNAME,
StrJobManagerID: StrJobManagerID,
MapAppJobInfo: &hpcAC.MapAppJobInfo{
GAP_CMD_FILE: cmd,
GAP_NNODE: GAP_NNODE,
GAP_NODE_STRING: GAP_NODE_STRING,
GAP_SUBMIT_TYPE: GAP_CMD_FILE,
GAP_JOB_NAME: taskName,
GAP_WORK_DIR: GAP_WORK_DIR,
GAP_QUEUE: GAP_QUEUE,
GAP_NPROC: GAP_NPROC,
GAP_PPN: GAP_PPN,
GAP_NGPU: GAP_NGPU,
GAP_NDCU: GAP_NDCU,
GAP_WALL_TIME: GAP_WALL_TIME_24H,
GAP_EXCLUSIVE: GAP_EXCLUSIVE,
GAP_APPNAME: GAP_APPNAME,
GAP_MULTI_SUB: GAP_MULTI_SUB,
GAP_STD_OUT_FILE: GAP_STD_OUT_FILE,
GAP_STD_ERR_FILE: GAP_STD_ERR_FILE,
GAP_SCHEDULER_OPT_WEB: env,
},
}
updateSGHpcRequestByResourceId(resourceId, req)
resp, err := s.aCRpc.SubmitJob(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (s ShuguangHpc) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
//实时作业
reqC := &hpcAC.JobDetailReq{
JobId: taskId,
}
respC, err := s.aCRpc.GetJobDetail(ctx, reqC)
if err != nil {
return nil, err
}
//实时作业检查是否成功
if respC.Data != nil && respC.Data.JobEndTime != "" {
return respC, nil
} else {
//历史作业
reqH := &hpcAC.HistoryJobDetailReq{
JobId: taskId,
JobmanagerId: strconv.Itoa(StrJobManagerID),
}
respH, err := s.aCRpc.HistoryJobDetail(ctx, reqH)
if err != nil {
return nil, err
}
return respH, nil
}
}
func (s ShuguangHpc) QuerySpecs(ctx context.Context) (interface{}, error) {
resp := &types.GetResourceSpecsResp{}
for k, v := range RESOURCESPECSHPC {
var respec types.ResourceSpecSl
respec.SpecId = k
respec.SpecName = v
respec.ParticipantId = s.participantId
respec.ParticipantName = s.platform
resp.ResourceSpecs = append(resp.ResourceSpecs, &respec)
}
resp.Success = true
return resp, nil
}
func (s ShuguangHpc) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
strJobInfoMap := fmt.Sprintf(StrJobInfoMap, StrJobManagerID, Username, taskId)
req := &hpcAC.DeleteJobReq{
StrJobInfoMap: strJobInfoMap,
}
resp, err := s.aCRpc.DeleteJob(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func updateSGHpcRequestByResourceId(resourceId string, req *hpcAC.SubmitJobReq) {
spec := RESOURCEMAP[resourceId]
req.MapAppJobInfo.GAP_NNODE = spec.GAP_NNODE
req.MapAppJobInfo.GAP_NPROC = spec.GAP_NPROC
req.MapAppJobInfo.GAP_NDCU = spec.GAP_NDCU
}