rpc client初始化

Former-commit-id: 593a85afefc63ce7274ad6ea1867098dc2bddf8f
This commit is contained in:
zhangwei 2023-09-27 14:48:28 +08:00
parent 11bfa2cbc5
commit d3d353096b
23 changed files with 748 additions and 550 deletions

View File

@ -181,6 +181,7 @@ type (
synergy string `yaml:"synergy"`
Description string `yaml:"description"`
strategy string `yaml:"strategy"`
tenantId int64 `yaml:"tenantId"`
tasks []TaskYaml `yaml:"tasks"`
}
TaskYaml {

13
api/internal/cron/cron.go Normal file
View File

@ -0,0 +1,13 @@
package cron
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
)
func AddCronGroup(svc *svc.ServiceContext) {
// 同步任务信息到core端
svc.Cron.AddFunc("*/5 * * * * ?", func() {
SyncParticipantRpc(svc)
})
}

View File

@ -0,0 +1,34 @@
package cron
import (
"github.com/zeromicro/go-zero/zrpc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient"
)
type client struct {
}
func SyncParticipantRpc(svc *svc.ServiceContext) {
// 查询出所有p端信息
var participants []*models.ScParticipantPhyInfo
tx := svc.DbEngin.Find(&participants)
if tx.Error != nil {
}
for _, participant := range participants {
// 初始化p端rpc客户端
if len(participant.RpcAddress) != 0 && svc.K8sRpc[participant.Id] == nil {
switch participant.Type {
case constants.Kubernetes:
svc.K8sRpc[participant.Id] = kubernetesclient.NewKubernetes(zrpc.MustNewClient(zrpc.RpcClientConf{
Endpoints: []string{participant.RpcAddress},
NonBlock: true,
}))
}
}
}
}

View File

@ -497,6 +497,16 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/storelink/deleteTask",
Handler: storelink.DeleteLinkTaskHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/storelink/getParticipants",
Handler: storelink.GetParticipantsHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/storelink/getResourceSpecs",
Handler: storelink.GetAISpecsHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)

View File

@ -0,0 +1,28 @@
package storelink
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/storelink"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
)
func GetAISpecsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.GetResourceSpecsReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := storelink.NewGetAISpecsLogic(r.Context(), svcCtx)
resp, err := l.GetAISpecs(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -0,0 +1,28 @@
package storelink
import (
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/storelink"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
)
func GetParticipantsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.GetParticipantsReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := storelink.NewGetParticipantsLogic(r.Context(), svcCtx)
resp, err := l.GetParticipants(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
}
}

View File

@ -2,6 +2,7 @@ package core
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"k8s.io/apimachinery/pkg/util/json"
"time"
@ -32,13 +33,13 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa
if err != nil {
return err
}
// 构建任务结构体
// 构建任务结构体
taskModel := models.Task{
Status: "Saved",
Status: constants.TASK_STATUS_SAVED,
Description: req.Description,
Name: req.Name,
YamlString: string(bytes),
StartTime: time.Now(),
CommitTime: time.Now(),
}
// 保存任务数据到数据库
tx := l.svcCtx.DbEngin.Create(&taskModel)
@ -46,7 +47,7 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa
return tx.Error
}
// push message into topic
// 遍历子任务放入任务队列中
for _, task := range req.Tasks {
task.TaskId = taskModel.Id
// 将任务数据转换成消息体

View File

@ -43,7 +43,7 @@ func (l *ScheduleTaskLogic) ScheduleTask(req *types.ScheduleTaskReq) (err error)
Description: req.Description,
Name: req.Name,
YamlString: string(bytes),
StartTime: time.Now(),
CommitTime: time.Now(),
}
// save the task in mysql and return id
tx := l.svcCtx.DbEngin.Create(&taskModel)

View File

@ -0,0 +1,30 @@
package storelink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetAISpecsLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetAISpecsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetAISpecsLogic {
return &GetAISpecsLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetAISpecsLogic) GetAISpecs(req *types.GetResourceSpecsReq) (resp *types.GetResourceSpecsResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -0,0 +1,30 @@
package storelink
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetParticipantsLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetParticipantsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetParticipantsLogic {
return &GetParticipantsLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetParticipantsLogic) GetParticipants(req *types.GetParticipantsReq) (resp *types.GetParticipantsResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -15,6 +15,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelartsclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-openstack/openstackclient"
@ -41,6 +42,7 @@ type ServiceContext struct {
DockerClient *client.Client
Downloader *s3manager.Downloader
Uploader *s3manager.Uploader
K8sRpc map[int64]kubernetesclient.Kubernetes
}
func NewServiceContext(c config.Config) *ServiceContext {
@ -87,6 +89,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)),
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
K8sRpc: make(map[int64]kubernetesclient.Kubernetes),
DockerClient: dockerClient,
Downloader: downloader,
Uploader: uploader,

View File

@ -161,6 +161,7 @@ type ScheduleTaskByYamlReq struct {
Synergy string `yaml:"synergy"`
Description string `yaml:"description"`
Strategy string `yaml:"strategy"`
TenantId int64 `yaml:"tenantId"`
Tasks []TaskYaml `yaml:"tasks"`
}
@ -2809,3 +2810,15 @@ type TaskSl struct {
StartedAt int64 `json:"startedAt"`
CompletedAt int64 `json:"completedAt"`
}
type GetParticipantsReq struct {
}
type GetParticipantsResp struct {
}
type GetResourceSpecsReq struct {
}
type GetResourceSpecsResp struct {
}

View File

@ -1,16 +0,0 @@
package validate
type cloud struct {
participantId int64
namespace string
name string
kind string
}
type IValidate interface {
Validate()
}
func (c *cloud) Validate() {
}

View File

@ -9,6 +9,7 @@ import (
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/rest"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/cron"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler"
kq3 "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/kq"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
@ -50,6 +51,7 @@ func main() {
// start log component
logx.MustSetup(c.LogConf)
ctx.Cron.Start()
cron.AddCronGroup(ctx)
handler.RegisterHandlers(server, ctx)
serviceGroup.Add(server)

View File

@ -1,9 +1,19 @@
package response
import "fmt"
type TaskInfo struct {
TaskId int64 `json:"taskId,optional"`
TaskType string `json:"taskType,optional"`
MatchLabels map[string]string `json:"matchLabels"`
ParticipantId int64 `json:"participantId"`
TenantId int64 `json:"tenantId"`
Metadata interface{} `json:"metadata"`
}
func (t *TaskInfo) Validate() error {
if t.TenantId == 0 {
return fmt.Errorf("tenantId is nil")
}
return nil
}

1
go.mod
View File

@ -22,6 +22,7 @@ require (
github.com/zeromicro/go-zero v1.5.5
gitlink.org.cn/jcce-pcm/pcm-participant-ac v0.0.0-20230814074259-99e24e1194d1
gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230817103341-2459e5bfc835
gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes v0.0.0-20230830120334-bf6d99c715ef
gitlink.org.cn/jcce-pcm/pcm-participant-modelarts v0.0.0-20230719015658-08a29549d86a
gitlink.org.cn/jcce-pcm/pcm-participant-octopus v0.0.0-20230714030856-601935bc30e2
gitlink.org.cn/jcce-pcm/pcm-participant-openstack v0.0.0-20230904093908-860f0b2b4eb4

2
go.sum
View File

@ -1033,6 +1033,8 @@ gitlink.org.cn/jcce-pcm/pcm-participant-ac v0.0.0-20230814074259-99e24e1194d1 h1
gitlink.org.cn/jcce-pcm/pcm-participant-ac v0.0.0-20230814074259-99e24e1194d1/go.mod h1:OflOWySJqYAygcwL7vT2yVtfcUs9TM3kmoD+89Tbu7c=
gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230817103341-2459e5bfc835 h1:WDCPqD8IrepGJXankkpG14Ny6inh9AldB0RX9WWa+ck=
gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230817103341-2459e5bfc835/go.mod h1:r/KLzUpupCV5jdxSfgDhc2pVjP0fBi3VhAWRttsBn30=
gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes v0.0.0-20230830120334-bf6d99c715ef h1:s7JfXjka2MhGaDjKMJ57fj0k3XuDB6w+UlYHFLyJlUY=
gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes v0.0.0-20230830120334-bf6d99c715ef/go.mod h1:SFpXY1gy1ELYTo4P6EU68nTL2vKu1ZuH7nKecL16hJk=
gitlink.org.cn/jcce-pcm/pcm-participant-modelarts v0.0.0-20230719015658-08a29549d86a h1:eSniMdLizPV3RNrz7/URgjT3Kpv1cTZ05jrWfxRJxHs=
gitlink.org.cn/jcce-pcm/pcm-participant-modelarts v0.0.0-20230719015658-08a29549d86a/go.mod h1:BhOgwM1LC+BD46DjTaQyYQVZs1CikwI5Pl/6qzKUexc=
gitlink.org.cn/jcce-pcm/pcm-participant-octopus v0.0.0-20230714030856-601935bc30e2 h1:RcGSqhsod6VXLksSLqNjV0q/SCeoUv6CbThKmV9NTZE=

View File

@ -7,12 +7,7 @@ const (
TASK_TYPE_AI = 3
)
// 任务状态
const (
TASK_STATUS_SAVED = 1
TASK_STATUS_INIT = 2
TASK_STATUS_RUNNING = 3
TASK_STATUS_FAILED = 4
TASK_STATUS_END = 5
TASK_STATUS_UNKNOW = 6
Kubernetes = "Kubernetes"
Slurm = "Slurm"
)

View File

@ -38,6 +38,7 @@ type (
Id int64 `db:"id"` // id
Name string `db:"name"` // 名称
Address string `db:"address"` // 集群地址
RpcAddress string `db:"rpc_address"` // rpc服务链接地址
MetricsUrl string `db:"metrics_url"` // 监控url
NetworkType string `db:"network_type"` // 集群网络类型
NetworkBandwidth string `db:"network_bandwidth"` // 集群网络带宽
@ -46,6 +47,6 @@ type (
StorageAvailSpace string `db:"storage_avail_space"` // 集群存储可用空间
StorageBandwidth string `db:"storage_bandwidth"` // 集群存储带宽
TenantId int64 `db:"tenant_id"` // 租户信息id
Type int64 `db:"type"` // 类型:0-数算集群1-智算集群2-超算集群
Type string `db:"type"` // 类型
}
)

View File

@ -41,12 +41,13 @@ type (
Status string `db:"status"` // 作业状态
Strategy int64 `db:"strategy"` // 策略
SynergyStatus int64 `db:"synergy_status"` // 协同状态0-未协同、1-已协同)
StartTime time.Time `db:"start_time"` // 开始运行时间
CommitTime time.Time `db:"commit_time"` // 提交时间
EndTime string `db:"end_time"` // 结束运行时间
RunningTime int64 `db:"running_time"` // 已运行时间(单位秒)
YamlString string `db:"yaml_string"`
Result string `db:"result"` // 作业结果
DeletedAt gorm.DeletedAt `gorm:"index"`
TenantId int64 `db:"tenant_id"`
}
)

View File

@ -155,6 +155,7 @@ message ParticipantPhyReq {
repeated QueuePhyInfo queueInfo = 16; //
int64 id = 17; // id
string MetricsUrl = 18; //url
string RpcAddress = 19;
}
// NodePhyInfo

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,7 @@
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v3.19.4
// source: pb/pcmCore.proto
// source: pcmCore.proto
package pcmCore
@ -146,7 +146,7 @@ var PcmCore_ServiceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pb/pcmCore.proto",
Metadata: "pcmCore.proto",
}
const (
@ -511,5 +511,5 @@ var ParticipantService_ServiceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pb/pcmCore.proto",
Metadata: "pcmCore.proto",
}