From 67974cf7118aba31be397ca21678c1e8cf33f3a3 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Tue, 10 Oct 2023 08:53:10 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 933d2d204fa04665b585612b195dfd2dff97b65b --- api/internal/cron/participant.go | 3 - .../logic/core/scheduletaskbyyamllogic.go | 16 ++--- api/internal/mqs/{kq => mq}/ScheduleAi.go | 2 +- api/internal/mqs/{kq => mq}/ScheduleCloud.go | 2 +- api/internal/mqs/{kq => mq}/ScheduleHpc.go | 2 +- api/internal/mqs/zzz/handler.go | 4 ++ api/internal/mqs/zzz/msg.go | 11 +++ api/internal/mqs/zzz/queue.go | 71 +++++++++++++++++++ api/internal/svc/servicecontext.go | 20 +++--- api/pcm.go | 27 ++++--- go.mod | 1 + go.sum | 2 + 12 files changed, 125 insertions(+), 36 deletions(-) rename api/internal/mqs/{kq => mq}/ScheduleAi.go (98%) rename api/internal/mqs/{kq => mq}/ScheduleCloud.go (98%) rename api/internal/mqs/{kq => mq}/ScheduleHpc.go (98%) create mode 100644 api/internal/mqs/zzz/handler.go create mode 100644 api/internal/mqs/zzz/msg.go create mode 100644 api/internal/mqs/zzz/queue.go diff --git a/api/internal/cron/participant.go b/api/internal/cron/participant.go index 76f02d89..1d1026e6 100644 --- a/api/internal/cron/participant.go +++ b/api/internal/cron/participant.go @@ -8,9 +8,6 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient" ) -type client struct { -} - func SyncParticipantRpc(svc *svc.ServiceContext) { // 查询出所有p端信息 var participants []*models.ScParticipantPhyInfo diff --git a/api/internal/logic/core/scheduletaskbyyamllogic.go b/api/internal/logic/core/scheduletaskbyyamllogic.go index 6b5dbf7e..6c2d55bf 100644 --- a/api/internal/logic/core/scheduletaskbyyamllogic.go +++ b/api/internal/logic/core/scheduletaskbyyamllogic.go @@ -50,20 +50,14 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa // 遍历子任务放入任务队列中 for _, task := range req.Tasks { task.TaskId = taskModel.Id - // 将任务数据转换成消息体 - reqMessage, err := json.Marshal(task) + //taskBytes, err := json.Marshal(task) if err != nil { - logx.Error(err) return err } - switch task.TaskType { - case "cloud": - l.svcCtx.ScheduleCloudClient.Push(string(reqMessage)) - case "hpc": - l.svcCtx.ScheduleHpcClient.Push(string(reqMessage)) - case "ai": - l.svcCtx.ScheduleAiClient.Push(string(reqMessage)) - } + //l.svcCtx.FrontMQ.SendMsg(l.ctx, &mq.Msg{ + // Topic: task.TaskType, + // Body: taskBytes, + //}) } return nil } diff --git a/api/internal/mqs/kq/ScheduleAi.go b/api/internal/mqs/mq/ScheduleAi.go similarity index 98% rename from api/internal/mqs/kq/ScheduleAi.go rename to api/internal/mqs/mq/ScheduleAi.go index fa01a01f..d102a049 100644 --- a/api/internal/mqs/kq/ScheduleAi.go +++ b/api/internal/mqs/mq/ScheduleAi.go @@ -1,4 +1,4 @@ -package kq +package mq import ( "context" diff --git a/api/internal/mqs/kq/ScheduleCloud.go b/api/internal/mqs/mq/ScheduleCloud.go similarity index 98% rename from api/internal/mqs/kq/ScheduleCloud.go rename to api/internal/mqs/mq/ScheduleCloud.go index 2c080d6c..a1a1e70e 100644 --- a/api/internal/mqs/kq/ScheduleCloud.go +++ b/api/internal/mqs/mq/ScheduleCloud.go @@ -1,4 +1,4 @@ -package kq +package mq import ( "context" diff --git a/api/internal/mqs/kq/ScheduleHpc.go b/api/internal/mqs/mq/ScheduleHpc.go similarity index 98% rename from api/internal/mqs/kq/ScheduleHpc.go rename to api/internal/mqs/mq/ScheduleHpc.go index 825b03af..8451a528 100644 --- a/api/internal/mqs/kq/ScheduleHpc.go +++ b/api/internal/mqs/mq/ScheduleHpc.go @@ -1,4 +1,4 @@ -package kq +package mq import ( "context" diff --git a/api/internal/mqs/zzz/handler.go b/api/internal/mqs/zzz/handler.go new file mode 100644 index 00000000..127e1274 --- /dev/null +++ b/api/internal/mqs/zzz/handler.go @@ -0,0 +1,4 @@ +package zzz + +// Handler 返回值代表消息是否消费成功 +type Handler func(msg *Msg) error diff --git a/api/internal/mqs/zzz/msg.go b/api/internal/mqs/zzz/msg.go new file mode 100644 index 00000000..22191de6 --- /dev/null +++ b/api/internal/mqs/zzz/msg.go @@ -0,0 +1,11 @@ +package zzz + +// Msg 消息 +type Msg struct { + ID string // 消息的编号 + Topic string // 消息的主题 + Body []byte // 消息的Body + Partition int // 分区号 + Group string // 消费者组 + Consumer string // 消费者组里的消费者 +} diff --git a/api/internal/mqs/zzz/queue.go b/api/internal/mqs/zzz/queue.go new file mode 100644 index 00000000..d8c4b36f --- /dev/null +++ b/api/internal/mqs/zzz/queue.go @@ -0,0 +1,71 @@ +package zzz + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/redis/go-redis/v9" + "strconv" + "strings" +) + +type ( + MsgQueue interface { + Consume(ctx context.Context, topic string, partition int, h Handler) error + SendMsg(ctx context.Context, msg *Msg) error + ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error + } + defaultMsgQueue struct { + // Redis客户端 + client *redis.Client + } +) + +func NewMsgQueue(client *redis.Client) MsgQueue { + return &defaultMsgQueue{client: client} +} + +// SendMsg 发送消息 +func (mq *defaultMsgQueue) SendMsg(ctx context.Context, msg *Msg) error { + return mq.client.Publish(ctx, mq.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err() +} + +// Consume 返回值代表消费过程中遇到的无法处理的错误 +func (mq *defaultMsgQueue) Consume(ctx context.Context, topic string, partition int, h Handler) error { + // 订阅频道 + channel := mq.client.Subscribe(ctx, mq.partitionTopic(topic, partition)).Channel() + for msg := range channel { + // 处理消息 + h(&Msg{ + Topic: topic, + Body: []byte(msg.Payload), + Partition: partition, + }) + } + return errors.New("channel closed") +} + +// ConsumeMultiPartitions 返回值代表消费过程中遇到的无法处理的错误 +func (mq *defaultMsgQueue) ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error { + // 订阅频道 + channels := make([]string, len(partitions)) + for i, partition := range partitions { + channels[i] = mq.partitionTopic(topic, partition) + } + channel := mq.client.Subscribe(ctx, channels...).Channel() + for msg := range channel { + // 处理消息 + _, partitionString, _ := strings.Cut(msg.Channel, ":") + partition, _ := strconv.Atoi(partitionString) + h(&Msg{ + Topic: topic, + Body: []byte(msg.Payload), + Partition: partition, + }) + } + return errors.New("channels closed") +} + +func (mq *defaultMsgQueue) partitionTopic(topic string, partition int) string { + return fmt.Sprintf("%s:%d", topic, partition) +} diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index f8b761bf..4cc1287a 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -6,12 +6,13 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/docker/docker/client" - "github.com/go-redis/redis" + "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" "github.com/zeromicro/go-queue/kq" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-participant-ac/hpcacclient" "gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient" @@ -43,6 +44,7 @@ type ServiceContext struct { Downloader *s3manager.Downloader Uploader *s3manager.Uploader K8sRpc map[int64]kubernetesclient.Kubernetes + Mq zzz.MsgQueue } func NewServiceContext(c config.Config) *ServiceContext { @@ -73,14 +75,16 @@ func NewServiceContext(c config.Config) *ServiceContext { logx.Error(err.Error()) return nil } + redisClient := redis.NewClient(&redis.Options{ + Addr: c.Redis.Host, + Password: c.Redis.Pass, + }) return &ServiceContext{ - Cron: cron.New(cron.WithSeconds()), - DbEngin: dbEngin, - Config: c, - RedisClient: redis.NewClient(&redis.Options{ - Addr: c.Redis.Host, - Password: c.Redis.Pass, - }), + Cron: cron.New(cron.WithSeconds()), + DbEngin: dbEngin, + Config: c, + RedisClient: redisClient, + Mq: zzz.NewMsgQueue(redisClient), ScheduleHpcClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.HpcTopic), ScheduleCloudClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.CloudTopic), ScheduleAiClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.AiTopic), diff --git a/api/pcm.go b/api/pcm.go index 08739034..b8f75f53 100644 --- a/api/pcm.go +++ b/api/pcm.go @@ -3,7 +3,7 @@ package main import ( "context" "flag" - "github.com/zeromicro/go-queue/kq" + "fmt" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/service" @@ -11,9 +11,10 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/cron" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler" - kq3 "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/kq" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos" + "strconv" ) var configFile = flag.String("f", "api/etc/pcm.yaml", "the config file") @@ -55,17 +56,21 @@ func main() { handler.RegisterHandlers(server, ctx) serviceGroup.Add(server) - services := []service.Service{ - //Listening for changes in consumption flow status - kq.MustNewQueue(c.HpcConsumerConf, kq3.NewScheduleHpcMq(context.Background(), ctx)), - kq.MustNewQueue(c.CloudConsumerConf, kq3.NewScheduleCloudMq(context.Background(), ctx)), - kq.MustNewQueue(c.AiConsumerConf, kq3.NewScheduleAiMq(context.Background(), ctx)), - //..... - } - for _, mq := range services { - serviceGroup.Add(mq) + go ctx.Mq.Consume(context.Background(), "test", 0, func(msg *zzz.Msg) error { + fmt.Printf("consume partiton0: %+v\n", string(msg.Body)) + + return nil + }) + for i := 1; i < 10; i++ { + + ctx.Mq.SendMsg(context.Background(), &zzz.Msg{ + Topic: "test", + Body: []byte(strconv.Itoa(i)), + Partition: i % 2, + }) } + logx.Infof("Starting server at %s:%d...\n", c.Host, c.Port) serviceGroup.Start() diff --git a/go.mod b/go.mod index de0ef4c2..4b08fc5a 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,7 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect + github.com/redis/go-redis/v9 v9.2.1 // indirect github.com/segmentio/kafka-go v0.4.38 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect diff --git a/go.sum b/go.sum index a5c650b2..1c63c99a 100644 --- a/go.sum +++ b/go.sum @@ -944,6 +944,8 @@ github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPH github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= +github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= From f66c9b8b7b1f4b85dc1df8d27baa0cb954359daa Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Tue, 10 Oct 2023 17:20:52 +0800 Subject: [PATCH 2/3] =?UTF-8?q?redis=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2kafka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: cb6030d4face691ad530a3bbfb53dac7eaf0676a --- .../logic/core/scheduletaskbyyamllogic.go | 8 +- api/internal/mqs/{mq => }/ScheduleAi.go | 10 +- api/internal/mqs/{mq => }/ScheduleCloud.go | 10 +- api/internal/mqs/{mq => }/ScheduleHpc.go | 10 +- api/internal/mqs/queue.go | 115 ++++++++++++++++++ api/internal/mqs/zzz/handler.go | 4 - api/internal/mqs/zzz/msg.go | 11 -- api/internal/mqs/zzz/queue.go | 71 ----------- api/internal/svc/servicecontext.go | 32 ++--- api/pcm.go | 25 ++-- 10 files changed, 155 insertions(+), 141 deletions(-) rename api/internal/mqs/{mq => }/ScheduleAi.go (77%) rename api/internal/mqs/{mq => }/ScheduleCloud.go (76%) rename api/internal/mqs/{mq => }/ScheduleHpc.go (77%) create mode 100644 api/internal/mqs/queue.go delete mode 100644 api/internal/mqs/zzz/handler.go delete mode 100644 api/internal/mqs/zzz/msg.go delete mode 100644 api/internal/mqs/zzz/queue.go diff --git a/api/internal/logic/core/scheduletaskbyyamllogic.go b/api/internal/logic/core/scheduletaskbyyamllogic.go index 6c2d55bf..f1350c7b 100644 --- a/api/internal/logic/core/scheduletaskbyyamllogic.go +++ b/api/internal/logic/core/scheduletaskbyyamllogic.go @@ -51,10 +51,10 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa for _, task := range req.Tasks { task.TaskId = taskModel.Id //taskBytes, err := json.Marshal(task) - if err != nil { - return err - } - //l.svcCtx.FrontMQ.SendMsg(l.ctx, &mq.Msg{ + //if err != nil { + // return err + //} + //l.svcCtx.RedisClient.Publish(context.Background(), "test", &mqs.Msg{ // Topic: task.TaskType, // Body: taskBytes, //}) diff --git a/api/internal/mqs/mq/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go similarity index 77% rename from api/internal/mqs/mq/ScheduleAi.go rename to api/internal/mqs/ScheduleAi.go index d102a049..8446c955 100644 --- a/api/internal/mqs/mq/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -1,4 +1,4 @@ -package mq +package mqs import ( "context" @@ -10,19 +10,19 @@ import ( * Listening to the payment flow status change notification message queue */ -type ScheduleAiMq struct { +type AiQueue struct { ctx context.Context svcCtx *svc.ServiceContext } -func NewScheduleAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleAiMq { - return &ScheduleAiMq{ +func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { + return &AiQueue{ ctx: ctx, svcCtx: svcCtx, } } -func (l *ScheduleAiMq) Consume(_, val string) error { +func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 aiSchdl := scheduler2.NewAiScheduler(val) schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin) diff --git a/api/internal/mqs/mq/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go similarity index 76% rename from api/internal/mqs/mq/ScheduleCloud.go rename to api/internal/mqs/ScheduleCloud.go index a1a1e70e..e24677a2 100644 --- a/api/internal/mqs/mq/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -1,4 +1,4 @@ -package mq +package mqs import ( "context" @@ -10,19 +10,19 @@ import ( * Listening to the payment flow status change notification message queue */ -type ScheduleCloudMq struct { +type CloudMq struct { ctx context.Context svcCtx *svc.ServiceContext } -func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCloudMq { - return &ScheduleCloudMq{ +func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq { + return &CloudMq{ ctx: ctx, svcCtx: svcCtx, } } -func (l *ScheduleCloudMq) Consume(_, val string) error { +func (l *CloudMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 cloudScheduler := scheduler2.NewCloudScheduler() schdl, err := scheduler2.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin) diff --git a/api/internal/mqs/mq/ScheduleHpc.go b/api/internal/mqs/ScheduleHpc.go similarity index 77% rename from api/internal/mqs/mq/ScheduleHpc.go rename to api/internal/mqs/ScheduleHpc.go index 8451a528..981626fa 100644 --- a/api/internal/mqs/mq/ScheduleHpc.go +++ b/api/internal/mqs/ScheduleHpc.go @@ -1,4 +1,4 @@ -package mq +package mqs import ( "context" @@ -10,19 +10,19 @@ import ( * Listening to the payment flow status change notification message queue */ -type ScheduleHpcMq struct { +type HpcMq struct { ctx context.Context svcCtx *svc.ServiceContext } -func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleHpcMq { - return &ScheduleHpcMq{ +func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq { + return &HpcMq{ ctx: ctx, svcCtx: svcCtx, } } -func (l *ScheduleHpcMq) Consume(_, val string) error { +func (l *HpcMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 hpcSchdl := scheduler2.NewHpcScheduler(val) schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin) diff --git a/api/internal/mqs/queue.go b/api/internal/mqs/queue.go new file mode 100644 index 00000000..34c77acc --- /dev/null +++ b/api/internal/mqs/queue.go @@ -0,0 +1,115 @@ +package mqs + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/redis/go-redis/v9" + "github.com/zeromicro/go-zero/core/queue" + "github.com/zeromicro/go-zero/core/service" + "github.com/zeromicro/go-zero/core/threading" +) + +type ( + ConsumeHandle func(v string) error + ConsumeHandler interface { + Consume(value string) error + } + redisQueues struct { + queues []queue.MessageQueue + group *service.ServiceGroup + } + redisQueue struct { + topic string + channel chan redis.Message + client *redis.Client + handler ConsumeHandler + consumerRoutines *threading.RoutineGroup + producerRoutines *threading.RoutineGroup + } +) + +func (r *redisQueue) Start() { + r.startConsumers() + r.startProducers() + + r.producerRoutines.Wait() + close(r.channel) + r.consumerRoutines.Wait() +} + +func (r *redisQueue) Stop() { +} + +func (r redisQueues) Start() { + for _, each := range r.queues { + r.group.Add(each) + } + r.group.Start() +} + +func (r redisQueues) Stop() { + r.group.Stop() +} + +func (r *redisQueue) startConsumers() { + r.consumerRoutines.Run(func() { + for message := range r.channel { + if err := r.consumeOne(message.Payload); err != nil { + fmt.Errorf("consume: %s, error: %v", message.Payload, err) + } + } + }) + +} + +func (r *redisQueue) consumeOne(value string) error { + err := r.handler.Consume(value) + return err +} + +func (r *redisQueue) startProducers() { + r.producerRoutines.Run(func() { + for { + channel := r.client.Subscribe(context.Background(), r.topic).Channel() + for msg := range channel { + fmt.Println("生产者获取的值:", msg.Payload) + r.channel <- *msg + } + + } + }) + +} + +func newRedisQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue { + return &redisQueue{ + topic: topic, + client: redisClient, + channel: make(chan redis.Message), + producerRoutines: threading.NewRoutineGroup(), + consumerRoutines: threading.NewRoutineGroup(), + handler: handler} +} + +func MustNewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue { + q, err := NewQueue(topic, redisClient, handler) + if err != nil { + fmt.Println("NewQueue报错") + } + + return q +} + +func NewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) (queue.MessageQueue, error) { + if len(topic) == 0 { + return nil, errors.New("topic不能为空") + } + + r := redisQueues{ + group: service.NewServiceGroup(), + } + r.queues = append(r.queues, newRedisQueue(topic, redisClient, handler)) + + return r, nil +} diff --git a/api/internal/mqs/zzz/handler.go b/api/internal/mqs/zzz/handler.go deleted file mode 100644 index 127e1274..00000000 --- a/api/internal/mqs/zzz/handler.go +++ /dev/null @@ -1,4 +0,0 @@ -package zzz - -// Handler 返回值代表消息是否消费成功 -type Handler func(msg *Msg) error diff --git a/api/internal/mqs/zzz/msg.go b/api/internal/mqs/zzz/msg.go deleted file mode 100644 index 22191de6..00000000 --- a/api/internal/mqs/zzz/msg.go +++ /dev/null @@ -1,11 +0,0 @@ -package zzz - -// Msg 消息 -type Msg struct { - ID string // 消息的编号 - Topic string // 消息的主题 - Body []byte // 消息的Body - Partition int // 分区号 - Group string // 消费者组 - Consumer string // 消费者组里的消费者 -} diff --git a/api/internal/mqs/zzz/queue.go b/api/internal/mqs/zzz/queue.go deleted file mode 100644 index d8c4b36f..00000000 --- a/api/internal/mqs/zzz/queue.go +++ /dev/null @@ -1,71 +0,0 @@ -package zzz - -import ( - "context" - "fmt" - "github.com/pkg/errors" - "github.com/redis/go-redis/v9" - "strconv" - "strings" -) - -type ( - MsgQueue interface { - Consume(ctx context.Context, topic string, partition int, h Handler) error - SendMsg(ctx context.Context, msg *Msg) error - ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error - } - defaultMsgQueue struct { - // Redis客户端 - client *redis.Client - } -) - -func NewMsgQueue(client *redis.Client) MsgQueue { - return &defaultMsgQueue{client: client} -} - -// SendMsg 发送消息 -func (mq *defaultMsgQueue) SendMsg(ctx context.Context, msg *Msg) error { - return mq.client.Publish(ctx, mq.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err() -} - -// Consume 返回值代表消费过程中遇到的无法处理的错误 -func (mq *defaultMsgQueue) Consume(ctx context.Context, topic string, partition int, h Handler) error { - // 订阅频道 - channel := mq.client.Subscribe(ctx, mq.partitionTopic(topic, partition)).Channel() - for msg := range channel { - // 处理消息 - h(&Msg{ - Topic: topic, - Body: []byte(msg.Payload), - Partition: partition, - }) - } - return errors.New("channel closed") -} - -// ConsumeMultiPartitions 返回值代表消费过程中遇到的无法处理的错误 -func (mq *defaultMsgQueue) ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error { - // 订阅频道 - channels := make([]string, len(partitions)) - for i, partition := range partitions { - channels[i] = mq.partitionTopic(topic, partition) - } - channel := mq.client.Subscribe(ctx, channels...).Channel() - for msg := range channel { - // 处理消息 - _, partitionString, _ := strings.Cut(msg.Channel, ":") - partition, _ := strconv.Atoi(partitionString) - h(&Msg{ - Topic: topic, - Body: []byte(msg.Payload), - Partition: partition, - }) - } - return errors.New("channels closed") -} - -func (mq *defaultMsgQueue) partitionTopic(topic string, partition int) string { - return fmt.Sprintf("%s:%d", topic, partition) -} diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index 4cc1287a..d8314df5 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -12,7 +12,6 @@ import ( "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-participant-ac/hpcacclient" "gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient" @@ -44,7 +43,6 @@ type ServiceContext struct { Downloader *s3manager.Downloader Uploader *s3manager.Uploader K8sRpc map[int64]kubernetesclient.Kubernetes - Mq zzz.MsgQueue } func NewServiceContext(c config.Config) *ServiceContext { @@ -80,22 +78,18 @@ func NewServiceContext(c config.Config) *ServiceContext { Password: c.Redis.Pass, }) return &ServiceContext{ - Cron: cron.New(cron.WithSeconds()), - DbEngin: dbEngin, - Config: c, - RedisClient: redisClient, - Mq: zzz.NewMsgQueue(redisClient), - ScheduleHpcClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.HpcTopic), - ScheduleCloudClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.CloudTopic), - ScheduleAiClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.AiTopic), - ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)), - CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)), - ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), - OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), - OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), - K8sRpc: make(map[int64]kubernetesclient.Kubernetes), - DockerClient: dockerClient, - Downloader: downloader, - Uploader: uploader, + Cron: cron.New(cron.WithSeconds()), + DbEngin: dbEngin, + Config: c, + RedisClient: redisClient, + ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)), + CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)), + ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), + OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), + OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), + K8sRpc: make(map[int64]kubernetesclient.Kubernetes), + DockerClient: dockerClient, + Downloader: downloader, + Uploader: uploader, } } diff --git a/api/pcm.go b/api/pcm.go index b8f75f53..d32ea51d 100644 --- a/api/pcm.go +++ b/api/pcm.go @@ -3,7 +3,6 @@ package main import ( "context" "flag" - "fmt" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/service" @@ -11,10 +10,9 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/cron" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos" - "strconv" ) var configFile = flag.String("f", "api/etc/pcm.yaml", "the config file") @@ -56,21 +54,14 @@ func main() { handler.RegisterHandlers(server, ctx) serviceGroup.Add(server) - - go ctx.Mq.Consume(context.Background(), "test", 0, func(msg *zzz.Msg) error { - fmt.Printf("consume partiton0: %+v\n", string(msg.Body)) - - return nil - }) - for i := 1; i < 10; i++ { - - ctx.Mq.SendMsg(context.Background(), &zzz.Msg{ - Topic: "test", - Body: []byte(strconv.Itoa(i)), - Partition: i % 2, - }) + services := []service.Service{ + mqs.MustNewQueue("ai", ctx.RedisClient, mqs.NewAiMq(context.Background(), ctx)), + mqs.MustNewQueue("cloud", ctx.RedisClient, mqs.NewCloudMq(context.Background(), ctx)), + mqs.MustNewQueue("hpc", ctx.RedisClient, mqs.NewHpcMq(context.Background(), ctx)), + } + for _, mq := range services { + serviceGroup.Add(mq) } - logx.Infof("Starting server at %s:%d...\n", c.Host, c.Port) serviceGroup.Start() From bdb1ad3d59b71ada208eedac64ae5dc35799f6c7 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Mon, 16 Oct 2023 10:27:49 +0800 Subject: [PATCH 3/3] =?UTF-8?q?redis=E9=98=9F=E5=88=97=E6=9B=BF=E6=8D=A2ka?= =?UTF-8?q?fka?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 744a11a0d88e451f77fd3ea525f219e968bba701 --- api/desc/ai/pcm-ai.api | 2 +- .../logic/core/scheduletaskbyyamllogic.go | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/api/desc/ai/pcm-ai.api b/api/desc/ai/pcm-ai.api index d41f7f4a..da96190a 100644 --- a/api/desc/ai/pcm-ai.api +++ b/api/desc/ai/pcm-ai.api @@ -341,7 +341,7 @@ type ImportTasks { TotalFileCount uint32 `json:"totalFileCount,omitempty"` CreateTime uint32 `json:"createTime,omitempty"` ElapsedTime uint32 `json:"elapsedTime,omitempty"` - AnnotationFormatConfig []interface{} `json:"annotationFormatConfig,omitempty"` + AnnotationFormatConfig []interface{} `json:"annotationFormatConfig,omitempty"` } /******************taskList end*************************/ /******************ListTrainingJobs start*************************/ diff --git a/api/internal/logic/core/scheduletaskbyyamllogic.go b/api/internal/logic/core/scheduletaskbyyamllogic.go index f1350c7b..763cd798 100644 --- a/api/internal/logic/core/scheduletaskbyyamllogic.go +++ b/api/internal/logic/core/scheduletaskbyyamllogic.go @@ -50,14 +50,13 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa // 遍历子任务放入任务队列中 for _, task := range req.Tasks { task.TaskId = taskModel.Id - //taskBytes, err := json.Marshal(task) - //if err != nil { - // return err - //} - //l.svcCtx.RedisClient.Publish(context.Background(), "test", &mqs.Msg{ - // Topic: task.TaskType, - // Body: taskBytes, - //}) + // 将任务数据转换成消息体 + reqMessage, err := json.Marshal(task) + if err != nil { + logx.Error(err) + return err + } + l.svcCtx.RedisClient.Publish(context.Background(), task.TaskType, reqMessage) } return nil }