redis消息队列替换kafka

Former-commit-id: cb6030d4face691ad530a3bbfb53dac7eaf0676a
This commit is contained in:
zhangwei 2023-10-10 17:20:52 +08:00
parent 67974cf711
commit f66c9b8b7b
10 changed files with 155 additions and 141 deletions

View File

@ -51,10 +51,10 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa
for _, task := range req.Tasks {
task.TaskId = taskModel.Id
//taskBytes, err := json.Marshal(task)
if err != nil {
return err
}
//l.svcCtx.FrontMQ.SendMsg(l.ctx, &mq.Msg{
//if err != nil {
// return err
//}
//l.svcCtx.RedisClient.Publish(context.Background(), "test", &mqs.Msg{
// Topic: task.TaskType,
// Body: taskBytes,
//})

View File

@ -1,4 +1,4 @@
package mq
package mqs
import (
"context"
@ -10,19 +10,19 @@ import (
*
Listening to the payment flow status change notification message queue
*/
type ScheduleAiMq struct {
type AiQueue struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewScheduleAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleAiMq {
return &ScheduleAiMq{
func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue {
return &AiQueue{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ScheduleAiMq) Consume(_, val string) error {
func (l *AiQueue) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
aiSchdl := scheduler2.NewAiScheduler(val)
schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin)

View File

@ -1,4 +1,4 @@
package mq
package mqs
import (
"context"
@ -10,19 +10,19 @@ import (
*
Listening to the payment flow status change notification message queue
*/
type ScheduleCloudMq struct {
type CloudMq struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCloudMq {
return &ScheduleCloudMq{
func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq {
return &CloudMq{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ScheduleCloudMq) Consume(_, val string) error {
func (l *CloudMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
cloudScheduler := scheduler2.NewCloudScheduler()
schdl, err := scheduler2.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin)

View File

@ -1,4 +1,4 @@
package mq
package mqs
import (
"context"
@ -10,19 +10,19 @@ import (
*
Listening to the payment flow status change notification message queue
*/
type ScheduleHpcMq struct {
type HpcMq struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleHpcMq {
return &ScheduleHpcMq{
func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq {
return &HpcMq{
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ScheduleHpcMq) Consume(_, val string) error {
func (l *HpcMq) Consume(val string) error {
// 接受消息, 根据标签筛选过滤
hpcSchdl := scheduler2.NewHpcScheduler(val)
schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin)

115
api/internal/mqs/queue.go Normal file
View File

@ -0,0 +1,115 @@
package mqs
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"github.com/zeromicro/go-zero/core/queue"
"github.com/zeromicro/go-zero/core/service"
"github.com/zeromicro/go-zero/core/threading"
)
type (
ConsumeHandle func(v string) error
ConsumeHandler interface {
Consume(value string) error
}
redisQueues struct {
queues []queue.MessageQueue
group *service.ServiceGroup
}
redisQueue struct {
topic string
channel chan redis.Message
client *redis.Client
handler ConsumeHandler
consumerRoutines *threading.RoutineGroup
producerRoutines *threading.RoutineGroup
}
)
func (r *redisQueue) Start() {
r.startConsumers()
r.startProducers()
r.producerRoutines.Wait()
close(r.channel)
r.consumerRoutines.Wait()
}
func (r *redisQueue) Stop() {
}
func (r redisQueues) Start() {
for _, each := range r.queues {
r.group.Add(each)
}
r.group.Start()
}
func (r redisQueues) Stop() {
r.group.Stop()
}
func (r *redisQueue) startConsumers() {
r.consumerRoutines.Run(func() {
for message := range r.channel {
if err := r.consumeOne(message.Payload); err != nil {
fmt.Errorf("consume: %s, error: %v", message.Payload, err)
}
}
})
}
func (r *redisQueue) consumeOne(value string) error {
err := r.handler.Consume(value)
return err
}
func (r *redisQueue) startProducers() {
r.producerRoutines.Run(func() {
for {
channel := r.client.Subscribe(context.Background(), r.topic).Channel()
for msg := range channel {
fmt.Println("生产者获取的值:", msg.Payload)
r.channel <- *msg
}
}
})
}
func newRedisQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue {
return &redisQueue{
topic: topic,
client: redisClient,
channel: make(chan redis.Message),
producerRoutines: threading.NewRoutineGroup(),
consumerRoutines: threading.NewRoutineGroup(),
handler: handler}
}
func MustNewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue {
q, err := NewQueue(topic, redisClient, handler)
if err != nil {
fmt.Println("NewQueue报错")
}
return q
}
func NewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) (queue.MessageQueue, error) {
if len(topic) == 0 {
return nil, errors.New("topic不能为空")
}
r := redisQueues{
group: service.NewServiceGroup(),
}
r.queues = append(r.queues, newRedisQueue(topic, redisClient, handler))
return r, nil
}

View File

@ -1,4 +0,0 @@
package zzz
// Handler 返回值代表消息是否消费成功
type Handler func(msg *Msg) error

View File

@ -1,11 +0,0 @@
package zzz
// Msg 消息
type Msg struct {
ID string // 消息的编号
Topic string // 消息的主题
Body []byte // 消息的Body
Partition int // 分区号
Group string // 消费者组
Consumer string // 消费者组里的消费者
}

View File

@ -1,71 +0,0 @@
package zzz
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"strconv"
"strings"
)
type (
MsgQueue interface {
Consume(ctx context.Context, topic string, partition int, h Handler) error
SendMsg(ctx context.Context, msg *Msg) error
ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error
}
defaultMsgQueue struct {
// Redis客户端
client *redis.Client
}
)
func NewMsgQueue(client *redis.Client) MsgQueue {
return &defaultMsgQueue{client: client}
}
// SendMsg 发送消息
func (mq *defaultMsgQueue) SendMsg(ctx context.Context, msg *Msg) error {
return mq.client.Publish(ctx, mq.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err()
}
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (mq *defaultMsgQueue) Consume(ctx context.Context, topic string, partition int, h Handler) error {
// 订阅频道
channel := mq.client.Subscribe(ctx, mq.partitionTopic(topic, partition)).Channel()
for msg := range channel {
// 处理消息
h(&Msg{
Topic: topic,
Body: []byte(msg.Payload),
Partition: partition,
})
}
return errors.New("channel closed")
}
// ConsumeMultiPartitions 返回值代表消费过程中遇到的无法处理的错误
func (mq *defaultMsgQueue) ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error {
// 订阅频道
channels := make([]string, len(partitions))
for i, partition := range partitions {
channels[i] = mq.partitionTopic(topic, partition)
}
channel := mq.client.Subscribe(ctx, channels...).Channel()
for msg := range channel {
// 处理消息
_, partitionString, _ := strings.Cut(msg.Channel, ":")
partition, _ := strconv.Atoi(partitionString)
h(&Msg{
Topic: topic,
Body: []byte(msg.Payload),
Partition: partition,
})
}
return errors.New("channels closed")
}
func (mq *defaultMsgQueue) partitionTopic(topic string, partition int) string {
return fmt.Sprintf("%s:%d", topic, partition)
}

View File

@ -12,7 +12,6 @@ import (
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/zrpc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient"
@ -44,7 +43,6 @@ type ServiceContext struct {
Downloader *s3manager.Downloader
Uploader *s3manager.Uploader
K8sRpc map[int64]kubernetesclient.Kubernetes
Mq zzz.MsgQueue
}
func NewServiceContext(c config.Config) *ServiceContext {
@ -80,22 +78,18 @@ func NewServiceContext(c config.Config) *ServiceContext {
Password: c.Redis.Pass,
})
return &ServiceContext{
Cron: cron.New(cron.WithSeconds()),
DbEngin: dbEngin,
Config: c,
RedisClient: redisClient,
Mq: zzz.NewMsgQueue(redisClient),
ScheduleHpcClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.HpcTopic),
ScheduleCloudClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.CloudTopic),
ScheduleAiClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.AiTopic),
ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)),
CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)),
ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)),
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
K8sRpc: make(map[int64]kubernetesclient.Kubernetes),
DockerClient: dockerClient,
Downloader: downloader,
Uploader: uploader,
Cron: cron.New(cron.WithSeconds()),
DbEngin: dbEngin,
Config: c,
RedisClient: redisClient,
ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)),
CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)),
ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)),
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
K8sRpc: make(map[int64]kubernetesclient.Kubernetes),
DockerClient: dockerClient,
Downloader: downloader,
Uploader: uploader,
}
}

View File

@ -3,7 +3,6 @@ package main
import (
"context"
"flag"
"fmt"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/service"
@ -11,10 +10,9 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/cron"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos"
"strconv"
)
var configFile = flag.String("f", "api/etc/pcm.yaml", "the config file")
@ -56,21 +54,14 @@ func main() {
handler.RegisterHandlers(server, ctx)
serviceGroup.Add(server)
go ctx.Mq.Consume(context.Background(), "test", 0, func(msg *zzz.Msg) error {
fmt.Printf("consume partiton0: %+v\n", string(msg.Body))
return nil
})
for i := 1; i < 10; i++ {
ctx.Mq.SendMsg(context.Background(), &zzz.Msg{
Topic: "test",
Body: []byte(strconv.Itoa(i)),
Partition: i % 2,
})
services := []service.Service{
mqs.MustNewQueue("ai", ctx.RedisClient, mqs.NewAiMq(context.Background(), ctx)),
mqs.MustNewQueue("cloud", ctx.RedisClient, mqs.NewCloudMq(context.Background(), ctx)),
mqs.MustNewQueue("hpc", ctx.RedisClient, mqs.NewHpcMq(context.Background(), ctx)),
}
for _, mq := range services {
serviceGroup.Add(mq)
}
logx.Infof("Starting server at %s:%d...\n", c.Host, c.Port)
serviceGroup.Start()