diff --git a/api/internal/logic/core/scheduletaskbyyamllogic.go b/api/internal/logic/core/scheduletaskbyyamllogic.go index 6c2d55bf..f1350c7b 100644 --- a/api/internal/logic/core/scheduletaskbyyamllogic.go +++ b/api/internal/logic/core/scheduletaskbyyamllogic.go @@ -51,10 +51,10 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa for _, task := range req.Tasks { task.TaskId = taskModel.Id //taskBytes, err := json.Marshal(task) - if err != nil { - return err - } - //l.svcCtx.FrontMQ.SendMsg(l.ctx, &mq.Msg{ + //if err != nil { + // return err + //} + //l.svcCtx.RedisClient.Publish(context.Background(), "test", &mqs.Msg{ // Topic: task.TaskType, // Body: taskBytes, //}) diff --git a/api/internal/mqs/mq/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go similarity index 77% rename from api/internal/mqs/mq/ScheduleAi.go rename to api/internal/mqs/ScheduleAi.go index d102a049..8446c955 100644 --- a/api/internal/mqs/mq/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -1,4 +1,4 @@ -package mq +package mqs import ( "context" @@ -10,19 +10,19 @@ import ( * Listening to the payment flow status change notification message queue */ -type ScheduleAiMq struct { +type AiQueue struct { ctx context.Context svcCtx *svc.ServiceContext } -func NewScheduleAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleAiMq { - return &ScheduleAiMq{ +func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { + return &AiQueue{ ctx: ctx, svcCtx: svcCtx, } } -func (l *ScheduleAiMq) Consume(_, val string) error { +func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 aiSchdl := scheduler2.NewAiScheduler(val) schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin) diff --git a/api/internal/mqs/mq/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go similarity index 76% rename from api/internal/mqs/mq/ScheduleCloud.go rename to api/internal/mqs/ScheduleCloud.go index a1a1e70e..e24677a2 100644 --- a/api/internal/mqs/mq/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -1,4 +1,4 @@ -package mq +package mqs import ( "context" @@ -10,19 +10,19 @@ import ( * Listening to the payment flow status change notification message queue */ -type ScheduleCloudMq struct { +type CloudMq struct { ctx context.Context svcCtx *svc.ServiceContext } -func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCloudMq { - return &ScheduleCloudMq{ +func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq { + return &CloudMq{ ctx: ctx, svcCtx: svcCtx, } } -func (l *ScheduleCloudMq) Consume(_, val string) error { +func (l *CloudMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 cloudScheduler := scheduler2.NewCloudScheduler() schdl, err := scheduler2.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin) diff --git a/api/internal/mqs/mq/ScheduleHpc.go b/api/internal/mqs/ScheduleHpc.go similarity index 77% rename from api/internal/mqs/mq/ScheduleHpc.go rename to api/internal/mqs/ScheduleHpc.go index 8451a528..981626fa 100644 --- a/api/internal/mqs/mq/ScheduleHpc.go +++ b/api/internal/mqs/ScheduleHpc.go @@ -1,4 +1,4 @@ -package mq +package mqs import ( "context" @@ -10,19 +10,19 @@ import ( * Listening to the payment flow status change notification message queue */ -type ScheduleHpcMq struct { +type HpcMq struct { ctx context.Context svcCtx *svc.ServiceContext } -func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleHpcMq { - return &ScheduleHpcMq{ +func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq { + return &HpcMq{ ctx: ctx, svcCtx: svcCtx, } } -func (l *ScheduleHpcMq) Consume(_, val string) error { +func (l *HpcMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 hpcSchdl := scheduler2.NewHpcScheduler(val) schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin) diff --git a/api/internal/mqs/queue.go b/api/internal/mqs/queue.go new file mode 100644 index 00000000..34c77acc --- /dev/null +++ b/api/internal/mqs/queue.go @@ -0,0 +1,115 @@ +package mqs + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/redis/go-redis/v9" + "github.com/zeromicro/go-zero/core/queue" + "github.com/zeromicro/go-zero/core/service" + "github.com/zeromicro/go-zero/core/threading" +) + +type ( + ConsumeHandle func(v string) error + ConsumeHandler interface { + Consume(value string) error + } + redisQueues struct { + queues []queue.MessageQueue + group *service.ServiceGroup + } + redisQueue struct { + topic string + channel chan redis.Message + client *redis.Client + handler ConsumeHandler + consumerRoutines *threading.RoutineGroup + producerRoutines *threading.RoutineGroup + } +) + +func (r *redisQueue) Start() { + r.startConsumers() + r.startProducers() + + r.producerRoutines.Wait() + close(r.channel) + r.consumerRoutines.Wait() +} + +func (r *redisQueue) Stop() { +} + +func (r redisQueues) Start() { + for _, each := range r.queues { + r.group.Add(each) + } + r.group.Start() +} + +func (r redisQueues) Stop() { + r.group.Stop() +} + +func (r *redisQueue) startConsumers() { + r.consumerRoutines.Run(func() { + for message := range r.channel { + if err := r.consumeOne(message.Payload); err != nil { + fmt.Errorf("consume: %s, error: %v", message.Payload, err) + } + } + }) + +} + +func (r *redisQueue) consumeOne(value string) error { + err := r.handler.Consume(value) + return err +} + +func (r *redisQueue) startProducers() { + r.producerRoutines.Run(func() { + for { + channel := r.client.Subscribe(context.Background(), r.topic).Channel() + for msg := range channel { + fmt.Println("生产者获取的值:", msg.Payload) + r.channel <- *msg + } + + } + }) + +} + +func newRedisQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue { + return &redisQueue{ + topic: topic, + client: redisClient, + channel: make(chan redis.Message), + producerRoutines: threading.NewRoutineGroup(), + consumerRoutines: threading.NewRoutineGroup(), + handler: handler} +} + +func MustNewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue { + q, err := NewQueue(topic, redisClient, handler) + if err != nil { + fmt.Println("NewQueue报错") + } + + return q +} + +func NewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) (queue.MessageQueue, error) { + if len(topic) == 0 { + return nil, errors.New("topic不能为空") + } + + r := redisQueues{ + group: service.NewServiceGroup(), + } + r.queues = append(r.queues, newRedisQueue(topic, redisClient, handler)) + + return r, nil +} diff --git a/api/internal/mqs/zzz/handler.go b/api/internal/mqs/zzz/handler.go deleted file mode 100644 index 127e1274..00000000 --- a/api/internal/mqs/zzz/handler.go +++ /dev/null @@ -1,4 +0,0 @@ -package zzz - -// Handler 返回值代表消息是否消费成功 -type Handler func(msg *Msg) error diff --git a/api/internal/mqs/zzz/msg.go b/api/internal/mqs/zzz/msg.go deleted file mode 100644 index 22191de6..00000000 --- a/api/internal/mqs/zzz/msg.go +++ /dev/null @@ -1,11 +0,0 @@ -package zzz - -// Msg 消息 -type Msg struct { - ID string // 消息的编号 - Topic string // 消息的主题 - Body []byte // 消息的Body - Partition int // 分区号 - Group string // 消费者组 - Consumer string // 消费者组里的消费者 -} diff --git a/api/internal/mqs/zzz/queue.go b/api/internal/mqs/zzz/queue.go deleted file mode 100644 index d8c4b36f..00000000 --- a/api/internal/mqs/zzz/queue.go +++ /dev/null @@ -1,71 +0,0 @@ -package zzz - -import ( - "context" - "fmt" - "github.com/pkg/errors" - "github.com/redis/go-redis/v9" - "strconv" - "strings" -) - -type ( - MsgQueue interface { - Consume(ctx context.Context, topic string, partition int, h Handler) error - SendMsg(ctx context.Context, msg *Msg) error - ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error - } - defaultMsgQueue struct { - // Redis客户端 - client *redis.Client - } -) - -func NewMsgQueue(client *redis.Client) MsgQueue { - return &defaultMsgQueue{client: client} -} - -// SendMsg 发送消息 -func (mq *defaultMsgQueue) SendMsg(ctx context.Context, msg *Msg) error { - return mq.client.Publish(ctx, mq.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err() -} - -// Consume 返回值代表消费过程中遇到的无法处理的错误 -func (mq *defaultMsgQueue) Consume(ctx context.Context, topic string, partition int, h Handler) error { - // 订阅频道 - channel := mq.client.Subscribe(ctx, mq.partitionTopic(topic, partition)).Channel() - for msg := range channel { - // 处理消息 - h(&Msg{ - Topic: topic, - Body: []byte(msg.Payload), - Partition: partition, - }) - } - return errors.New("channel closed") -} - -// ConsumeMultiPartitions 返回值代表消费过程中遇到的无法处理的错误 -func (mq *defaultMsgQueue) ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error { - // 订阅频道 - channels := make([]string, len(partitions)) - for i, partition := range partitions { - channels[i] = mq.partitionTopic(topic, partition) - } - channel := mq.client.Subscribe(ctx, channels...).Channel() - for msg := range channel { - // 处理消息 - _, partitionString, _ := strings.Cut(msg.Channel, ":") - partition, _ := strconv.Atoi(partitionString) - h(&Msg{ - Topic: topic, - Body: []byte(msg.Payload), - Partition: partition, - }) - } - return errors.New("channels closed") -} - -func (mq *defaultMsgQueue) partitionTopic(topic string, partition int) string { - return fmt.Sprintf("%s:%d", topic, partition) -} diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index 4cc1287a..d8314df5 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -12,7 +12,6 @@ import ( "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz" "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils" "gitlink.org.cn/jcce-pcm/pcm-participant-ac/hpcacclient" "gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient" @@ -44,7 +43,6 @@ type ServiceContext struct { Downloader *s3manager.Downloader Uploader *s3manager.Uploader K8sRpc map[int64]kubernetesclient.Kubernetes - Mq zzz.MsgQueue } func NewServiceContext(c config.Config) *ServiceContext { @@ -80,22 +78,18 @@ func NewServiceContext(c config.Config) *ServiceContext { Password: c.Redis.Pass, }) return &ServiceContext{ - Cron: cron.New(cron.WithSeconds()), - DbEngin: dbEngin, - Config: c, - RedisClient: redisClient, - Mq: zzz.NewMsgQueue(redisClient), - ScheduleHpcClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.HpcTopic), - ScheduleCloudClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.CloudTopic), - ScheduleAiClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.AiTopic), - ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)), - CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)), - ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), - OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), - OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), - K8sRpc: make(map[int64]kubernetesclient.Kubernetes), - DockerClient: dockerClient, - Downloader: downloader, - Uploader: uploader, + Cron: cron.New(cron.WithSeconds()), + DbEngin: dbEngin, + Config: c, + RedisClient: redisClient, + ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)), + CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)), + ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), + OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), + OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), + K8sRpc: make(map[int64]kubernetesclient.Kubernetes), + DockerClient: dockerClient, + Downloader: downloader, + Uploader: uploader, } } diff --git a/api/pcm.go b/api/pcm.go index b8f75f53..d32ea51d 100644 --- a/api/pcm.go +++ b/api/pcm.go @@ -3,7 +3,6 @@ package main import ( "context" "flag" - "fmt" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/service" @@ -11,10 +10,9 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/cron" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler" - "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos" - "strconv" ) var configFile = flag.String("f", "api/etc/pcm.yaml", "the config file") @@ -56,21 +54,14 @@ func main() { handler.RegisterHandlers(server, ctx) serviceGroup.Add(server) - - go ctx.Mq.Consume(context.Background(), "test", 0, func(msg *zzz.Msg) error { - fmt.Printf("consume partiton0: %+v\n", string(msg.Body)) - - return nil - }) - for i := 1; i < 10; i++ { - - ctx.Mq.SendMsg(context.Background(), &zzz.Msg{ - Topic: "test", - Body: []byte(strconv.Itoa(i)), - Partition: i % 2, - }) + services := []service.Service{ + mqs.MustNewQueue("ai", ctx.RedisClient, mqs.NewAiMq(context.Background(), ctx)), + mqs.MustNewQueue("cloud", ctx.RedisClient, mqs.NewCloudMq(context.Background(), ctx)), + mqs.MustNewQueue("hpc", ctx.RedisClient, mqs.NewHpcMq(context.Background(), ctx)), + } + for _, mq := range services { + serviceGroup.Add(mq) } - logx.Infof("Starting server at %s:%d...\n", c.Host, c.Port) serviceGroup.Start()