diff --git a/server/internal/v2/actor.go b/server/internal/v2/actor.go deleted file mode 100644 index d70516f..0000000 --- a/server/internal/v2/actor.go +++ /dev/null @@ -1,3 +0,0 @@ -package server - -type diff --git a/server/internal/v2/actor/actor.go b/server/internal/v2/actor/actor.go deleted file mode 100644 index 8acb236..0000000 --- a/server/internal/v2/actor/actor.go +++ /dev/null @@ -1,7 +0,0 @@ -package actor - -import "github.com/kercylan98/minotaur/server/internal/v2/dispatcher" - -type Actor[M any] struct { - *dispatcher.Dispatcher[M] -} diff --git a/server/internal/v2/conn.go b/server/internal/v2/conn.go index 51bb9e9..ce2e92f 100644 --- a/server/internal/v2/conn.go +++ b/server/internal/v2/conn.go @@ -1,26 +1,58 @@ package server import ( - "context" - "github.com/kercylan98/minotaur/server/internal/v2/dispatcher" "net" ) type ConnWriter func(packet Packet) error type Conn interface { + // SetActor 设置连接使用的 Actor 名称 + SetActor(actor string) + + // GetActor 获取连接使用的 Actor 名称 + GetActor() string + + // WritePacket 写入一个 Packet + WritePacket(packet Packet) error + + // Write 写入数据 + Write(data []byte) (n int, err error) + + // WriteBytes 写入数据 + WriteBytes(data []byte) error } -func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter) Conn { +func newConn(c net.Conn, connWriter ConnWriter) *conn { return &conn{ conn: c, writer: connWriter, - actor: dispatcher.NewActor[Packet](ctx, handler), } } type conn struct { conn net.Conn writer ConnWriter - actor *dispatcher.Actor[Packet] + actor string +} + +func (c *conn) SetActor(actor string) { + c.actor = actor +} + +func (c *conn) GetActor() string { + return c.actor +} + +func (c *conn) WritePacket(packet Packet) error { + return c.writer(packet) +} + +func (c *conn) Write(data []byte) (n int, err error) { + return c.conn.Write(data) +} + +func (c *conn) WriteBytes(data []byte) error { + _, err := c.conn.Write(data) + return err } diff --git a/server/internal/v2/controller.go b/server/internal/v2/controller.go index 74ee17d..4f3f54b 100644 --- a/server/internal/v2/controller.go +++ b/server/internal/v2/controller.go @@ -3,23 +3,51 @@ package server import "net" type Controller interface { - Run() error - Shutdown() error + RegisterConnection(conn net.Conn, writer ConnWriter) + EliminateConnection(conn net.Conn, err error) + ReactPacket(conn net.Conn, packet Packet) } type controller struct { *server + connections map[net.Conn]*conn } func (s *controller) init(srv *server) *controller { s.server = srv + s.connections = make(map[net.Conn]*conn) return s } -func (s *controller) RegisterConn(conn net.Conn, writer ConnWriter) { - +func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) { + if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) { + srv.connections[conn] = newConn(conn, writer) + })); err != nil { + panic(err) + } } -func (s *controller) UnRegisterConn() { - +func (s *controller) EliminateConnection(conn net.Conn, err error) { + if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) { + delete(srv.connections, conn) + })); err != nil { + panic(err) + } +} + +func (s *controller) ReactPacket(conn net.Conn, packet Packet) { + if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) { + c, exist := srv.connections[conn] + if !exist { + return + } + + if err := srv.reactor.Dispatch(c.GetActor(), HandlerMessage(srv, func(srv *server) { + srv.events.onConnectionReceivePacket(c, packet) + })); err != nil { + panic(err) + } + })); err != nil { + panic(err) + } } diff --git a/server/internal/v2/dispatcher/dispatcher.go b/server/internal/v2/dispatcher/dispatcher.go deleted file mode 100644 index 8ef6a02..0000000 --- a/server/internal/v2/dispatcher/dispatcher.go +++ /dev/null @@ -1,108 +0,0 @@ -package dispatcher - -import ( - "context" - "errors" - "github.com/kercylan98/minotaur/utils/buffer" - "github.com/kercylan98/minotaur/utils/log" - "github.com/kercylan98/minotaur/utils/super" - "sync" - "sync/atomic" -) - -// NewDispatcher 创建一个消息调度器 -func NewDispatcher[M any](bufferSize int, handler func(M)) *Dispatcher[M] { - d := &Dispatcher[M]{ - buf: buffer.NewRing[M](bufferSize), - bufCond: sync.NewCond(&sync.Mutex{}), - ctx: context.Background(), - } - - d.BindErrorHandler(func(err error) { - log.Error("dispatcher", log.Err(err)) - }) - - d.handler = func(m M) { - defer func() { - if err := super.RecoverTransform(recover()); err != nil { - d.errorHandler.Load().(func(error))(err) - } - }() - handler(m) - } - - return d -} - -// Dispatcher 并发安全且不阻塞的消息调度器 -type Dispatcher[M any] struct { - buf *buffer.Ring[M] - bufCond *sync.Cond - handler func(M) - closed bool - ctx context.Context - errorHandler atomic.Value -} - -// BindErrorHandler 绑定一个错误处理器到调度器中 -func (d *Dispatcher[M]) BindErrorHandler(handler func(error)) { - d.errorHandler.Store(handler) -} - -// BindContext 绑定一个上下文到调度器中 -func (d *Dispatcher[M]) BindContext(ctx context.Context) { - d.bufCond.L.Lock() - d.ctx = ctx - if _, canceled := d.ctx.Deadline(); canceled { - d.closed = true - d.bufCond.Signal() - d.bufCond.L.Unlock() - return - } - d.bufCond.L.Unlock() -} - -// Send 发送消息到调度器中等待处理 -func (d *Dispatcher[M]) Send(m M) error { - d.bufCond.L.Lock() - if d.closed { - d.bufCond.L.Unlock() - return errors.New("dispatcher closed") - } - d.buf.Write(m) - d.bufCond.Signal() - d.bufCond.L.Unlock() - return nil -} - -// Start 阻塞式启动调度器,调用后将开始处理消息 -func (d *Dispatcher[M]) Start() { - for { - select { - case <-d.ctx.Done(): - d.Stop() - default: - d.bufCond.L.Lock() - if d.buf.Len() == 0 { - if d.closed { - d.bufCond.L.Unlock() - return - } - d.bufCond.Wait() - } - messages := d.buf.ReadAll() - d.bufCond.L.Unlock() - for _, msg := range messages { - d.handler(msg) - } - } - } -} - -// Stop 停止调度器,调用后将不再接受新消息,但会处理完已有消息 -func (d *Dispatcher[M]) Stop() { - d.bufCond.L.Lock() - d.closed = true - d.bufCond.Signal() - d.bufCond.L.Unlock() -} diff --git a/server/internal/v2/events.go b/server/internal/v2/events.go new file mode 100644 index 0000000..c806dd6 --- /dev/null +++ b/server/internal/v2/events.go @@ -0,0 +1,33 @@ +package server + +import "github.com/kercylan98/minotaur/utils/collection/listings" + +type ( + ConnectionReceivePacketEventHandler func(srv Server, conn Conn, packet Packet) +) + +type Events interface { + RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) +} + +type events struct { + *server + + connectionReceivePacketEventHandlers listings.PrioritySlice[ConnectionReceivePacketEventHandler] +} + +func (s *events) init(srv *server) *events { + s.server = srv + return s +} + +func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) { + s.connectionReceivePacketEventHandlers.AppendByOptionalPriority(handler, priority...) +} + +func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) { + s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool { + value(s.server, conn, packet) + return true + }) +} diff --git a/server/internal/v2/message.go b/server/internal/v2/message.go new file mode 100644 index 0000000..60fbfb2 --- /dev/null +++ b/server/internal/v2/message.go @@ -0,0 +1,18 @@ +package server + +type Message interface { + Execute() +} + +func HandlerMessage(srv *server, handler func(srv *server)) Message { + return &handlerMessage{srv: srv, handler: handler} +} + +type handlerMessage struct { + srv *server + handler func(srv *server) +} + +func (s *handlerMessage) Execute() { + s.handler(s.srv) +} diff --git a/server/internal/v2/network/http.go b/server/internal/v2/network/http.go index 88faa1d..00273ee 100644 --- a/server/internal/v2/network/http.go +++ b/server/internal/v2/network/http.go @@ -9,7 +9,7 @@ import ( "time" ) -func Http(addr string) server.server { +func Http(addr string) server.Network { return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()}) } @@ -27,14 +27,14 @@ func HttpWithHandler[H http.Handler](addr string, handler H) server.Network { } type httpCore[H http.Handler] struct { - addr string - handler H - srv *http.Server - event server.NetworkCore + addr string + handler H + srv *http.Server + controller server.Controller } -func (h *httpCore[H]) OnSetup(ctx context.Context, event server.NetworkCore) (err error) { - h.event = event +func (h *httpCore[H]) OnSetup(ctx context.Context, controller server.Controller) (err error) { + h.controller = controller h.srv.BaseContext = func(listener net.Listener) context.Context { return ctx } diff --git a/server/internal/v2/network/websocket.go b/server/internal/v2/network/websocket.go index f10c746..b75109a 100644 --- a/server/internal/v2/network/websocket.go +++ b/server/internal/v2/network/websocket.go @@ -9,7 +9,7 @@ import ( "time" ) -func WebSocket(addr string, pattern ...string) server.server { +func WebSocket(addr string, pattern ...string) server.Network { ws := &websocketCore{ addr: addr, pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"), @@ -18,17 +18,17 @@ func WebSocket(addr string, pattern ...string) server.server { } type websocketCore struct { - ctx context.Context - core server.NetworkCore - handler *websocketHandler - addr string - pattern string + ctx context.Context + controller server.Controller + handler *websocketHandler + addr string + pattern string } -func (w *websocketCore) OnSetup(ctx context.Context, core server.NetworkCore) (err error) { +func (w *websocketCore) OnSetup(ctx context.Context, controller server.Controller) (err error) { w.ctx = ctx w.handler = newWebsocketHandler(w) - w.core = core + w.controller = controller return } diff --git a/server/internal/v2/network/websocket_handler.go b/server/internal/v2/network/websocket_handler.go index c88e3dd..1f0de8d 100644 --- a/server/internal/v2/network/websocket_handler.go +++ b/server/internal/v2/network/websocket_handler.go @@ -12,14 +12,14 @@ import ( func newWebsocketHandler(core *websocketCore) *websocketHandler { return &websocketHandler{ - core: core, + websocketCore: core, } } type websocketHandler struct { engine *gnet.Engine upgrader ws.Upgrader - core *websocketCore + *websocketCore } func (w *websocketHandler) OnBoot(eng gnet.Engine) (action gnet.Action) { @@ -33,17 +33,16 @@ func (w *websocketHandler) OnShutdown(eng gnet.Engine) { } func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { - wrapper := newWebsocketWrapper(w.core.ctx, c) + wrapper := newWebsocketWrapper(c) c.SetContext(wrapper) - w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.server) error { - return wsutil.WriteServerMessage(c, message.GetContext().(ws.OpCode), message.GetBytes()) + w.controller.RegisterConnection(c, func(packet server.Packet) error { + return wsutil.WriteServerMessage(c, packet.GetContext().(ws.OpCode), packet.GetBytes()) }) return } func (w *websocketHandler) OnClose(c gnet.Conn, err error) (action gnet.Action) { - wrapper := c.Context().(*websocketWrapper) - wrapper.cancel() + w.controller.EliminateConnection(c, err) return } @@ -70,9 +69,9 @@ func (w *websocketHandler) OnTraffic(c gnet.Conn) (action gnet.Action) { } for _, message := range messages { - packet := w.core.core.GeneratePacket(message.Payload) + packet := server.NewPacket(message.Payload) packet.SetContext(message.OpCode) - w.core.core.OnReceivePacket(packet) + w.controller.ReactPacket(c, packet) } return @@ -85,7 +84,7 @@ func (w *websocketHandler) OnTick() (delay time.Duration, action gnet.Action) { func (w *websocketHandler) initUpgrader() { w.upgrader = ws.Upgrader{ OnRequest: func(uri []byte) (err error) { - if string(uri) != w.core.pattern { + if string(uri) != w.pattern { err = errors.New("bad request") } return diff --git a/server/internal/v2/network/wsbsocket_wrapper.go b/server/internal/v2/network/wsbsocket_wrapper.go index f77862c..3bc155e 100644 --- a/server/internal/v2/network/wsbsocket_wrapper.go +++ b/server/internal/v2/network/wsbsocket_wrapper.go @@ -8,26 +8,22 @@ import ( "github.com/gobwas/ws/wsutil" "github.com/kercylan98/minotaur/utils/super" "github.com/panjf2000/gnet/v2" - "golang.org/x/net/context" "io" "time" ) // newWebsocketWrapper 创建 websocket 包装器 -func newWebsocketWrapper(ctx context.Context, conn gnet.Conn) *websocketWrapper { +func newWebsocketWrapper(conn gnet.Conn) *websocketWrapper { wrapper := &websocketWrapper{ conn: conn, upgraded: false, active: time.Now(), } - wrapper.ctx, wrapper.cancel = context.WithCancel(ctx) return wrapper } // websocketWrapper websocket 包装器 type websocketWrapper struct { - ctx context.Context - cancel context.CancelFunc conn gnet.Conn // 连接 upgraded bool // 是否已经升级 hs ws.Handshake // 握手信息 diff --git a/server/internal/v2/packet.go b/server/internal/v2/packet.go index ab9d03b..6055b73 100644 --- a/server/internal/v2/packet.go +++ b/server/internal/v2/packet.go @@ -1,8 +1,12 @@ package server +func NewPacket(data []byte) Packet { + return new(packet).init(data) +} + type Packet interface { GetBytes() []byte - SetContext(ctx any) + SetContext(ctx any) Packet GetContext() any } @@ -25,8 +29,9 @@ func (m *packet) GetBytes() []byte { return m.data } -func (m *packet) SetContext(ctx any) { +func (m *packet) SetContext(ctx any) Packet { m.ctx = ctx + return m } func (m *packet) GetContext() any { diff --git a/server/internal/v2/server.go b/server/internal/v2/server.go index 3934b2c..005b842 100644 --- a/server/internal/v2/server.go +++ b/server/internal/v2/server.go @@ -1,17 +1,24 @@ package server -import "golang.org/x/net/context" +import ( + "github.com/kercylan98/minotaur/server/internal/v2/reactor" + "golang.org/x/net/context" +) type Server interface { + Events + Run() error Shutdown() error } type server struct { *controller + *events ctx context.Context cancel context.CancelFunc network Network + reactor *reactor.Reactor[Message] } func NewServer(network Network) Server { @@ -20,10 +27,15 @@ func NewServer(network Network) Server { } srv.ctx, srv.cancel = context.WithCancel(context.Background()) srv.controller = new(controller).init(srv) + srv.events = new(events).init(srv) + srv.reactor = reactor.NewReactor[Message](1024*8, 1024, func(msg Message) { + msg.Execute() + }, nil) return srv } func (s *server) Run() (err error) { + go s.reactor.Run() if err = s.network.OnSetup(s.ctx, s); err != nil { return } @@ -35,7 +47,8 @@ func (s *server) Run() (err error) { } func (s *server) Shutdown() (err error) { - defer s.server.cancel() + defer s.cancel() err = s.network.OnShutdown() + s.reactor.Close() return } diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go index 74170d0..9e339e8 100644 --- a/server/internal/v2/server_test.go +++ b/server/internal/v2/server_test.go @@ -7,7 +7,12 @@ import ( ) func TestNewServer(t *testing.T) { - srv := server.server.NewServer(network.WebSocket(":9999")) + srv := server.NewServer(network.WebSocket(":9999")) + srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) { + if err := conn.WritePacket(packet); err != nil { + panic(err) + } + }) if err := srv.Run(); err != nil { panic(err) }