对接天河

This commit is contained in:
zhangwei 2023-04-27 14:39:22 +08:00
parent 5af20bdc0f
commit 74931f6c48
8 changed files with 148 additions and 37 deletions

View File

@ -4,6 +4,7 @@ import (
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient" "PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcAC" "PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcAC"
"PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/svc" "PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/svc"
"PCM/common/enum"
"PCM/common/tool" "PCM/common/tool"
"context" "context"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
@ -38,12 +39,7 @@ func InitCron(svc *svc.ServiceContext) {
infoList.HpcInfoList[index].JobId = job.JobId infoList.HpcInfoList[index].JobId = job.JobId
infoList.HpcInfoList[index].StartTime = job.JobStartTime infoList.HpcInfoList[index].StartTime = job.JobStartTime
infoList.HpcInfoList[index].RunningTime = int64(tool.RunTimeToSeconds(job.JobRunTime)) infoList.HpcInfoList[index].RunningTime = int64(tool.RunTimeToSeconds(job.JobRunTime))
if job.JobStatus == "statR" { infoList.HpcInfoList[index].Status = enum.AcStatus(job.JobStatus).String()
infoList.HpcInfoList[index].Status = "Running"
}
if job.JobStatus == "statC" {
infoList.HpcInfoList[index].Status = "Completed"
}
} }
} }
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/hpcTH" "PCM/adaptor/PCM-HPC/PCM-TH/rpc/hpcTH"
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/config" "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/config"
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/logic"
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/server" "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/server"
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/svc" "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/svc"
commonConfig "PCM/common/config" commonConfig "PCM/common/config"
@ -19,29 +20,6 @@ import (
var configFile = flag.String("f", "adaptor/PCM-HPC/PCM-TH/rpc/etc/hpcth.yaml", "the config file") var configFile = flag.String("f", "adaptor/PCM-HPC/PCM-TH/rpc/etc/hpcth.yaml", "the config file")
func main() { func main() {
//flag.Parse()
//
//var c config.Config
//conf.MustLoad(*configFile, &c)
//// start log component
//logx.MustSetup(c.LogConf)
//ctx := svc.NewServiceContext(c)
//
//s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
// hpcTH.RegisterHpcTHServer(grpcServer, server.NewHpcTHServer(ctx))
//
// if c.Mode == service.DevMode || c.Mode == service.TestMode {
// reflection.Register(grpcServer)
// }
//})
//
////rpc log
//s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)
//
//defer s.Stop()
//
//logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
//s.Start()
//-------------------- //--------------------
flag.Parse() flag.Parse()
@ -82,7 +60,7 @@ func main() {
s.AddUnaryInterceptors(rpcserver.LoggerInterceptor) s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)
defer s.Stop() defer s.Stop()
logic.InitCron(ctx)
logx.Infof("Starting rpc server at %s...\n", c.ListenOn) logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
s.Start() s.Start()
} }

View File

@ -7,5 +7,6 @@ import (
type Config struct { type Config struct {
zrpc.RpcServerConf zrpc.RpcServerConf
LogConf logx.LogConf LogConf logx.LogConf
PcmCoreRpcConf zrpc.RpcClientConf
} }

View File

@ -0,0 +1,81 @@
package logic
import (
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/hpcTH"
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/svc"
"PCM/common/enum"
"context"
"github.com/zeromicro/go-zero/core/logx"
"time"
)
func InitCron(svc *svc.ServiceContext) {
submitJobLogic := NewSubmitJobLogic(context.Background(), svc)
listLogic := NewListJobLogic(context.Background(), svc)
svc.Cron.AddFunc("*/5 * * * * ?", func() {
syncInfoReq := pcmcoreclient.SyncInfoReq{
Kind: "hpc",
ServiceName: "th",
}
// 查询core端分发下来的任务列表
infoList, err := queryCoreInfoList(svc)
if err != nil {
logx.Error(err)
return
}
// 提交任务
submitJob(infoList, submitJobLogic)
// 查询运行中的任务列表同步信息
listReq := hpcTH.ListJobReq{}
listJob, err := listLogic.ListJob(&listReq)
if err != nil {
logx.Error(err)
return
}
for index, _ := range infoList.HpcInfoList {
for _, job := range listJob.Jobs {
if job.Name == infoList.HpcInfoList[index].Name {
infoList.HpcInfoList[index].JobId = string(job.JobId)
infoList.HpcInfoList[index].StartTime = time.Unix(job.StartTime, 0).String()
infoList.HpcInfoList[index].RunningTime = time.Now().Sub(time.Unix(job.StartTime, 0)).Milliseconds()
infoList.HpcInfoList[index].Status = enum.State(job.JobState).String()
}
}
}
// 同步信息到core端
if len(infoList.HpcInfoList) != 0 {
syncInfoReq.HpcInfoList = infoList.HpcInfoList
svc.PcmCoreRpc.SyncInfo(context.Background(), &syncInfoReq)
}
})
}
func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *SubmitJobLogic) {
for index, _ := range infoList.HpcInfoList {
if infoList.HpcInfoList[index].Status == "Saved" {
submitReq := hpcTH.SubmitJobReq{
Account: "root",
Name: infoList.HpcInfoList[index].Name,
Script: "#! /bin/bash\\n hostname \\n env | grep SLURM",
UserId: 123,
}
jobResult, _ := submitJobLogic.SubmitJob(&submitReq)
infoList.HpcInfoList[index].Status = "Pending"
infoList.HpcInfoList[index].JobId = string(jobResult.SubmitResponseMsg[0].JobId)
}
}
}
func queryCoreInfoList(svc *svc.ServiceContext) (*pcmcoreclient.InfoListResp, error) {
infoReq := pcmcoreclient.InfoListReq{
Kind: "hpc",
ServiceName: "th",
}
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
if err != nil {
return nil, err
}
return infoList, nil
}

View File

@ -1,15 +1,22 @@
package svc package svc
import ( import (
"PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
"PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/config" "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/config"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-zero/zrpc"
) )
type ServiceContext struct { type ServiceContext struct {
Config config.Config Config config.Config
Cron *cron.Cron
PcmCoreRpc pcmcoreclient.PcmCore
} }
func NewServiceContext(c config.Config) *ServiceContext { func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{ return &ServiceContext{
Config: c, Cron: cron.New(cron.WithSeconds()),
Config: c,
PcmCoreRpc: pcmcoreclient.NewPcmCore(zrpc.MustNewClient(c.PcmCoreRpcConf)),
} }
} }

23
common/enum/acEnum.go Normal file
View File

@ -0,0 +1,23 @@
package enum
type AcStatus string
const (
statR AcStatus = "statR"
statC AcStatus = "statC"
statQ AcStatus = "statQ"
statW AcStatus = "statW"
)
func (s AcStatus) String() string {
switch s {
case statR:
return "Running"
case statC:
return "Completed"
case statQ, statW:
return "Pending"
default:
return ""
}
}

View File

@ -1,13 +1,13 @@
package enum package enum
type Status int64 type synergyStatus int64
const ( const (
SYNERGIZED Status = 0 SYNERGIZED synergyStatus = 0
NOT_SYNERGIZED Status = 1 NOT_SYNERGIZED synergyStatus = 1
) )
func (s Status) String() string { func (s synergyStatus) String() string {
switch s { switch s {
case SYNERGIZED: case SYNERGIZED:
return "已协同" return "已协同"

25
common/enum/tianheEnum.go Normal file
View File

@ -0,0 +1,25 @@
package enum
type State uint32
const (
pending State = 0
running State = 1
suspended State = 2
completed State = 3
)
func (s State) String() string {
switch s {
case pending:
return "Pending"
case running:
return "Running"
case suspended:
return "Suspended"
case completed:
return "Completed"
default:
return ""
}
}