Former-commit-id: 933d2d204fa04665b585612b195dfd2dff97b65b
This commit is contained in:
zhangwei 2023-10-10 08:53:10 +08:00
parent 7c61eb220a
commit 67974cf711
12 changed files with 125 additions and 36 deletions

View File

@ -8,9 +8,6 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient"
)
type client struct {
}
func SyncParticipantRpc(svc *svc.ServiceContext) {
// 查询出所有p端信息
var participants []*models.ScParticipantPhyInfo

View File

@ -50,20 +50,14 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa
// 遍历子任务放入任务队列中
for _, task := range req.Tasks {
task.TaskId = taskModel.Id
// 将任务数据转换成消息体
reqMessage, err := json.Marshal(task)
//taskBytes, err := json.Marshal(task)
if err != nil {
logx.Error(err)
return err
}
switch task.TaskType {
case "cloud":
l.svcCtx.ScheduleCloudClient.Push(string(reqMessage))
case "hpc":
l.svcCtx.ScheduleHpcClient.Push(string(reqMessage))
case "ai":
l.svcCtx.ScheduleAiClient.Push(string(reqMessage))
}
//l.svcCtx.FrontMQ.SendMsg(l.ctx, &mq.Msg{
// Topic: task.TaskType,
// Body: taskBytes,
//})
}
return nil
}

View File

@ -1,4 +1,4 @@
package kq
package mq
import (
"context"

View File

@ -1,4 +1,4 @@
package kq
package mq
import (
"context"

View File

@ -1,4 +1,4 @@
package kq
package mq
import (
"context"

View File

@ -0,0 +1,4 @@
package zzz
// Handler 返回值代表消息是否消费成功
type Handler func(msg *Msg) error

View File

@ -0,0 +1,11 @@
package zzz
// Msg 消息
type Msg struct {
ID string // 消息的编号
Topic string // 消息的主题
Body []byte // 消息的Body
Partition int // 分区号
Group string // 消费者组
Consumer string // 消费者组里的消费者
}

View File

@ -0,0 +1,71 @@
package zzz
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"strconv"
"strings"
)
type (
MsgQueue interface {
Consume(ctx context.Context, topic string, partition int, h Handler) error
SendMsg(ctx context.Context, msg *Msg) error
ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error
}
defaultMsgQueue struct {
// Redis客户端
client *redis.Client
}
)
func NewMsgQueue(client *redis.Client) MsgQueue {
return &defaultMsgQueue{client: client}
}
// SendMsg 发送消息
func (mq *defaultMsgQueue) SendMsg(ctx context.Context, msg *Msg) error {
return mq.client.Publish(ctx, mq.partitionTopic(msg.Topic, msg.Partition), msg.Body).Err()
}
// Consume 返回值代表消费过程中遇到的无法处理的错误
func (mq *defaultMsgQueue) Consume(ctx context.Context, topic string, partition int, h Handler) error {
// 订阅频道
channel := mq.client.Subscribe(ctx, mq.partitionTopic(topic, partition)).Channel()
for msg := range channel {
// 处理消息
h(&Msg{
Topic: topic,
Body: []byte(msg.Payload),
Partition: partition,
})
}
return errors.New("channel closed")
}
// ConsumeMultiPartitions 返回值代表消费过程中遇到的无法处理的错误
func (mq *defaultMsgQueue) ConsumeMultiPartitions(ctx context.Context, topic string, partitions []int, h Handler) error {
// 订阅频道
channels := make([]string, len(partitions))
for i, partition := range partitions {
channels[i] = mq.partitionTopic(topic, partition)
}
channel := mq.client.Subscribe(ctx, channels...).Channel()
for msg := range channel {
// 处理消息
_, partitionString, _ := strings.Cut(msg.Channel, ":")
partition, _ := strconv.Atoi(partitionString)
h(&Msg{
Topic: topic,
Body: []byte(msg.Payload),
Partition: partition,
})
}
return errors.New("channels closed")
}
func (mq *defaultMsgQueue) partitionTopic(topic string, partition int) string {
return fmt.Sprintf("%s:%d", topic, partition)
}

View File

@ -6,12 +6,13 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/docker/docker/client"
"github.com/go-redis/redis"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/zrpc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-participant-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient"
@ -43,6 +44,7 @@ type ServiceContext struct {
Downloader *s3manager.Downloader
Uploader *s3manager.Uploader
K8sRpc map[int64]kubernetesclient.Kubernetes
Mq zzz.MsgQueue
}
func NewServiceContext(c config.Config) *ServiceContext {
@ -73,14 +75,16 @@ func NewServiceContext(c config.Config) *ServiceContext {
logx.Error(err.Error())
return nil
}
redisClient := redis.NewClient(&redis.Options{
Addr: c.Redis.Host,
Password: c.Redis.Pass,
})
return &ServiceContext{
Cron: cron.New(cron.WithSeconds()),
DbEngin: dbEngin,
Config: c,
RedisClient: redis.NewClient(&redis.Options{
Addr: c.Redis.Host,
Password: c.Redis.Pass,
}),
Cron: cron.New(cron.WithSeconds()),
DbEngin: dbEngin,
Config: c,
RedisClient: redisClient,
Mq: zzz.NewMsgQueue(redisClient),
ScheduleHpcClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.HpcTopic),
ScheduleCloudClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.CloudTopic),
ScheduleAiClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.AiTopic),

View File

@ -3,7 +3,7 @@ package main
import (
"context"
"flag"
"github.com/zeromicro/go-queue/kq"
"fmt"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/service"
@ -11,9 +11,10 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/cron"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler"
kq3 "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/kq"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/zzz"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos"
"strconv"
)
var configFile = flag.String("f", "api/etc/pcm.yaml", "the config file")
@ -55,17 +56,21 @@ func main() {
handler.RegisterHandlers(server, ctx)
serviceGroup.Add(server)
services := []service.Service{
//Listening for changes in consumption flow status
kq.MustNewQueue(c.HpcConsumerConf, kq3.NewScheduleHpcMq(context.Background(), ctx)),
kq.MustNewQueue(c.CloudConsumerConf, kq3.NewScheduleCloudMq(context.Background(), ctx)),
kq.MustNewQueue(c.AiConsumerConf, kq3.NewScheduleAiMq(context.Background(), ctx)),
//.....
}
for _, mq := range services {
serviceGroup.Add(mq)
go ctx.Mq.Consume(context.Background(), "test", 0, func(msg *zzz.Msg) error {
fmt.Printf("consume partiton0: %+v\n", string(msg.Body))
return nil
})
for i := 1; i < 10; i++ {
ctx.Mq.SendMsg(context.Background(), &zzz.Msg{
Topic: "test",
Body: []byte(strconv.Itoa(i)),
Partition: i % 2,
})
}
logx.Infof("Starting server at %s:%d...\n", c.Host, c.Port)
serviceGroup.Start()

1
go.mod
View File

@ -99,6 +99,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/redis/go-redis/v9 v9.2.1 // indirect
github.com/segmentio/kafka-go v0.4.38 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.etcd.io/etcd/api/v3 v3.5.9 // indirect

2
go.sum
View File

@ -944,6 +944,8 @@ github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPH
github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg=
github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=