From 73cefc9b48be3c0f537b4d0ed93b5b73087701da Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Sat, 1 Jul 2023 12:25:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9C=8D=E5=8A=A1=E5=99=A8=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E9=80=9A=E8=BF=87=20server.WithDiversion=20=E5=8F=AF?= =?UTF-8?q?=E9=80=89=E9=A1=B9=E5=AF=B9=E6=95=B0=E6=8D=AE=E5=8C=85=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E8=BF=9B=E8=A1=8C=E5=88=86=E6=B5=81=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 适用于类似房间这样的模式中,每个房间的消息将不会对其他房间消息造成阻塞 --- server/conn_readonly.go | 13 +++++++++++++ server/options.go | 10 ++++++++++ server/server.go | 42 +++++++++++++++++++++++++---------------- 3 files changed, 49 insertions(+), 16 deletions(-) create mode 100644 server/conn_readonly.go diff --git a/server/conn_readonly.go b/server/conn_readonly.go new file mode 100644 index 0000000..b5bafea --- /dev/null +++ b/server/conn_readonly.go @@ -0,0 +1,13 @@ +package server + +import ( + "net" +) + +type ConnReadonly interface { + RemoteAddr() net.Addr + GetID() string + GetIP() string // GetData 获取连接数据 + GetData(key any) any + IsWebsocket() bool +} diff --git a/server/options.go b/server/options.go index 4772713..9783360 100644 --- a/server/options.go +++ b/server/options.go @@ -24,6 +24,16 @@ const ( type Option func(srv *Server) +// WithDiversion 通过分流的方式创建服务器 +// - diversion:分流函数,返回一个函数通道,用于接收分流的消息 +// - 需要确保能够通过 conn 和 packet 确定分流通道 +// - 多核模式下将导致消息顺序不一致,通过结果依然是单核处理的,因为分流通道仅有一个 +func WithDiversion(diversion func(conn ConnReadonly, packet []byte) chan func()) Option { + return func(srv *Server) { + srv.diversion = diversion + } +} + // WithTicker 通过定时器创建服务器,为服务器添加定时器功能 // - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题) // - 多核与分流情况下需要考虑是否有必要 autonomy diff --git a/server/server.go b/server/server.go index e32303c..2a4c3bb 100644 --- a/server/server.go +++ b/server/server.go @@ -53,20 +53,21 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event // 事件 - cross map[string]Cross // 跨服 - id int64 // 服务器id - network Network // 网络类型 - addr string // 侦听地址 - options []Option // 选项 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 + *event // 事件 + cross map[string]Cross // 跨服 + id int64 // 服务器id + network Network // 网络类型 + addr string // 侦听地址 + options []Option // 选项 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + isRunning bool // 是否正在运行 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 + diversion func(conn ConnReadonly, packet []byte) chan func() // 分流器 gServer *gNet // TCP或UDP模式下的服务器 messagePool *synchronization.Pool[*Message] // 消息池 @@ -129,7 +130,7 @@ func (slf *Server) Run(addr string) error { messageChannel := messageChannel go func() { for message := range messageChannel { - slf.dispatchMessage(message) + slf.dispatchMessage(message, slf.diversion != nil) } }() } @@ -456,7 +457,16 @@ func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []b } // dispatchMessage 消息分发 -func (slf *Server) dispatchMessage(msg *Message) { +func (slf *Server) dispatchMessage(msg *Message, isRedirect bool) { + if slf.diversion != nil && isRedirect && msg.t == MessageTypePacket { + conn, packet, _ := msg.t.deconstructWebSocketPacket(msg.attrs...) + if redirect := slf.diversion(conn, packet); redirect != nil { + redirect <- func() { + slf.dispatchMessage(msg, false) + } + } + return + } present := time.Now() defer func() { if err := recover(); err != nil {