diff --git a/api/desc/ai/pcm-ai.api b/api/desc/ai/pcm-ai.api index d41f7f4a..da96190a 100644 --- a/api/desc/ai/pcm-ai.api +++ b/api/desc/ai/pcm-ai.api @@ -341,7 +341,7 @@ type ImportTasks { TotalFileCount uint32 `json:"totalFileCount,omitempty"` CreateTime uint32 `json:"createTime,omitempty"` ElapsedTime uint32 `json:"elapsedTime,omitempty"` - AnnotationFormatConfig []interface{} `json:"annotationFormatConfig,omitempty"` + AnnotationFormatConfig []interface{} `json:"annotationFormatConfig,omitempty"` } /******************taskList end*************************/ /******************ListTrainingJobs start*************************/ diff --git a/api/internal/cron/participant.go b/api/internal/cron/participant.go index 76f02d89..1d1026e6 100644 --- a/api/internal/cron/participant.go +++ b/api/internal/cron/participant.go @@ -8,9 +8,6 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient" ) -type client struct { -} - func SyncParticipantRpc(svc *svc.ServiceContext) { // 查询出所有p端信息 var participants []*models.ScParticipantPhyInfo diff --git a/api/internal/logic/core/scheduletaskbyyamllogic.go b/api/internal/logic/core/scheduletaskbyyamllogic.go index 6b5dbf7e..763cd798 100644 --- a/api/internal/logic/core/scheduletaskbyyamllogic.go +++ b/api/internal/logic/core/scheduletaskbyyamllogic.go @@ -56,14 +56,7 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa logx.Error(err) return err } - switch task.TaskType { - case "cloud": - l.svcCtx.ScheduleCloudClient.Push(string(reqMessage)) - case "hpc": - l.svcCtx.ScheduleHpcClient.Push(string(reqMessage)) - case "ai": - l.svcCtx.ScheduleAiClient.Push(string(reqMessage)) - } + l.svcCtx.RedisClient.Publish(context.Background(), task.TaskType, reqMessage) } return nil } diff --git a/api/internal/mqs/kq/ScheduleAi.go b/api/internal/mqs/ScheduleAi.go similarity index 77% rename from api/internal/mqs/kq/ScheduleAi.go rename to api/internal/mqs/ScheduleAi.go index fa01a01f..8446c955 100644 --- a/api/internal/mqs/kq/ScheduleAi.go +++ b/api/internal/mqs/ScheduleAi.go @@ -1,4 +1,4 @@ -package kq +package mqs import ( "context" @@ -10,19 +10,19 @@ import ( * Listening to the payment flow status change notification message queue */ -type ScheduleAiMq struct { +type AiQueue struct { ctx context.Context svcCtx *svc.ServiceContext } -func NewScheduleAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleAiMq { - return &ScheduleAiMq{ +func NewAiMq(ctx context.Context, svcCtx *svc.ServiceContext) *AiQueue { + return &AiQueue{ ctx: ctx, svcCtx: svcCtx, } } -func (l *ScheduleAiMq) Consume(_, val string) error { +func (l *AiQueue) Consume(val string) error { // 接受消息, 根据标签筛选过滤 aiSchdl := scheduler2.NewAiScheduler(val) schdl, err := scheduler2.NewScheduler(aiSchdl, val, l.svcCtx.DbEngin) diff --git a/api/internal/mqs/kq/ScheduleCloud.go b/api/internal/mqs/ScheduleCloud.go similarity index 76% rename from api/internal/mqs/kq/ScheduleCloud.go rename to api/internal/mqs/ScheduleCloud.go index 2c080d6c..e24677a2 100644 --- a/api/internal/mqs/kq/ScheduleCloud.go +++ b/api/internal/mqs/ScheduleCloud.go @@ -1,4 +1,4 @@ -package kq +package mqs import ( "context" @@ -10,19 +10,19 @@ import ( * Listening to the payment flow status change notification message queue */ -type ScheduleCloudMq struct { +type CloudMq struct { ctx context.Context svcCtx *svc.ServiceContext } -func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCloudMq { - return &ScheduleCloudMq{ +func NewCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *CloudMq { + return &CloudMq{ ctx: ctx, svcCtx: svcCtx, } } -func (l *ScheduleCloudMq) Consume(_, val string) error { +func (l *CloudMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 cloudScheduler := scheduler2.NewCloudScheduler() schdl, err := scheduler2.NewScheduler(cloudScheduler, val, l.svcCtx.DbEngin) diff --git a/api/internal/mqs/kq/ScheduleHpc.go b/api/internal/mqs/ScheduleHpc.go similarity index 77% rename from api/internal/mqs/kq/ScheduleHpc.go rename to api/internal/mqs/ScheduleHpc.go index 825b03af..981626fa 100644 --- a/api/internal/mqs/kq/ScheduleHpc.go +++ b/api/internal/mqs/ScheduleHpc.go @@ -1,4 +1,4 @@ -package kq +package mqs import ( "context" @@ -10,19 +10,19 @@ import ( * Listening to the payment flow status change notification message queue */ -type ScheduleHpcMq struct { +type HpcMq struct { ctx context.Context svcCtx *svc.ServiceContext } -func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleHpcMq { - return &ScheduleHpcMq{ +func NewHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *HpcMq { + return &HpcMq{ ctx: ctx, svcCtx: svcCtx, } } -func (l *ScheduleHpcMq) Consume(_, val string) error { +func (l *HpcMq) Consume(val string) error { // 接受消息, 根据标签筛选过滤 hpcSchdl := scheduler2.NewHpcScheduler(val) schdl, err := scheduler2.NewScheduler(hpcSchdl, val, l.svcCtx.DbEngin) diff --git a/api/internal/mqs/queue.go b/api/internal/mqs/queue.go new file mode 100644 index 00000000..34c77acc --- /dev/null +++ b/api/internal/mqs/queue.go @@ -0,0 +1,115 @@ +package mqs + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/redis/go-redis/v9" + "github.com/zeromicro/go-zero/core/queue" + "github.com/zeromicro/go-zero/core/service" + "github.com/zeromicro/go-zero/core/threading" +) + +type ( + ConsumeHandle func(v string) error + ConsumeHandler interface { + Consume(value string) error + } + redisQueues struct { + queues []queue.MessageQueue + group *service.ServiceGroup + } + redisQueue struct { + topic string + channel chan redis.Message + client *redis.Client + handler ConsumeHandler + consumerRoutines *threading.RoutineGroup + producerRoutines *threading.RoutineGroup + } +) + +func (r *redisQueue) Start() { + r.startConsumers() + r.startProducers() + + r.producerRoutines.Wait() + close(r.channel) + r.consumerRoutines.Wait() +} + +func (r *redisQueue) Stop() { +} + +func (r redisQueues) Start() { + for _, each := range r.queues { + r.group.Add(each) + } + r.group.Start() +} + +func (r redisQueues) Stop() { + r.group.Stop() +} + +func (r *redisQueue) startConsumers() { + r.consumerRoutines.Run(func() { + for message := range r.channel { + if err := r.consumeOne(message.Payload); err != nil { + fmt.Errorf("consume: %s, error: %v", message.Payload, err) + } + } + }) + +} + +func (r *redisQueue) consumeOne(value string) error { + err := r.handler.Consume(value) + return err +} + +func (r *redisQueue) startProducers() { + r.producerRoutines.Run(func() { + for { + channel := r.client.Subscribe(context.Background(), r.topic).Channel() + for msg := range channel { + fmt.Println("生产者获取的值:", msg.Payload) + r.channel <- *msg + } + + } + }) + +} + +func newRedisQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue { + return &redisQueue{ + topic: topic, + client: redisClient, + channel: make(chan redis.Message), + producerRoutines: threading.NewRoutineGroup(), + consumerRoutines: threading.NewRoutineGroup(), + handler: handler} +} + +func MustNewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) queue.MessageQueue { + q, err := NewQueue(topic, redisClient, handler) + if err != nil { + fmt.Println("NewQueue报错") + } + + return q +} + +func NewQueue(topic string, redisClient *redis.Client, handler ConsumeHandler) (queue.MessageQueue, error) { + if len(topic) == 0 { + return nil, errors.New("topic不能为空") + } + + r := redisQueues{ + group: service.NewServiceGroup(), + } + r.queues = append(r.queues, newRedisQueue(topic, redisClient, handler)) + + return r, nil +} diff --git a/api/internal/svc/servicecontext.go b/api/internal/svc/servicecontext.go index f8b761bf..d8314df5 100644 --- a/api/internal/svc/servicecontext.go +++ b/api/internal/svc/servicecontext.go @@ -6,7 +6,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/docker/docker/client" - "github.com/go-redis/redis" + "github.com/redis/go-redis/v9" "github.com/robfig/cron/v3" "github.com/zeromicro/go-queue/kq" "github.com/zeromicro/go-zero/core/logx" @@ -73,25 +73,23 @@ func NewServiceContext(c config.Config) *ServiceContext { logx.Error(err.Error()) return nil } + redisClient := redis.NewClient(&redis.Options{ + Addr: c.Redis.Host, + Password: c.Redis.Pass, + }) return &ServiceContext{ - Cron: cron.New(cron.WithSeconds()), - DbEngin: dbEngin, - Config: c, - RedisClient: redis.NewClient(&redis.Options{ - Addr: c.Redis.Host, - Password: c.Redis.Pass, - }), - ScheduleHpcClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.HpcTopic), - ScheduleCloudClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.CloudTopic), - ScheduleAiClient: kq.NewPusher(c.KqProducerConf.Brokers, c.KqProducerConf.AiTopic), - ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)), - CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)), - ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), - OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), - OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), - K8sRpc: make(map[int64]kubernetesclient.Kubernetes), - DockerClient: dockerClient, - Downloader: downloader, - Uploader: uploader, + Cron: cron.New(cron.WithSeconds()), + DbEngin: dbEngin, + Config: c, + RedisClient: redisClient, + ModelArtsRpc: modelartsclient.NewModelArts(zrpc.MustNewClient(c.ModelArtsRpcConf)), + CephRpc: cephclient.NewCeph(zrpc.MustNewClient(c.CephRpcConf)), + ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), + OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), + OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), + K8sRpc: make(map[int64]kubernetesclient.Kubernetes), + DockerClient: dockerClient, + Downloader: downloader, + Uploader: uploader, } } diff --git a/api/pcm.go b/api/pcm.go index 08739034..d32ea51d 100644 --- a/api/pcm.go +++ b/api/pcm.go @@ -3,7 +3,6 @@ package main import ( "context" "flag" - "github.com/zeromicro/go-queue/kq" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/service" @@ -11,7 +10,7 @@ import ( "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/cron" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/handler" - kq3 "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs/kq" + "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/mqs" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos" ) @@ -56,12 +55,9 @@ func main() { serviceGroup.Add(server) services := []service.Service{ - //Listening for changes in consumption flow status - kq.MustNewQueue(c.HpcConsumerConf, kq3.NewScheduleHpcMq(context.Background(), ctx)), - kq.MustNewQueue(c.CloudConsumerConf, kq3.NewScheduleCloudMq(context.Background(), ctx)), - kq.MustNewQueue(c.AiConsumerConf, kq3.NewScheduleAiMq(context.Background(), ctx)), - - //..... + mqs.MustNewQueue("ai", ctx.RedisClient, mqs.NewAiMq(context.Background(), ctx)), + mqs.MustNewQueue("cloud", ctx.RedisClient, mqs.NewCloudMq(context.Background(), ctx)), + mqs.MustNewQueue("hpc", ctx.RedisClient, mqs.NewHpcMq(context.Background(), ctx)), } for _, mq := range services { serviceGroup.Add(mq) diff --git a/go.mod b/go.mod index baa5d7da..3226abd1 100644 --- a/go.mod +++ b/go.mod @@ -99,6 +99,7 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect + github.com/redis/go-redis/v9 v9.2.1 // indirect github.com/segmentio/kafka-go v0.4.38 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect diff --git a/go.sum b/go.sum index 1675478c..654548e8 100644 --- a/go.sum +++ b/go.sum @@ -944,6 +944,8 @@ github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPH github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= +github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=