From 9c151e7d80d090f5fe4536a1e663db4c86669ad1 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Wed, 26 Jul 2023 15:47:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=BB=E5=8A=A1=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: b346a12f842d2f706712d5e2389db49b4f845ed0 --- api/desc/core/pcm-core.api | 3 ++ api/internal/types/types.go | 10 ++-- model/cloudmodel.go | 14 ------ model/cloudmodel_gen.go | 49 ------------------- .../participantservice/heartbeatcheck.go | 4 +- rpc/internal/logic/pcmcore/infolistlogic.go | 6 +-- rpc/internal/logic/pcmcore/syncinfologic.go | 12 ++--- rpc/pb/pcmCore.proto | 2 +- rpc/pcmCore/pcmCore.pb.go | 8 +-- 9 files changed, 25 insertions(+), 83 deletions(-) diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index f19abef2..659ccf5a 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -182,6 +182,7 @@ type ( TaskYaml { TaskId int64 `yaml:"taskId"` taskType string `yaml:"taskType"` + matchLabels map[string]string `yaml:"matchLabels"` metadata interface{} `yaml:"metadata"` } ) @@ -196,6 +197,8 @@ type ( } TaskInfo { TaskId int64 `json:"taskId,optional"` + TaskType string `json:"taskType,optional"` + matchLabels map[string]string `json:"matchLabels"` serviceName string `json:"serviceName"` metadata interface{} `json:"metadata"` } diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 3c8f4998..92e2f6f4 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -161,9 +161,10 @@ type ScheduleTaskByYamlReq struct { } type TaskYaml struct { - TaskId int64 `yaml:"taskId"` - TaskType string `yaml:"taskType"` - Metadata interface{} `yaml:"metadata"` + TaskId int64 `yaml:"taskId"` + TaskType string `yaml:"taskType"` + MatchLabels map[string]string `yaml:"matchLabels"` + Metadata interface{} `yaml:"metadata"` } type ScheduleTaskReq struct { @@ -176,8 +177,9 @@ type ScheduleTaskReq struct { type TaskInfo struct { TaskId int64 `json:"taskId,optional"` - TaskType string `json:"taskType"` + TaskType string `json:"taskType,optional"` MatchLabels map[string]string `json:"matchLabels"` + ServiceName string `json:"serviceName"` Metadata interface{} `json:"metadata"` } diff --git a/model/cloudmodel.go b/model/cloudmodel.go index deaf7b1e..887479ce 100644 --- a/model/cloudmodel.go +++ b/model/cloudmodel.go @@ -1,12 +1,5 @@ package model -import ( - "github.com/zeromicro/go-zero/core/stores/cache" - "github.com/zeromicro/go-zero/core/stores/sqlx" -) - -var _ CloudModel = (*customCloudModel)(nil) - type ( // CloudModel is an interface to be customized, add more methods here, // and implement the added methods in customCloudModel. @@ -18,10 +11,3 @@ type ( *defaultCloudModel } ) - -// NewCloudModel returns a model for the database table. -func NewCloudModel(conn sqlx.SqlConn, c cache.CacheConf) CloudModel { - return &customCloudModel{ - defaultCloudModel: newCloudModel(conn), - } -} diff --git a/model/cloudmodel_gen.go b/model/cloudmodel_gen.go index f963355a..fa63395e 100644 --- a/model/cloudmodel_gen.go +++ b/model/cloudmodel_gen.go @@ -5,11 +5,9 @@ package model import ( "context" "database/sql" - "fmt" "strings" "github.com/zeromicro/go-zero/core/stores/builder" - "github.com/zeromicro/go-zero/core/stores/sqlc" "github.com/zeromicro/go-zero/core/stores/sqlx" "github.com/zeromicro/go-zero/core/stringx" ) @@ -51,7 +49,6 @@ type ( UpdatedBy int64 `db:"updated_by"` // 更新人 UpdatedTime sql.NullTime `db:"updated_time"` // 更新时间 DeletedFlag int64 `db:"deleted_flag"` // 是否删除(0-否,1-是) - ServiceName string `db:"service_name"` YamlString string `db:"yaml_string"` Result string `db:"result"` } @@ -64,52 +61,6 @@ func newCloudModel(conn sqlx.SqlConn) *defaultCloudModel { } } -func (m *defaultCloudModel) Delete(ctx context.Context, id int64) error { - query := fmt.Sprintf("delete from %s where `id` = ?", m.table) - _, err := m.conn.ExecCtx(ctx, query, id) - return err -} - -func (m *defaultCloudModel) FindOne(ctx context.Context, id int64) (*Cloud, error) { - query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", cloudRows, m.table) - var resp Cloud - err := m.conn.QueryRowCtx(ctx, &resp, query, id) - switch err { - case nil: - return &resp, nil - case sqlc.ErrNotFound: - return nil, ErrNotFound - default: - return nil, err - } -} - -func (m *defaultCloudModel) FindOneByNamespaceNameServiceName(ctx context.Context, namespace sql.NullString, name sql.NullString, serviceName sql.NullString) (*Cloud, error) { - var resp Cloud - query := fmt.Sprintf("select %s from %s where `namespace` = ? and `name` = ? and `service_name` = ? limit 1", cloudRows, m.table) - err := m.conn.QueryRowCtx(ctx, &resp, query, namespace, name, serviceName) - switch err { - case nil: - return &resp, nil - case sqlc.ErrNotFound: - return nil, ErrNotFound - default: - return nil, err - } -} - -func (m *defaultCloudModel) Insert(ctx context.Context, data *Cloud) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, cloudRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.ApiVersion, data.Name, data.Namespace, data.Kind, data.Status, data.StartTime, data.RunningTime, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.DeletedFlag, data.ServiceName, data.YamlString, data.Result) - return ret, err -} - -func (m *defaultCloudModel) Update(ctx context.Context, newData *Cloud) error { - query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, cloudRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, newData.TaskId, newData.ApiVersion, newData.Name, newData.Namespace, newData.Kind, newData.Status, newData.StartTime, newData.RunningTime, newData.CreatedBy, newData.CreatedTime, newData.UpdatedBy, newData.UpdatedTime, newData.DeletedFlag, newData.ServiceName, newData.YamlString, newData.Result, newData.Id) - return err -} - func (m *defaultCloudModel) tableName() string { return m.table } diff --git a/rpc/internal/logic/participantservice/heartbeatcheck.go b/rpc/internal/logic/participantservice/heartbeatcheck.go index 4303ad2e..8e2d59e1 100644 --- a/rpc/internal/logic/participantservice/heartbeatcheck.go +++ b/rpc/internal/logic/participantservice/heartbeatcheck.go @@ -9,7 +9,7 @@ import ( type Client struct { Host string - Port int32 + Port string ParticipantID int64 LastHeartbeat time.Time ClientState string @@ -23,7 +23,7 @@ var ( removeTime = 30 * time.Second ) -func SendHeartbeat(host string, port int32, participantID int64) { +func SendHeartbeat(host string, port string, participantID int64) { key := fmt.Sprintf("%s:%d-%d", host, port, participantID) clientsMutex.Lock() diff --git a/rpc/internal/logic/pcmcore/infolistlogic.go b/rpc/internal/logic/pcmcore/infolistlogic.go index 0f8b2008..205c50e5 100644 --- a/rpc/internal/logic/pcmcore/infolistlogic.go +++ b/rpc/internal/logic/pcmcore/infolistlogic.go @@ -31,7 +31,7 @@ func (l *InfoListLogic) InfoList(in *pcmCore.InfoListReq) (*pcmCore.InfoListResp switch in.Kind { case "hpc": var hpcModelList []model.Hpc - tx := l.svcCtx.DbEngin.Where("service_name = ? AND status NOT IN ?", in.ServiceName, []string{"Succeed", "Completed"}).Find(&hpcModelList) + tx := l.svcCtx.DbEngin.Where("service_name = ? AND status NOT IN ?", in.ParticipantId, []string{"Succeed", "Completed"}).Find(&hpcModelList) if tx.Error != nil { return nil, tx.Error } @@ -40,7 +40,7 @@ func (l *InfoListLogic) InfoList(in *pcmCore.InfoListReq) (*pcmCore.InfoListResp result.HpcInfoList = hpcInfoList case "cloud": var cloudModelList []model.Cloud - tx := l.svcCtx.DbEngin.Where("service_name = ? AND status NOT IN ?", in.ServiceName, []string{"Succeed", "Completed"}).Find(&cloudModelList) + tx := l.svcCtx.DbEngin.Where("service_name = ? AND status NOT IN ?", in.ParticipantId, []string{"Succeed", "Completed"}).Find(&cloudModelList) if tx.Error != nil { return nil, tx.Error } @@ -49,7 +49,7 @@ func (l *InfoListLogic) InfoList(in *pcmCore.InfoListReq) (*pcmCore.InfoListResp result.CloudInfoList = cloudInfoList case "ai": var aiModelList []model.Ai - tx := l.svcCtx.DbEngin.Where("service_name = ? AND status NOT IN ?", in.ServiceName, []string{"Succeed", "Completed"}).Find(&aiModelList) + tx := l.svcCtx.DbEngin.Where("service_name = ? AND status NOT IN ?", in.ParticipantId, []string{"Succeed", "Completed"}).Find(&aiModelList) if tx.Error != nil { return nil, tx.Error } diff --git a/rpc/internal/logic/pcmcore/syncinfologic.go b/rpc/internal/logic/pcmcore/syncinfologic.go index 213e1655..6d18d4dc 100644 --- a/rpc/internal/logic/pcmcore/syncinfologic.go +++ b/rpc/internal/logic/pcmcore/syncinfologic.go @@ -40,19 +40,19 @@ func (l *SyncInfoLogic) SyncInfo(in *pcmCore.SyncInfoReq) (*pcmCore.SyncInfoResp switch in.Kind { case "cloud": for _, cloudInfo := range in.CloudInfoList { - db.Exec("update cloud set status = ?,start_time = ?,running_time = ? where service_name = ? and task_id = ? and namespace = ? and name = ?", - cloudInfo.Status, cloudInfo.StartTime, cloudInfo.RunningTime, in.ServiceName, cloudInfo.TaskId, cloudInfo.Namespace, cloudInfo.Name) + db.Exec("update cloud set status = ?,start_time = ?,running_time = ? where participant_id = ? and task_id = ? and namespace = ? and name = ?", + cloudInfo.Status, cloudInfo.StartTime, cloudInfo.RunningTime, in.ParticipantId, cloudInfo.TaskId, cloudInfo.Namespace, cloudInfo.Name) } case "hpc": for _, hpcInfo := range in.HpcInfoList { - tx := db.Exec("update hpc set status = ?,derived_es = ?,assoc_id = ?,exit_code = ?,version = ?,alloc_cpu = ?,alloc_nodes = ?,cluster = ?,block_id = ?,start_time = ?,running_time = ?,job_id = ? where service_name = ? and task_id = ? and name = ?", - hpcInfo.Status, hpcInfo.DerivedEs, hpcInfo.AssocId, hpcInfo.ExitCode, hpcInfo.Version, hpcInfo.AllocCpu, hpcInfo.AllocNodes, hpcInfo.Cluster, hpcInfo.BlockId, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, in.ServiceName, hpcInfo.TaskId, hpcInfo.Name) + tx := db.Exec("update hpc set status = ?,derived_es = ?,assoc_id = ?,exit_code = ?,version = ?,alloc_cpu = ?,alloc_nodes = ?,cluster = ?,block_id = ?,start_time = ?,running_time = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", + hpcInfo.Status, hpcInfo.DerivedEs, hpcInfo.AssocId, hpcInfo.ExitCode, hpcInfo.Version, hpcInfo.AllocCpu, hpcInfo.AllocNodes, hpcInfo.Cluster, hpcInfo.BlockId, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, in.ParticipantId, hpcInfo.TaskId, hpcInfo.Name) print(tx.Error) } case "ai": for _, aiInfo := range in.AiInfoList { - db.Exec("update ai set status = ?,start_time = ?,running_time = ?,project_id = ?,job_id = ? where service_name = ? and task_id = ? and name = ?", - aiInfo.Status, aiInfo.StartTime, aiInfo.RunningTime, aiInfo.ProjectId, aiInfo.JobId, in.ServiceName, aiInfo.TaskId, aiInfo.Name) + db.Exec("update ai set status = ?,start_time = ?,running_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", + aiInfo.Status, aiInfo.StartTime, aiInfo.RunningTime, aiInfo.ProjectId, aiInfo.JobId, in.ParticipantId, aiInfo.TaskId, aiInfo.Name) } } diff --git a/rpc/pb/pcmCore.proto b/rpc/pb/pcmCore.proto index 10121580..43d29a47 100644 --- a/rpc/pb/pcmCore.proto +++ b/rpc/pb/pcmCore.proto @@ -74,7 +74,7 @@ message SyncInfoResp{ message InfoListReq{ string kind = 1; - string participantId = 2; + int64 participantId = 2; } message InfoListResp{ diff --git a/rpc/pcmCore/pcmCore.pb.go b/rpc/pcmCore/pcmCore.pb.go index 3dff42ce..f3677740 100644 --- a/rpc/pcmCore/pcmCore.pb.go +++ b/rpc/pcmCore/pcmCore.pb.go @@ -694,7 +694,7 @@ type InfoListReq struct { unknownFields protoimpl.UnknownFields Kind string `protobuf:"bytes,1,opt,name=kind,proto3" json:"kind,omitempty"` - ParticipantId string `protobuf:"bytes,2,opt,name=participantId,proto3" json:"participantId,omitempty"` + ParticipantId int64 `protobuf:"varint,2,opt,name=participantId,proto3" json:"participantId,omitempty"` } func (x *InfoListReq) Reset() { @@ -736,11 +736,11 @@ func (x *InfoListReq) GetKind() string { return "" } -func (x *InfoListReq) GetParticipantId() string { +func (x *InfoListReq) GetParticipantId() int64 { if x != nil { return x.ParticipantId } - return "" + return 0 } type InfoListResp struct { @@ -1473,7 +1473,7 @@ var file_pcmCore_proto_rawDesc = []byte{ 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, + 0x03, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x22, 0xad, 0x01, 0x0a, 0x0c, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x12, 0x32, 0x0a, 0x0b, 0x48, 0x70, 0x63, 0x49, 0x6e, 0x66, 0x6f, 0x4c, 0x69, 0x73, 0x74, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x70, 0x63, 0x6d, 0x43, 0x6f, 0x72, 0x65,