diff --git a/api/internal/cron/participant.go b/api/internal/cron/participant.go index 76f02d89..1d1026e6 100644 --- a/api/internal/cron/participant.go +++ b/api/internal/cron/participant.go @@ -8,9 +8,6 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient" ) -type client struct { -} - func SyncParticipantRpc(svc *svc.ServiceContext) { // 查询出所有p端信息 var participants []*models.ScParticipantPhyInfo diff --git a/api/internal/logic/core/scheduletaskbyyamllogic.go b/api/internal/logic/core/scheduletaskbyyamllogic.go index 6b5dbf7e..6c2d55bf 100644 --- a/api/internal/logic/core/scheduletaskbyyamllogic.go +++ b/api/internal/logic/core/scheduletaskbyyamllogic.go @@ -50,20 +50,14 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa // 遍历子任务放入任务队列中 for _, task := range req.Tasks { task.TaskId = taskModel.Id - // 将任务数据转换成消息体 - reqMessage, err := json.Marshal(task) + //taskBytes, err := json.Marshal(task) if err != nil { - logx.Error(err) return err } - switch task.TaskType { - case "cloud": - l.svcCtx.ScheduleCloudClient.Push(string(reqMessage)) - case "hpc": - l.svcCtx.ScheduleHpcClient.Push(string(reqMessage)) - case "ai": - l.svcCtx.ScheduleAiClient.Push(string(reqMessage)) - } + //l.svcCtx.FrontMQ.SendMsg(l.ctx, &mq.Msg{ + // Topic: task.TaskType, + // Body: taskBytes, + //}) } return nil } diff --git a/api/internal/mqs/kq/ScheduleAi.go b/api/internal/mqs/mq/ScheduleAi.go similarity index 98% rename from api/internal/mqs/kq/ScheduleAi.go rename to api/internal/mqs/mq/ScheduleAi.go index fa01a01f..d102a049 100644 --- a/api/internal/mqs/kq/ScheduleAi.go +++ b/api/internal/mqs/mq/ScheduleAi.go @@ -1,4 +1,4 @@ -package kq +package mq import ( "context" diff --git a/api/internal/mqs/kq/ScheduleCloud.go b/api/internal/mqs/mq/ScheduleCloud.go similarity index 98% rename from api/internal/mqs/kq/ScheduleCloud.go rename to api/internal/mqs/mq/ScheduleCloud.go index 2c080d6c..a1a1e70e 100644 --- a/api/internal/mqs/kq/ScheduleCloud.go +++ b/api/internal/mqs/mq/ScheduleCloud.go @@ -1,4 +1,4 @@ -package kq +package mq import ( "context" diff --git a/api/internal/mqs/kq/ScheduleHpc.go b/api/internal/mqs/mq/ScheduleHpc.go similarity index 98% rename from api/internal/mqs/kq/ScheduleHpc.go rename to api/internal/mqs/mq/ScheduleHpc.go index 825b03af..8451a528 100644 --- a/api/internal/mqs/kq/ScheduleHpc.go +++ b/api/internal/mqs/mq/ScheduleHpc.go @@ -1,4 +1,4 @@ -package kq +package mq import ( "context" diff --git a/api/internal/mqs/zzz/handler.go b/api/internal/mqs/zzz/handler.go new file mode 100644 index 00000000..127e1274 --- /dev/null +++ b/api/internal/mqs/zzz/handler.go @@ -0,0 +1,4 @@ +package zzz + +// Handler 返回值代表消息是否消费成功 +type Handler func(msg *Msg) error diff --git a/api/internal/mqs/zzz/msg.go b/api/internal/mqs/zzz/msg.go new file mode 100644 index 00000000..22191de6 --- /dev/null +++ b/api/internal/mqs/zzz/msg.go @@ -0,0 +1,11 @@ +package zzz + +// Msg 消息 +type Msg struct { + ID string // 消息的编号 + Topic string // 消息的主题 + Body []byte // 消息的Body + Partition int // 分区号 + Group string // 消费者组 + Consumer string // 消费者组里的消费者 +} diff --git a/api/internal/mqs/zzz/queue.go b/api/internal/mqs/zzz/queue.go new file mode 100644 index 00000000..d8c4b36f --- /dev/null +++ b/api/internal/mqs/zzz/queue.go @@ -0,0 +1,71 @@ +package zzz + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/redis/go-redis/v9" + "strconv" + "strings" +) + +type ( + MsgQueue interface { + Consume(ctx context.Context, topic string, partition int, h Handler) error + SendMsg(ctx context.Context, msg *Msg) error + ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error + } + defaultMsgQueue struct { + // Redis客户端 + client *redis.Client + } +) + +func NewMsgQueue(client *redis.Client) MsgQueue { + return &defaultMsgQueue{client: client} +} + +// SendMsg 发送消息 +func (mq *defaultMsgQueue) SendMsg(ctx context.Context, msg *Msg) error { + return mq.client.Publish(ctx, mq.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err() +} + +// Consume 返回值代表消费过程中遇到的无法处理的错误 +func (mq *defaultMsgQueue) Consume(ctx context.Context, topic string, partition int, h Handler) error { + // 订阅频道 + channel := mq.client.Subscribe(ctx, mq.partitionTopic(topic, partition)).Channel() + for msg := range channel { + // 处理消息 + h(&Msg{ + Topic: topic, + Body: []byte(msg.Payload), + Partition: partition, + }) + } + return errors.New("channel closed") +} + +// ConsumeMultiPartitions 返回值代表消费过程中遇到的无法处理的错误 +func (mq *defaultMsgQueue) ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error { + // 订阅频道 + channels := make([]string, len(partitions)) + for i, partition := range partitions { + channels[i] = mq.partitionTopic(topic, partition) + } + channel := mq.client.Subscribe(ctx, channels...).Channel() + for msg := range channel { + // 处理消息 + _, partitionString, _ := strings.Cut(msg.Channel, ":") + partition, _ := strconv.Atoi(partitionString) + h(&Msg{ + Topic: topic, + Body: []byte(msg.Payload), + Partition: partition, + }) + } + return errors.New("channels closed") +} + +func (mq *defaultMsgQueue) partitionTopic(topic string, partition int) string { + return fmt.Sprintf("%s:%d", topic, partition) +} diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index f8b761bf..4cc1287a 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -6,12 +6,13 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/docker/docker/client" - "github.com/go-redis/redis" + "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" "github.com/zeromicro/go-queue/kq" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-participant-ac/hpcacclient" "gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient" @@ -43,6 +44,7 @@ type ServiceContext struct { Downloader *s3manager.Downloader Uploader *s3manager.Uploader K8sRpc map[int64]kubernetesclient.Kubernetes + Mq zzz.MsgQueue } func NewServiceContext(c config.Config) *ServiceContext { @@ -73,14 +75,16 @@ func NewServiceContext(c config.Config) *ServiceContext { logx.Error(err.Error()) return nil } + redisClient := redis.NewClient(&redis.Options{ + Addr: c.Redis.Host, + Password: c.Redis.Pass, + }) return &ServiceContext{ - Cron: cron.New(cron.WithSeconds()), - DbEngin: dbEngin, - Config: c, - RedisClient: redis.NewClient(&redis.Options{ - Addr: c.Redis.Host, - Password: c.Redis.Pass, - }), + Cron: cron.New(cron.WithSeconds()), + DbEngin: dbEngin, + Config: c, + RedisClient: redisClient, + Mq: zzz.NewMsgQueue(redisClient), ScheduleHpcClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.HpcTopic), ScheduleCloudClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.CloudTopic), ScheduleAiClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.AiTopic), diff --git a/api/pcm.go b/api/pcm.go index 08739034..b8f75f53 100644 --- a/api/pcm.go +++ b/api/pcm.go @@ -3,7 +3,7 @@ package main import ( "context" "flag" - "github.com/zeromicro/go-queue/kq" + "fmt" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/service" @@ -11,9 +11,10 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/cron" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler" - kq3 "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/kq" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos" + "strconv" ) var configFile = flag.String("f", "api/etc/pcm.yaml", "the config file") @@ -55,17 +56,21 @@ func main() { handler.RegisterHandlers(server, ctx) serviceGroup.Add(server) - services := []service.Service{ - //Listening for changes in consumption flow status - kq.MustNewQueue(c.HpcConsumerConf, kq3.NewScheduleHpcMq(context.Background(), ctx)), - kq.MustNewQueue(c.CloudConsumerConf, kq3.NewScheduleCloudMq(context.Background(), ctx)), - kq.MustNewQueue(c.AiConsumerConf, kq3.NewScheduleAiMq(context.Background(), ctx)), - //..... - } - for _, mq := range services { - serviceGroup.Add(mq) + go ctx.Mq.Consume(context.Background(), "test", 0, func(msg *zzz.Msg) error { + fmt.Printf("consume partiton0: %+v\n", string(msg.Body)) + + return nil + }) + for i := 1; i < 10; i++ { + + ctx.Mq.SendMsg(context.Background(), &zzz.Msg{ + Topic: "test", + Body: []byte(strconv.Itoa(i)), + Partition: i % 2, + }) } + logx.Infof("Starting server at %s:%d...\n", c.Host, c.Port) serviceGroup.Start() diff --git a/go.mod b/go.mod index de0ef4c2..4b08fc5a 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,7 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect + github.com/redis/go-redis/v9 v9.2.1 // indirect github.com/segmentio/kafka-go v0.4.38 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect diff --git a/go.sum b/go.sum index a5c650b2..1c63c99a 100644 --- a/go.sum +++ b/go.sum @@ -944,6 +944,8 @@ github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPH github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= +github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=