other: 新版 server 同步、异步消息实现

This commit is contained in:
kercylan98 2024-04-03 16:51:40 +08:00
parent e4eee31ede
commit 7cb5dd069a
20 changed files with 532 additions and 239 deletions

View File

@ -25,16 +25,22 @@ type Conn interface {
// WriteContext 写入数据
WriteContext(data []byte, context interface{}) error
PushSyncMessage(handler func(srv Server, conn Conn))
PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error))
}
func newConn(c net.Conn, connWriter ConnWriter) *conn {
func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn {
return &conn{
server: srv,
conn: c,
writer: connWriter,
}
}
type conn struct {
server *server
conn net.Conn // 连接
writer ConnWriter // 写入器
actor string // Actor 名称
@ -64,3 +70,23 @@ func (c *conn) WriteBytes(data []byte) error {
func (c *conn) WriteContext(data []byte, context interface{}) error {
return c.writer(NewPacket(data).SetContext(context))
}
func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) {
if err := c.server.reactor.Dispatch(c.actor, SyncMessage(c.server, func(srv *server) {
handler(srv, c)
})); err != nil {
panic(err)
}
}
func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error)) {
if err := c.server.reactor.Dispatch(c.actor, AsyncMessage(c.server, c.actor, func(srv *server) error {
return handler(srv, c)
}, func(srv *server, err error) {
for _, callback := range callbacks {
callback(srv, c, err)
}
})); err != nil {
panic(err)
}
}

View File

@ -20,8 +20,8 @@ func (s *controller) init(srv *server) *controller {
}
func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) {
if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
c := newConn(conn, writer)
if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
c := newConn(s.server, conn, writer)
srv.connections[conn] = c
s.events.onConnectionOpened(c)
})); err != nil {
@ -30,7 +30,7 @@ func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) {
}
func (s *controller) EliminateConnection(conn net.Conn, err error) {
if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
c, exist := srv.connections[conn]
if !exist {
return
@ -43,7 +43,7 @@ func (s *controller) EliminateConnection(conn net.Conn, err error) {
}
func (s *controller) ReactPacket(conn net.Conn, packet Packet) {
if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
c, exist := srv.connections[conn]
if !exist {
return

View File

@ -66,7 +66,7 @@ func (s *events) onLaunched() {
opt.logger.Info("Minotaur Server", log.String("", "============================================================================"))
})
_ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
_ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
s.launchedEventHandlers.RangeValue(func(index int, value LaunchedEventHandler) bool {
value(s.server, s.server.state.Ip, s.server.state.LaunchedAt)
return true
@ -79,7 +79,7 @@ func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHand
}
func (s *events) onConnectionOpened(conn Conn) {
_ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
_ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
s.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool {
value(s.server, conn)
return true
@ -92,7 +92,7 @@ func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHand
}
func (s *events) onConnectionClosed(conn Conn, err error) {
_ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
_ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
s.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool {
value(s.server, conn, err)
return true
@ -105,7 +105,7 @@ func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceiveP
}
func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) {
_ = s.server.reactor.AutoDispatch(conn.GetActor(), NativeMessage(s.server, func(srv *server) {
_ = s.server.reactor.AutoDispatch(conn.GetActor(), SyncMessage(s.server, func(srv *server) {
s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool {
value(s.server, conn, packet)
return true
@ -118,7 +118,7 @@ func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority ..
}
func (s *events) onShutdown() {
_ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
_ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) {
s.shutdownEventHandlers.RangeValue(func(index int, value ShutdownEventHandler) bool {
value(s.server)
return true

View File

@ -1,18 +1,96 @@
package server
import (
"github.com/kercylan98/minotaur/server/internal/v2/queue"
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
"github.com/kercylan98/minotaur/utils/log/v2"
"github.com/kercylan98/minotaur/utils/super"
"runtime/debug"
)
type Message interface {
Execute()
}
func NativeMessage(srv *server, handler func(srv *server)) Message {
return &nativeMessage{srv: srv, handler: handler}
func SyncMessage(srv *server, handler func(srv *server)) Message {
return &syncMessage{srv: srv, handler: handler}
}
type nativeMessage struct {
type syncMessage struct {
srv *server
handler func(srv *server)
}
func (s *nativeMessage) Execute() {
func (s *syncMessage) Execute() {
s.handler(s.srv)
}
func AsyncMessage(srv *server, ident string, handler func(srv *server) error, callback func(srv *server, err error)) Message {
return &asyncMessage{
ident: ident,
srv: srv,
handler: handler,
callback: callback,
}
}
type asyncMessage struct {
ident string
srv *server
handler func(srv *server) error
callback func(srv *server, err error)
}
func (s *asyncMessage) Execute() {
var q *queue.Queue[int, string, Message]
var dispatch = func(ident string, message Message, beforeHandler ...func(queue *queue.Queue[int, string, Message], msg Message)) {
if ident == "" {
_ = s.srv.reactor.DispatchWithSystem(message, beforeHandler...)
} else {
_ = s.srv.reactor.Dispatch(ident, message, beforeHandler...)
}
}
dispatch(
s.ident,
SyncMessage(s.srv, func(srv *server) {
_ = srv.ants.Submit(func() {
defer func(srv *server, msg *asyncMessage) {
if err := super.RecoverTransform(recover()); err != nil {
if errHandler := srv.GetMessageErrorHandler(); errHandler != nil {
errHandler(srv, msg, err)
} else {
srv.GetLogger().Error("Message", log.Err(err))
debug.PrintStack()
}
}
}(s.srv, s)
err := s.handler(srv)
var msg Message
msg = SyncMessage(srv, func(srv *server) {
defer func() {
q.WaitAdd(s.ident, -1)
if err := super.RecoverTransform(recover()); err != nil {
if errHandler := srv.GetMessageErrorHandler(); errHandler != nil {
errHandler(srv, msg, err)
} else {
srv.GetLogger().Error("Message", log.Err(err))
debug.PrintStack()
}
}
}()
if s.callback != nil {
s.callback(srv, err)
}
})
dispatch(s.ident, msg)
})
}),
func(queue *queue.Queue[int, string, Message], msg Message) {
queue.WaitAdd(reactor.SysIdent, 1)
q = queue
},
)
}

View File

@ -29,11 +29,18 @@ func (n *notify) init(srv *server) *notify {
return n
}
func (n *notify) close() {
close(n.systemSignal)
close(n.lifeCycleLimit)
close(n.lifeCycleTime)
}
func (n *notify) run() {
defer func() {
if err := n.server.Shutdown(); err != nil {
panic(err)
}
n.close()
}()
for {
select {

View File

@ -0,0 +1,10 @@
package queue
import (
"errors"
)
var (
ErrorQueueClosed = errors.New("queue closed") // 队列已关闭
ErrorQueueInvalid = errors.New("queue invalid") // 无效的队列
)

View File

@ -0,0 +1,6 @@
package queue
// Message 消息接口定义
type Message interface {
// 保留
}

View File

@ -0,0 +1,9 @@
package queue
// MessageHandler 消息处理器支持传入两个函数对消息进行处理
// - 在 handler 内可以执行对消息的逻辑
// - 在 finisher 函数中可以接收到该消息是否是最后一条消息
type MessageHandler[Id, Ident comparable, M Message] func(
handler func(m MessageWrapper[Id, Ident, M]),
finisher func(m MessageWrapper[Id, Ident, M], last bool),
)

View File

@ -0,0 +1,23 @@
package queue
// MessageWrapper 提供了对外部消息的包装,用于方便的获取消息信息
type MessageWrapper[Id, Ident comparable, M Message] struct {
queue *Queue[Id, Ident, M] // 处理消息的队列
ident Ident // 消息所有人
msg M // 消息信息
}
// Queue 返回处理该消息的队列
func (m MessageWrapper[Id, Ident, M]) Queue() *Queue[Id, Ident, M] {
return m.queue
}
// Ident 返回消息的所有人
func (m MessageWrapper[Id, Ident, M]) Ident() Ident {
return m.ident
}
// Message 返回消息的具体实例
func (m MessageWrapper[Id, Ident, M]) Message() M {
return m.msg
}

View File

@ -0,0 +1,168 @@
package queue
import (
"github.com/kercylan98/minotaur/utils/buffer"
"sync"
"sync/atomic"
)
// New 创建一个并发安全的队列 Queue该队列支持通过 chanSize 自定义读取 channel 的大小,同支持使用 bufferSize 指定 buffer.Ring 的初始大小
func New[Id, Ident comparable, M Message](id Id, chanSize, bufferSize int) *Queue[Id, Ident, M] {
q := &Queue[Id, Ident, M]{
c: make(chan MessageHandler[Id, Ident, M], chanSize),
buf: buffer.NewRing[MessageWrapper[Id, Ident, M]](bufferSize),
condRW: &sync.RWMutex{},
identifiers: make(map[Ident]int64),
}
q.cond = sync.NewCond(q.condRW)
q.state = &State[Id, Ident, M]{
queue: q,
id: id,
status: StatusNone,
}
return q
}
// Queue 队列是一个适用于消息处理等场景的并发安全的数据结构
// - 该队列接收自定义的消息 M并将消息有序的传入 Read 函数所返回的 channel 中以供处理
// - 该结构主要实现目标为读写分离且并发安全的非阻塞传输队列,当消费阻塞时以牺牲内存为代价换取消息的生产不阻塞,适用于服务器消息处理等
// - 该队列保证了消息的完整性,确保消息不丢失,在队列关闭后会等待所有消息处理完毕后进行关闭,并提供 SetClosedHandler 函数来监听队列的关闭信号
type Queue[Id, Ident comparable, M Message] struct {
state *State[Id, Ident, M] // 队列状态信息
c chan MessageHandler[Id, Ident, M] // 消息读取通道
buf *buffer.Ring[MessageWrapper[Id, Ident, M]] // 消息缓冲区
cond *sync.Cond // 条件变量
condRW *sync.RWMutex // 条件变量的读写锁
closedHandler func(q *Queue[Id, Ident, M]) // 关闭处理函数
identifiers map[Ident]int64 // 标识符在队列的消息计数映射
}
// Id 获取队列 Id
func (q *Queue[Id, Ident, M]) Id() Id {
return q.state.id
}
// SetId 设置队列 Id
func (q *Queue[Id, Ident, M]) SetId(id Id) {
q.state.id = id
}
// SetClosedHandler 设置队列关闭处理函数,在队列关闭后将执行该函数。此刻队列不再可用
// - Close 函数为非阻塞调用,调用后不会立即关闭队列,会等待消息处理完毕且处理期间不再有新消息介入
func (q *Queue[Id, Ident, M]) SetClosedHandler(handler func(q *Queue[Id, Ident, M])) {
q.closedHandler = handler
}
// Run 阻塞的运行队列,当队列非首次运行时,将会引发来自 ErrorQueueInvalid 的 panic
func (q *Queue[Id, Ident, M]) Run() {
if atomic.LoadInt32(&q.state.status) != StatusNone {
panic(ErrorQueueInvalid)
}
atomic.StoreInt32(&q.state.status, StatusRunning)
defer func(q *Queue[Id, Ident, M]) {
if q.closedHandler != nil {
q.closedHandler(q)
}
}(q)
for {
q.cond.L.Lock()
for q.buf.IsEmpty() {
if atomic.LoadInt32(&q.state.status) >= StatusClosing && q.state.total == 0 {
q.cond.L.Unlock()
atomic.StoreInt32(&q.state.status, StatusClosed)
close(q.c)
return
}
q.cond.Wait()
}
items := q.buf.ReadAll()
q.cond.L.Unlock()
for _, item := range items {
q.c <- func(handler func(MessageWrapper[Id, Ident, M]), finisher func(m MessageWrapper[Id, Ident, M], last bool)) {
defer func(msg MessageWrapper[Id, Ident, M]) {
q.cond.L.Lock()
q.state.total--
curr := q.identifiers[msg.Ident()] - 1
if curr != 0 {
q.identifiers[msg.Ident()] = curr
} else {
delete(q.identifiers, msg.Ident())
}
if finisher != nil {
finisher(msg, curr == 0)
}
//log.Info("消息总计数", log.Int64("计数", q.state.total))
q.cond.Signal()
q.cond.L.Unlock()
}(item)
handler(item)
}
}
}
}
// Push 向队列中推送来自 ident 的消息 m当队列已关闭时将会返回 ErrorQueueClosed
func (q *Queue[Id, Ident, M]) Push(ident Ident, m M) error {
if atomic.LoadInt32(&q.state.status) > StatusClosing {
return ErrorQueueClosed
}
q.cond.L.Lock()
q.identifiers[ident]++
q.state.total++
q.buf.Write(MessageWrapper[Id, Ident, M]{
ident: ident,
msg: m,
})
//log.Info("消息总计数", log.Int64("计数", q.state.total))
q.cond.Signal()
q.cond.L.Unlock()
return nil
}
// WaitAdd 向队列增加来自外部的等待计数,在队列关闭时会等待该计数归零
func (q *Queue[Id, Ident, M]) WaitAdd(ident Ident, delta int64) {
q.cond.L.Lock()
currIdent := q.identifiers[ident]
currIdent += delta
q.identifiers[ident] = currIdent
q.state.total += delta
//log.Info("消息总计数", log.Int64("计数", q.state.total))
q.cond.Signal()
q.cond.L.Unlock()
}
// Read 获取队列消息的只读通道,
func (q *Queue[Id, Ident, M]) Read() <-chan MessageHandler[Id, Ident, M] {
return q.c
}
// Close 关闭队列
func (q *Queue[Id, Ident, M]) Close() {
atomic.CompareAndSwapInt32(&q.state.status, StatusRunning, StatusClosing)
q.cond.Broadcast()
}
// State 获取队列状态
func (q *Queue[Id, Ident, M]) State() *State[Id, Ident, M] {
return q.state
}
// GetMessageCount 获取消息数量
func (q *Queue[Id, Ident, M]) GetMessageCount() (count int64) {
q.condRW.RLock()
defer q.condRW.RUnlock()
for _, i := range q.identifiers {
count += i
}
return
}
// GetMessageCountWithIdent 获取特定消息人的消息数量
func (q *Queue[Id, Ident, M]) GetMessageCountWithIdent(ident Ident) int64 {
q.condRW.RLock()
defer q.condRW.RUnlock()
return q.identifiers[ident]
}

View File

@ -0,0 +1,34 @@
package queue
import (
"sync/atomic"
)
const (
StatusNone = iota - 1 // 队列未运行
StatusRunning // 队列运行中
StatusClosing // 队列关闭中
StatusClosed // 队列已关闭
)
type State[Id, Ident comparable, M Message] struct {
queue *Queue[Id, Ident, M]
id Id // 队列 ID
status int32 // 状态标志
total int64 // 消息总计数
}
// IsClosed 判断队列是否已关闭
func (q *State[Id, Ident, M]) IsClosed() bool {
return atomic.LoadInt32(&q.status) == StatusClosed
}
// IsClosing 判断队列是否正在关闭
func (q *State[Id, Ident, M]) IsClosing() bool {
return atomic.LoadInt32(&q.status) == StatusClosing
}
// IsRunning 判断队列是否正在运行
func (q *State[Id, Ident, M]) IsRunning() bool {
return atomic.LoadInt32(&q.status) == StatusRunning
}

View File

@ -1,7 +1,7 @@
package reactor
type queueMessageHandler[M any] func(q *queue[M], ident *identifiable, msg M)
import "github.com/kercylan98/minotaur/server/internal/v2/queue"
type MessageHandler[M any] func(msg M)
type MessageHandler[M any] func(message queue.MessageWrapper[int, string, M])
type ErrorHandler[M any] func(msg M, err error)
type ErrorHandler[M any] func(message queue.MessageWrapper[int, string, M], err error)

View File

@ -1,82 +0,0 @@
package reactor
import (
"errors"
"github.com/kercylan98/minotaur/utils/buffer"
"sync"
"sync/atomic"
)
func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] {
q := &queue[M]{
c: make(chan queueMessage[M], chanSize),
buf: buffer.NewRing[queueMessage[M]](bufferSize),
cond: sync.NewCond(&sync.Mutex{}),
}
q.QueueState = &QueueState[M]{
queue: q,
idx: idx,
status: QueueStatusNone,
}
return q
}
type queue[M any] struct {
*QueueState[M]
c chan queueMessage[M] // 通道
buf *buffer.Ring[queueMessage[M]] // 缓冲区
cond *sync.Cond // 条件变量
closedHandler func(q *queue[M]) // 关闭处理函数
}
func (q *queue[M]) Id() int {
return q.idx
}
func (q *queue[M]) setClosedHandler(handler func(q *queue[M])) {
q.closedHandler = handler
}
func (q *queue[M]) run() {
atomic.StoreInt32(&q.status, QueueStatusRunning)
defer func(q *queue[M]) {
atomic.StoreInt32(&q.status, QueueStatusClosed)
if q.closedHandler != nil {
q.closedHandler(q)
}
}(q)
for {
q.cond.L.Lock()
for q.buf.IsEmpty() {
if atomic.LoadInt32(&q.status) >= QueueStatusClosing {
q.cond.L.Unlock()
close(q.c)
return
}
q.cond.Wait()
}
items := q.buf.ReadAll()
q.cond.L.Unlock()
for _, item := range items {
q.c <- item
}
}
}
func (q *queue[M]) push(ident *identifiable, m M) error {
if atomic.LoadInt32(&q.status) > QueueStatusRunning {
return errors.New("queue status exception")
}
q.cond.L.Lock()
q.buf.Write(queueMessage[M]{
ident: ident,
msg: m,
})
q.cond.Signal()
q.cond.L.Unlock()
return nil
}
func (q *queue[M]) read() <-chan queueMessage[M] {
return q.c
}

View File

@ -1,6 +0,0 @@
package reactor
type queueMessage[M any] struct {
ident *identifiable
msg M
}

View File

@ -1,39 +0,0 @@
package reactor
import (
"sync/atomic"
)
const (
QueueStatusNone = iota - 1 // 队列未运行
QueueStatusRunning // 队列运行中
QueueStatusClosing // 队列关闭中
QueueStatusClosed // 队列已关闭
)
type QueueState[M any] struct {
queue *queue[M]
idx int // 队列索引
status int32 // 状态标志
}
// IsClosed 判断队列是否已关闭
func (q *QueueState[M]) IsClosed() bool {
return atomic.LoadInt32(&q.status) == QueueStatusClosed
}
// IsClosing 判断队列是否正在关闭
func (q *QueueState[M]) IsClosing() bool {
return atomic.LoadInt32(&q.status) == QueueStatusClosing
}
// IsRunning 判断队列是否正在运行
func (q *QueueState[M]) IsRunning() bool {
return atomic.LoadInt32(&q.status) == QueueStatusRunning
}
// Close 关闭队列
func (q *QueueState[M]) Close() {
atomic.CompareAndSwapInt32(&q.status, QueueStatusRunning, QueueStatusClosing)
q.queue.cond.Broadcast()
}

View File

@ -2,8 +2,10 @@ package reactor
import (
"fmt"
"github.com/alphadose/haxmap"
"github.com/kercylan98/minotaur/server/internal/v2/loadbalancer"
"github.com/kercylan98/minotaur/server/internal/v2/queue"
"github.com/kercylan98/minotaur/utils/log/v2"
"github.com/kercylan98/minotaur/utils/str"
"github.com/kercylan98/minotaur/utils/super"
"runtime"
"runtime/debug"
@ -18,109 +20,116 @@ const (
statusClosed // 事件循环已关闭
)
var sysIdent = &identifiable{ident: "system"}
const SysIdent = str.None
// NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列
func NewReactor[M any](systemQueueSize, queueSize, systemBufferSize, queueBufferSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] {
func NewReactor[M queue.Message](systemQueueSize, queueSize, systemBufferSize, queueBufferSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] {
if handler == nil {
}
r := &Reactor[M]{
systemQueue: newQueue[M](-1, systemQueueSize, systemBufferSize),
identifiers: haxmap.New[string, *identifiable](),
lb: loadbalancer.NewRoundRobin[int, *queue[M]](),
systemQueue: queue.New[int, string, M](-1, systemQueueSize, systemBufferSize),
lb: loadbalancer.NewRoundRobin[int, *queue.Queue[int, string, M]](),
errorHandler: errorHandler,
queueSize: queueSize,
queueBufferSize: queueBufferSize,
state: statusNone,
handler: handler,
location: make(map[string]int),
}
r.logger.Store(log.GetLogger())
defaultNum := runtime.NumCPU()
if defaultNum < 1 {
defaultNum = 1
}
r.queueRW.Lock()
for i := 0; i < defaultNum; i++ {
r.noneLockAddQueue()
}
r.queueRW.Unlock()
r.handler = func(q *queue[M], ident *identifiable, msg M) {
defer func(ident *identifiable, msg M) {
if err := super.RecoverTransform(recover()); err != nil {
defer func(msg M) {
if err = super.RecoverTransform(recover()); err != nil {
fmt.Println(err)
debug.PrintStack()
}
}(msg)
if r.errorHandler != nil {
r.errorHandler(msg, err)
} else {
fmt.Println(err)
debug.PrintStack()
}
}
if atomic.AddInt64(&ident.n, -1) == 0 {
r.queueRW.Lock()
r.identifiers.Del(ident.ident)
r.queueRW.Unlock()
}
}(ident, msg)
if handler != nil {
handler(msg)
}
}
return r
}
// Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列
type Reactor[M any] struct {
state int32 // 状态
systemQueue *queue[M] // 系统级别的队列
queueSize int // 队列管道大小
queueBufferSize int // 队列缓冲区大小
queues []*queue[M] // 所有使用的队列
queueRW sync.RWMutex // 队列读写锁
identifiers *haxmap.Map[string, *identifiable] // 标识符到队列索引的映射及消息计数
lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器
wg sync.WaitGroup // 等待组
cwg sync.WaitGroup // 关闭等待组
handler queueMessageHandler[M] // 消息处理器
errorHandler ErrorHandler[M] // 错误处理器
type Reactor[M queue.Message] struct {
logger atomic.Pointer[log.Logger] // 日志记录器
state int32 // 状态
systemQueue *queue.Queue[int, string, M] // 系统级别的队列
queueSize int // 队列管道大小
queueBufferSize int // 队列缓冲区大小
queues []*queue.Queue[int, string, M] // 所有使用的队列
queueRW sync.RWMutex // 队列读写锁
location map[string]int // 所在队列 ID 映射
locationRW sync.RWMutex // 所在队列 ID 映射锁
lb *loadbalancer.RoundRobin[int, *queue.Queue[int, string, M]] // 负载均衡器
wg sync.WaitGroup // 等待组
cwg sync.WaitGroup // 关闭等待组
handler MessageHandler[M] // 消息处理器
errorHandler ErrorHandler[M] // 错误处理器
}
// process 消息处理
func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) {
defer func(msg queue.MessageWrapper[int, string, M]) {
if err := super.RecoverTransform(recover()); err != nil {
if r.errorHandler != nil {
r.errorHandler(msg, err)
} else {
r.logger.Load().Error("Reactor", log.Int("queue", msg.Queue().Id()), log.String("ident", msg.Ident()), log.Err(err))
debug.PrintStack()
}
}
}(msg)
r.handler(msg)
}
// AutoDispatch 自动分发,当 ident 为空字符串时,分发到系统级别的队列,否则分发到 ident 使用的队列
func (r *Reactor[M]) AutoDispatch(ident string, msg M) error {
if ident == "" {
return r.SystemDispatch(msg)
if ident == str.None {
return r.DispatchWithSystem(msg)
}
return r.Dispatch(ident, msg)
}
// SystemDispatch 将消息分发到系统级别的队列
func (r *Reactor[M]) SystemDispatch(msg M) error {
if atomic.LoadInt32(&r.state) > statusRunning {
// DispatchWithSystem 将消息分发到系统级别的队列
func (r *Reactor[M]) DispatchWithSystem(msg M, beforeHandler ...func(queue *queue.Queue[int, string, M], msg M)) error {
if atomic.LoadInt32(&r.state) > statusClosing {
r.queueRW.RUnlock()
return fmt.Errorf("reactor closing or closed")
}
return r.systemQueue.push(sysIdent, msg)
for _, f := range beforeHandler {
f(r.systemQueue, msg)
}
return r.systemQueue.Push(SysIdent, msg)
}
// Dispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列
func (r *Reactor[M]) Dispatch(ident string, msg M) error {
// - 设置 count 会增加消息的外部计数,当 Reactor 关闭时会等待外部计数归零
func (r *Reactor[M]) Dispatch(ident string, msg M, beforeHandler ...func(queue *queue.Queue[int, string, M], msg M)) error {
r.queueRW.RLock()
if atomic.LoadInt32(&r.state) > statusRunning {
if atomic.LoadInt32(&r.state) > statusClosing {
r.queueRW.RUnlock()
return fmt.Errorf("reactor closing or closed")
}
next := r.lb.Next()
i, _ := r.identifiers.GetOrSet(ident, &identifiable{ident: ident})
q := r.queues[next.Id()]
atomic.AddInt64(&i.n, 1)
var next *queue.Queue[int, string, M]
r.locationRW.RLock()
i, exist := r.location[ident]
r.locationRW.RUnlock()
if !exist {
r.locationRW.Lock()
if i, exist = r.location[ident]; !exist {
next = r.lb.Next()
r.location[ident] = next.Id()
}
r.locationRW.Unlock()
} else {
next = r.queues[i]
}
r.queueRW.RUnlock()
return q.push(i, msg)
for _, f := range beforeHandler {
f(next, msg)
}
return next.Push(ident, msg)
}
// Run 启动 Reactor运行系统级别的队列和多个 Socket 对应的队列
@ -138,35 +147,41 @@ func (r *Reactor[M]) Run() {
}
func (r *Reactor[M]) noneLockAddQueue() {
q := newQueue[M](len(r.queues), r.queueSize, r.queueBufferSize)
q := queue.New[int, string, M](len(r.queues), r.queueSize, r.queueBufferSize)
r.lb.Add(q) // 运行前添加到负载均衡器,未运行时允许接收消息
r.queues = append(r.queues, q)
}
func (r *Reactor[M]) noneLockDelQueue(q *queue[M]) {
func (r *Reactor[M]) noneLockDelQueue(q *queue.Queue[int, string, M]) {
idx := q.Id()
if idx < 0 || idx >= len(r.queues) || r.queues[idx] != q {
return
}
r.queues = append(r.queues[:idx], r.queues[idx+1:]...)
for i := idx; i < len(r.queues); i++ {
r.queues[i].idx = i
r.queues[i].SetId(i)
}
}
func (r *Reactor[M]) runQueue(q *queue[M]) {
func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) {
r.wg.Add(1)
q.setClosedHandler(func(q *queue[M]) {
q.SetClosedHandler(func(q *queue.Queue[int, string, M]) {
// 关闭时正在等待关闭完成,外部已加锁,无需再次加锁
r.noneLockDelQueue(q)
r.cwg.Done()
})
go q.run()
go q.Run()
go func(r *Reactor[M], q *queue[M]) {
go func(r *Reactor[M], q *queue.Queue[int, string, M]) {
defer r.wg.Done()
for m := range q.read() {
r.handler(q, m.ident, m.msg)
for m := range q.Read() {
m(r.process, func(m queue.MessageWrapper[int, string, M], last bool) {
if last {
r.queueRW.Lock()
defer r.queueRW.Unlock()
delete(r.location, m.Ident())
}
})
}
}(r, q)
}

View File

@ -1,6 +1,7 @@
package reactor_test
import (
"github.com/kercylan98/minotaur/server/internal/v2/queue"
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
"github.com/kercylan98/minotaur/utils/random"
"github.com/kercylan98/minotaur/utils/times"
@ -9,11 +10,11 @@ import (
)
func BenchmarkReactor_Dispatch(b *testing.B) {
var r = reactor.NewReactor(1024*16, 1024, func(msg func()) {
msg()
}, func(msg func(), err error) {
b.Error(err)
}).SetDebug(false)
var r = reactor.NewReactor(1024*16, 1024, 1024, 1024, func(message queue.MessageWrapper[int, string, func()]) {
message.Message()
}, func(message queue.MessageWrapper[int, string, func()], err error) {
})
go r.Run()
@ -28,11 +29,11 @@ func BenchmarkReactor_Dispatch(b *testing.B) {
}
func TestReactor_Dispatch(t *testing.T) {
var r = reactor.NewReactor(1024*16, 1024, func(msg func()) {
msg()
}, func(msg func(), err error) {
t.Error(err)
}).SetDebug(true)
var r = reactor.NewReactor(1024*16, 1024, 1024, 1024, func(message queue.MessageWrapper[int, string, func()]) {
message.Message()
}, func(message queue.MessageWrapper[int, string, func()], err error) {
})
go r.Run()

View File

@ -1,6 +0,0 @@
package reactor
type identifiable struct {
ident string // 标识
n int64 // 消息数量
}

View File

@ -2,7 +2,9 @@ package server
import (
"context"
"github.com/kercylan98/minotaur/server/internal/v2/queue"
"github.com/kercylan98/minotaur/utils/log/v2"
"github.com/panjf2000/ants/v2"
"time"
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
@ -20,12 +22,19 @@ type Server interface {
// GetStatus 获取服务器状态
GetStatus() *State
// PushSyncMessage 推送同步消息
PushSyncMessage(handler func(srv Server))
// PushAsyncMessage 推送异步消息
PushAsyncMessage(handler func(srv Server) error, callbacks ...func(srv Server, err error))
}
type server struct {
*controller
*events
*Options
ants *ants.Pool
state *State
notify *notify
ctx context.Context
@ -47,14 +56,26 @@ func NewServer(network Network, options ...*Options) Server {
srv.reactor = reactor.NewReactor[Message](
srv.GetServerMessageChannelSize(), srv.GetActorMessageChannelSize(),
srv.GetServerMessageBufferInitialSize(), srv.GetActorMessageBufferInitialSize(),
func(msg Message) {
msg.Execute()
}, func(msg Message, err error) {
func(message queue.MessageWrapper[int, string, Message]) {
message.Message().Execute()
}, func(message queue.MessageWrapper[int, string, Message], err error) {
if handler := srv.GetMessageErrorHandler(); handler != nil {
handler(srv, msg, err)
handler(srv, message.Message(), err)
}
})
srv.Options.init(srv).Apply(options...)
antsPool, err := ants.NewPool(ants.DefaultAntsPoolSize, ants.WithOptions(ants.Options{
ExpiryDuration: 10 * time.Second,
Nonblocking: true,
//Logger: &antsLogger{logging.GetDefaultLogger()},
//PanicHandler: func(i interface{}) {
//logging.Errorf("goroutine pool panic: %v", i)
//},
}))
if err != nil {
panic(err)
}
srv.ants = antsPool
return srv
}
@ -91,6 +112,26 @@ func (s *server) Shutdown() (err error) {
return
}
func (s *server) PushSyncMessage(handler func(srv Server)) {
if err := s.reactor.DispatchWithSystem(SyncMessage(s, func(srv *server) {
handler(srv)
})); err != nil {
panic(err)
}
}
func (s *server) PushAsyncMessage(handler func(srv Server) error, callbacks ...func(srv Server, err error)) {
if err := s.reactor.DispatchWithSystem(AsyncMessage(s, reactor.SysIdent, func(srv *server) error {
return handler(srv)
}, func(srv *server, err error) {
for _, callback := range callbacks {
callback(srv, err)
}
})); err != nil {
panic(err)
}
}
func (s *server) GetStatus() *State {
return s.state.Status()
}

View File

@ -23,7 +23,7 @@ func TestNewServer(t *testing.T) {
})
}()
srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Second*3))
srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Day*3))
srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) {
if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil {
@ -35,6 +35,14 @@ func TestNewServer(t *testing.T) {
if err := conn.WritePacket(packet); err != nil {
panic(err)
}
srv.PushAsyncMessage(func(srv server.Server) error {
for i := 0; i < 3; i++ {
time.Sleep(time.Second)
}
return nil
}, func(srv server.Server, err error) {
t.Log("callback")
})
})
if err := srv.Run(); err != nil {