C端状态与P端同步,slurm任务下发

Former-commit-id: 2c56af84c84bc9b82ba5430f3257e983ab486c2b
This commit is contained in:
zhouqunjie 2023-11-07 20:00:04 +08:00
parent 5aabbbdf2a
commit 4e98b1dc08
9 changed files with 49 additions and 56 deletions

View File

@ -19,7 +19,7 @@ func SyncParticipantRpc(svc *svc.ServiceContext) {
// 初始化p端rpc客户端
if len(participant.RpcAddress) != 0 && svc.K8sRpc[participant.Id] == nil {
switch participant.Type {
case constants.Kubernetes:
case constants.CLOUD:
svc.K8sRpc[participant.Id] = kubernetesclient.NewKubernetes(zrpc.MustNewClient(zrpc.RpcClientConf{
Endpoints: []string{participant.RpcAddress},
NonBlock: true,

View File

@ -1,6 +1,15 @@
package constants
const (
Kubernetes = "Kubernetes"
Slurm = "Slurm"
HPC = "HPC"
AI = "AI"
CLOUD = "CLOUD"
SLURM = "SLURM"
AC = "AC"
OCTOPUS = "OCTOPUS"
MODELARTS = "MODELARTS"
KUBERNETES = "KUBERNETES"
TKE = "TKE"
OPENSTACK = "OPENSTACK"
TSTACK = "TSTACK"
)

View File

@ -1,7 +1,7 @@
// Code generated by goctl. DO NOT EDIT.
// Source: pcmCore.proto
package participantservice
package client
import (
"context"

View File

@ -1,7 +1,7 @@
// Code generated by goctl. DO NOT EDIT.
// Source: pcmCore.proto
package pcmcore
package client
import (
"context"

View File

@ -6,8 +6,8 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/pcmcore"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore"
"gorm.io/gorm"
)
@ -26,25 +26,25 @@ func NewInfoListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *InfoList
}
// InfoList p端根据id查询任务列表
func (l *InfoListLogic) InfoList(in *pcmcore.InfoListReq) (*pcmcore.InfoListResp, error) {
result := pcmcore.InfoListResp{}
func (l *InfoListLogic) InfoList(in *pcmCore.InfoListReq) (*pcmCore.InfoListResp, error) {
result := pcmCore.InfoListResp{}
// 查询p端类型
var kind string
l.svcCtx.DbEngin.Raw("select type as kind from sc_participant_phy_info where id = ?", in.ParticipantId).Scan(&kind)
// 查询云智超中的数据列表
switch kind {
case "hpc":
case constants.HPC:
var hpcModelList []models.Hpc
findModelList(in.ParticipantId, l.svcCtx.DbEngin, &hpcModelList)
utils.Convert(hpcModelList, &result.CloudInfoList)
case constants.Kubernetes:
utils.Convert(hpcModelList, &result.HpcInfoList)
case constants.CLOUD:
var cloudModelList []models.Cloud
findModelList(in.ParticipantId, l.svcCtx.DbEngin, &cloudModelList)
utils.Convert(cloudModelList, &result.CloudInfoList)
case "ai":
case constants.AI:
var aiModelList []models.Ai
findModelList(in.ParticipantId, l.svcCtx.DbEngin, &aiModelList)
utils.Convert(aiModelList, &result.CloudInfoList)
utils.Convert(aiModelList, &result.AiInfoList)
}
return &result, nil
}

View File

@ -38,17 +38,17 @@ func (l *SyncInfoLogic) SyncInfo(in *pcmCore.SyncInfoReq) (*pcmCore.SyncInfoResp
var kind string
l.svcCtx.DbEngin.Raw("select type as kind from sc_participant_phy_info where id = ?", in.ParticipantId).Scan(&kind)
switch kind {
case constants.Kubernetes:
case constants.CLOUD:
for _, cloudInfo := range in.CloudInfoList {
l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,running_time = ?,result = ? where participant_id = ? and task_id = ? and namespace = ? and name = ?",
cloudInfo.Status, cloudInfo.StartTime, cloudInfo.RunningTime, cloudInfo.Result, in.ParticipantId, cloudInfo.TaskId, cloudInfo.Namespace, cloudInfo.Name)
}
case "hpc":
case constants.HPC:
for _, hpcInfo := range in.HpcInfoList {
l.svcCtx.DbEngin.Exec("update hpc set status = ?,start_time = ?,running_time = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, in.ParticipantId, hpcInfo.TaskId, hpcInfo.Name)
}
case "ai":
case constants.AI:
for _, aiInfo := range in.AiInfoList {
l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,running_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?",
aiInfo.Status, aiInfo.StartTime, aiInfo.RunningTime, aiInfo.ProjectId, aiInfo.JobId, in.ParticipantId, aiInfo.TaskId, aiInfo.Name)

View File

@ -144,7 +144,7 @@ message ParticipantPhyReq {
string storageSpace = 7; //
string storageAvailSpace = 8; //
string storageBandwidth = 9; //
string type = 10; // :0-1-2-
string type = 10; // :CLOUD-;AI-HPC-
int64 tenantId = 11; // id
string tenantName = 12; //
repeated NodePhyInfo nodeInfo = 13; //

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc-gen-go v1.31.0
// protoc v3.19.4
// source: pcmCore.proto
@ -1162,7 +1162,7 @@ type ParticipantPhyReq struct {
StorageSpace string `protobuf:"bytes,7,opt,name=storageSpace,proto3" json:"storageSpace,omitempty"` // 集群存储空间
StorageAvailSpace string `protobuf:"bytes,8,opt,name=storageAvailSpace,proto3" json:"storageAvailSpace,omitempty"` // 集群存储可用空间
StorageBandwidth string `protobuf:"bytes,9,opt,name=storageBandwidth,proto3" json:"storageBandwidth,omitempty"` // 集群存储带宽
Type string `protobuf:"bytes,10,opt,name=type,proto3" json:"type,omitempty"` // 参与者类型:0-数算集群1-智算集群2-超算集群
Type string `protobuf:"bytes,10,opt,name=type,proto3" json:"type,omitempty"` // 参与者类型:CLOUD-数算集群;AI-智算集群HPC-超算集群
TenantId int64 `protobuf:"varint,11,opt,name=tenantId,proto3" json:"tenantId,omitempty"` // 租户id
TenantName string `protobuf:"bytes,12,opt,name=tenantName,proto3" json:"tenantName,omitempty"` // 租户名称
NodeInfo []*NodePhyInfo `protobuf:"bytes,13,rep,name=nodeInfo,proto3" json:"nodeInfo,omitempty"` // 节点信息

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: pcmCore.proto
@ -18,11 +18,6 @@ import (
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
const (
PcmCore_SyncInfo_FullMethodName = "/pcmCore.pcmCore/SyncInfo"
PcmCore_InfoList_FullMethodName = "/pcmCore.pcmCore/InfoList"
)
// PcmCoreClient is the client API for PcmCore service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
@ -43,7 +38,7 @@ func NewPcmCoreClient(cc grpc.ClientConnInterface) PcmCoreClient {
func (c *pcmCoreClient) SyncInfo(ctx context.Context, in *SyncInfoReq, opts ...grpc.CallOption) (*SyncInfoResp, error) {
out := new(SyncInfoResp)
err := c.cc.Invoke(ctx, PcmCore_SyncInfo_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.pcmCore/SyncInfo", in, out, opts...)
if err != nil {
return nil, err
}
@ -52,7 +47,7 @@ func (c *pcmCoreClient) SyncInfo(ctx context.Context, in *SyncInfoReq, opts ...g
func (c *pcmCoreClient) InfoList(ctx context.Context, in *InfoListReq, opts ...grpc.CallOption) (*InfoListResp, error) {
out := new(InfoListResp)
err := c.cc.Invoke(ctx, PcmCore_InfoList_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.pcmCore/InfoList", in, out, opts...)
if err != nil {
return nil, err
}
@ -103,7 +98,7 @@ func _PcmCore_SyncInfo_Handler(srv interface{}, ctx context.Context, dec func(in
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: PcmCore_SyncInfo_FullMethodName,
FullMethod: "/pcmCore.pcmCore/SyncInfo",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PcmCoreServer).SyncInfo(ctx, req.(*SyncInfoReq))
@ -121,7 +116,7 @@ func _PcmCore_InfoList_Handler(srv interface{}, ctx context.Context, dec func(in
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: PcmCore_InfoList_FullMethodName,
FullMethod: "/pcmCore.pcmCore/InfoList",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PcmCoreServer).InfoList(ctx, req.(*InfoListReq))
@ -149,17 +144,6 @@ var PcmCore_ServiceDesc = grpc.ServiceDesc{
Metadata: "pcmCore.proto",
}
const (
ParticipantService_RegisterParticipant_FullMethodName = "/pcmCore.participantService/registerParticipant"
ParticipantService_ReportHeartbeat_FullMethodName = "/pcmCore.participantService/reportHeartbeat"
ParticipantService_ReportAvailable_FullMethodName = "/pcmCore.participantService/reportAvailable"
ParticipantService_ListParticipant_FullMethodName = "/pcmCore.participantService/listParticipant"
ParticipantService_ListPhyAvailable_FullMethodName = "/pcmCore.participantService/listPhyAvailable"
ParticipantService_ListPhyInformation_FullMethodName = "/pcmCore.participantService/listPhyInformation"
ParticipantService_RegisterTenant_FullMethodName = "/pcmCore.participantService/registerTenant"
ParticipantService_ListTenant_FullMethodName = "/pcmCore.participantService/listTenant"
)
// ParticipantServiceClient is the client API for ParticipantService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
@ -192,7 +176,7 @@ func NewParticipantServiceClient(cc grpc.ClientConnInterface) ParticipantService
func (c *participantServiceClient) RegisterParticipant(ctx context.Context, in *ParticipantPhyReq, opts ...grpc.CallOption) (*ParticipantPhyResp, error) {
out := new(ParticipantPhyResp)
err := c.cc.Invoke(ctx, ParticipantService_RegisterParticipant_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.participantService/registerParticipant", in, out, opts...)
if err != nil {
return nil, err
}
@ -201,7 +185,7 @@ func (c *participantServiceClient) RegisterParticipant(ctx context.Context, in *
func (c *participantServiceClient) ReportHeartbeat(ctx context.Context, in *ParticipantHeartbeatReq, opts ...grpc.CallOption) (*HealthCheckResp, error) {
out := new(HealthCheckResp)
err := c.cc.Invoke(ctx, ParticipantService_ReportHeartbeat_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.participantService/reportHeartbeat", in, out, opts...)
if err != nil {
return nil, err
}
@ -210,7 +194,7 @@ func (c *participantServiceClient) ReportHeartbeat(ctx context.Context, in *Part
func (c *participantServiceClient) ReportAvailable(ctx context.Context, in *ParticipantAvailReq, opts ...grpc.CallOption) (*ParticipantResp, error) {
out := new(ParticipantResp)
err := c.cc.Invoke(ctx, ParticipantService_ReportAvailable_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.participantService/reportAvailable", in, out, opts...)
if err != nil {
return nil, err
}
@ -219,7 +203,7 @@ func (c *participantServiceClient) ReportAvailable(ctx context.Context, in *Part
func (c *participantServiceClient) ListParticipant(ctx context.Context, in *ParticipantTenant, opts ...grpc.CallOption) (*ParticipantServiceResp, error) {
out := new(ParticipantServiceResp)
err := c.cc.Invoke(ctx, ParticipantService_ListParticipant_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.participantService/listParticipant", in, out, opts...)
if err != nil {
return nil, err
}
@ -228,7 +212,7 @@ func (c *participantServiceClient) ListParticipant(ctx context.Context, in *Part
func (c *participantServiceClient) ListPhyAvailable(ctx context.Context, in *ParticipantTenant, opts ...grpc.CallOption) (*ListParticipantAvailResp, error) {
out := new(ListParticipantAvailResp)
err := c.cc.Invoke(ctx, ParticipantService_ListPhyAvailable_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.participantService/listPhyAvailable", in, out, opts...)
if err != nil {
return nil, err
}
@ -237,7 +221,7 @@ func (c *participantServiceClient) ListPhyAvailable(ctx context.Context, in *Par
func (c *participantServiceClient) ListPhyInformation(ctx context.Context, in *ParticipantTenant, opts ...grpc.CallOption) (*ListParticipantPhyResp, error) {
out := new(ListParticipantPhyResp)
err := c.cc.Invoke(ctx, ParticipantService_ListPhyInformation_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.participantService/listPhyInformation", in, out, opts...)
if err != nil {
return nil, err
}
@ -246,7 +230,7 @@ func (c *participantServiceClient) ListPhyInformation(ctx context.Context, in *P
func (c *participantServiceClient) RegisterTenant(ctx context.Context, in *TenantInfo, opts ...grpc.CallOption) (*TenantResp, error) {
out := new(TenantResp)
err := c.cc.Invoke(ctx, ParticipantService_RegisterTenant_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.participantService/registerTenant", in, out, opts...)
if err != nil {
return nil, err
}
@ -255,7 +239,7 @@ func (c *participantServiceClient) RegisterTenant(ctx context.Context, in *Tenan
func (c *participantServiceClient) ListTenant(ctx context.Context, in *TenantInfo, opts ...grpc.CallOption) (*ListTenantResp, error) {
out := new(ListTenantResp)
err := c.cc.Invoke(ctx, ParticipantService_ListTenant_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, "/pcmCore.participantService/listTenant", in, out, opts...)
if err != nil {
return nil, err
}
@ -336,7 +320,7 @@ func _ParticipantService_RegisterParticipant_Handler(srv interface{}, ctx contex
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ParticipantService_RegisterParticipant_FullMethodName,
FullMethod: "/pcmCore.participantService/registerParticipant",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ParticipantServiceServer).RegisterParticipant(ctx, req.(*ParticipantPhyReq))
@ -354,7 +338,7 @@ func _ParticipantService_ReportHeartbeat_Handler(srv interface{}, ctx context.Co
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ParticipantService_ReportHeartbeat_FullMethodName,
FullMethod: "/pcmCore.participantService/reportHeartbeat",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ParticipantServiceServer).ReportHeartbeat(ctx, req.(*ParticipantHeartbeatReq))
@ -372,7 +356,7 @@ func _ParticipantService_ReportAvailable_Handler(srv interface{}, ctx context.Co
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ParticipantService_ReportAvailable_FullMethodName,
FullMethod: "/pcmCore.participantService/reportAvailable",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ParticipantServiceServer).ReportAvailable(ctx, req.(*ParticipantAvailReq))
@ -390,7 +374,7 @@ func _ParticipantService_ListParticipant_Handler(srv interface{}, ctx context.Co
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ParticipantService_ListParticipant_FullMethodName,
FullMethod: "/pcmCore.participantService/listParticipant",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ParticipantServiceServer).ListParticipant(ctx, req.(*ParticipantTenant))
@ -408,7 +392,7 @@ func _ParticipantService_ListPhyAvailable_Handler(srv interface{}, ctx context.C
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ParticipantService_ListPhyAvailable_FullMethodName,
FullMethod: "/pcmCore.participantService/listPhyAvailable",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ParticipantServiceServer).ListPhyAvailable(ctx, req.(*ParticipantTenant))
@ -426,7 +410,7 @@ func _ParticipantService_ListPhyInformation_Handler(srv interface{}, ctx context
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ParticipantService_ListPhyInformation_FullMethodName,
FullMethod: "/pcmCore.participantService/listPhyInformation",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ParticipantServiceServer).ListPhyInformation(ctx, req.(*ParticipantTenant))
@ -444,7 +428,7 @@ func _ParticipantService_RegisterTenant_Handler(srv interface{}, ctx context.Con
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ParticipantService_RegisterTenant_FullMethodName,
FullMethod: "/pcmCore.participantService/registerTenant",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ParticipantServiceServer).RegisterTenant(ctx, req.(*TenantInfo))
@ -462,7 +446,7 @@ func _ParticipantService_ListTenant_Handler(srv interface{}, ctx context.Context
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ParticipantService_ListTenant_FullMethodName,
FullMethod: "/pcmCore.participantService/listTenant",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ParticipantServiceServer).ListTenant(ctx, req.(*TenantInfo))