other: server/v2 基本通讯模型实现

This commit is contained in:
kercylan 2024-03-31 23:05:57 +08:00
parent 5b0ea566d5
commit 89e868bd1c
14 changed files with 175 additions and 164 deletions

View File

@ -1,3 +0,0 @@
package server
type

View File

@ -1,7 +0,0 @@
package actor
import "github.com/kercylan98/minotaur/server/internal/v2/dispatcher"
type Actor[M any] struct {
*dispatcher.Dispatcher[M]
}

View File

@ -1,26 +1,58 @@
package server
import (
"context"
"github.com/kercylan98/minotaur/server/internal/v2/dispatcher"
"net"
)
type ConnWriter func(packet Packet) error
type Conn interface {
// SetActor 设置连接使用的 Actor 名称
SetActor(actor string)
// GetActor 获取连接使用的 Actor 名称
GetActor() string
// WritePacket 写入一个 Packet
WritePacket(packet Packet) error
// Write 写入数据
Write(data []byte) (n int, err error)
// WriteBytes 写入数据
WriteBytes(data []byte) error
}
func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter) Conn {
func newConn(c net.Conn, connWriter ConnWriter) *conn {
return &conn{
conn: c,
writer: connWriter,
actor: dispatcher.NewActor[Packet](ctx, handler),
}
}
type conn struct {
conn net.Conn
writer ConnWriter
actor *dispatcher.Actor[Packet]
actor string
}
func (c *conn) SetActor(actor string) {
c.actor = actor
}
func (c *conn) GetActor() string {
return c.actor
}
func (c *conn) WritePacket(packet Packet) error {
return c.writer(packet)
}
func (c *conn) Write(data []byte) (n int, err error) {
return c.conn.Write(data)
}
func (c *conn) WriteBytes(data []byte) error {
_, err := c.conn.Write(data)
return err
}

View File

@ -3,23 +3,51 @@ package server
import "net"
type Controller interface {
Run() error
Shutdown() error
RegisterConnection(conn net.Conn, writer ConnWriter)
EliminateConnection(conn net.Conn, err error)
ReactPacket(conn net.Conn, packet Packet)
}
type controller struct {
*server
connections map[net.Conn]*conn
}
func (s *controller) init(srv *server) *controller {
s.server = srv
s.connections = make(map[net.Conn]*conn)
return s
}
func (s *controller) RegisterConn(conn net.Conn, writer ConnWriter) {
func (s *controller) RegisterConnection(conn net.Conn, writer ConnWriter) {
if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) {
srv.connections[conn] = newConn(conn, writer)
})); err != nil {
panic(err)
}
}
func (s *controller) UnRegisterConn() {
func (s *controller) EliminateConnection(conn net.Conn, err error) {
if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) {
delete(srv.connections, conn)
})); err != nil {
panic(err)
}
}
func (s *controller) ReactPacket(conn net.Conn, packet Packet) {
if err := s.server.reactor.SystemDispatch(HandlerMessage(s.server, func(srv *server) {
c, exist := srv.connections[conn]
if !exist {
return
}
if err := srv.reactor.Dispatch(c.GetActor(), HandlerMessage(srv, func(srv *server) {
srv.events.onConnectionReceivePacket(c, packet)
})); err != nil {
panic(err)
}
})); err != nil {
panic(err)
}
}

View File

@ -1,108 +0,0 @@
package dispatcher
import (
"context"
"errors"
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
"sync"
"sync/atomic"
)
// NewDispatcher 创建一个消息调度器
func NewDispatcher[M any](bufferSize int, handler func(M)) *Dispatcher[M] {
d := &Dispatcher[M]{
buf: buffer.NewRing[M](bufferSize),
bufCond: sync.NewCond(&sync.Mutex{}),
ctx: context.Background(),
}
d.BindErrorHandler(func(err error) {
log.Error("dispatcher", log.Err(err))
})
d.handler = func(m M) {
defer func() {
if err := super.RecoverTransform(recover()); err != nil {
d.errorHandler.Load().(func(error))(err)
}
}()
handler(m)
}
return d
}
// Dispatcher 并发安全且不阻塞的消息调度器
type Dispatcher[M any] struct {
buf *buffer.Ring[M]
bufCond *sync.Cond
handler func(M)
closed bool
ctx context.Context
errorHandler atomic.Value
}
// BindErrorHandler 绑定一个错误处理器到调度器中
func (d *Dispatcher[M]) BindErrorHandler(handler func(error)) {
d.errorHandler.Store(handler)
}
// BindContext 绑定一个上下文到调度器中
func (d *Dispatcher[M]) BindContext(ctx context.Context) {
d.bufCond.L.Lock()
d.ctx = ctx
if _, canceled := d.ctx.Deadline(); canceled {
d.closed = true
d.bufCond.Signal()
d.bufCond.L.Unlock()
return
}
d.bufCond.L.Unlock()
}
// Send 发送消息到调度器中等待处理
func (d *Dispatcher[M]) Send(m M) error {
d.bufCond.L.Lock()
if d.closed {
d.bufCond.L.Unlock()
return errors.New("dispatcher closed")
}
d.buf.Write(m)
d.bufCond.Signal()
d.bufCond.L.Unlock()
return nil
}
// Start 阻塞式启动调度器,调用后将开始处理消息
func (d *Dispatcher[M]) Start() {
for {
select {
case <-d.ctx.Done():
d.Stop()
default:
d.bufCond.L.Lock()
if d.buf.Len() == 0 {
if d.closed {
d.bufCond.L.Unlock()
return
}
d.bufCond.Wait()
}
messages := d.buf.ReadAll()
d.bufCond.L.Unlock()
for _, msg := range messages {
d.handler(msg)
}
}
}
}
// Stop 停止调度器,调用后将不再接受新消息,但会处理完已有消息
func (d *Dispatcher[M]) Stop() {
d.bufCond.L.Lock()
d.closed = true
d.bufCond.Signal()
d.bufCond.L.Unlock()
}

View File

@ -0,0 +1,33 @@
package server
import "github.com/kercylan98/minotaur/utils/collection/listings"
type (
ConnectionReceivePacketEventHandler func(srv Server, conn Conn, packet Packet)
)
type Events interface {
RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int)
}
type events struct {
*server
connectionReceivePacketEventHandlers listings.PrioritySlice[ConnectionReceivePacketEventHandler]
}
func (s *events) init(srv *server) *events {
s.server = srv
return s
}
func (s *events) RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) {
s.connectionReceivePacketEventHandlers.AppendByOptionalPriority(handler, priority...)
}
func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) {
s.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool {
value(s.server, conn, packet)
return true
})
}

View File

@ -0,0 +1,18 @@
package server
type Message interface {
Execute()
}
func HandlerMessage(srv *server, handler func(srv *server)) Message {
return &handlerMessage{srv: srv, handler: handler}
}
type handlerMessage struct {
srv *server
handler func(srv *server)
}
func (s *handlerMessage) Execute() {
s.handler(s.srv)
}

View File

@ -9,7 +9,7 @@ import (
"time"
)
func Http(addr string) server.server {
func Http(addr string) server.Network {
return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()})
}
@ -27,14 +27,14 @@ func HttpWithHandler[H http.Handler](addr string, handler H) server.Network {
}
type httpCore[H http.Handler] struct {
addr string
handler H
srv *http.Server
event server.NetworkCore
addr string
handler H
srv *http.Server
controller server.Controller
}
func (h *httpCore[H]) OnSetup(ctx context.Context, event server.NetworkCore) (err error) {
h.event = event
func (h *httpCore[H]) OnSetup(ctx context.Context, controller server.Controller) (err error) {
h.controller = controller
h.srv.BaseContext = func(listener net.Listener) context.Context {
return ctx
}

View File

@ -9,7 +9,7 @@ import (
"time"
)
func WebSocket(addr string, pattern ...string) server.server {
func WebSocket(addr string, pattern ...string) server.Network {
ws := &websocketCore{
addr: addr,
pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"),
@ -18,17 +18,17 @@ func WebSocket(addr string, pattern ...string) server.server {
}
type websocketCore struct {
ctx context.Context
core server.NetworkCore
handler *websocketHandler
addr string
pattern string
ctx context.Context
controller server.Controller
handler *websocketHandler
addr string
pattern string
}
func (w *websocketCore) OnSetup(ctx context.Context, core server.NetworkCore) (err error) {
func (w *websocketCore) OnSetup(ctx context.Context, controller server.Controller) (err error) {
w.ctx = ctx
w.handler = newWebsocketHandler(w)
w.core = core
w.controller = controller
return
}

View File

@ -12,14 +12,14 @@ import (
func newWebsocketHandler(core *websocketCore) *websocketHandler {
return &websocketHandler{
core: core,
websocketCore: core,
}
}
type websocketHandler struct {
engine *gnet.Engine
upgrader ws.Upgrader
core *websocketCore
*websocketCore
}
func (w *websocketHandler) OnBoot(eng gnet.Engine) (action gnet.Action) {
@ -33,17 +33,16 @@ func (w *websocketHandler) OnShutdown(eng gnet.Engine) {
}
func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
wrapper := newWebsocketWrapper(w.core.ctx, c)
wrapper := newWebsocketWrapper(c)
c.SetContext(wrapper)
w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.server) error {
return wsutil.WriteServerMessage(c, message.GetContext().(ws.OpCode), message.GetBytes())
w.controller.RegisterConnection(c, func(packet server.Packet) error {
return wsutil.WriteServerMessage(c, packet.GetContext().(ws.OpCode), packet.GetBytes())
})
return
}
func (w *websocketHandler) OnClose(c gnet.Conn, err error) (action gnet.Action) {
wrapper := c.Context().(*websocketWrapper)
wrapper.cancel()
w.controller.EliminateConnection(c, err)
return
}
@ -70,9 +69,9 @@ func (w *websocketHandler) OnTraffic(c gnet.Conn) (action gnet.Action) {
}
for _, message := range messages {
packet := w.core.core.GeneratePacket(message.Payload)
packet := server.NewPacket(message.Payload)
packet.SetContext(message.OpCode)
w.core.core.OnReceivePacket(packet)
w.controller.ReactPacket(c, packet)
}
return
@ -85,7 +84,7 @@ func (w *websocketHandler) OnTick() (delay time.Duration, action gnet.Action) {
func (w *websocketHandler) initUpgrader() {
w.upgrader = ws.Upgrader{
OnRequest: func(uri []byte) (err error) {
if string(uri) != w.core.pattern {
if string(uri) != w.pattern {
err = errors.New("bad request")
}
return

View File

@ -8,26 +8,22 @@ import (
"github.com/gobwas/ws/wsutil"
"github.com/kercylan98/minotaur/utils/super"
"github.com/panjf2000/gnet/v2"
"golang.org/x/net/context"
"io"
"time"
)
// newWebsocketWrapper 创建 websocket 包装器
func newWebsocketWrapper(ctx context.Context, conn gnet.Conn) *websocketWrapper {
func newWebsocketWrapper(conn gnet.Conn) *websocketWrapper {
wrapper := &websocketWrapper{
conn: conn,
upgraded: false,
active: time.Now(),
}
wrapper.ctx, wrapper.cancel = context.WithCancel(ctx)
return wrapper
}
// websocketWrapper websocket 包装器
type websocketWrapper struct {
ctx context.Context
cancel context.CancelFunc
conn gnet.Conn // 连接
upgraded bool // 是否已经升级
hs ws.Handshake // 握手信息

View File

@ -1,8 +1,12 @@
package server
func NewPacket(data []byte) Packet {
return new(packet).init(data)
}
type Packet interface {
GetBytes() []byte
SetContext(ctx any)
SetContext(ctx any) Packet
GetContext() any
}
@ -25,8 +29,9 @@ func (m *packet) GetBytes() []byte {
return m.data
}
func (m *packet) SetContext(ctx any) {
func (m *packet) SetContext(ctx any) Packet {
m.ctx = ctx
return m
}
func (m *packet) GetContext() any {

View File

@ -1,17 +1,24 @@
package server
import "golang.org/x/net/context"
import (
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
"golang.org/x/net/context"
)
type Server interface {
Events
Run() error
Shutdown() error
}
type server struct {
*controller
*events
ctx context.Context
cancel context.CancelFunc
network Network
reactor *reactor.Reactor[Message]
}
func NewServer(network Network) Server {
@ -20,10 +27,15 @@ func NewServer(network Network) Server {
}
srv.ctx, srv.cancel = context.WithCancel(context.Background())
srv.controller = new(controller).init(srv)
srv.events = new(events).init(srv)
srv.reactor = reactor.NewReactor[Message](1024*8, 1024, func(msg Message) {
msg.Execute()
}, nil)
return srv
}
func (s *server) Run() (err error) {
go s.reactor.Run()
if err = s.network.OnSetup(s.ctx, s); err != nil {
return
}
@ -35,7 +47,8 @@ func (s *server) Run() (err error) {
}
func (s *server) Shutdown() (err error) {
defer s.server.cancel()
defer s.cancel()
err = s.network.OnShutdown()
s.reactor.Close()
return
}

View File

@ -7,7 +7,12 @@ import (
)
func TestNewServer(t *testing.T) {
srv := server.server.NewServer(network.WebSocket(":9999"))
srv := server.NewServer(network.WebSocket(":9999"))
srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) {
if err := conn.WritePacket(packet); err != nil {
panic(err)
}
})
if err := srv.Run(); err != nil {
panic(err)
}