Merge remote-tracking branch 'origin/zw'

Former-commit-id: 6037cb3c8547d3f24f1f82fb41ee1480205269f7
This commit is contained in:
zhangwei 2023-10-16 10:42:49 +08:00
commit 3fff406038
11 changed files with 157 additions and 55 deletions

View File

@ -341,7 +341,7 @@ type ImportTasks {
TotalFileCount uint32 `json:"totalFileCount,omitempty"`
CreateTime uint32 `json:"createTime,omitempty"`
ElapsedTime uint32 `json:"elapsedTime,omitempty"`
AnnotationFormatConfig []interface{} `json:"annotationFormatConfigomitempty"`
AnnotationFormatConfig []interface{} `json:"annotationFormatConfig,omitempty"`
}
/******************taskList end*************************/
/******************ListTrainingJobs start*************************/

View File

@ -8,9 +8,6 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient"
)
type client struct {
}
func SyncParticipantRpc(svc *svc.ServiceContext) {
// 查询出所有p端信息
var participants []*models.ScParticipantPhyInfo

View File

@ -56,14 +56,7 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa
logx.Error(err)
return err
}
switch task.TaskType {
case "cloud":
l.svcCtx.ScheduleCloudClient.Push(string(reqMessage))
case "hpc":
l.svcCtx.ScheduleHpcClient.Push(string(reqMessage))
case "ai":
l.svcCtx.ScheduleAiClient.Push(string(reqMessage))
}
l.svcCtx.RedisClient.Publish(context.Background(), task.TaskType, reqMessage)
}
return nil
}

View File

@ -1,4 +1,4 @@
package kq
package mqs
import (
"context"
@ -10,19 +10,19 @@ import (
*
Listening to the payment flow status change notification message queue
*/
type ScheduleAiMq struct {
type AiQueue struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewScheduleAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleAiMq {
return &ScheduleAiMq{
func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
return &AiQueue{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ScheduleAiMq) Consume(_, val string) error {
func (l *AiQueue) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
aiSchdl := scheduler2.NewAiScheduler(val)
schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin)

View File

@ -1,4 +1,4 @@
package kq
package mqs
import (
"context"
@ -10,19 +10,19 @@ import (
*
Listening to the payment flow status change notification message queue
*/
type ScheduleCloudMq struct {
type CloudMq struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCloudMq {
return &ScheduleCloudMq{
func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq {
return &CloudMq{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ScheduleCloudMq) Consume(_, val string) error {
func (l *CloudMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
cloudScheduler := scheduler2.NewCloudScheduler()
schdl, err := scheduler2.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin)

View File

@ -1,4 +1,4 @@
package kq
package mqs
import (
"context"
@ -10,19 +10,19 @@ import (
*
Listening to the payment flow status change notification message queue
*/
type ScheduleHpcMq struct {
type HpcMq struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleHpcMq {
return &ScheduleHpcMq{
func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq {
return &HpcMq{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ScheduleHpcMq) Consume(_, val string) error {
func (l *HpcMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
hpcSchdl := scheduler2.NewHpcScheduler(val)
schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin)

115
api/internal/mqs/queue.go Normal file
View File

@ -0,0 +1,115 @@
package mqs
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"github.com/zeromicro/go-zero/core/queue"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/threading"
)
type (
ConsumeHandle func(v string) error
ConsumeHandler interface {
Consume(value string) error
}
redisQueues struct {
queues []queue.MessageQueue
group *service.ServiceGroup
}
redisQueue struct {
topic string
channel chan redis.Message
client *redis.Client
handler ConsumeHandler
consumerRoutines *threading.RoutineGroup
producerRoutines *threading.RoutineGroup
}
)
func (r *redisQueue) Start() {
r.startConsumers()
r.startProducers()
r.producerRoutines.Wait()
close(r.channel)
r.consumerRoutines.Wait()
}
func (r *redisQueue) Stop() {
}
func (r redisQueues) Start() {
for _, each := range r.queues {
r.group.Add(each)
}
r.group.Start()
}
func (r redisQueues) Stop() {
r.group.Stop()
}
func (r *redisQueue) startConsumers() {
r.consumerRoutines.Run(func() {
for message := range r.channel {
if err := r.consumeOne(message.Payload); err != nil {
fmt.Errorf("consume: %s, error: %v", message.Payload, err)
}
}
})
}
func (r *redisQueue) consumeOne(value string) error {
err := r.handler.Consume(value)
return err
}
func (r *redisQueue) startProducers() {
r.producerRoutines.Run(func() {
for {
channel := r.client.Subscribe(context.Background(), r.topic).Channel()
for msg := range channel {
fmt.Println("生产者获取的值:", msg.Payload)
r.channel <- *msg
}
}
})
}
func newRedisQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue {
return &redisQueue{
topic: topic,
client: redisClient,
channel: make(chan redis.Message),
producerRoutines: threading.NewRoutineGroup(),
consumerRoutines: threading.NewRoutineGroup(),
handler: handler}
}
func MustNewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue {
q, err := NewQueue(topic, redisClient, handler)
if err != nil {
fmt.Println("NewQueue报错")
}
return q
}
func NewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) (queue.MessageQueue, error) {
if len(topic) == 0 {
return nil, errors.New("topic不能为空")
}
r := redisQueues{
group: service.NewServiceGroup(),
}
r.queues = append(r.queues, newRedisQueue(topic, redisClient, handler))
return r, nil
}

View File

@ -6,7 +6,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/docker/docker/client"
"github.com/go-redis/redis"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/logx"
@ -73,25 +73,23 @@ func NewServiceContext(c config.Config) *ServiceContext {
logx.Error(err.Error())
return nil
}
redisClient := redis.NewClient(&redis.Options{
Addr: c.Redis.Host,
Password: c.Redis.Pass,
})
return &ServiceContext{
Cron: cron.New(cron.WithSeconds()),
DbEngin: dbEngin,
Config: c,
RedisClient: redis.NewClient(&redis.Options{
Addr: c.Redis.Host,
Password: c.Redis.Pass,
}),
ScheduleHpcClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.HpcTopic),
ScheduleCloudClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.CloudTopic),
ScheduleAiClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.AiTopic),
ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)),
CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)),
ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)),
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
K8sRpc: make(map[int64]kubernetesclient.Kubernetes),
DockerClient: dockerClient,
Downloader: downloader,
Uploader: uploader,
Cron: cron.New(cron.WithSeconds()),
DbEngin: dbEngin,
Config: c,
RedisClient: redisClient,
ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)),
CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)),
ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)),
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
K8sRpc: make(map[int64]kubernetesclient.Kubernetes),
DockerClient: dockerClient,
Downloader: downloader,
Uploader: uploader,
}
}

View File

@ -3,7 +3,6 @@ package main
import (
"context"
"flag"
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/service"
@ -11,7 +10,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/cron"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler"
kq3 "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/kq"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos"
)
@ -56,12 +55,9 @@ func main() {
serviceGroup.Add(server)
services := []service.Service{
//Listening for changes in consumption flow status
kq.MustNewQueue(c.HpcConsumerConf, kq3.NewScheduleHpcMq(context.Background(), ctx)),
kq.MustNewQueue(c.CloudConsumerConf, kq3.NewScheduleCloudMq(context.Background(), ctx)),
kq.MustNewQueue(c.AiConsumerConf, kq3.NewScheduleAiMq(context.Background(), ctx)),
//.....
mqs.MustNewQueue("ai", ctx.RedisClient, mqs.NewAiMq(context.Background(), ctx)),
mqs.MustNewQueue("cloud", ctx.RedisClient, mqs.NewCloudMq(context.Background(), ctx)),
mqs.MustNewQueue("hpc", ctx.RedisClient, mqs.NewHpcMq(context.Background(), ctx)),
}
for _, mq := range services {
serviceGroup.Add(mq)

1
go.mod
View File

@ -99,6 +99,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/redis/go-redis/v9 v9.2.1 // indirect
github.com/segmentio/kafka-go v0.4.38 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.etcd.io/etcd/api/v3 v3.5.9 // indirect

2
go.sum
View File

@ -944,6 +944,8 @@ github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPH
github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg=
github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=