超算字段修改

Former-commit-id: 14d580a742e7c54076921d32152667a421122271
This commit is contained in:
zhangwei 2023-08-18 16:09:48 +08:00
parent 3121b71085
commit 3891fd2397
14 changed files with 427 additions and 215 deletions

View File

@ -81,14 +81,15 @@ func (l *JobTotalLogic) JobTotal() (resp *types.JobTotalResp, err error) {
return nil, nil
}
for _, task := range tasks {
tx := l.svcCtx.DbEngin.Raw("SELECT name from sc_participant_phy_info where id in (SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.participant_id) ,GROUP_CONCAT(DISTINCT a.participant_id) ,GROUP_CONCAT(DISTINCT c.participant_id))as service_name from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?)", task.Id).Scan(&task.ParticipantName)
var participantName string
tx := l.svcCtx.DbEngin.Raw("SELECT name from sc_participant_phy_info where id in (SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.participant_id) ,GROUP_CONCAT(DISTINCT a.participant_id) ,GROUP_CONCAT(DISTINCT c.participant_id))as service_name from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?)", task.Id).Scan(&participantName)
if tx.Error != nil {
logx.Error(err)
return nil, tx.Error
}
// 承接方转义
resp.TrainJobs = append(resp.TrainJobs, types.TrainJob{
ParticipantName: task.ParticipantName,
ParticipantName: participantName,
Name: task.Name,
Strategy: int(task.Strategy),
SynergyStatus: enum.SynergyStatus(task.SynergyStatus).String(),

View File

@ -38,7 +38,8 @@ func (l *TaskListLogic) TaskList() (resp *types.TaskListResp, err error) {
return nil, nil
}
for _, task := range tasks {
tx := l.svcCtx.DbEngin.Raw("SELECT name from sc_participant_phy_info where id in (SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.participant_id) ,GROUP_CONCAT(DISTINCT a.participant_id) ,GROUP_CONCAT(DISTINCT c.participant_id))as service_name from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?)", task.Id).Scan(&task.ParticipantName)
var participantName string
tx := l.svcCtx.DbEngin.Raw("SELECT name from sc_participant_phy_info where id in (SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.participant_id) ,GROUP_CONCAT(DISTINCT a.participant_id) ,GROUP_CONCAT(DISTINCT c.participant_id))as service_name from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?)", task.Id).Scan(&participantName)
if tx.Error != nil {
logx.Error(err)
return nil, tx.Error
@ -46,7 +47,7 @@ func (l *TaskListLogic) TaskList() (resp *types.TaskListResp, err error) {
// 承接方转义
resp.Tasks = append(resp.Tasks, types.Task{
ParticipantName: task.ParticipantName,
ParticipantName: participantName,
Name: task.Name,
Strategy: int(task.Strategy),
SynergyStatus: enum.SynergyStatus(task.SynergyStatus).String(),

View File

@ -69,6 +69,7 @@ func UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud {
func (l *ScheduleCloudMq) Consume(_, val string) error {
var task *types.TaskInfo
json.Unmarshal([]byte(val), &task)
// 根据标签筛选过滤
participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task)
if err != nil {
return err

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/model"
@ -31,15 +30,18 @@ func (l *ScheduleHpcMq) Consume(_, val string) error {
// 接受消息
var task *types.TaskInfo
json.Unmarshal([]byte(val), &task)
participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task)
if err != nil {
return err
}
//if len(task.MatchLabels) != 0 {
// participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task)
// if err != nil {
// return err
// }
//}
hpc := model.Hpc{
TaskId: task.TaskId,
Status: "Saved",
ParticipantId: participantId[0],
YamlString: val,
TaskId: task.TaskId,
Status: "Saved",
//ParticipantId: participantId[0],
YamlString: val,
}
tool.Convert(task.Metadata, &hpc)
// 存储数据

2
go.mod
View File

@ -29,6 +29,8 @@ require (
k8s.io/apimachinery v0.27.3
)
replace github.com/zeromicro/go-zero => github.com/zeromicro/go-zero v1.5.3
require (
github.com/JCCE-nudt/zero-contrib/zrpc/registry/nacos v0.0.0-20230419021610-13bbc83fbc3c // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect

421
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -55,15 +55,22 @@ type (
Result string `db:"result"`
YamlString string `db:"yaml_string"`
CmdScript string `db:"cmd_script"`
derivedEs string `db:"derived_es"`
cluster string `db:"cluster"`
blockId string `db:"block_id"`
allocNodes uint32 `db:"alloc_nodes"`
allocCpu uint32 `db:"alloc_cpu"`
version string `db:"version"`
account string `db:"account"`
exitCode uint32 `db:"exit_code"`
assocId uint32 `db:"assoc_id"`
//DerivedEs string `db:"derived_es"`
//Cluster string `db:"cluster"`
//BlockId string `db:"block_id"`
//AllocNodes uint32 `db:"alloc_nodes"`
//AllocCpu uint32 `db:"alloc_cpu"`
//Version string `db:"version"`
//Account string `db:"account"`
//ExitCode uint32 `db:"exit_code"`
//AssocId uint32 `db:"assoc_id"`
AppType string `db:"app_type"`
AppName string `db:"app_name"`
Queue string `db:"queue"`
SubmitType string `db:"submit_type"`
NNode string `db:"n_node"`
StdOutFile string `db:"std_out_file"`
StdErrFile string `db:"std_err_file"`
}
)

View File

@ -1,27 +1,9 @@
package model
import (
"github.com/zeromicro/go-zero/core/stores/cache"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
var _ ScParticipantPhyInfoModel = (*customScParticipantPhyInfoModel)(nil)
type (
// ScParticipantPhyInfoModel is an interface to be customized, add more methods here,
// and implement the added methods in customScParticipantPhyInfoModel.
ScParticipantPhyInfoModel interface {
scParticipantPhyInfoModel
}
customScParticipantPhyInfoModel struct {
*defaultScParticipantPhyInfoModel
}
)
// NewScParticipantPhyInfoModel returns a model for the database table.
func NewScParticipantPhyInfoModel(conn sqlx.SqlConn, c cache.CacheConf, opts ...cache.Option) ScParticipantPhyInfoModel {
return &customScParticipantPhyInfoModel{
defaultScParticipantPhyInfoModel: newScParticipantPhyInfoModel(conn, c, opts...),
}
}

View File

@ -41,8 +41,7 @@ type (
ScParticipantPhyInfo struct {
Id int64 `db:"id"` // id
Name string `db:"name"` // 名称
Host string `db:"host"` // 集群p端host
Port string `db:"port"` // 集群p端端口
Address string `db:"address"` // 集群地址
NetworkType string `db:"network_type"` // 集群网络类型
NetworkBandwidth string `db:"network_bandwidth"` // 集群网络带宽
StorageType string `db:"storage_type"` // 集群存储类型
@ -99,24 +98,6 @@ func (m *defaultScParticipantPhyInfoModel) FindOne(ctx context.Context, id int64
}
}
func (m *defaultScParticipantPhyInfoModel) Insert(ctx context.Context, data *ScParticipantPhyInfo) (sql.Result, error) {
pcmScParticipantPhyInfoIdKey := fmt.Sprintf("%s%v", cachePcmScParticipantPhyInfoIdPrefix, data.Id)
ret, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, scParticipantPhyInfoRowsExpectAutoSet)
return conn.ExecCtx(ctx, query, data.Id, data.Name, data.Host, data.Port, data.NetworkType, data.NetworkBandwidth, data.StorageType, data.StorageSpace, data.StorageAvailSpace, data.StorageBandwidth, data.TenantId, data.Type, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime)
}, pcmScParticipantPhyInfoIdKey)
return ret, err
}
func (m *defaultScParticipantPhyInfoModel) Update(ctx context.Context, data *ScParticipantPhyInfo) error {
pcmScParticipantPhyInfoIdKey := fmt.Sprintf("%s%v", cachePcmScParticipantPhyInfoIdPrefix, data.Id)
_, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (result sql.Result, err error) {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, scParticipantPhyInfoRowsWithPlaceHolder)
return conn.ExecCtx(ctx, query, data.Name, data.Host, data.Port, data.NetworkType, data.NetworkBandwidth, data.StorageType, data.StorageSpace, data.StorageAvailSpace, data.StorageBandwidth, data.TenantId, data.Type, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.Id)
}, pcmScParticipantPhyInfoIdKey)
return err
}
func (m *defaultScParticipantPhyInfoModel) formatPrimary(primary any) string {
return fmt.Sprintf("%s%v", cachePcmScParticipantPhyInfoIdPrefix, primary)
}

View File

@ -34,23 +34,22 @@ type (
}
Task struct {
Id int64 `db:"id"` // id
Name string `db:"name"` // 作业名称
Description string `db:"description"` // 作业描述
Status string `db:"status"` // 作业状态
Strategy int64 `db:"strategy"` // 策略
SynergyStatus int64 `db:"synergy_status"` // 协同状态0-未协同、1-已协同)
StartTime time.Time `db:"start_time"` // 开始运行时间
EndTime string `db:"end_time"` // 结束运行时间
RunningTime int64 `db:"running_time"` // 已运行时间(单位秒)
YamlString string `db:"yaml_string"`
Result string `db:"result"` // 作业结果
CreatedBy int64 `db:"created_by"` // 创建人
CreatedTime time.Time `db:"created_time"` // 创建时间
UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime time.Time `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
ParticipantName string `db:"participant_name"`
Id int64 `db:"id"` // id
Name string `db:"name"` // 作业名称
Description string `db:"description"` // 作业描述
Status string `db:"status"` // 作业状态
Strategy int64 `db:"strategy"` // 策略
SynergyStatus int64 `db:"synergy_status"` // 协同状态0-未协同、1-已协同)
StartTime time.Time `db:"start_time"` // 开始运行时间
EndTime string `db:"end_time"` // 结束运行时间
RunningTime int64 `db:"running_time"` // 已运行时间(单位秒)
YamlString string `db:"yaml_string"`
Result string `db:"result"` // 作业结果
CreatedBy int64 `db:"created_by"` // 创建人
CreatedTime time.Time `db:"created_time"` // 创建时间
UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime time.Time `db:"updated_time"` // 更新时间
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
}
)

View File

@ -11,6 +11,7 @@ import (
func InitCron(svc *svc.ServiceContext) {
svc.Cron.Start()
svc.Cron.AddFunc("*/5 * * * * ?", func() {
var tasks []model.Task
svc.DbEngin.Not("status = ?", "Completed").Find(&tasks)
for _, task := range tasks {

View File

@ -47,7 +47,6 @@ message HpcInfo {
int64 participantId = 1;
int64 taskId = 2;
string jobId = 3;
string name = 4;
string status = 5;
string startTime = 6;
@ -65,6 +64,13 @@ message HpcInfo {
string account = 18;
uint32 exitCode = 19;
uint32 assocId = 20;
string appType = 21;
string appName = 22;
string queue = 23;
string submitType = 24;
string nNode = 25;
string stdOutFile = 26;
string stdErrFile = 27;
}
message SyncInfoResp {

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.30.0
// protoc v4.23.4
// protoc v3.19.4
// source: pb/pcmCore.proto
package pcmCore
@ -459,6 +459,13 @@ type HpcInfo struct {
Account string `protobuf:"bytes,18,opt,name=account,proto3" json:"account,omitempty"`
ExitCode uint32 `protobuf:"varint,19,opt,name=exitCode,proto3" json:"exitCode,omitempty"`
AssocId uint32 `protobuf:"varint,20,opt,name=assocId,proto3" json:"assocId,omitempty"`
AppType string `protobuf:"bytes,21,opt,name=appType,proto3" json:"appType,omitempty"`
AppName string `protobuf:"bytes,22,opt,name=appName,proto3" json:"appName,omitempty"`
Queue string `protobuf:"bytes,23,opt,name=queue,proto3" json:"queue,omitempty"`
SubmitType string `protobuf:"bytes,24,opt,name=submitType,proto3" json:"submitType,omitempty"`
NNode string `protobuf:"bytes,25,opt,name=nNode,proto3" json:"nNode,omitempty"`
StdOutFile string `protobuf:"bytes,26,opt,name=stdOutFile,proto3" json:"stdOutFile,omitempty"`
StdErrFile string `protobuf:"bytes,27,opt,name=stdErrFile,proto3" json:"stdErrFile,omitempty"`
}
func (x *HpcInfo) Reset() {
@ -633,6 +640,55 @@ func (x *HpcInfo) GetAssocId() uint32 {
return 0
}
func (x *HpcInfo) GetAppType() string {
if x != nil {
return x.AppType
}
return ""
}
func (x *HpcInfo) GetAppName() string {
if x != nil {
return x.AppName
}
return ""
}
func (x *HpcInfo) GetQueue() string {
if x != nil {
return x.Queue
}
return ""
}
func (x *HpcInfo) GetSubmitType() string {
if x != nil {
return x.SubmitType
}
return ""
}
func (x *HpcInfo) GetNNode() string {
if x != nil {
return x.NNode
}
return ""
}
func (x *HpcInfo) GetStdOutFile() string {
if x != nil {
return x.StdOutFile
}
return ""
}
func (x *HpcInfo) GetStdErrFile() string {
if x != nil {
return x.StdErrFile
}
return ""
}
type SyncInfoResp struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -2015,9 +2071,9 @@ type ClientInfo struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty" redis:"host"`
Port string `protobuf:"bytes,2,opt,name=port,proto3" json:"port,omitempty" redis:"port"`
ParticipantId int64 `protobuf:"varint,3,opt,name=participantId,proto3" json:"participantId,omitempty" redis:"participantId"`
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` // @gotags: redis:"host"
Port string `protobuf:"bytes,2,opt,name=port,proto3" json:"port,omitempty"` // @gotags: redis:"port"
ParticipantId int64 `protobuf:"varint,3,opt,name=participantId,proto3" json:"participantId,omitempty"` // @gotags: redis:"participantId"
}
func (x *ClientInfo) Reset() {
@ -2140,7 +2196,7 @@ var file_pb_pcmCore_proto_rawDesc = []byte{
0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65,
0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74, 0x72, 0x69,
0x6e, 0x67, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74,
0x72, 0x69, 0x6e, 0x67, 0x22, 0xad, 0x04, 0x0a, 0x07, 0x48, 0x70, 0x63, 0x49, 0x6e, 0x66, 0x6f,
0x72, 0x69, 0x6e, 0x67, 0x22, 0xed, 0x05, 0x0a, 0x07, 0x48, 0x70, 0x63, 0x49, 0x6e, 0x66, 0x6f,
0x12, 0x24, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69,
0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64,
@ -2175,7 +2231,19 @@ var file_pb_pcmCore_proto_rawDesc = []byte{
0x0a, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, 0x18, 0x13, 0x20, 0x01, 0x28, 0x0d,
0x52, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x73,
0x73, 0x6f, 0x63, 0x49, 0x64, 0x18, 0x14, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, 0x61, 0x73, 0x73,
0x6f, 0x63, 0x49, 0x64, 0x22, 0x34, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f,
0x6f, 0x63, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x70, 0x70, 0x54, 0x79, 0x70, 0x65, 0x18,
0x15, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x70, 0x70, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18,
0x0a, 0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x16, 0x20, 0x01, 0x28, 0x09, 0x52,
0x07, 0x61, 0x70, 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75,
0x65, 0x18, 0x17, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x1e,
0x0a, 0x0a, 0x73, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x54, 0x79, 0x70, 0x65, 0x18, 0x18, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x14,
0x0a, 0x05, 0x6e, 0x4e, 0x6f, 0x64, 0x65, 0x18, 0x19, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x6e,
0x4e, 0x6f, 0x64, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x64, 0x4f, 0x75, 0x74, 0x46, 0x69,
0x6c, 0x65, 0x18, 0x1a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x64, 0x4f, 0x75, 0x74,
0x46, 0x69, 0x6c, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x64, 0x45, 0x72, 0x72, 0x46, 0x69,
0x6c, 0x65, 0x18, 0x1b, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x64, 0x45, 0x72, 0x72,
0x46, 0x69, 0x6c, 0x65, 0x22, 0x34, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x49, 0x6e, 0x66, 0x6f,
0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x03, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x22, 0x47, 0x0a, 0x0b, 0x49, 0x6e,

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.23.4
// - protoc v3.19.4
// source: pb/pcmCore.proto
package pcmCore