From 7cb5dd069a015c2322045e2a156c64998fc51f50 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 3 Apr 2024 16:51:40 +0800 Subject: [PATCH] =?UTF-8?q?other:=20=E6=96=B0=E7=89=88=20server=20?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E3=80=81=E5=BC=82=E6=AD=A5=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/internal/v2/conn.go | 28 +++- server/internal/v2/controller.go | 8 +- server/internal/v2/events.go | 10 +- server/internal/v2/message.go | 86 +++++++++- server/internal/v2/notify.go | 7 + server/internal/v2/queue/errors.go | 10 ++ server/internal/v2/queue/message.go | 6 + server/internal/v2/queue/message_handler.go | 9 ++ server/internal/v2/queue/message_wrapper.go | 23 +++ server/internal/v2/queue/queue.go | 168 ++++++++++++++++++++ server/internal/v2/queue/state.go | 34 ++++ server/internal/v2/reactor/handlers.go | 6 +- server/internal/v2/reactor/queue.go | 82 ---------- server/internal/v2/reactor/queue_message.go | 6 - server/internal/v2/reactor/queue_state.go | 39 ----- server/internal/v2/reactor/reactor.go | 163 ++++++++++--------- server/internal/v2/reactor/reactor_test.go | 21 +-- server/internal/v2/reactor/ref.go | 6 - server/internal/v2/server.go | 49 +++++- server/internal/v2/server_test.go | 10 +- 20 files changed, 532 insertions(+), 239 deletions(-) create mode 100644 server/internal/v2/queue/errors.go create mode 100644 server/internal/v2/queue/message.go create mode 100644 server/internal/v2/queue/message_handler.go create mode 100644 server/internal/v2/queue/message_wrapper.go create mode 100644 server/internal/v2/queue/queue.go create mode 100644 server/internal/v2/queue/state.go delete mode 100644 server/internal/v2/reactor/queue.go delete mode 100644 server/internal/v2/reactor/queue_message.go delete mode 100644 server/internal/v2/reactor/queue_state.go delete mode 100644 server/internal/v2/reactor/ref.go diff --git a/server/internal/v2/conn.go b/server/internal/v2/conn.go index c2a038a..282b764 100644 --- a/server/internal/v2/conn.go +++ b/server/internal/v2/conn.go @@ -25,16 +25,22 @@ type Conn interface { // WriteContext 写入数据 WriteContext(data []byte, context interface{}) error + + PushSyncMessage(handler func(srv Server, conn Conn)) + + PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error)) } -func newConn(c net.Conn, connWriter ConnWriter) *conn { +func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn { return &conn{ + server: srv, conn: c, writer: connWriter, } } type conn struct { + server *server conn net.Conn // 连接 writer ConnWriter // 写入器 actor string // Actor 名称 @@ -64,3 +70,23 @@ func (c *conn) WriteBytes(data []byte) error { func (c *conn) WriteContext(data []byte, context interface{}) error { return c.writer(NewPacket(data).SetContext(context)) } + +func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) { + if err := c.server.reactor.Dispatch(c.actor, SyncMessage(c.server, func(srv *server) { + handler(srv, c) + })); err != nil { + panic(err) + } +} + +func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error)) { + if err := c.server.reactor.Dispatch(c.actor, AsyncMessage(c.server, c.actor, func(srv *server) error { + return handler(srv, c) + }, func(srv *server, err error) { + for _, callback := range callbacks { + callback(srv, c, err) + } + })); err != nil { + panic(err) + } +} diff --git a/server/internal/v2/controller.go b/server/internal/v2/controller.go index d4600e0..f22c772 100644 --- a/server/internal/v2/controller.go +++ b/server/internal/v2/controller.go @@ -20,8 +20,8 @@ func (s *controller) init(srv *server) *controller { } func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) { - if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { - c := newConn(conn, writer) + if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { + c := newConn(s.server, conn, writer) srv.connections[conn] = c s.events.onConnectionOpened(c) })); err != nil { @@ -30,7 +30,7 @@ func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) { } func (s *controller) EliminateConnection(conn net.Conn, err error) { - if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { c, exist := srv.connections[conn] if !exist { return @@ -43,7 +43,7 @@ func (s *controller) EliminateConnection(conn net.Conn, err error) { } func (s *controller) ReactPacket(conn net.Conn, packet Packet) { - if err := s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + if err := s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { c, exist := srv.connections[conn] if !exist { return diff --git a/server/internal/v2/events.go b/server/internal/v2/events.go index 7d5ea8d..0e8f0a1 100644 --- a/server/internal/v2/events.go +++ b/server/internal/v2/events.go @@ -66,7 +66,7 @@ func (s *events) onLaunched() { opt.logger.Info("Minotaur Server", log.String("", "============================================================================")) }) - _ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + _ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { s.launchedEventHandlers.RangeValue(func(index int, value LaunchedEventHandler) bool { value(s.server, s.server.state.Ip, s.server.state.LaunchedAt) return true @@ -79,7 +79,7 @@ func (s *events) RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHand } func (s *events) onConnectionOpened(conn Conn) { - _ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + _ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { s.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool { value(s.server, conn) return true @@ -92,7 +92,7 @@ func (s *events) RegisterConnectionClosedEvent(handler ConnectionClosedEventHand } func (s *events) onConnectionClosed(conn Conn, err error) { - _ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + _ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { s.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool { value(s.server, conn, err) return true @@ -105,7 +105,7 @@ func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceiveP } func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) { - _ = s.server.reactor.AutoDispatch(conn.GetActor(), NativeMessage(s.server, func(srv *server) { + _ = s.server.reactor.AutoDispatch(conn.GetActor(), SyncMessage(s.server, func(srv *server) { s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool { value(s.server, conn, packet) return true @@ -118,7 +118,7 @@ func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority .. } func (s *events) onShutdown() { - _ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + _ = s.server.reactor.DispatchWithSystem(SyncMessage(s.server, func(srv *server) { s.shutdownEventHandlers.RangeValue(func(index int, value ShutdownEventHandler) bool { value(s.server) return true diff --git a/server/internal/v2/message.go b/server/internal/v2/message.go index 77c737b..6039c32 100644 --- a/server/internal/v2/message.go +++ b/server/internal/v2/message.go @@ -1,18 +1,96 @@ package server +import ( + "github.com/kercylan98/minotaur/server/internal/v2/queue" + "github.com/kercylan98/minotaur/server/internal/v2/reactor" + "github.com/kercylan98/minotaur/utils/log/v2" + "github.com/kercylan98/minotaur/utils/super" + "runtime/debug" +) + type Message interface { Execute() } -func NativeMessage(srv *server, handler func(srv *server)) Message { - return &nativeMessage{srv: srv, handler: handler} +func SyncMessage(srv *server, handler func(srv *server)) Message { + return &syncMessage{srv: srv, handler: handler} } -type nativeMessage struct { +type syncMessage struct { srv *server handler func(srv *server) } -func (s *nativeMessage) Execute() { +func (s *syncMessage) Execute() { s.handler(s.srv) } + +func AsyncMessage(srv *server, ident string, handler func(srv *server) error, callback func(srv *server, err error)) Message { + return &asyncMessage{ + ident: ident, + srv: srv, + handler: handler, + callback: callback, + } +} + +type asyncMessage struct { + ident string + srv *server + handler func(srv *server) error + callback func(srv *server, err error) +} + +func (s *asyncMessage) Execute() { + var q *queue.Queue[int, string, Message] + var dispatch = func(ident string, message Message, beforeHandler ...func(queue *queue.Queue[int, string, Message], msg Message)) { + if ident == "" { + _ = s.srv.reactor.DispatchWithSystem(message, beforeHandler...) + } else { + _ = s.srv.reactor.Dispatch(ident, message, beforeHandler...) + } + } + + dispatch( + s.ident, + SyncMessage(s.srv, func(srv *server) { + _ = srv.ants.Submit(func() { + defer func(srv *server, msg *asyncMessage) { + if err := super.RecoverTransform(recover()); err != nil { + if errHandler := srv.GetMessageErrorHandler(); errHandler != nil { + errHandler(srv, msg, err) + } else { + srv.GetLogger().Error("Message", log.Err(err)) + debug.PrintStack() + } + } + }(s.srv, s) + + err := s.handler(srv) + var msg Message + msg = SyncMessage(srv, func(srv *server) { + defer func() { + q.WaitAdd(s.ident, -1) + if err := super.RecoverTransform(recover()); err != nil { + if errHandler := srv.GetMessageErrorHandler(); errHandler != nil { + errHandler(srv, msg, err) + } else { + srv.GetLogger().Error("Message", log.Err(err)) + debug.PrintStack() + } + } + }() + if s.callback != nil { + s.callback(srv, err) + } + }) + dispatch(s.ident, msg) + + }) + }), + func(queue *queue.Queue[int, string, Message], msg Message) { + queue.WaitAdd(reactor.SysIdent, 1) + q = queue + }, + ) +} diff --git a/server/internal/v2/notify.go b/server/internal/v2/notify.go index 118a6a9..e071d4b 100644 --- a/server/internal/v2/notify.go +++ b/server/internal/v2/notify.go @@ -29,11 +29,18 @@ func (n *notify) init(srv *server) *notify { return n } +func (n *notify) close() { + close(n.systemSignal) + close(n.lifeCycleLimit) + close(n.lifeCycleTime) +} + func (n *notify) run() { defer func() { if err := n.server.Shutdown(); err != nil { panic(err) } + n.close() }() for { select { diff --git a/server/internal/v2/queue/errors.go b/server/internal/v2/queue/errors.go new file mode 100644 index 0000000..9675217 --- /dev/null +++ b/server/internal/v2/queue/errors.go @@ -0,0 +1,10 @@ +package queue + +import ( + "errors" +) + +var ( + ErrorQueueClosed = errors.New("queue closed") // 队列已关闭 + ErrorQueueInvalid = errors.New("queue invalid") // 无效的队列 +) diff --git a/server/internal/v2/queue/message.go b/server/internal/v2/queue/message.go new file mode 100644 index 0000000..4bafd43 --- /dev/null +++ b/server/internal/v2/queue/message.go @@ -0,0 +1,6 @@ +package queue + +// Message 消息接口定义 +type Message interface { + // 保留 +} diff --git a/server/internal/v2/queue/message_handler.go b/server/internal/v2/queue/message_handler.go new file mode 100644 index 0000000..aabab48 --- /dev/null +++ b/server/internal/v2/queue/message_handler.go @@ -0,0 +1,9 @@ +package queue + +// MessageHandler 消息处理器支持传入两个函数对消息进行处理 +// - 在 handler 内可以执行对消息的逻辑 +// - 在 finisher 函数中可以接收到该消息是否是最后一条消息 +type MessageHandler[Id, Ident comparable, M Message] func( + handler func(m MessageWrapper[Id, Ident, M]), + finisher func(m MessageWrapper[Id, Ident, M], last bool), +) diff --git a/server/internal/v2/queue/message_wrapper.go b/server/internal/v2/queue/message_wrapper.go new file mode 100644 index 0000000..0dc345c --- /dev/null +++ b/server/internal/v2/queue/message_wrapper.go @@ -0,0 +1,23 @@ +package queue + +// MessageWrapper 提供了对外部消息的包装,用于方便的获取消息信息 +type MessageWrapper[Id, Ident comparable, M Message] struct { + queue *Queue[Id, Ident, M] // 处理消息的队列 + ident Ident // 消息所有人 + msg M // 消息信息 +} + +// Queue 返回处理该消息的队列 +func (m MessageWrapper[Id, Ident, M]) Queue() *Queue[Id, Ident, M] { + return m.queue +} + +// Ident 返回消息的所有人 +func (m MessageWrapper[Id, Ident, M]) Ident() Ident { + return m.ident +} + +// Message 返回消息的具体实例 +func (m MessageWrapper[Id, Ident, M]) Message() M { + return m.msg +} diff --git a/server/internal/v2/queue/queue.go b/server/internal/v2/queue/queue.go new file mode 100644 index 0000000..c5806d6 --- /dev/null +++ b/server/internal/v2/queue/queue.go @@ -0,0 +1,168 @@ +package queue + +import ( + "github.com/kercylan98/minotaur/utils/buffer" + "sync" + "sync/atomic" +) + +// New 创建一个并发安全的队列 Queue,该队列支持通过 chanSize 自定义读取 channel 的大小,同支持使用 bufferSize 指定 buffer.Ring 的初始大小 +func New[Id, Ident comparable, M Message](id Id, chanSize, bufferSize int) *Queue[Id, Ident, M] { + q := &Queue[Id, Ident, M]{ + c: make(chan MessageHandler[Id, Ident, M], chanSize), + buf: buffer.NewRing[MessageWrapper[Id, Ident, M]](bufferSize), + condRW: &sync.RWMutex{}, + identifiers: make(map[Ident]int64), + } + q.cond = sync.NewCond(q.condRW) + q.state = &State[Id, Ident, M]{ + queue: q, + id: id, + status: StatusNone, + } + return q +} + +// Queue 队列是一个适用于消息处理等场景的并发安全的数据结构 +// - 该队列接收自定义的消息 M,并将消息有序的传入 Read 函数所返回的 channel 中以供处理 +// - 该结构主要实现目标为读写分离且并发安全的非阻塞传输队列,当消费阻塞时以牺牲内存为代价换取消息的生产不阻塞,适用于服务器消息处理等 +// - 该队列保证了消息的完整性,确保消息不丢失,在队列关闭后会等待所有消息处理完毕后进行关闭,并提供 SetClosedHandler 函数来监听队列的关闭信号 +type Queue[Id, Ident comparable, M Message] struct { + state *State[Id, Ident, M] // 队列状态信息 + c chan MessageHandler[Id, Ident, M] // 消息读取通道 + buf *buffer.Ring[MessageWrapper[Id, Ident, M]] // 消息缓冲区 + cond *sync.Cond // 条件变量 + condRW *sync.RWMutex // 条件变量的读写锁 + closedHandler func(q *Queue[Id, Ident, M]) // 关闭处理函数 + identifiers map[Ident]int64 // 标识符在队列的消息计数映射 +} + +// Id 获取队列 Id +func (q *Queue[Id, Ident, M]) Id() Id { + return q.state.id +} + +// SetId 设置队列 Id +func (q *Queue[Id, Ident, M]) SetId(id Id) { + q.state.id = id +} + +// SetClosedHandler 设置队列关闭处理函数,在队列关闭后将执行该函数。此刻队列不再可用 +// - Close 函数为非阻塞调用,调用后不会立即关闭队列,会等待消息处理完毕且处理期间不再有新消息介入 +func (q *Queue[Id, Ident, M]) SetClosedHandler(handler func(q *Queue[Id, Ident, M])) { + q.closedHandler = handler +} + +// Run 阻塞的运行队列,当队列非首次运行时,将会引发来自 ErrorQueueInvalid 的 panic +func (q *Queue[Id, Ident, M]) Run() { + if atomic.LoadInt32(&q.state.status) != StatusNone { + panic(ErrorQueueInvalid) + } + atomic.StoreInt32(&q.state.status, StatusRunning) + defer func(q *Queue[Id, Ident, M]) { + if q.closedHandler != nil { + q.closedHandler(q) + } + }(q) + for { + q.cond.L.Lock() + for q.buf.IsEmpty() { + if atomic.LoadInt32(&q.state.status) >= StatusClosing && q.state.total == 0 { + q.cond.L.Unlock() + atomic.StoreInt32(&q.state.status, StatusClosed) + close(q.c) + return + } + q.cond.Wait() + } + items := q.buf.ReadAll() + q.cond.L.Unlock() + for _, item := range items { + q.c <- func(handler func(MessageWrapper[Id, Ident, M]), finisher func(m MessageWrapper[Id, Ident, M], last bool)) { + defer func(msg MessageWrapper[Id, Ident, M]) { + q.cond.L.Lock() + q.state.total-- + curr := q.identifiers[msg.Ident()] - 1 + if curr != 0 { + q.identifiers[msg.Ident()] = curr + } else { + delete(q.identifiers, msg.Ident()) + } + if finisher != nil { + finisher(msg, curr == 0) + } + //log.Info("消息总计数", log.Int64("计数", q.state.total)) + q.cond.Signal() + q.cond.L.Unlock() + }(item) + + handler(item) + } + } + } +} + +// Push 向队列中推送来自 ident 的消息 m,当队列已关闭时将会返回 ErrorQueueClosed +func (q *Queue[Id, Ident, M]) Push(ident Ident, m M) error { + if atomic.LoadInt32(&q.state.status) > StatusClosing { + return ErrorQueueClosed + } + q.cond.L.Lock() + q.identifiers[ident]++ + q.state.total++ + q.buf.Write(MessageWrapper[Id, Ident, M]{ + ident: ident, + msg: m, + }) + //log.Info("消息总计数", log.Int64("计数", q.state.total)) + q.cond.Signal() + q.cond.L.Unlock() + return nil +} + +// WaitAdd 向队列增加来自外部的等待计数,在队列关闭时会等待该计数归零 +func (q *Queue[Id, Ident, M]) WaitAdd(ident Ident, delta int64) { + q.cond.L.Lock() + + currIdent := q.identifiers[ident] + currIdent += delta + q.identifiers[ident] = currIdent + q.state.total += delta + //log.Info("消息总计数", log.Int64("计数", q.state.total)) + + q.cond.Signal() + q.cond.L.Unlock() +} + +// Read 获取队列消息的只读通道, +func (q *Queue[Id, Ident, M]) Read() <-chan MessageHandler[Id, Ident, M] { + return q.c +} + +// Close 关闭队列 +func (q *Queue[Id, Ident, M]) Close() { + atomic.CompareAndSwapInt32(&q.state.status, StatusRunning, StatusClosing) + q.cond.Broadcast() +} + +// State 获取队列状态 +func (q *Queue[Id, Ident, M]) State() *State[Id, Ident, M] { + return q.state +} + +// GetMessageCount 获取消息数量 +func (q *Queue[Id, Ident, M]) GetMessageCount() (count int64) { + q.condRW.RLock() + defer q.condRW.RUnlock() + for _, i := range q.identifiers { + count += i + } + return +} + +// GetMessageCountWithIdent 获取特定消息人的消息数量 +func (q *Queue[Id, Ident, M]) GetMessageCountWithIdent(ident Ident) int64 { + q.condRW.RLock() + defer q.condRW.RUnlock() + return q.identifiers[ident] +} diff --git a/server/internal/v2/queue/state.go b/server/internal/v2/queue/state.go new file mode 100644 index 0000000..a61173d --- /dev/null +++ b/server/internal/v2/queue/state.go @@ -0,0 +1,34 @@ +package queue + +import ( + "sync/atomic" +) + +const ( + StatusNone = iota - 1 // 队列未运行 + StatusRunning // 队列运行中 + StatusClosing // 队列关闭中 + StatusClosed // 队列已关闭 +) + +type State[Id, Ident comparable, M Message] struct { + queue *Queue[Id, Ident, M] + id Id // 队列 ID + status int32 // 状态标志 + total int64 // 消息总计数 +} + +// IsClosed 判断队列是否已关闭 +func (q *State[Id, Ident, M]) IsClosed() bool { + return atomic.LoadInt32(&q.status) == StatusClosed +} + +// IsClosing 判断队列是否正在关闭 +func (q *State[Id, Ident, M]) IsClosing() bool { + return atomic.LoadInt32(&q.status) == StatusClosing +} + +// IsRunning 判断队列是否正在运行 +func (q *State[Id, Ident, M]) IsRunning() bool { + return atomic.LoadInt32(&q.status) == StatusRunning +} diff --git a/server/internal/v2/reactor/handlers.go b/server/internal/v2/reactor/handlers.go index 59aa4a7..ac5f956 100644 --- a/server/internal/v2/reactor/handlers.go +++ b/server/internal/v2/reactor/handlers.go @@ -1,7 +1,7 @@ package reactor -type queueMessageHandler[M any] func(q *queue[M], ident *identifiable, msg M) +import "github.com/kercylan98/minotaur/server/internal/v2/queue" -type MessageHandler[M any] func(msg M) +type MessageHandler[M any] func(message queue.MessageWrapper[int, string, M]) -type ErrorHandler[M any] func(msg M, err error) +type ErrorHandler[M any] func(message queue.MessageWrapper[int, string, M], err error) diff --git a/server/internal/v2/reactor/queue.go b/server/internal/v2/reactor/queue.go deleted file mode 100644 index 68d8d4b..0000000 --- a/server/internal/v2/reactor/queue.go +++ /dev/null @@ -1,82 +0,0 @@ -package reactor - -import ( - "errors" - "github.com/kercylan98/minotaur/utils/buffer" - "sync" - "sync/atomic" -) - -func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] { - q := &queue[M]{ - c: make(chan queueMessage[M], chanSize), - buf: buffer.NewRing[queueMessage[M]](bufferSize), - cond: sync.NewCond(&sync.Mutex{}), - } - q.QueueState = &QueueState[M]{ - queue: q, - idx: idx, - status: QueueStatusNone, - } - return q -} - -type queue[M any] struct { - *QueueState[M] - c chan queueMessage[M] // 通道 - buf *buffer.Ring[queueMessage[M]] // 缓冲区 - cond *sync.Cond // 条件变量 - closedHandler func(q *queue[M]) // 关闭处理函数 -} - -func (q *queue[M]) Id() int { - return q.idx -} - -func (q *queue[M]) setClosedHandler(handler func(q *queue[M])) { - q.closedHandler = handler -} - -func (q *queue[M]) run() { - atomic.StoreInt32(&q.status, QueueStatusRunning) - defer func(q *queue[M]) { - atomic.StoreInt32(&q.status, QueueStatusClosed) - if q.closedHandler != nil { - q.closedHandler(q) - } - }(q) - for { - q.cond.L.Lock() - for q.buf.IsEmpty() { - if atomic.LoadInt32(&q.status) >= QueueStatusClosing { - q.cond.L.Unlock() - close(q.c) - return - } - q.cond.Wait() - } - items := q.buf.ReadAll() - q.cond.L.Unlock() - for _, item := range items { - q.c <- item - } - } -} - -func (q *queue[M]) push(ident *identifiable, m M) error { - if atomic.LoadInt32(&q.status) > QueueStatusRunning { - return errors.New("queue status exception") - } - q.cond.L.Lock() - q.buf.Write(queueMessage[M]{ - ident: ident, - msg: m, - }) - q.cond.Signal() - q.cond.L.Unlock() - return nil -} - -func (q *queue[M]) read() <-chan queueMessage[M] { - return q.c -} diff --git a/server/internal/v2/reactor/queue_message.go b/server/internal/v2/reactor/queue_message.go deleted file mode 100644 index 930795b..0000000 --- a/server/internal/v2/reactor/queue_message.go +++ /dev/null @@ -1,6 +0,0 @@ -package reactor - -type queueMessage[M any] struct { - ident *identifiable - msg M -} diff --git a/server/internal/v2/reactor/queue_state.go b/server/internal/v2/reactor/queue_state.go deleted file mode 100644 index 72926d4..0000000 --- a/server/internal/v2/reactor/queue_state.go +++ /dev/null @@ -1,39 +0,0 @@ -package reactor - -import ( - "sync/atomic" -) - -const ( - QueueStatusNone = iota - 1 // 队列未运行 - QueueStatusRunning // 队列运行中 - QueueStatusClosing // 队列关闭中 - QueueStatusClosed // 队列已关闭 -) - -type QueueState[M any] struct { - queue *queue[M] - idx int // 队列索引 - status int32 // 状态标志 -} - -// IsClosed 判断队列是否已关闭 -func (q *QueueState[M]) IsClosed() bool { - return atomic.LoadInt32(&q.status) == QueueStatusClosed -} - -// IsClosing 判断队列是否正在关闭 -func (q *QueueState[M]) IsClosing() bool { - return atomic.LoadInt32(&q.status) == QueueStatusClosing -} - -// IsRunning 判断队列是否正在运行 -func (q *QueueState[M]) IsRunning() bool { - return atomic.LoadInt32(&q.status) == QueueStatusRunning -} - -// Close 关闭队列 -func (q *QueueState[M]) Close() { - atomic.CompareAndSwapInt32(&q.status, QueueStatusRunning, QueueStatusClosing) - q.queue.cond.Broadcast() -} diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go index 3b23e6d..f88ae1e 100644 --- a/server/internal/v2/reactor/reactor.go +++ b/server/internal/v2/reactor/reactor.go @@ -2,8 +2,10 @@ package reactor import ( "fmt" - "github.com/alphadose/haxmap" "github.com/kercylan98/minotaur/server/internal/v2/loadbalancer" + "github.com/kercylan98/minotaur/server/internal/v2/queue" + "github.com/kercylan98/minotaur/utils/log/v2" + "github.com/kercylan98/minotaur/utils/str" "github.com/kercylan98/minotaur/utils/super" "runtime" "runtime/debug" @@ -18,109 +20,116 @@ const ( statusClosed // 事件循环已关闭 ) -var sysIdent = &identifiable{ident: "system"} +const SysIdent = str.None // NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列 -func NewReactor[M any](systemQueueSize, queueSize, systemBufferSize, queueBufferSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { +func NewReactor[M queue.Message](systemQueueSize, queueSize, systemBufferSize, queueBufferSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { + if handler == nil { + + } r := &Reactor[M]{ - systemQueue: newQueue[M](-1, systemQueueSize, systemBufferSize), - identifiers: haxmap.New[string, *identifiable](), - lb: loadbalancer.NewRoundRobin[int, *queue[M]](), + systemQueue: queue.New[int, string, M](-1, systemQueueSize, systemBufferSize), + lb: loadbalancer.NewRoundRobin[int, *queue.Queue[int, string, M]](), errorHandler: errorHandler, queueSize: queueSize, queueBufferSize: queueBufferSize, state: statusNone, + handler: handler, + location: make(map[string]int), } + r.logger.Store(log.GetLogger()) defaultNum := runtime.NumCPU() - if defaultNum < 1 { - defaultNum = 1 - } - r.queueRW.Lock() for i := 0; i < defaultNum; i++ { r.noneLockAddQueue() } r.queueRW.Unlock() - r.handler = func(q *queue[M], ident *identifiable, msg M) { - defer func(ident *identifiable, msg M) { - if err := super.RecoverTransform(recover()); err != nil { - defer func(msg M) { - if err = super.RecoverTransform(recover()); err != nil { - fmt.Println(err) - debug.PrintStack() - } - }(msg) - if r.errorHandler != nil { - r.errorHandler(msg, err) - } else { - fmt.Println(err) - debug.PrintStack() - } - } - - if atomic.AddInt64(&ident.n, -1) == 0 { - r.queueRW.Lock() - r.identifiers.Del(ident.ident) - r.queueRW.Unlock() - } - - }(ident, msg) - if handler != nil { - handler(msg) - } - } - return r } // Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列 -type Reactor[M any] struct { - state int32 // 状态 - systemQueue *queue[M] // 系统级别的队列 - queueSize int // 队列管道大小 - queueBufferSize int // 队列缓冲区大小 - queues []*queue[M] // 所有使用的队列 - queueRW sync.RWMutex // 队列读写锁 - identifiers *haxmap.Map[string, *identifiable] // 标识符到队列索引的映射及消息计数 - lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器 - wg sync.WaitGroup // 等待组 - cwg sync.WaitGroup // 关闭等待组 - handler queueMessageHandler[M] // 消息处理器 - errorHandler ErrorHandler[M] // 错误处理器 +type Reactor[M queue.Message] struct { + logger atomic.Pointer[log.Logger] // 日志记录器 + state int32 // 状态 + systemQueue *queue.Queue[int, string, M] // 系统级别的队列 + queueSize int // 队列管道大小 + queueBufferSize int // 队列缓冲区大小 + queues []*queue.Queue[int, string, M] // 所有使用的队列 + queueRW sync.RWMutex // 队列读写锁 + location map[string]int // 所在队列 ID 映射 + locationRW sync.RWMutex // 所在队列 ID 映射锁 + lb *loadbalancer.RoundRobin[int, *queue.Queue[int, string, M]] // 负载均衡器 + wg sync.WaitGroup // 等待组 + cwg sync.WaitGroup // 关闭等待组 + handler MessageHandler[M] // 消息处理器 + errorHandler ErrorHandler[M] // 错误处理器 +} + +// process 消息处理 +func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) { + defer func(msg queue.MessageWrapper[int, string, M]) { + if err := super.RecoverTransform(recover()); err != nil { + if r.errorHandler != nil { + r.errorHandler(msg, err) + } else { + r.logger.Load().Error("Reactor", log.Int("queue", msg.Queue().Id()), log.String("ident", msg.Ident()), log.Err(err)) + debug.PrintStack() + } + } + }(msg) + r.handler(msg) } // AutoDispatch 自动分发,当 ident 为空字符串时,分发到系统级别的队列,否则分发到 ident 使用的队列 func (r *Reactor[M]) AutoDispatch(ident string, msg M) error { - if ident == "" { - return r.SystemDispatch(msg) + if ident == str.None { + return r.DispatchWithSystem(msg) } return r.Dispatch(ident, msg) } -// SystemDispatch 将消息分发到系统级别的队列 -func (r *Reactor[M]) SystemDispatch(msg M) error { - if atomic.LoadInt32(&r.state) > statusRunning { +// DispatchWithSystem 将消息分发到系统级别的队列 +func (r *Reactor[M]) DispatchWithSystem(msg M, beforeHandler ...func(queue *queue.Queue[int, string, M], msg M)) error { + if atomic.LoadInt32(&r.state) > statusClosing { r.queueRW.RUnlock() return fmt.Errorf("reactor closing or closed") } - return r.systemQueue.push(sysIdent, msg) + for _, f := range beforeHandler { + f(r.systemQueue, msg) + } + return r.systemQueue.Push(SysIdent, msg) } // Dispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列 -func (r *Reactor[M]) Dispatch(ident string, msg M) error { +// - 设置 count 会增加消息的外部计数,当 Reactor 关闭时会等待外部计数归零 +func (r *Reactor[M]) Dispatch(ident string, msg M, beforeHandler ...func(queue *queue.Queue[int, string, M], msg M)) error { r.queueRW.RLock() - if atomic.LoadInt32(&r.state) > statusRunning { + if atomic.LoadInt32(&r.state) > statusClosing { r.queueRW.RUnlock() return fmt.Errorf("reactor closing or closed") } - next := r.lb.Next() - i, _ := r.identifiers.GetOrSet(ident, &identifiable{ident: ident}) - q := r.queues[next.Id()] - atomic.AddInt64(&i.n, 1) + + var next *queue.Queue[int, string, M] + r.locationRW.RLock() + i, exist := r.location[ident] + r.locationRW.RUnlock() + if !exist { + r.locationRW.Lock() + if i, exist = r.location[ident]; !exist { + next = r.lb.Next() + r.location[ident] = next.Id() + } + r.locationRW.Unlock() + } else { + next = r.queues[i] + } r.queueRW.RUnlock() - return q.push(i, msg) + for _, f := range beforeHandler { + f(next, msg) + } + return next.Push(ident, msg) } // Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列 @@ -138,35 +147,41 @@ func (r *Reactor[M]) Run() { } func (r *Reactor[M]) noneLockAddQueue() { - q := newQueue[M](len(r.queues), r.queueSize, r.queueBufferSize) + q := queue.New[int, string, M](len(r.queues), r.queueSize, r.queueBufferSize) r.lb.Add(q) // 运行前添加到负载均衡器,未运行时允许接收消息 r.queues = append(r.queues, q) } -func (r *Reactor[M]) noneLockDelQueue(q *queue[M]) { +func (r *Reactor[M]) noneLockDelQueue(q *queue.Queue[int, string, M]) { idx := q.Id() if idx < 0 || idx >= len(r.queues) || r.queues[idx] != q { return } r.queues = append(r.queues[:idx], r.queues[idx+1:]...) for i := idx; i < len(r.queues); i++ { - r.queues[i].idx = i + r.queues[i].SetId(i) } } -func (r *Reactor[M]) runQueue(q *queue[M]) { +func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) { r.wg.Add(1) - q.setClosedHandler(func(q *queue[M]) { + q.SetClosedHandler(func(q *queue.Queue[int, string, M]) { // 关闭时正在等待关闭完成,外部已加锁,无需再次加锁 r.noneLockDelQueue(q) r.cwg.Done() }) - go q.run() + go q.Run() - go func(r *Reactor[M], q *queue[M]) { + go func(r *Reactor[M], q *queue.Queue[int, string, M]) { defer r.wg.Done() - for m := range q.read() { - r.handler(q, m.ident, m.msg) + for m := range q.Read() { + m(r.process, func(m queue.MessageWrapper[int, string, M], last bool) { + if last { + r.queueRW.Lock() + defer r.queueRW.Unlock() + delete(r.location, m.Ident()) + } + }) } }(r, q) } diff --git a/server/internal/v2/reactor/reactor_test.go b/server/internal/v2/reactor/reactor_test.go index 7c37f93..8935ebd 100644 --- a/server/internal/v2/reactor/reactor_test.go +++ b/server/internal/v2/reactor/reactor_test.go @@ -1,6 +1,7 @@ package reactor_test import ( + "github.com/kercylan98/minotaur/server/internal/v2/queue" "github.com/kercylan98/minotaur/server/internal/v2/reactor" "github.com/kercylan98/minotaur/utils/random" "github.com/kercylan98/minotaur/utils/times" @@ -9,11 +10,11 @@ import ( ) func BenchmarkReactor_Dispatch(b *testing.B) { - var r = reactor.NewReactor(1024*16, 1024, func(msg func()) { - msg() - }, func(msg func(), err error) { - b.Error(err) - }).SetDebug(false) + var r = reactor.NewReactor(1024*16, 1024, 1024, 1024, func(message queue.MessageWrapper[int, string, func()]) { + message.Message() + }, func(message queue.MessageWrapper[int, string, func()], err error) { + + }) go r.Run() @@ -28,11 +29,11 @@ func BenchmarkReactor_Dispatch(b *testing.B) { } func TestReactor_Dispatch(t *testing.T) { - var r = reactor.NewReactor(1024*16, 1024, func(msg func()) { - msg() - }, func(msg func(), err error) { - t.Error(err) - }).SetDebug(true) + var r = reactor.NewReactor(1024*16, 1024, 1024, 1024, func(message queue.MessageWrapper[int, string, func()]) { + message.Message() + }, func(message queue.MessageWrapper[int, string, func()], err error) { + + }) go r.Run() diff --git a/server/internal/v2/reactor/ref.go b/server/internal/v2/reactor/ref.go deleted file mode 100644 index 848e6e3..0000000 --- a/server/internal/v2/reactor/ref.go +++ /dev/null @@ -1,6 +0,0 @@ -package reactor - -type identifiable struct { - ident string // 标识 - n int64 // 消息数量 -} diff --git a/server/internal/v2/server.go b/server/internal/v2/server.go index 9093dab..d087c0d 100644 --- a/server/internal/v2/server.go +++ b/server/internal/v2/server.go @@ -2,7 +2,9 @@ package server import ( "context" + "github.com/kercylan98/minotaur/server/internal/v2/queue" "github.com/kercylan98/minotaur/utils/log/v2" + "github.com/panjf2000/ants/v2" "time" "github.com/kercylan98/minotaur/server/internal/v2/reactor" @@ -20,12 +22,19 @@ type Server interface { // GetStatus 获取服务器状态 GetStatus() *State + + // PushSyncMessage 推送同步消息 + PushSyncMessage(handler func(srv Server)) + + // PushAsyncMessage 推送异步消息 + PushAsyncMessage(handler func(srv Server) error, callbacks ...func(srv Server, err error)) } type server struct { *controller *events *Options + ants *ants.Pool state *State notify *notify ctx context.Context @@ -47,14 +56,26 @@ func NewServer(network Network, options ...*Options) Server { srv.reactor = reactor.NewReactor[Message]( srv.GetServerMessageChannelSize(), srv.GetActorMessageChannelSize(), srv.GetServerMessageBufferInitialSize(), srv.GetActorMessageBufferInitialSize(), - func(msg Message) { - msg.Execute() - }, func(msg Message, err error) { + func(message queue.MessageWrapper[int, string, Message]) { + message.Message().Execute() + }, func(message queue.MessageWrapper[int, string, Message], err error) { if handler := srv.GetMessageErrorHandler(); handler != nil { - handler(srv, msg, err) + handler(srv, message.Message(), err) } }) srv.Options.init(srv).Apply(options...) + antsPool, err := ants.NewPool(ants.DefaultAntsPoolSize, ants.WithOptions(ants.Options{ + ExpiryDuration: 10 * time.Second, + Nonblocking: true, + //Logger: &antsLogger{logging.GetDefaultLogger()}, + //PanicHandler: func(i interface{}) { + //logging.Errorf("goroutine pool panic: %v", i) + //}, + })) + if err != nil { + panic(err) + } + srv.ants = antsPool return srv } @@ -91,6 +112,26 @@ func (s *server) Shutdown() (err error) { return } +func (s *server) PushSyncMessage(handler func(srv Server)) { + if err := s.reactor.DispatchWithSystem(SyncMessage(s, func(srv *server) { + handler(srv) + })); err != nil { + panic(err) + } +} + +func (s *server) PushAsyncMessage(handler func(srv Server) error, callbacks ...func(srv Server, err error)) { + if err := s.reactor.DispatchWithSystem(AsyncMessage(s, reactor.SysIdent, func(srv *server) error { + return handler(srv) + }, func(srv *server, err error) { + for _, callback := range callbacks { + callback(srv, err) + } + })); err != nil { + panic(err) + } +} + func (s *server) GetStatus() *State { return s.state.Status() } diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go index f48f253..14e69f9 100644 --- a/server/internal/v2/server_test.go +++ b/server/internal/v2/server_test.go @@ -23,7 +23,7 @@ func TestNewServer(t *testing.T) { }) }() - srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Second*3)) + srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Day*3)) srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) { if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil { @@ -35,6 +35,14 @@ func TestNewServer(t *testing.T) { if err := conn.WritePacket(packet); err != nil { panic(err) } + srv.PushAsyncMessage(func(srv server.Server) error { + for i := 0; i < 3; i++ { + time.Sleep(time.Second) + } + return nil + }, func(srv server.Server, err error) { + t.Log("callback") + }) }) if err := srv.Run(); err != nil {