From a833ccf4044b5abc42fb30010bfb7982ef3668e7 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Sat, 27 May 2023 16:53:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=A9=E6=B2=B3=E5=A2=9E=E5=8A=A0=E8=BF=94?= =?UTF-8?q?=E5=9B=9E=E4=BF=A1=E6=81=AF=E5=AD=97=E6=AE=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 1ccf70a342f2760f2725e6418c88042841d60805 --- adaptor/PCM-CORE/model/hpcmodel_gen.go | 2 +- adaptor/PCM-CORE/rpc/pb/pcmCore.proto | 8 ++--- adaptor/PCM-CORE/rpc/pcmCore/pcmCore.pb.go | 36 +++++++++---------- .../PCM-CORE/rpc/pcmCore/pcmCore_grpc.pb.go | 25 +++++++------ .../PCM-TH/rpc/internal/logic/cronlogic.go | 22 ++++++++++-- 5 files changed, 58 insertions(+), 35 deletions(-) diff --git a/adaptor/PCM-CORE/model/hpcmodel_gen.go b/adaptor/PCM-CORE/model/hpcmodel_gen.go index e88baf5f..12d68e0b 100644 --- a/adaptor/PCM-CORE/model/hpcmodel_gen.go +++ b/adaptor/PCM-CORE/model/hpcmodel_gen.go @@ -62,7 +62,7 @@ type ( allocCpu uint32 `db:"alloc_cpu"` version string `db:"version"` account string `db:"account"` - exitCode string `db:"exit_code"` + exitCode uint32 `db:"exit_code"` assocId uint32 `db:"assoc_id"` } ) diff --git a/adaptor/PCM-CORE/rpc/pb/pcmCore.proto b/adaptor/PCM-CORE/rpc/pb/pcmCore.proto index 07477905..5bdb0e8f 100644 --- a/adaptor/PCM-CORE/rpc/pb/pcmCore.proto +++ b/adaptor/PCM-CORE/rpc/pb/pcmCore.proto @@ -57,12 +57,12 @@ message HpcInfo { string derivedEs = 12; string cluster =13; string blockId = 14; - string allocNodes = 15; - string allocCpu =16; + uint32 allocNodes = 15; + uint32 allocCpu =16; string version = 17; string account =18; - string exitCode =19; - string assocId = 20; + uint32 exitCode =19; + uint32 assocId = 20; } message SyncInfoResp{ diff --git a/adaptor/PCM-CORE/rpc/pcmCore/pcmCore.pb.go b/adaptor/PCM-CORE/rpc/pcmCore/pcmCore.pb.go index 1986dfcb..7368f793 100644 --- a/adaptor/PCM-CORE/rpc/pcmCore/pcmCore.pb.go +++ b/adaptor/PCM-CORE/rpc/pcmCore/pcmCore.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.10 +// protoc-gen-go v1.30.0 +// protoc v3.19.4 // source: pcmCore.proto package pcmCore @@ -388,12 +388,12 @@ type HpcInfo struct { DerivedEs string `protobuf:"bytes,12,opt,name=derivedEs,proto3" json:"derivedEs,omitempty"` Cluster string `protobuf:"bytes,13,opt,name=cluster,proto3" json:"cluster,omitempty"` BlockId string `protobuf:"bytes,14,opt,name=blockId,proto3" json:"blockId,omitempty"` - AllocNodes string `protobuf:"bytes,15,opt,name=allocNodes,proto3" json:"allocNodes,omitempty"` - AllocCpu string `protobuf:"bytes,16,opt,name=allocCpu,proto3" json:"allocCpu,omitempty"` + AllocNodes uint32 `protobuf:"varint,15,opt,name=allocNodes,proto3" json:"allocNodes,omitempty"` + AllocCpu uint32 `protobuf:"varint,16,opt,name=allocCpu,proto3" json:"allocCpu,omitempty"` Version string `protobuf:"bytes,17,opt,name=version,proto3" json:"version,omitempty"` Account string `protobuf:"bytes,18,opt,name=account,proto3" json:"account,omitempty"` - ExitCode string `protobuf:"bytes,19,opt,name=exitCode,proto3" json:"exitCode,omitempty"` - AssocId string `protobuf:"bytes,20,opt,name=assocId,proto3" json:"assocId,omitempty"` + ExitCode uint32 `protobuf:"varint,19,opt,name=exitCode,proto3" json:"exitCode,omitempty"` + AssocId uint32 `protobuf:"varint,20,opt,name=assocId,proto3" json:"assocId,omitempty"` } func (x *HpcInfo) Reset() { @@ -526,18 +526,18 @@ func (x *HpcInfo) GetBlockId() string { return "" } -func (x *HpcInfo) GetAllocNodes() string { +func (x *HpcInfo) GetAllocNodes() uint32 { if x != nil { return x.AllocNodes } - return "" + return 0 } -func (x *HpcInfo) GetAllocCpu() string { +func (x *HpcInfo) GetAllocCpu() uint32 { if x != nil { return x.AllocCpu } - return "" + return 0 } func (x *HpcInfo) GetVersion() string { @@ -554,18 +554,18 @@ func (x *HpcInfo) GetAccount() string { return "" } -func (x *HpcInfo) GetExitCode() string { +func (x *HpcInfo) GetExitCode() uint32 { if x != nil { return x.ExitCode } - return "" + return 0 } -func (x *HpcInfo) GetAssocId() string { +func (x *HpcInfo) GetAssocId() uint32 { if x != nil { return x.AssocId } - return "" + return 0 } type SyncInfoResp struct { @@ -828,15 +828,15 @@ var file_pcmCore_proto_rawDesc = []byte{ 0x52, 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x18, 0x0a, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x4e, 0x6f, 0x64, 0x65, - 0x73, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x4e, 0x6f, + 0x73, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x4e, 0x6f, 0x64, 0x65, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x43, 0x70, 0x75, 0x18, - 0x10, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x43, 0x70, 0x75, 0x12, + 0x10, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x43, 0x70, 0x75, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x11, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x12, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, 0x18, - 0x13, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, 0x12, - 0x18, 0x0a, 0x07, 0x61, 0x73, 0x73, 0x6f, 0x63, 0x49, 0x64, 0x18, 0x14, 0x20, 0x01, 0x28, 0x09, + 0x13, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, 0x12, + 0x18, 0x0a, 0x07, 0x61, 0x73, 0x73, 0x6f, 0x63, 0x49, 0x64, 0x18, 0x14, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x61, 0x73, 0x73, 0x6f, 0x63, 0x49, 0x64, 0x22, 0x34, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, diff --git a/adaptor/PCM-CORE/rpc/pcmCore/pcmCore_grpc.pb.go b/adaptor/PCM-CORE/rpc/pcmCore/pcmCore_grpc.pb.go index e8d9a687..db9fcae8 100644 --- a/adaptor/PCM-CORE/rpc/pcmCore/pcmCore_grpc.pb.go +++ b/adaptor/PCM-CORE/rpc/pcmCore/pcmCore_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.10 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v3.19.4 // source: pcmCore.proto package pcmCore @@ -18,13 +18,18 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + PcmCore_SyncInfo_FullMethodName = "/pcmCore.pcmCore/SyncInfo" + PcmCore_InfoList_FullMethodName = "/pcmCore.pcmCore/InfoList" +) + // PcmCoreClient is the client API for PcmCore service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type PcmCoreClient interface { - // SyncInfo Synchronous data information + //SyncInfo Synchronous data information SyncInfo(ctx context.Context, in *SyncInfoReq, opts ...grpc.CallOption) (*SyncInfoResp, error) - // InfoList + //InfoList InfoList(ctx context.Context, in *InfoListReq, opts ...grpc.CallOption) (*InfoListResp, error) } @@ -38,7 +43,7 @@ func NewPcmCoreClient(cc grpc.ClientConnInterface) PcmCoreClient { func (c *pcmCoreClient) SyncInfo(ctx context.Context, in *SyncInfoReq, opts ...grpc.CallOption) (*SyncInfoResp, error) { out := new(SyncInfoResp) - err := c.cc.Invoke(ctx, "/pcmCore.pcmCore/SyncInfo", in, out, opts...) + err := c.cc.Invoke(ctx, PcmCore_SyncInfo_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -47,7 +52,7 @@ func (c *pcmCoreClient) SyncInfo(ctx context.Context, in *SyncInfoReq, opts ...g func (c *pcmCoreClient) InfoList(ctx context.Context, in *InfoListReq, opts ...grpc.CallOption) (*InfoListResp, error) { out := new(InfoListResp) - err := c.cc.Invoke(ctx, "/pcmCore.pcmCore/InfoList", in, out, opts...) + err := c.cc.Invoke(ctx, PcmCore_InfoList_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -58,9 +63,9 @@ func (c *pcmCoreClient) InfoList(ctx context.Context, in *InfoListReq, opts ...g // All implementations must embed UnimplementedPcmCoreServer // for forward compatibility type PcmCoreServer interface { - // SyncInfo Synchronous data information + //SyncInfo Synchronous data information SyncInfo(context.Context, *SyncInfoReq) (*SyncInfoResp, error) - // InfoList + //InfoList InfoList(context.Context, *InfoListReq) (*InfoListResp, error) mustEmbedUnimplementedPcmCoreServer() } @@ -98,7 +103,7 @@ func _PcmCore_SyncInfo_Handler(srv interface{}, ctx context.Context, dec func(in } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/pcmCore.pcmCore/SyncInfo", + FullMethod: PcmCore_SyncInfo_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(PcmCoreServer).SyncInfo(ctx, req.(*SyncInfoReq)) @@ -116,7 +121,7 @@ func _PcmCore_InfoList_Handler(srv interface{}, ctx context.Context, dec func(in } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/pcmCore.pcmCore/InfoList", + FullMethod: PcmCore_InfoList_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(PcmCoreServer).InfoList(ctx, req.(*InfoListReq)) diff --git a/adaptor/PCM-HPC/PCM-TH/rpc/internal/logic/cronlogic.go b/adaptor/PCM-HPC/PCM-TH/rpc/internal/logic/cronlogic.go index 917e5b63..3da27087 100644 --- a/adaptor/PCM-HPC/PCM-TH/rpc/internal/logic/cronlogic.go +++ b/adaptor/PCM-HPC/PCM-TH/rpc/internal/logic/cronlogic.go @@ -16,7 +16,9 @@ func InitCron(svc *svc.ServiceContext) { svc.Cron.Start() submitJobLogic := NewSubmitJobLogic(context.Background(), svc) listLogic := NewListJobLogic(context.Background(), svc) + historyListLogic := NewListHistoryJobLogic(context.Background(), svc) svc.Cron.AddFunc("*/5 * * * * ?", func() { + // 查询core端分发下来的任务列表 infoReq := pcmcoreclient.InfoListReq{ Kind: "hpc", @@ -28,7 +30,7 @@ func InitCron(svc *svc.ServiceContext) { return } // 提交任务 - submitJob(infoList, submitJobLogic) + submitJob(infoList, submitJobLogic, historyListLogic) // 查询运行中的任务列表同步信息 listReq := hpcTH.ListJobReq{} listJob, err := listLogic.ListJob(&listReq) @@ -59,7 +61,7 @@ func InitCron(svc *svc.ServiceContext) { }) } -func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *SubmitJobLogic) { +func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *SubmitJobLogic, historyListLogic *ListHistoryJobLogic) { for index, _ := range infoList.HpcInfoList { if infoList.HpcInfoList[index].Status == "Saved" { submitReq := hpcTH.SubmitJobReq{ @@ -69,6 +71,22 @@ func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *SubmitJobLo UserId: 123, } jobResult, _ := submitJobLogic.SubmitJob(&submitReq) + // 任务失败 + if string(jobResult.SubmitResponseMsg[0].ErrorCode) != "" { + infoList.HpcInfoList[index].Status = "Failed" + infoList.HpcInfoList[index].ExitCode = jobResult.SubmitResponseMsg[0].ErrorCode + // 查询失败的任务信息同步到core端 + historyResult, err := historyListLogic.ListHistoryJob(&hpcTH.ListHistoryJobReq{}) + if err != nil { + return + } + for _, historyJob := range historyResult.HistoryJobs { + if infoList.HpcInfoList[index].Name == historyJob.Jobname { + tool.Convert(historyJob, infoList.HpcInfoList[index]) + } + } + } + // 任务提交成功 infoList.HpcInfoList[index].Status = "Pending" infoList.HpcInfoList[index].JobId = strconv.Itoa(int(jobResult.SubmitResponseMsg[0].JobId)) }