other: 服务器消息组件抽离

This commit is contained in:
kercylan98 2024-04-09 20:23:04 +08:00
parent 7ecb13b7c8
commit cc3573b792
10 changed files with 86 additions and 91 deletions

View File

@ -3,10 +3,10 @@ package server
import (
"context"
"fmt"
"github.com/kercylan98/minotaur/toolkit/message"
"github.com/kercylan98/minotaur/toolkit/message/brokers"
messageEvents "github.com/kercylan98/minotaur/toolkit/message/events"
"github.com/kercylan98/minotaur/toolkit/message/queues"
"github.com/kercylan98/minotaur/toolkit/nexus"
"github.com/kercylan98/minotaur/toolkit/nexus/brokers"
messageEvents "github.com/kercylan98/minotaur/toolkit/nexus/events"
"github.com/kercylan98/minotaur/toolkit/nexus/queues"
"github.com/kercylan98/minotaur/utils/collection"
"github.com/kercylan98/minotaur/utils/log/v2"
"github.com/kercylan98/minotaur/utils/random"
@ -47,7 +47,7 @@ type server struct {
ctx context.Context
cancel context.CancelFunc
network Network
broker message.Broker[int, string]
broker nexus.Broker[int, string]
}
func NewServer(network Network, options ...*Options) Server {
@ -61,9 +61,9 @@ func NewServer(network Network, options ...*Options) Server {
srv.controller = new(controller).init(srv)
srv.events = new(events).init(srv)
srv.state = new(State).init(srv)
srv.broker = brokers.NewSparseGoroutine(func(index int) message.Queue[int, string] {
srv.broker = brokers.NewSparseGoroutine(func(index int) nexus.Queue[int, string] {
return queues.NewNonBlockingRW[int, string](index, 1024*8, 1024)
}, func(handler message.EventExecutor) {
}, func(handler nexus.EventExecutor) {
handler()
})
srv.Options.init(srv).Apply(options...)
@ -119,7 +119,7 @@ func (s *server) getSysQueue() string {
return s.queue
}
func (s *server) PublishMessage(topic string, event message.Event[int, string]) {
func (s *server) PublishMessage(topic string, event nexus.Event[int, string]) {
s.broker.Publish(topic, event)
}

View File

@ -1,37 +0,0 @@
package events
import (
"context"
"github.com/kercylan98/minotaur/toolkit/message"
"time"
)
type (
SynchronousHandler func(context.Context)
)
func Synchronous[I, T comparable](handler SynchronousHandler) message.Event[I, T] {
return &synchronous[I, T]{
handler: handler,
}
}
type synchronous[I, T comparable] struct {
ctx context.Context
handler SynchronousHandler
}
func (s *synchronous[I, T]) OnInitialize(ctx context.Context, broker message.Broker[I, T]) {
s.ctx = ctx
}
func (s *synchronous[I, T]) OnPublished(topic T, queue message.Queue[I, T]) {
}
func (s *synchronous[I, T]) OnProcess(topic T, queue message.Queue[I, T], startAt time.Time) {
s.handler(s.ctx)
}
func (s *synchronous[I, T]) OnProcessed(topic T, queue message.Queue[I, T], endAt time.Time) {
}

View File

@ -1,5 +0,0 @@
package message
type Producer[T comparable] interface {
GetTopic() T
}

View File

@ -1,4 +1,4 @@
package message
package nexus
// Broker 消息核心的接口定义
type Broker[I, T comparable] interface {

View File

@ -4,7 +4,7 @@ import (
"context"
"fmt"
"github.com/kercylan98/minotaur/toolkit/loadbalancer"
"github.com/kercylan98/minotaur/toolkit/message"
"github.com/kercylan98/minotaur/toolkit/nexus"
"runtime"
"sync"
"sync/atomic"
@ -18,13 +18,13 @@ const (
)
type (
SparseGoroutineMessageHandler func(handler message.EventExecutor)
SparseGoroutineMessageHandler func(handler nexus.EventExecutor)
)
func NewSparseGoroutine[I, T comparable](queueFactory func(index int) message.Queue[I, T], handler SparseGoroutineMessageHandler) message.Broker[I, T] {
func NewSparseGoroutine[I, T comparable](queueFactory func(index int) nexus.Queue[I, T], handler SparseGoroutineMessageHandler) nexus.Broker[I, T] {
s := &SparseGoroutine[I, T]{
lb: loadbalancer.NewRoundRobin[I, message.Queue[I, T]](),
queues: make(map[I]message.Queue[I, T]),
lb: loadbalancer.NewRoundRobin[I, nexus.Queue[I, T]](),
queues: make(map[I]nexus.Queue[I, T]),
state: sparseGoroutineStatusNone,
location: make(map[T]I),
handler: handler,
@ -47,18 +47,18 @@ func NewSparseGoroutine[I, T comparable](queueFactory func(index int) message.Qu
}
type SparseGoroutine[I, T comparable] struct {
state int32 // 状态
queueSize int // 队列管道大小
queueBufferSize int // 队列缓冲区大小
queues map[I]message.Queue[I, T] // 所有使用的队列
queueRW sync.RWMutex // 队列读写锁
location map[T]I // Topic 所在队列 Id 映射
locationRW sync.RWMutex // 所在队列 ID 映射锁
lb *loadbalancer.RoundRobin[I, message.Queue[I, T]] // 负载均衡器
wg sync.WaitGroup // 等待组
handler SparseGoroutineMessageHandler // 消息处理器
state int32 // 状态
queueSize int // 队列管道大小
queueBufferSize int // 队列缓冲区大小
queues map[I]nexus.Queue[I, T] // 所有使用的队列
queueRW sync.RWMutex // 队列读写锁
location map[T]I // Topic 所在队列 Id 映射
locationRW sync.RWMutex // 所在队列 ID 映射锁
lb *loadbalancer.RoundRobin[I, nexus.Queue[I, T]] // 负载均衡器
wg sync.WaitGroup // 等待组
handler SparseGoroutineMessageHandler // 消息处理器
queueFactory func(index int) message.Queue[I, T]
queueFactory func(index int) nexus.Queue[I, T]
}
// Run 启动 Reactor运行队列
@ -72,12 +72,12 @@ func (s *SparseGoroutine[I, T]) Run() {
s.wg.Add(1)
go queue.Run()
go func(r *SparseGoroutine[I, T], queue message.Queue[I, T]) {
go func(r *SparseGoroutine[I, T], queue nexus.Queue[I, T]) {
defer r.wg.Done()
for h := range queue.Consume() {
h.Exec(
// onProcess
func(topic T, event message.EventExecutor) {
func(topic T, event nexus.EventExecutor) {
s.handler(event)
},
// onFinish
@ -105,7 +105,7 @@ func (s *SparseGoroutine[I, T]) Close() {
var wg sync.WaitGroup
wg.Add(len(s.queues))
for _, queue := range s.queues {
go func(queue message.Queue[I, T]) {
go func(queue nexus.Queue[I, T]) {
defer wg.Done()
queue.Close()
}(queue)
@ -117,14 +117,14 @@ func (s *SparseGoroutine[I, T]) Close() {
// Publish 将消息分发到特定 topic当 topic 首次使用时,将会根据负载均衡策略选择一个队列
// - 设置 count 会增加消息的外部计数,当 SparseGoroutine 关闭时会等待外部计数归零
func (s *SparseGoroutine[I, T]) Publish(topic T, event message.Event[I, T]) error {
func (s *SparseGoroutine[I, T]) Publish(topic T, event nexus.Event[I, T]) error {
s.queueRW.RLock()
if atomic.LoadInt32(&s.state) > sparseGoroutineStatusClosing {
s.queueRW.RUnlock()
return fmt.Errorf("broker closing or closed")
}
var next message.Queue[I, T]
var next nexus.Queue[I, T]
s.locationRW.RLock()
i, exist := s.location[topic]
s.locationRW.RUnlock()

View File

@ -1,4 +1,4 @@
package message
package nexus
import (
"context"

View File

@ -2,7 +2,7 @@ package events
import (
"context"
"github.com/kercylan98/minotaur/toolkit/message"
"github.com/kercylan98/minotaur/toolkit/nexus"
"time"
)
@ -27,7 +27,7 @@ func Asynchronous[I, T comparable](
actuator AsynchronousActuator,
handler AsynchronousHandler,
callback AsynchronousCallbackHandler,
) message.Event[I, T] {
) nexus.Event[I, T] {
m := &asynchronous[I, T]{
actuator: actuator,
handler: handler,
@ -44,22 +44,22 @@ func Asynchronous[I, T comparable](
type asynchronous[I, T comparable] struct {
ctx context.Context
broker message.Broker[I, T]
broker nexus.Broker[I, T]
actuator AsynchronousActuator
handler AsynchronousHandler
callback AsynchronousCallbackHandler
}
func (s *asynchronous[I, T]) OnInitialize(ctx context.Context, broker message.Broker[I, T]) {
func (s *asynchronous[I, T]) OnInitialize(ctx context.Context, broker nexus.Broker[I, T]) {
s.ctx = ctx
s.broker = broker
}
func (s *asynchronous[I, T]) OnPublished(topic T, queue message.Queue[I, T]) {
func (s *asynchronous[I, T]) OnPublished(topic T, queue nexus.Queue[I, T]) {
queue.IncrementCustomMessageCount(topic, 1)
}
func (s *asynchronous[I, T]) OnProcess(topic T, queue message.Queue[I, T], startAt time.Time) {
func (s *asynchronous[I, T]) OnProcess(topic T, queue nexus.Queue[I, T], startAt time.Time) {
s.actuator(s.ctx, func(ctx context.Context) {
var err error
if s.handler != nil {
@ -74,6 +74,6 @@ func (s *asynchronous[I, T]) OnProcess(topic T, queue message.Queue[I, T], start
})
}
func (s *asynchronous[I, T]) OnProcessed(topic T, queue message.Queue[I, T], endAt time.Time) {
func (s *asynchronous[I, T]) OnProcessed(topic T, queue nexus.Queue[I, T], endAt time.Time) {
queue.IncrementCustomMessageCount(topic, -1)
}

View File

@ -0,0 +1,37 @@
package events
import (
"context"
"github.com/kercylan98/minotaur/toolkit/nexus"
"time"
)
type (
SynchronousHandler func(context.Context)
)
func Synchronous[I, T comparable](handler SynchronousHandler) nexus.Event[I, T] {
return &synchronous[I, T]{
handler: handler,
}
}
type synchronous[I, T comparable] struct {
ctx context.Context
handler SynchronousHandler
}
func (s *synchronous[I, T]) OnInitialize(ctx context.Context, broker nexus.Broker[I, T]) {
s.ctx = ctx
}
func (s *synchronous[I, T]) OnPublished(topic T, queue nexus.Queue[I, T]) {
}
func (s *synchronous[I, T]) OnProcess(topic T, queue nexus.Queue[I, T], startAt time.Time) {
s.handler(s.ctx)
}
func (s *synchronous[I, T]) OnProcessed(topic T, queue nexus.Queue[I, T], endAt time.Time) {
}

View File

@ -1,4 +1,4 @@
package message
package nexus
type Queue[I, T comparable] interface {
// GetId 获取队列 Id

View File

@ -2,7 +2,7 @@ package queues
import (
"errors"
"github.com/kercylan98/minotaur/toolkit/message"
"github.com/kercylan98/minotaur/toolkit/nexus"
"github.com/kercylan98/minotaur/utils/buffer"
"sync"
"sync/atomic"
@ -25,25 +25,25 @@ type NonBlockingRWState = int32
type nonBlockingRWEventInfo[I, T comparable] struct {
topic T
event message.Event[I, T]
exec func(handler message.EventHandler[T], finisher message.EventFinisher[I, T])
event nexus.Event[I, T]
exec func(handler nexus.EventHandler[T], finisher nexus.EventFinisher[I, T])
}
func (e *nonBlockingRWEventInfo[I, T]) GetTopic() T {
return e.topic
}
func (e *nonBlockingRWEventInfo[I, T]) Exec(handler message.EventHandler[T], finisher message.EventFinisher[I, T]) {
func (e *nonBlockingRWEventInfo[I, T]) Exec(handler nexus.EventHandler[T], finisher nexus.EventFinisher[I, T]) {
e.exec(handler, finisher)
}
// NewNonBlockingRW 创建一个并发安全的队列 NonBlockingRW该队列支持通过 chanSize 自定义读取 channel 的大小,同支持使用 bufferSize 指定 buffer.Ring 的初始大小
// - closedHandler 可选的设置队列关闭处理函数,在队列关闭后将执行该函数。此刻队列不再可用
func NewNonBlockingRW[I, T comparable](id I, chanSize, bufferSize int) message.Queue[I, T] {
func NewNonBlockingRW[I, T comparable](id I, chanSize, bufferSize int) nexus.Queue[I, T] {
q := &NonBlockingRW[I, T]{
id: id,
status: NonBlockingRWStatusNone,
c: make(chan message.EventInfo[I, T], chanSize),
c: make(chan nexus.EventInfo[I, T], chanSize),
buf: buffer.NewRing[nonBlockingRWEventInfo[I, T]](bufferSize),
condRW: &sync.RWMutex{},
topics: make(map[T]int64),
@ -62,7 +62,7 @@ type NonBlockingRW[I, T comparable] struct {
total int64 // 消息总计数
topics map[T]int64 // 主题对应的消息计数映射
buf *buffer.Ring[nonBlockingRWEventInfo[I, T]] // 消息缓冲区
c chan message.EventInfo[I, T] // 消息读取通道
c chan nexus.EventInfo[I, T] // 消息读取通道
cs chan struct{} // 关闭信号
cond *sync.Cond // 条件变量
condRW *sync.RWMutex // 条件变量的读写锁
@ -101,7 +101,7 @@ func (n *NonBlockingRW[I, T]) Run() {
}
// Consume 获取队列消息的只读通道,
func (n *NonBlockingRW[I, T]) Consume() <-chan message.EventInfo[I, T] {
func (n *NonBlockingRW[I, T]) Consume() <-chan nexus.EventInfo[I, T] {
return n.c
}
@ -128,7 +128,7 @@ func (n *NonBlockingRW[I, T]) GetTopicMessageCount(topic T) int64 {
return n.topics[topic]
}
func (n *NonBlockingRW[I, T]) Publish(topic T, event message.Event[I, T]) error {
func (n *NonBlockingRW[I, T]) Publish(topic T, event nexus.Event[I, T]) error {
if atomic.LoadInt32(&n.status) > NonBlockingRWStatusClosing {
return ErrorQueueClosed
}
@ -136,7 +136,7 @@ func (n *NonBlockingRW[I, T]) Publish(topic T, event message.Event[I, T]) error
ei := nonBlockingRWEventInfo[I, T]{
topic: topic,
event: event,
exec: func(handler message.EventHandler[T], finisher message.EventFinisher[I, T]) {
exec: func(handler nexus.EventHandler[T], finisher nexus.EventFinisher[I, T]) {
defer func() {
event.OnProcessed(topic, n, time.Now())