From cc3573b792e93210c3acd929596587d45454102a Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 9 Apr 2024 20:23:04 +0800 Subject: [PATCH] =?UTF-8?q?other:=20=E6=9C=8D=E5=8A=A1=E5=99=A8=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E7=BB=84=E4=BB=B6=E6=8A=BD=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/internal/v2/server.go | 16 +++---- toolkit/message/events/synchronous.go | 37 ---------------- toolkit/message/producer.go | 5 --- toolkit/{message => nexus}/broker.go | 2 +- .../brokers/sparse_goroutine.go | 42 +++++++++---------- toolkit/{message => nexus}/event.go | 2 +- .../{message => nexus}/events/asynchronous.go | 14 +++---- toolkit/nexus/events/synchronous.go | 37 ++++++++++++++++ toolkit/{message => nexus}/queue.go | 2 +- .../queues/non_blocking_rw.go | 20 ++++----- 10 files changed, 86 insertions(+), 91 deletions(-) delete mode 100644 toolkit/message/events/synchronous.go delete mode 100644 toolkit/message/producer.go rename toolkit/{message => nexus}/broker.go (89%) rename toolkit/{message => nexus}/brokers/sparse_goroutine.go (77%) rename toolkit/{message => nexus}/event.go (97%) rename toolkit/{message => nexus}/events/asynchronous.go (84%) create mode 100644 toolkit/nexus/events/synchronous.go rename toolkit/{message => nexus}/queue.go (96%) rename toolkit/{message => nexus}/queues/non_blocking_rw.go (89%) diff --git a/server/internal/v2/server.go b/server/internal/v2/server.go index 39ccf4b..c6d4d63 100644 --- a/server/internal/v2/server.go +++ b/server/internal/v2/server.go @@ -3,10 +3,10 @@ package server import ( "context" "fmt" - "github.com/kercylan98/minotaur/toolkit/message" - "github.com/kercylan98/minotaur/toolkit/message/brokers" - messageEvents "github.com/kercylan98/minotaur/toolkit/message/events" - "github.com/kercylan98/minotaur/toolkit/message/queues" + "github.com/kercylan98/minotaur/toolkit/nexus" + "github.com/kercylan98/minotaur/toolkit/nexus/brokers" + messageEvents "github.com/kercylan98/minotaur/toolkit/nexus/events" + "github.com/kercylan98/minotaur/toolkit/nexus/queues" "github.com/kercylan98/minotaur/utils/collection" "github.com/kercylan98/minotaur/utils/log/v2" "github.com/kercylan98/minotaur/utils/random" @@ -47,7 +47,7 @@ type server struct { ctx context.Context cancel context.CancelFunc network Network - broker message.Broker[int, string] + broker nexus.Broker[int, string] } func NewServer(network Network, options ...*Options) Server { @@ -61,9 +61,9 @@ func NewServer(network Network, options ...*Options) Server { srv.controller = new(controller).init(srv) srv.events = new(events).init(srv) srv.state = new(State).init(srv) - srv.broker = brokers.NewSparseGoroutine(func(index int) message.Queue[int, string] { + srv.broker = brokers.NewSparseGoroutine(func(index int) nexus.Queue[int, string] { return queues.NewNonBlockingRW[int, string](index, 1024*8, 1024) - }, func(handler message.EventExecutor) { + }, func(handler nexus.EventExecutor) { handler() }) srv.Options.init(srv).Apply(options...) @@ -119,7 +119,7 @@ func (s *server) getSysQueue() string { return s.queue } -func (s *server) PublishMessage(topic string, event message.Event[int, string]) { +func (s *server) PublishMessage(topic string, event nexus.Event[int, string]) { s.broker.Publish(topic, event) } diff --git a/toolkit/message/events/synchronous.go b/toolkit/message/events/synchronous.go deleted file mode 100644 index 9d48e40..0000000 --- a/toolkit/message/events/synchronous.go +++ /dev/null @@ -1,37 +0,0 @@ -package events - -import ( - "context" - "github.com/kercylan98/minotaur/toolkit/message" - "time" -) - -type ( - SynchronousHandler func(context.Context) -) - -func Synchronous[I, T comparable](handler SynchronousHandler) message.Event[I, T] { - return &synchronous[I, T]{ - handler: handler, - } -} - -type synchronous[I, T comparable] struct { - ctx context.Context - handler SynchronousHandler -} - -func (s *synchronous[I, T]) OnInitialize(ctx context.Context, broker message.Broker[I, T]) { - s.ctx = ctx -} - -func (s *synchronous[I, T]) OnPublished(topic T, queue message.Queue[I, T]) { - -} - -func (s *synchronous[I, T]) OnProcess(topic T, queue message.Queue[I, T], startAt time.Time) { - s.handler(s.ctx) -} - -func (s *synchronous[I, T]) OnProcessed(topic T, queue message.Queue[I, T], endAt time.Time) { -} diff --git a/toolkit/message/producer.go b/toolkit/message/producer.go deleted file mode 100644 index 3396efd..0000000 --- a/toolkit/message/producer.go +++ /dev/null @@ -1,5 +0,0 @@ -package message - -type Producer[T comparable] interface { - GetTopic() T -} diff --git a/toolkit/message/broker.go b/toolkit/nexus/broker.go similarity index 89% rename from toolkit/message/broker.go rename to toolkit/nexus/broker.go index 6d67962..f20f5e3 100644 --- a/toolkit/message/broker.go +++ b/toolkit/nexus/broker.go @@ -1,4 +1,4 @@ -package message +package nexus // Broker 消息核心的接口定义 type Broker[I, T comparable] interface { diff --git a/toolkit/message/brokers/sparse_goroutine.go b/toolkit/nexus/brokers/sparse_goroutine.go similarity index 77% rename from toolkit/message/brokers/sparse_goroutine.go rename to toolkit/nexus/brokers/sparse_goroutine.go index be5a3f0..2a480bc 100644 --- a/toolkit/message/brokers/sparse_goroutine.go +++ b/toolkit/nexus/brokers/sparse_goroutine.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "github.com/kercylan98/minotaur/toolkit/loadbalancer" - "github.com/kercylan98/minotaur/toolkit/message" + "github.com/kercylan98/minotaur/toolkit/nexus" "runtime" "sync" "sync/atomic" @@ -18,13 +18,13 @@ const ( ) type ( - SparseGoroutineMessageHandler func(handler message.EventExecutor) + SparseGoroutineMessageHandler func(handler nexus.EventExecutor) ) -func NewSparseGoroutine[I, T comparable](queueFactory func(index int) message.Queue[I, T], handler SparseGoroutineMessageHandler) message.Broker[I, T] { +func NewSparseGoroutine[I, T comparable](queueFactory func(index int) nexus.Queue[I, T], handler SparseGoroutineMessageHandler) nexus.Broker[I, T] { s := &SparseGoroutine[I, T]{ - lb: loadbalancer.NewRoundRobin[I, message.Queue[I, T]](), - queues: make(map[I]message.Queue[I, T]), + lb: loadbalancer.NewRoundRobin[I, nexus.Queue[I, T]](), + queues: make(map[I]nexus.Queue[I, T]), state: sparseGoroutineStatusNone, location: make(map[T]I), handler: handler, @@ -47,18 +47,18 @@ func NewSparseGoroutine[I, T comparable](queueFactory func(index int) message.Qu } type SparseGoroutine[I, T comparable] struct { - state int32 // 状态 - queueSize int // 队列管道大小 - queueBufferSize int // 队列缓冲区大小 - queues map[I]message.Queue[I, T] // 所有使用的队列 - queueRW sync.RWMutex // 队列读写锁 - location map[T]I // Topic 所在队列 Id 映射 - locationRW sync.RWMutex // 所在队列 ID 映射锁 - lb *loadbalancer.RoundRobin[I, message.Queue[I, T]] // 负载均衡器 - wg sync.WaitGroup // 等待组 - handler SparseGoroutineMessageHandler // 消息处理器 + state int32 // 状态 + queueSize int // 队列管道大小 + queueBufferSize int // 队列缓冲区大小 + queues map[I]nexus.Queue[I, T] // 所有使用的队列 + queueRW sync.RWMutex // 队列读写锁 + location map[T]I // Topic 所在队列 Id 映射 + locationRW sync.RWMutex // 所在队列 ID 映射锁 + lb *loadbalancer.RoundRobin[I, nexus.Queue[I, T]] // 负载均衡器 + wg sync.WaitGroup // 等待组 + handler SparseGoroutineMessageHandler // 消息处理器 - queueFactory func(index int) message.Queue[I, T] + queueFactory func(index int) nexus.Queue[I, T] } // Run 启动 Reactor,运行队列 @@ -72,12 +72,12 @@ func (s *SparseGoroutine[I, T]) Run() { s.wg.Add(1) go queue.Run() - go func(r *SparseGoroutine[I, T], queue message.Queue[I, T]) { + go func(r *SparseGoroutine[I, T], queue nexus.Queue[I, T]) { defer r.wg.Done() for h := range queue.Consume() { h.Exec( // onProcess - func(topic T, event message.EventExecutor) { + func(topic T, event nexus.EventExecutor) { s.handler(event) }, // onFinish @@ -105,7 +105,7 @@ func (s *SparseGoroutine[I, T]) Close() { var wg sync.WaitGroup wg.Add(len(s.queues)) for _, queue := range s.queues { - go func(queue message.Queue[I, T]) { + go func(queue nexus.Queue[I, T]) { defer wg.Done() queue.Close() }(queue) @@ -117,14 +117,14 @@ func (s *SparseGoroutine[I, T]) Close() { // Publish 将消息分发到特定 topic,当 topic 首次使用时,将会根据负载均衡策略选择一个队列 // - 设置 count 会增加消息的外部计数,当 SparseGoroutine 关闭时会等待外部计数归零 -func (s *SparseGoroutine[I, T]) Publish(topic T, event message.Event[I, T]) error { +func (s *SparseGoroutine[I, T]) Publish(topic T, event nexus.Event[I, T]) error { s.queueRW.RLock() if atomic.LoadInt32(&s.state) > sparseGoroutineStatusClosing { s.queueRW.RUnlock() return fmt.Errorf("broker closing or closed") } - var next message.Queue[I, T] + var next nexus.Queue[I, T] s.locationRW.RLock() i, exist := s.location[topic] s.locationRW.RUnlock() diff --git a/toolkit/message/event.go b/toolkit/nexus/event.go similarity index 97% rename from toolkit/message/event.go rename to toolkit/nexus/event.go index 4b69df8..bcb9d22 100644 --- a/toolkit/message/event.go +++ b/toolkit/nexus/event.go @@ -1,4 +1,4 @@ -package message +package nexus import ( "context" diff --git a/toolkit/message/events/asynchronous.go b/toolkit/nexus/events/asynchronous.go similarity index 84% rename from toolkit/message/events/asynchronous.go rename to toolkit/nexus/events/asynchronous.go index 608bb29..c87b854 100644 --- a/toolkit/message/events/asynchronous.go +++ b/toolkit/nexus/events/asynchronous.go @@ -2,7 +2,7 @@ package events import ( "context" - "github.com/kercylan98/minotaur/toolkit/message" + "github.com/kercylan98/minotaur/toolkit/nexus" "time" ) @@ -27,7 +27,7 @@ func Asynchronous[I, T comparable]( actuator AsynchronousActuator, handler AsynchronousHandler, callback AsynchronousCallbackHandler, -) message.Event[I, T] { +) nexus.Event[I, T] { m := &asynchronous[I, T]{ actuator: actuator, handler: handler, @@ -44,22 +44,22 @@ func Asynchronous[I, T comparable]( type asynchronous[I, T comparable] struct { ctx context.Context - broker message.Broker[I, T] + broker nexus.Broker[I, T] actuator AsynchronousActuator handler AsynchronousHandler callback AsynchronousCallbackHandler } -func (s *asynchronous[I, T]) OnInitialize(ctx context.Context, broker message.Broker[I, T]) { +func (s *asynchronous[I, T]) OnInitialize(ctx context.Context, broker nexus.Broker[I, T]) { s.ctx = ctx s.broker = broker } -func (s *asynchronous[I, T]) OnPublished(topic T, queue message.Queue[I, T]) { +func (s *asynchronous[I, T]) OnPublished(topic T, queue nexus.Queue[I, T]) { queue.IncrementCustomMessageCount(topic, 1) } -func (s *asynchronous[I, T]) OnProcess(topic T, queue message.Queue[I, T], startAt time.Time) { +func (s *asynchronous[I, T]) OnProcess(topic T, queue nexus.Queue[I, T], startAt time.Time) { s.actuator(s.ctx, func(ctx context.Context) { var err error if s.handler != nil { @@ -74,6 +74,6 @@ func (s *asynchronous[I, T]) OnProcess(topic T, queue message.Queue[I, T], start }) } -func (s *asynchronous[I, T]) OnProcessed(topic T, queue message.Queue[I, T], endAt time.Time) { +func (s *asynchronous[I, T]) OnProcessed(topic T, queue nexus.Queue[I, T], endAt time.Time) { queue.IncrementCustomMessageCount(topic, -1) } diff --git a/toolkit/nexus/events/synchronous.go b/toolkit/nexus/events/synchronous.go new file mode 100644 index 0000000..36ba4fa --- /dev/null +++ b/toolkit/nexus/events/synchronous.go @@ -0,0 +1,37 @@ +package events + +import ( + "context" + "github.com/kercylan98/minotaur/toolkit/nexus" + "time" +) + +type ( + SynchronousHandler func(context.Context) +) + +func Synchronous[I, T comparable](handler SynchronousHandler) nexus.Event[I, T] { + return &synchronous[I, T]{ + handler: handler, + } +} + +type synchronous[I, T comparable] struct { + ctx context.Context + handler SynchronousHandler +} + +func (s *synchronous[I, T]) OnInitialize(ctx context.Context, broker nexus.Broker[I, T]) { + s.ctx = ctx +} + +func (s *synchronous[I, T]) OnPublished(topic T, queue nexus.Queue[I, T]) { + +} + +func (s *synchronous[I, T]) OnProcess(topic T, queue nexus.Queue[I, T], startAt time.Time) { + s.handler(s.ctx) +} + +func (s *synchronous[I, T]) OnProcessed(topic T, queue nexus.Queue[I, T], endAt time.Time) { +} diff --git a/toolkit/message/queue.go b/toolkit/nexus/queue.go similarity index 96% rename from toolkit/message/queue.go rename to toolkit/nexus/queue.go index da38f3b..1099546 100644 --- a/toolkit/message/queue.go +++ b/toolkit/nexus/queue.go @@ -1,4 +1,4 @@ -package message +package nexus type Queue[I, T comparable] interface { // GetId 获取队列 Id diff --git a/toolkit/message/queues/non_blocking_rw.go b/toolkit/nexus/queues/non_blocking_rw.go similarity index 89% rename from toolkit/message/queues/non_blocking_rw.go rename to toolkit/nexus/queues/non_blocking_rw.go index 5c9ce5e..308ed13 100644 --- a/toolkit/message/queues/non_blocking_rw.go +++ b/toolkit/nexus/queues/non_blocking_rw.go @@ -2,7 +2,7 @@ package queues import ( "errors" - "github.com/kercylan98/minotaur/toolkit/message" + "github.com/kercylan98/minotaur/toolkit/nexus" "github.com/kercylan98/minotaur/utils/buffer" "sync" "sync/atomic" @@ -25,25 +25,25 @@ type NonBlockingRWState = int32 type nonBlockingRWEventInfo[I, T comparable] struct { topic T - event message.Event[I, T] - exec func(handler message.EventHandler[T], finisher message.EventFinisher[I, T]) + event nexus.Event[I, T] + exec func(handler nexus.EventHandler[T], finisher nexus.EventFinisher[I, T]) } func (e *nonBlockingRWEventInfo[I, T]) GetTopic() T { return e.topic } -func (e *nonBlockingRWEventInfo[I, T]) Exec(handler message.EventHandler[T], finisher message.EventFinisher[I, T]) { +func (e *nonBlockingRWEventInfo[I, T]) Exec(handler nexus.EventHandler[T], finisher nexus.EventFinisher[I, T]) { e.exec(handler, finisher) } // NewNonBlockingRW 创建一个并发安全的队列 NonBlockingRW,该队列支持通过 chanSize 自定义读取 channel 的大小,同支持使用 bufferSize 指定 buffer.Ring 的初始大小 // - closedHandler 可选的设置队列关闭处理函数,在队列关闭后将执行该函数。此刻队列不再可用 -func NewNonBlockingRW[I, T comparable](id I, chanSize, bufferSize int) message.Queue[I, T] { +func NewNonBlockingRW[I, T comparable](id I, chanSize, bufferSize int) nexus.Queue[I, T] { q := &NonBlockingRW[I, T]{ id: id, status: NonBlockingRWStatusNone, - c: make(chan message.EventInfo[I, T], chanSize), + c: make(chan nexus.EventInfo[I, T], chanSize), buf: buffer.NewRing[nonBlockingRWEventInfo[I, T]](bufferSize), condRW: &sync.RWMutex{}, topics: make(map[T]int64), @@ -62,7 +62,7 @@ type NonBlockingRW[I, T comparable] struct { total int64 // 消息总计数 topics map[T]int64 // 主题对应的消息计数映射 buf *buffer.Ring[nonBlockingRWEventInfo[I, T]] // 消息缓冲区 - c chan message.EventInfo[I, T] // 消息读取通道 + c chan nexus.EventInfo[I, T] // 消息读取通道 cs chan struct{} // 关闭信号 cond *sync.Cond // 条件变量 condRW *sync.RWMutex // 条件变量的读写锁 @@ -101,7 +101,7 @@ func (n *NonBlockingRW[I, T]) Run() { } // Consume 获取队列消息的只读通道, -func (n *NonBlockingRW[I, T]) Consume() <-chan message.EventInfo[I, T] { +func (n *NonBlockingRW[I, T]) Consume() <-chan nexus.EventInfo[I, T] { return n.c } @@ -128,7 +128,7 @@ func (n *NonBlockingRW[I, T]) GetTopicMessageCount(topic T) int64 { return n.topics[topic] } -func (n *NonBlockingRW[I, T]) Publish(topic T, event message.Event[I, T]) error { +func (n *NonBlockingRW[I, T]) Publish(topic T, event nexus.Event[I, T]) error { if atomic.LoadInt32(&n.status) > NonBlockingRWStatusClosing { return ErrorQueueClosed } @@ -136,7 +136,7 @@ func (n *NonBlockingRW[I, T]) Publish(topic T, event message.Event[I, T]) error ei := nonBlockingRWEventInfo[I, T]{ topic: topic, event: event, - exec: func(handler message.EventHandler[T], finisher message.EventFinisher[I, T]) { + exec: func(handler nexus.EventHandler[T], finisher nexus.EventFinisher[I, T]) { defer func() { event.OnProcessed(topic, n, time.Now())