diff --git a/adaptor/PCM-HPC/PCM-AC/rpc/internal/logic/cronlogic.go b/adaptor/PCM-HPC/PCM-AC/rpc/internal/logic/cronlogic.go index a517d699..01e5f279 100644 --- a/adaptor/PCM-HPC/PCM-AC/rpc/internal/logic/cronlogic.go +++ b/adaptor/PCM-HPC/PCM-AC/rpc/internal/logic/cronlogic.go @@ -4,6 +4,7 @@ import ( "PCM/adaptor/PCM-CORE/rpc/pcmcoreclient" "PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcAC" "PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/svc" + "PCM/common/enum" "PCM/common/tool" "context" "github.com/zeromicro/go-zero/core/logx" @@ -38,12 +39,7 @@ func InitCron(svc *svc.ServiceContext) { infoList.HpcInfoList[index].JobId = job.JobId infoList.HpcInfoList[index].StartTime = job.JobStartTime infoList.HpcInfoList[index].RunningTime = int64(tool.RunTimeToSeconds(job.JobRunTime)) - if job.JobStatus == "statR" { - infoList.HpcInfoList[index].Status = "Running" - } - if job.JobStatus == "statC" { - infoList.HpcInfoList[index].Status = "Completed" - } + infoList.HpcInfoList[index].Status = enum.AcStatus(job.JobStatus).String() } } } diff --git a/adaptor/PCM-HPC/PCM-TH/rpc/hpcth.go b/adaptor/PCM-HPC/PCM-TH/rpc/hpcth.go index e6fc1193..74d0959a 100644 --- a/adaptor/PCM-HPC/PCM-TH/rpc/hpcth.go +++ b/adaptor/PCM-HPC/PCM-TH/rpc/hpcth.go @@ -3,6 +3,7 @@ package main import ( "PCM/adaptor/PCM-HPC/PCM-TH/rpc/hpcTH" "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/config" + "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/logic" "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/server" "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/svc" commonConfig "PCM/common/config" @@ -19,29 +20,6 @@ import ( var configFile = flag.String("f", "adaptor/PCM-HPC/PCM-TH/rpc/etc/hpcth.yaml", "the config file") func main() { - //flag.Parse() - // - //var c config.Config - //conf.MustLoad(*configFile, &c) - //// start log component - //logx.MustSetup(c.LogConf) - //ctx := svc.NewServiceContext(c) - // - //s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { - // hpcTH.RegisterHpcTHServer(grpcServer, server.NewHpcTHServer(ctx)) - // - // if c.Mode == service.DevMode || c.Mode == service.TestMode { - // reflection.Register(grpcServer) - // } - //}) - // - ////rpc log - //s.AddUnaryInterceptors(rpcserver.LoggerInterceptor) - // - //defer s.Stop() - // - //logx.Infof("Starting rpc server at %s...\n", c.ListenOn) - //s.Start() //-------------------- flag.Parse() @@ -82,7 +60,7 @@ func main() { s.AddUnaryInterceptors(rpcserver.LoggerInterceptor) defer s.Stop() - + logic.InitCron(ctx) logx.Infof("Starting rpc server at %s...\n", c.ListenOn) s.Start() } diff --git a/adaptor/PCM-HPC/PCM-TH/rpc/internal/config/config.go b/adaptor/PCM-HPC/PCM-TH/rpc/internal/config/config.go index 54996416..a9b2ecbb 100644 --- a/adaptor/PCM-HPC/PCM-TH/rpc/internal/config/config.go +++ b/adaptor/PCM-HPC/PCM-TH/rpc/internal/config/config.go @@ -7,5 +7,6 @@ import ( type Config struct { zrpc.RpcServerConf - LogConf logx.LogConf + LogConf logx.LogConf + PcmCoreRpcConf zrpc.RpcClientConf } diff --git a/adaptor/PCM-HPC/PCM-TH/rpc/internal/logic/cronlogic.go b/adaptor/PCM-HPC/PCM-TH/rpc/internal/logic/cronlogic.go new file mode 100644 index 00000000..678ce5e7 --- /dev/null +++ b/adaptor/PCM-HPC/PCM-TH/rpc/internal/logic/cronlogic.go @@ -0,0 +1,81 @@ +package logic + +import ( + "PCM/adaptor/PCM-CORE/rpc/pcmcoreclient" + "PCM/adaptor/PCM-HPC/PCM-TH/rpc/hpcTH" + "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/svc" + "PCM/common/enum" + "context" + "github.com/zeromicro/go-zero/core/logx" + "time" +) + +func InitCron(svc *svc.ServiceContext) { + submitJobLogic := NewSubmitJobLogic(context.Background(), svc) + listLogic := NewListJobLogic(context.Background(), svc) + svc.Cron.AddFunc("*/5 * * * * ?", func() { + syncInfoReq := pcmcoreclient.SyncInfoReq{ + Kind: "hpc", + ServiceName: "th", + } + // 查询core端分发下来的任务列表 + infoList, err := queryCoreInfoList(svc) + if err != nil { + logx.Error(err) + return + } + // 提交任务 + submitJob(infoList, submitJobLogic) + // 查询运行中的任务列表同步信息 + listReq := hpcTH.ListJobReq{} + listJob, err := listLogic.ListJob(&listReq) + if err != nil { + logx.Error(err) + return + } + for index, _ := range infoList.HpcInfoList { + for _, job := range listJob.Jobs { + if job.Name == infoList.HpcInfoList[index].Name { + infoList.HpcInfoList[index].JobId = string(job.JobId) + infoList.HpcInfoList[index].StartTime = time.Unix(job.StartTime, 0).String() + infoList.HpcInfoList[index].RunningTime = time.Now().Sub(time.Unix(job.StartTime, 0)).Milliseconds() + infoList.HpcInfoList[index].Status = enum.State(job.JobState).String() + + } + } + } + // 同步信息到core端 + if len(infoList.HpcInfoList) != 0 { + syncInfoReq.HpcInfoList = infoList.HpcInfoList + svc.PcmCoreRpc.SyncInfo(context.Background(), &syncInfoReq) + } + }) +} + +func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *SubmitJobLogic) { + for index, _ := range infoList.HpcInfoList { + if infoList.HpcInfoList[index].Status == "Saved" { + submitReq := hpcTH.SubmitJobReq{ + Account: "root", + Name: infoList.HpcInfoList[index].Name, + Script: "#! /bin/bash\\n hostname \\n env | grep SLURM", + UserId: 123, + } + jobResult, _ := submitJobLogic.SubmitJob(&submitReq) + infoList.HpcInfoList[index].Status = "Pending" + infoList.HpcInfoList[index].JobId = string(jobResult.SubmitResponseMsg[0].JobId) + } + } +} + +func queryCoreInfoList(svc *svc.ServiceContext) (*pcmcoreclient.InfoListResp, error) { + infoReq := pcmcoreclient.InfoListReq{ + Kind: "hpc", + ServiceName: "th", + } + infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq) + if err != nil { + return nil, err + } + return infoList, nil +} diff --git a/adaptor/PCM-HPC/PCM-TH/rpc/internal/svc/servicecontext.go b/adaptor/PCM-HPC/PCM-TH/rpc/internal/svc/servicecontext.go index 1bc428c8..81fcd86e 100644 --- a/adaptor/PCM-HPC/PCM-TH/rpc/internal/svc/servicecontext.go +++ b/adaptor/PCM-HPC/PCM-TH/rpc/internal/svc/servicecontext.go @@ -1,15 +1,22 @@ package svc import ( + "PCM/adaptor/PCM-CORE/rpc/pcmcoreclient" "PCM/adaptor/PCM-HPC/PCM-TH/rpc/internal/config" + "github.com/robfig/cron/v3" + "github.com/zeromicro/go-zero/zrpc" ) type ServiceContext struct { - Config config.Config + Config config.Config + Cron *cron.Cron + PcmCoreRpc pcmcoreclient.PcmCore } func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ - Config: c, + Cron: cron.New(cron.WithSeconds()), + Config: c, + PcmCoreRpc: pcmcoreclient.NewPcmCore(zrpc.MustNewClient(c.PcmCoreRpcConf)), } } diff --git a/common/enum/acEnum.go b/common/enum/acEnum.go new file mode 100644 index 00000000..51744256 --- /dev/null +++ b/common/enum/acEnum.go @@ -0,0 +1,23 @@ +package enum + +type AcStatus string + +const ( + statR AcStatus = "statR" + statC AcStatus = "statC" + statQ AcStatus = "statQ" + statW AcStatus = "statW" +) + +func (s AcStatus) String() string { + switch s { + case statR: + return "Running" + case statC: + return "Completed" + case statQ, statW: + return "Pending" + default: + return "" + } +} diff --git a/common/enum/synergyStatusEnum.go b/common/enum/synergyStatusEnum.go index d7b097f6..ffa7eea3 100644 --- a/common/enum/synergyStatusEnum.go +++ b/common/enum/synergyStatusEnum.go @@ -1,13 +1,13 @@ package enum -type Status int64 +type synergyStatus int64 const ( - SYNERGIZED Status = 0 - NOT_SYNERGIZED Status = 1 + SYNERGIZED synergyStatus = 0 + NOT_SYNERGIZED synergyStatus = 1 ) -func (s Status) String() string { +func (s synergyStatus) String() string { switch s { case SYNERGIZED: return "已协同" diff --git a/common/enum/tianheEnum.go b/common/enum/tianheEnum.go new file mode 100644 index 00000000..832990c1 --- /dev/null +++ b/common/enum/tianheEnum.go @@ -0,0 +1,25 @@ +package enum + +type State uint32 + +const ( + pending State = 0 + running State = 1 + suspended State = 2 + completed State = 3 +) + +func (s State) String() string { + switch s { + case pending: + return "Pending" + case running: + return "Running" + case suspended: + return "Suspended" + case completed: + return "Completed" + default: + return "" + } +}