✨ feat reportAvailable impl
Signed-off-by: devad <cossjie@foxmail.com> Former-commit-id: 82285d1dba06aca04d60858ee28b1a0cf6d21896
This commit is contained in:
parent
9c151e7d80
commit
c6b9b99081
|
@ -39,22 +39,22 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
ScNodeAvailInfo struct {
|
ScNodeAvailInfo struct {
|
||||||
Id int64 `db:"id"` // id
|
Id int64 `db:"id"` // id
|
||||||
NodeName string `db:"node_name"` // 节点名称
|
NodeName string `db:"node_name"` // 节点名称
|
||||||
CpuTotal int64 `db:"cpu_total"` // cpu核数
|
CpuTotal int64 `db:"cpu_total"` // cpu核数
|
||||||
CpuUsable float64 `db:"cpu_usable"` // cpu可用率
|
CpuUsable float64 `db:"cpu_usable"` // cpu可用率
|
||||||
DiskTotal int64 `db:"disk_total"` // 磁盘空间
|
DiskTotal int64 `db:"disk_total"` // 磁盘空间
|
||||||
DiskAvail int64 `db:"disk_avail"` // 磁盘可用空间
|
DiskAvail int64 `db:"disk_avail"` // 磁盘可用空间
|
||||||
MemTotal int64 `db:"mem_total"` // 内存总数
|
MemTotal int64 `db:"mem_total"` // 内存总数
|
||||||
MemAvail int64 `db:"mem_avail"` // 内存可用数
|
MemAvail int64 `db:"mem_avail"` // 内存可用数
|
||||||
GpuTotal int64 `db:"gpu_total"` // gpu总数
|
GpuTotal int64 `db:"gpu_total"` // gpu总数
|
||||||
GpuAvail int64 `db:"gpu_avail"` // gpu可用数
|
GpuAvail int64 `db:"gpu_avail"` // gpu可用数
|
||||||
ParticipantId int64 `db:"participant_id"` // 集群静态信息id
|
ParticipantAvailId int64 `db:"participant_avail_id"` // 集群动态信息id
|
||||||
DeletedFlag int64 `db:"deleted_flag"` // 是否删除
|
DeletedFlag int64 `db:"deleted_flag"` // 是否删除
|
||||||
CreatedBy sql.NullInt64 `db:"created_by"` // 创建人
|
CreatedBy sql.NullInt64 `db:"created_by"` // 创建人
|
||||||
CreatedTime time.Time `db:"created_time"` // 创建时间
|
CreatedTime time.Time `db:"created_time"` // 创建时间
|
||||||
UpdatedBy sql.NullInt64 `db:"updated_by"` // 更新人
|
UpdatedBy sql.NullInt64 `db:"updated_by"` // 更新人
|
||||||
UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间
|
UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ func (m *defaultScNodeAvailInfoModel) Insert(ctx context.Context, data *ScNodeAv
|
||||||
pcmScNodeAvailInfoIdKey := fmt.Sprintf("%s%v", cachePcmScNodeAvailInfoIdPrefix, data.Id)
|
pcmScNodeAvailInfoIdKey := fmt.Sprintf("%s%v", cachePcmScNodeAvailInfoIdPrefix, data.Id)
|
||||||
ret, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
|
ret, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
|
||||||
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, scNodeAvailInfoRowsExpectAutoSet)
|
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, scNodeAvailInfoRowsExpectAutoSet)
|
||||||
return conn.ExecCtx(ctx, query, data.Id, data.NodeName, data.CpuTotal, data.CpuUsable, data.DiskTotal, data.DiskAvail, data.MemTotal, data.MemAvail, data.GpuTotal, data.GpuAvail, data.ParticipantId, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime)
|
return conn.ExecCtx(ctx, query, data.Id, data.NodeName, data.CpuTotal, data.CpuUsable, data.DiskTotal, data.DiskAvail, data.MemTotal, data.MemAvail, data.GpuTotal, data.GpuAvail, data.ParticipantAvailId, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime)
|
||||||
}, pcmScNodeAvailInfoIdKey)
|
}, pcmScNodeAvailInfoIdKey)
|
||||||
return ret, err
|
return ret, err
|
||||||
}
|
}
|
||||||
|
@ -111,7 +111,7 @@ func (m *defaultScNodeAvailInfoModel) Update(ctx context.Context, data *ScNodeAv
|
||||||
pcmScNodeAvailInfoIdKey := fmt.Sprintf("%s%v", cachePcmScNodeAvailInfoIdPrefix, data.Id)
|
pcmScNodeAvailInfoIdKey := fmt.Sprintf("%s%v", cachePcmScNodeAvailInfoIdPrefix, data.Id)
|
||||||
_, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
|
_, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
|
||||||
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, scNodeAvailInfoRowsWithPlaceHolder)
|
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, scNodeAvailInfoRowsWithPlaceHolder)
|
||||||
return conn.ExecCtx(ctx, query, data.NodeName, data.CpuTotal, data.CpuUsable, data.DiskTotal, data.DiskAvail, data.MemTotal, data.MemAvail, data.GpuTotal, data.GpuAvail, data.ParticipantId, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.Id)
|
return conn.ExecCtx(ctx, query, data.NodeName, data.CpuTotal, data.CpuUsable, data.DiskTotal, data.DiskAvail, data.MemTotal, data.MemAvail, data.GpuTotal, data.GpuAvail, data.ParticipantAvailId, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.Id)
|
||||||
}, pcmScNodeAvailInfoIdKey)
|
}, pcmScNodeAvailInfoIdKey)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,19 +39,17 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
ScParticipantAvailInfo struct {
|
ScParticipantAvailInfo struct {
|
||||||
Id int64 `db:"id"` // id
|
Id int64 `db:"id"` // id
|
||||||
Host sql.NullString `db:"host"` // 集群p端host
|
AvailStorageSpace int64 `db:"avail_storage_space"` // 集群存储可用空间
|
||||||
Port sql.NullString `db:"port"` // 集群p端端口
|
UserNum int64 `db:"user_num"` // 用户数量
|
||||||
AvailStorageSpace sql.NullInt64 `db:"avail_storage_space"` // 集群存储可用空间
|
PendingJobNum int64 `db:"pending_job_num"` // 待处理作业数量
|
||||||
UserNum sql.NullInt64 `db:"user_num"` // 用户数量
|
RunningJobNum int64 `db:"running_job_num"` // 运行作业数量
|
||||||
PendingJobNum sql.NullInt64 `db:"pending_job_num"` // 待处理作业数量
|
ParticipantId int64 `db:"participant_id"` // 集群静态信息id
|
||||||
RunningJobNum int64 `db:"running_job_num"` // 运行作业数量
|
DeletedFlag int64 `db:"deleted_flag"` // 是否删除
|
||||||
ParticipantId int64 `db:"participant_id"` // 集群静态信息id
|
CreatedBy sql.NullInt64 `db:"created_by"` // 创建人
|
||||||
DeletedFlag int64 `db:"deleted_flag"` // 是否删除
|
CreatedTime time.Time `db:"created_time"` // 创建时间
|
||||||
CreatedBy sql.NullInt64 `db:"created_by"` // 创建人
|
UpdatedBy sql.NullInt64 `db:"updated_by"` // 更新人
|
||||||
CreatedTime time.Time `db:"created_time"` // 创建时间
|
UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间
|
||||||
UpdatedBy sql.NullInt64 `db:"updated_by"` // 更新人
|
|
||||||
UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -98,8 +96,8 @@ func (m *defaultScParticipantAvailInfoModel) FindOne(ctx context.Context, id int
|
||||||
func (m *defaultScParticipantAvailInfoModel) Insert(ctx context.Context, data *ScParticipantAvailInfo) (sql.Result, error) {
|
func (m *defaultScParticipantAvailInfoModel) Insert(ctx context.Context, data *ScParticipantAvailInfo) (sql.Result, error) {
|
||||||
pcmScParticipantAvailInfoIdKey := fmt.Sprintf("%s%v", cachePcmScParticipantAvailInfoIdPrefix, data.Id)
|
pcmScParticipantAvailInfoIdKey := fmt.Sprintf("%s%v", cachePcmScParticipantAvailInfoIdPrefix, data.Id)
|
||||||
ret, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
|
ret, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
|
||||||
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, scParticipantAvailInfoRowsExpectAutoSet)
|
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, scParticipantAvailInfoRowsExpectAutoSet)
|
||||||
return conn.ExecCtx(ctx, query, data.Id, data.Host, data.Port, data.AvailStorageSpace, data.UserNum, data.PendingJobNum, data.RunningJobNum, data.ParticipantId, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime)
|
return conn.ExecCtx(ctx, query, data.Id, data.AvailStorageSpace, data.UserNum, data.PendingJobNum, data.RunningJobNum, data.ParticipantId, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime)
|
||||||
}, pcmScParticipantAvailInfoIdKey)
|
}, pcmScParticipantAvailInfoIdKey)
|
||||||
return ret, err
|
return ret, err
|
||||||
}
|
}
|
||||||
|
@ -108,7 +106,7 @@ func (m *defaultScParticipantAvailInfoModel) Update(ctx context.Context, data *S
|
||||||
pcmScParticipantAvailInfoIdKey := fmt.Sprintf("%s%v", cachePcmScParticipantAvailInfoIdPrefix, data.Id)
|
pcmScParticipantAvailInfoIdKey := fmt.Sprintf("%s%v", cachePcmScParticipantAvailInfoIdPrefix, data.Id)
|
||||||
_, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
|
_, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
|
||||||
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, scParticipantAvailInfoRowsWithPlaceHolder)
|
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, scParticipantAvailInfoRowsWithPlaceHolder)
|
||||||
return conn.ExecCtx(ctx, query, data.Host, data.Port, data.AvailStorageSpace, data.UserNum, data.PendingJobNum, data.RunningJobNum, data.ParticipantId, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.Id)
|
return conn.ExecCtx(ctx, query, data.AvailStorageSpace, data.UserNum, data.PendingJobNum, data.RunningJobNum, data.ParticipantId, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.Id)
|
||||||
}, pcmScParticipantAvailInfoIdKey)
|
}, pcmScParticipantAvailInfoIdKey)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,11 +19,14 @@ type (
|
||||||
HpcInfo = pcmCore.HpcInfo
|
HpcInfo = pcmCore.HpcInfo
|
||||||
InfoListReq = pcmCore.InfoListReq
|
InfoListReq = pcmCore.InfoListReq
|
||||||
InfoListResp = pcmCore.InfoListResp
|
InfoListResp = pcmCore.InfoListResp
|
||||||
|
NodeAvailInfo = pcmCore.NodeAvailInfo
|
||||||
NodePhyInfo = pcmCore.NodePhyInfo
|
NodePhyInfo = pcmCore.NodePhyInfo
|
||||||
|
ParticipantAvailReq = pcmCore.ParticipantAvailReq
|
||||||
ParticipantHeartbeatReq = pcmCore.ParticipantHeartbeatReq
|
ParticipantHeartbeatReq = pcmCore.ParticipantHeartbeatReq
|
||||||
ParticipantLabel = pcmCore.ParticipantLabel
|
ParticipantLabel = pcmCore.ParticipantLabel
|
||||||
ParticipantPhyReq = pcmCore.ParticipantPhyReq
|
ParticipantPhyReq = pcmCore.ParticipantPhyReq
|
||||||
ParticipantPhyResp = pcmCore.ParticipantPhyResp
|
ParticipantPhyResp = pcmCore.ParticipantPhyResp
|
||||||
|
ParticipantResp = pcmCore.ParticipantResp
|
||||||
ParticipantTenant = pcmCore.ParticipantTenant
|
ParticipantTenant = pcmCore.ParticipantTenant
|
||||||
SyncInfoReq = pcmCore.SyncInfoReq
|
SyncInfoReq = pcmCore.SyncInfoReq
|
||||||
SyncInfoResp = pcmCore.SyncInfoResp
|
SyncInfoResp = pcmCore.SyncInfoResp
|
||||||
|
@ -31,8 +34,10 @@ type (
|
||||||
ParticipantService interface {
|
ParticipantService interface {
|
||||||
// registerParticipant Participant注册接口
|
// registerParticipant Participant注册接口
|
||||||
RegisterParticipant(ctx context.Context, in *ParticipantPhyReq, opts ...grpc.CallOption) (*ParticipantPhyResp, error)
|
RegisterParticipant(ctx context.Context, in *ParticipantPhyReq, opts ...grpc.CallOption) (*ParticipantPhyResp, error)
|
||||||
// 心跳
|
// reportHeartbeat 心跳请求
|
||||||
ReportHeartbeat(ctx context.Context, in *ParticipantHeartbeatReq, opts ...grpc.CallOption) (*HealthCheckResp, error)
|
ReportHeartbeat(ctx context.Context, in *ParticipantHeartbeatReq, opts ...grpc.CallOption) (*HealthCheckResp, error)
|
||||||
|
// reportAvailable 监控数据上报
|
||||||
|
ReportAvailable(ctx context.Context, in *ParticipantAvailReq, opts ...grpc.CallOption) (*ParticipantResp, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
defaultParticipantService struct {
|
defaultParticipantService struct {
|
||||||
|
@ -52,8 +57,14 @@ func (m *defaultParticipantService) RegisterParticipant(ctx context.Context, in
|
||||||
return client.RegisterParticipant(ctx, in, opts...)
|
return client.RegisterParticipant(ctx, in, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 心跳
|
// reportHeartbeat 心跳请求
|
||||||
func (m *defaultParticipantService) ReportHeartbeat(ctx context.Context, in *ParticipantHeartbeatReq, opts ...grpc.CallOption) (*HealthCheckResp, error) {
|
func (m *defaultParticipantService) ReportHeartbeat(ctx context.Context, in *ParticipantHeartbeatReq, opts ...grpc.CallOption) (*HealthCheckResp, error) {
|
||||||
client := pcmCore.NewParticipantServiceClient(m.cli.Conn())
|
client := pcmCore.NewParticipantServiceClient(m.cli.Conn())
|
||||||
return client.ReportHeartbeat(ctx, in, opts...)
|
return client.ReportHeartbeat(ctx, in, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reportAvailable 监控数据上报
|
||||||
|
func (m *defaultParticipantService) ReportAvailable(ctx context.Context, in *ParticipantAvailReq, opts ...grpc.CallOption) (*ParticipantResp, error) {
|
||||||
|
client := pcmCore.NewParticipantServiceClient(m.cli.Conn())
|
||||||
|
return client.ReportAvailable(ctx, in, opts...)
|
||||||
|
}
|
||||||
|
|
|
@ -19,11 +19,14 @@ type (
|
||||||
HpcInfo = pcmCore.HpcInfo
|
HpcInfo = pcmCore.HpcInfo
|
||||||
InfoListReq = pcmCore.InfoListReq
|
InfoListReq = pcmCore.InfoListReq
|
||||||
InfoListResp = pcmCore.InfoListResp
|
InfoListResp = pcmCore.InfoListResp
|
||||||
|
NodeAvailInfo = pcmCore.NodeAvailInfo
|
||||||
NodePhyInfo = pcmCore.NodePhyInfo
|
NodePhyInfo = pcmCore.NodePhyInfo
|
||||||
|
ParticipantAvailReq = pcmCore.ParticipantAvailReq
|
||||||
ParticipantHeartbeatReq = pcmCore.ParticipantHeartbeatReq
|
ParticipantHeartbeatReq = pcmCore.ParticipantHeartbeatReq
|
||||||
ParticipantLabel = pcmCore.ParticipantLabel
|
ParticipantLabel = pcmCore.ParticipantLabel
|
||||||
ParticipantPhyReq = pcmCore.ParticipantPhyReq
|
ParticipantPhyReq = pcmCore.ParticipantPhyReq
|
||||||
ParticipantPhyResp = pcmCore.ParticipantPhyResp
|
ParticipantPhyResp = pcmCore.ParticipantPhyResp
|
||||||
|
ParticipantResp = pcmCore.ParticipantResp
|
||||||
ParticipantTenant = pcmCore.ParticipantTenant
|
ParticipantTenant = pcmCore.ParticipantTenant
|
||||||
SyncInfoReq = pcmCore.SyncInfoReq
|
SyncInfoReq = pcmCore.SyncInfoReq
|
||||||
SyncInfoResp = pcmCore.SyncInfoResp
|
SyncInfoResp = pcmCore.SyncInfoResp
|
||||||
|
|
|
@ -46,11 +46,7 @@ func (l *RegisterParticipantLogic) RegisterParticipant(in *pcmCore.ParticipantPh
|
||||||
participantInfo := &model.ScParticipantPhyInfo{}
|
participantInfo := &model.ScParticipantPhyInfo{}
|
||||||
tool.Convert(in, participantInfo)
|
tool.Convert(in, participantInfo)
|
||||||
if in.ParticipantId == 0 {
|
if in.ParticipantId == 0 {
|
||||||
var err error
|
|
||||||
participantInfo.Id = tool.GenSnowflakeID()
|
participantInfo.Id = tool.GenSnowflakeID()
|
||||||
if err != nil {
|
|
||||||
logx.Errorf("生成id错误 err:", err)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
participantInfo.Id = in.ParticipantId
|
participantInfo.Id = in.ParticipantId
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
package participantservicelogic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
|
||||||
|
"gitlink.org.cn/jcce-pcm/utils/tool"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc"
|
||||||
|
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore"
|
||||||
|
|
||||||
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ReportAvailableLogic struct {
|
||||||
|
ctx context.Context
|
||||||
|
svcCtx *svc.ServiceContext
|
||||||
|
logx.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReportAvailableLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ReportAvailableLogic {
|
||||||
|
return &ReportAvailableLogic{
|
||||||
|
ctx: ctx,
|
||||||
|
svcCtx: svcCtx,
|
||||||
|
Logger: logx.WithContext(ctx),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReportAvailable 监控数据上报
|
||||||
|
func (l *ReportAvailableLogic) ReportAvailable(in *pcmCore.ParticipantAvailReq) (*pcmCore.ParticipantResp, error) {
|
||||||
|
db := l.svcCtx.DbEngin.Begin()
|
||||||
|
// 执行回滚或者提交操作
|
||||||
|
defer func() {
|
||||||
|
if p := recover(); p != nil {
|
||||||
|
db.Rollback()
|
||||||
|
logx.Error(p)
|
||||||
|
} else if db.Error != nil {
|
||||||
|
logx.Info("rollback")
|
||||||
|
db.Rollback()
|
||||||
|
} else {
|
||||||
|
db = db.Commit()
|
||||||
|
logx.Info("commit success")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
//判断Participant静态信息是否存在
|
||||||
|
participantPhyInfo := &model.ScParticipantPhyInfo{}
|
||||||
|
participantPhyInfo.Id = in.ParticipantId
|
||||||
|
if errors.Is(db.Take(&participantPhyInfo).Error, gorm.ErrRecordNotFound) {
|
||||||
|
fmt.Println("sdsdfsdf ")
|
||||||
|
return &pcmCore.ParticipantResp{
|
||||||
|
Code: 500,
|
||||||
|
Msg: fmt.Sprintf("ParticipantInfo Does not exist, please check participantPhyId: %d", in.ParticipantId),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
participantAvailInfo := &model.ScParticipantAvailInfo{}
|
||||||
|
tool.Convert(in, participantAvailInfo)
|
||||||
|
if in.Id == 0 {
|
||||||
|
participantAvailInfo.Id = tool.GenSnowflakeID()
|
||||||
|
}
|
||||||
|
participantAvailInfo.CreatedTime = time.Now()
|
||||||
|
//保存participant动态信息
|
||||||
|
result := db.Save(&participantAvailInfo)
|
||||||
|
//保存节点信息
|
||||||
|
nodeList := make([]*model.ScNodeAvailInfo, 0)
|
||||||
|
for _, info := range in.NodeAvailInfo {
|
||||||
|
nodeInfo := &model.ScNodeAvailInfo{}
|
||||||
|
tool.Convert(info, nodeInfo)
|
||||||
|
nodeInfo.CreatedTime = time.Now()
|
||||||
|
nodeInfo.ParticipantAvailId = participantAvailInfo.Id
|
||||||
|
if nodeInfo.Id == 0 {
|
||||||
|
nodeInfo.Id = tool.GenSnowflakeID()
|
||||||
|
}
|
||||||
|
nodeList = append(nodeList, nodeInfo)
|
||||||
|
}
|
||||||
|
result = db.Save(&nodeList)
|
||||||
|
if result.Error != nil {
|
||||||
|
logx.Errorf("orm err:", result.Error)
|
||||||
|
return &pcmCore.ParticipantResp{
|
||||||
|
Code: 500,
|
||||||
|
Msg: fmt.Sprintf("Save participantAvailInfo error %s", result.Error),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pcmCore.ParticipantResp{
|
||||||
|
Code: 200,
|
||||||
|
Msg: "ok",
|
||||||
|
}, nil
|
||||||
|
}
|
|
@ -2,7 +2,6 @@ package participantservicelogic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"github.com/zeromicro/go-zero/core/logx"
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc"
|
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc"
|
||||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore"
|
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore"
|
||||||
|
@ -26,7 +25,7 @@ func NewReportHeartbeatLogic(ctx context.Context, svcCtx *svc.ServiceContext) *R
|
||||||
func (l *ReportHeartbeatLogic) ReportHeartbeat(in *pcmCore.ParticipantHeartbeatReq) (*pcmCore.HealthCheckResp, error) {
|
func (l *ReportHeartbeatLogic) ReportHeartbeat(in *pcmCore.ParticipantHeartbeatReq) (*pcmCore.HealthCheckResp, error) {
|
||||||
SendHeartbeat(in.Host, in.Port, in.ParticipantId)
|
SendHeartbeat(in.Host, in.Port, in.ParticipantId)
|
||||||
for _, client := range ParticipantClients {
|
for _, client := range ParticipantClients {
|
||||||
fmt.Println("客户端id:", client.ParticipantID, " 客户端状态: ", client.ClientState, "客户端最后发送心跳检测时间: ", client.LastHeartbeat)
|
logx.Infof("客户端id:%d; 客户端状态:%s ;客户端最后发送心跳检测时间:%s .", client.ParticipantID, client.ClientState, client.LastHeartbeat)
|
||||||
}
|
}
|
||||||
return &pcmCore.HealthCheckResp{
|
return &pcmCore.HealthCheckResp{
|
||||||
Code: 200,
|
Code: 200,
|
||||||
|
|
|
@ -28,8 +28,14 @@ func (s *ParticipantServiceServer) RegisterParticipant(ctx context.Context, in *
|
||||||
return l.RegisterParticipant(in)
|
return l.RegisterParticipant(in)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 心跳
|
// reportHeartbeat 心跳请求
|
||||||
func (s *ParticipantServiceServer) ReportHeartbeat(ctx context.Context, in *pcmCore.ParticipantHeartbeatReq) (*pcmCore.HealthCheckResp, error) {
|
func (s *ParticipantServiceServer) ReportHeartbeat(ctx context.Context, in *pcmCore.ParticipantHeartbeatReq) (*pcmCore.HealthCheckResp, error) {
|
||||||
l := participantservicelogic.NewReportHeartbeatLogic(ctx, s.svcCtx)
|
l := participantservicelogic.NewReportHeartbeatLogic(ctx, s.svcCtx)
|
||||||
return l.ReportHeartbeat(in)
|
return l.ReportHeartbeat(in)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reportAvailable 监控数据上报
|
||||||
|
func (s *ParticipantServiceServer) ReportAvailable(ctx context.Context, in *pcmCore.ParticipantAvailReq) (*pcmCore.ParticipantResp, error) {
|
||||||
|
l := participantservicelogic.NewReportAvailableLogic(ctx, s.svcCtx)
|
||||||
|
return l.ReportAvailable(in)
|
||||||
|
}
|
||||||
|
|
|
@ -161,12 +161,47 @@ message ParticipantHeartbeatReq{
|
||||||
string port = 3; //port
|
string port = 3; //port
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParticipantAvailInfo Participant可用信息
|
||||||
|
message ParticipantAvailReq{
|
||||||
|
int64 id = 1; //id
|
||||||
|
int64 availStorageSpace = 2; //集群存储可用空间
|
||||||
|
int32 userNum = 3; //用户数量
|
||||||
|
int32 pendingJobNum = 4; //待处理作业数量
|
||||||
|
int32 runningJobNum = 5; //运行作业数量
|
||||||
|
int64 participantId = 6; //集群静态信息id
|
||||||
|
repeated NodeAvailInfo nodeAvailInfo = 7; //节点可用信息
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeAvailInfo 节点可用信息
|
||||||
|
message NodeAvailInfo{
|
||||||
|
int64 id = 1; //id
|
||||||
|
string nodeName = 2; //节点名称
|
||||||
|
int32 cpuTotal = 3; //cpu核数
|
||||||
|
double cpuUsable = 4; //cpu可用率
|
||||||
|
int32 diskTotal = 5; //磁盘空间
|
||||||
|
int32 diskAvail = 6; //磁盘可用空间
|
||||||
|
int32 memTotal = 7; //内存总数
|
||||||
|
int32 memAvail = 8; //内存可用数
|
||||||
|
int32 gpuTotal = 9; //gpu总数
|
||||||
|
int32 gpuAvail = 10; //gpu可用数
|
||||||
|
int64 participantAvailId = 11; //集群动态信息id
|
||||||
|
}
|
||||||
|
|
||||||
|
message ParticipantResp{
|
||||||
|
int64 code = 1;
|
||||||
|
string msg = 2;
|
||||||
|
}
|
||||||
|
|
||||||
// participant 参与者
|
// participant 参与者
|
||||||
service participantService {
|
service participantService {
|
||||||
|
|
||||||
// registerParticipant Participant注册接口
|
// registerParticipant Participant注册接口
|
||||||
rpc registerParticipant (ParticipantPhyReq) returns (ParticipantPhyResp) {};
|
rpc registerParticipant (ParticipantPhyReq) returns (ParticipantPhyResp) {};
|
||||||
|
|
||||||
//心跳
|
// reportHeartbeat 心跳请求
|
||||||
rpc reportHeartbeat (ParticipantHeartbeatReq) returns (HealthCheckResp) {};
|
rpc reportHeartbeat (ParticipantHeartbeatReq) returns (HealthCheckResp) {};
|
||||||
|
|
||||||
|
// reportAvailable 监控数据上报
|
||||||
|
rpc reportAvailable (ParticipantAvailReq) returns(ParticipantResp){}
|
||||||
|
|
||||||
}
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1,8 +1,8 @@
|
||||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||||
// versions:
|
// versions:
|
||||||
// - protoc-gen-go-grpc v1.3.0
|
// - protoc-gen-go-grpc v1.3.0
|
||||||
// - protoc v3.19.4
|
// - protoc v4.23.4
|
||||||
// source: pcmCore.proto
|
// source: pb/pcmCore.proto
|
||||||
|
|
||||||
package pcmCore
|
package pcmCore
|
||||||
|
|
||||||
|
@ -27,9 +27,9 @@ const (
|
||||||
//
|
//
|
||||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||||
type PcmCoreClient interface {
|
type PcmCoreClient interface {
|
||||||
//SyncInfo Synchronous data information
|
// SyncInfo Synchronous data information
|
||||||
SyncInfo(ctx context.Context, in *SyncInfoReq, opts ...grpc.CallOption) (*SyncInfoResp, error)
|
SyncInfo(ctx context.Context, in *SyncInfoReq, opts ...grpc.CallOption) (*SyncInfoResp, error)
|
||||||
//InfoList
|
// InfoList
|
||||||
InfoList(ctx context.Context, in *InfoListReq, opts ...grpc.CallOption) (*InfoListResp, error)
|
InfoList(ctx context.Context, in *InfoListReq, opts ...grpc.CallOption) (*InfoListResp, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,9 +63,9 @@ func (c *pcmCoreClient) InfoList(ctx context.Context, in *InfoListReq, opts ...g
|
||||||
// All implementations must embed UnimplementedPcmCoreServer
|
// All implementations must embed UnimplementedPcmCoreServer
|
||||||
// for forward compatibility
|
// for forward compatibility
|
||||||
type PcmCoreServer interface {
|
type PcmCoreServer interface {
|
||||||
//SyncInfo Synchronous data information
|
// SyncInfo Synchronous data information
|
||||||
SyncInfo(context.Context, *SyncInfoReq) (*SyncInfoResp, error)
|
SyncInfo(context.Context, *SyncInfoReq) (*SyncInfoResp, error)
|
||||||
//InfoList
|
// InfoList
|
||||||
InfoList(context.Context, *InfoListReq) (*InfoListResp, error)
|
InfoList(context.Context, *InfoListReq) (*InfoListResp, error)
|
||||||
mustEmbedUnimplementedPcmCoreServer()
|
mustEmbedUnimplementedPcmCoreServer()
|
||||||
}
|
}
|
||||||
|
@ -146,12 +146,13 @@ var PcmCore_ServiceDesc = grpc.ServiceDesc{
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Streams: []grpc.StreamDesc{},
|
Streams: []grpc.StreamDesc{},
|
||||||
Metadata: "pcmCore.proto",
|
Metadata: "pb/pcmCore.proto",
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ParticipantService_RegisterParticipant_FullMethodName = "/pcmCore.participantService/registerParticipant"
|
ParticipantService_RegisterParticipant_FullMethodName = "/pcmCore.participantService/registerParticipant"
|
||||||
ParticipantService_ReportHeartbeat_FullMethodName = "/pcmCore.participantService/reportHeartbeat"
|
ParticipantService_ReportHeartbeat_FullMethodName = "/pcmCore.participantService/reportHeartbeat"
|
||||||
|
ParticipantService_ReportAvailable_FullMethodName = "/pcmCore.participantService/reportAvailable"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ParticipantServiceClient is the client API for ParticipantService service.
|
// ParticipantServiceClient is the client API for ParticipantService service.
|
||||||
|
@ -160,8 +161,10 @@ const (
|
||||||
type ParticipantServiceClient interface {
|
type ParticipantServiceClient interface {
|
||||||
// registerParticipant Participant注册接口
|
// registerParticipant Participant注册接口
|
||||||
RegisterParticipant(ctx context.Context, in *ParticipantPhyReq, opts ...grpc.CallOption) (*ParticipantPhyResp, error)
|
RegisterParticipant(ctx context.Context, in *ParticipantPhyReq, opts ...grpc.CallOption) (*ParticipantPhyResp, error)
|
||||||
//心跳
|
// reportHeartbeat 心跳请求
|
||||||
ReportHeartbeat(ctx context.Context, in *ParticipantHeartbeatReq, opts ...grpc.CallOption) (*HealthCheckResp, error)
|
ReportHeartbeat(ctx context.Context, in *ParticipantHeartbeatReq, opts ...grpc.CallOption) (*HealthCheckResp, error)
|
||||||
|
// reportAvailable 监控数据上报
|
||||||
|
ReportAvailable(ctx context.Context, in *ParticipantAvailReq, opts ...grpc.CallOption) (*ParticipantResp, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type participantServiceClient struct {
|
type participantServiceClient struct {
|
||||||
|
@ -190,14 +193,25 @@ func (c *participantServiceClient) ReportHeartbeat(ctx context.Context, in *Part
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *participantServiceClient) ReportAvailable(ctx context.Context, in *ParticipantAvailReq, opts ...grpc.CallOption) (*ParticipantResp, error) {
|
||||||
|
out := new(ParticipantResp)
|
||||||
|
err := c.cc.Invoke(ctx, ParticipantService_ReportAvailable_FullMethodName, in, out, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ParticipantServiceServer is the server API for ParticipantService service.
|
// ParticipantServiceServer is the server API for ParticipantService service.
|
||||||
// All implementations must embed UnimplementedParticipantServiceServer
|
// All implementations must embed UnimplementedParticipantServiceServer
|
||||||
// for forward compatibility
|
// for forward compatibility
|
||||||
type ParticipantServiceServer interface {
|
type ParticipantServiceServer interface {
|
||||||
// registerParticipant Participant注册接口
|
// registerParticipant Participant注册接口
|
||||||
RegisterParticipant(context.Context, *ParticipantPhyReq) (*ParticipantPhyResp, error)
|
RegisterParticipant(context.Context, *ParticipantPhyReq) (*ParticipantPhyResp, error)
|
||||||
//心跳
|
// reportHeartbeat 心跳请求
|
||||||
ReportHeartbeat(context.Context, *ParticipantHeartbeatReq) (*HealthCheckResp, error)
|
ReportHeartbeat(context.Context, *ParticipantHeartbeatReq) (*HealthCheckResp, error)
|
||||||
|
// reportAvailable 监控数据上报
|
||||||
|
ReportAvailable(context.Context, *ParticipantAvailReq) (*ParticipantResp, error)
|
||||||
mustEmbedUnimplementedParticipantServiceServer()
|
mustEmbedUnimplementedParticipantServiceServer()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,6 +225,9 @@ func (UnimplementedParticipantServiceServer) RegisterParticipant(context.Context
|
||||||
func (UnimplementedParticipantServiceServer) ReportHeartbeat(context.Context, *ParticipantHeartbeatReq) (*HealthCheckResp, error) {
|
func (UnimplementedParticipantServiceServer) ReportHeartbeat(context.Context, *ParticipantHeartbeatReq) (*HealthCheckResp, error) {
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method ReportHeartbeat not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method ReportHeartbeat not implemented")
|
||||||
}
|
}
|
||||||
|
func (UnimplementedParticipantServiceServer) ReportAvailable(context.Context, *ParticipantAvailReq) (*ParticipantResp, error) {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method ReportAvailable not implemented")
|
||||||
|
}
|
||||||
func (UnimplementedParticipantServiceServer) mustEmbedUnimplementedParticipantServiceServer() {}
|
func (UnimplementedParticipantServiceServer) mustEmbedUnimplementedParticipantServiceServer() {}
|
||||||
|
|
||||||
// UnsafeParticipantServiceServer may be embedded to opt out of forward compatibility for this service.
|
// UnsafeParticipantServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||||
|
@ -260,6 +277,24 @@ func _ParticipantService_ReportHeartbeat_Handler(srv interface{}, ctx context.Co
|
||||||
return interceptor(ctx, in, info, handler)
|
return interceptor(ctx, in, info, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func _ParticipantService_ReportAvailable_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(ParticipantAvailReq)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(ParticipantServiceServer).ReportAvailable(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: ParticipantService_ReportAvailable_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(ParticipantServiceServer).ReportAvailable(ctx, req.(*ParticipantAvailReq))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
// ParticipantService_ServiceDesc is the grpc.ServiceDesc for ParticipantService service.
|
// ParticipantService_ServiceDesc is the grpc.ServiceDesc for ParticipantService service.
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
|
@ -275,7 +310,11 @@ var ParticipantService_ServiceDesc = grpc.ServiceDesc{
|
||||||
MethodName: "reportHeartbeat",
|
MethodName: "reportHeartbeat",
|
||||||
Handler: _ParticipantService_ReportHeartbeat_Handler,
|
Handler: _ParticipantService_ReportHeartbeat_Handler,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
MethodName: "reportAvailable",
|
||||||
|
Handler: _ParticipantService_ReportAvailable_Handler,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Streams: []grpc.StreamDesc{},
|
Streams: []grpc.StreamDesc{},
|
||||||
Metadata: "pcmCore.proto",
|
Metadata: "pb/pcmCore.proto",
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue