diff --git a/server/conn_readonly.go b/server/conn_readonly.go new file mode 100644 index 0000000..b5bafea --- /dev/null +++ b/server/conn_readonly.go @@ -0,0 +1,13 @@ +package server + +import ( + "net" +) + +type ConnReadonly interface { + RemoteAddr() net.Addr + GetID() string + GetIP() string // GetData 获取连接数据 + GetData(key any) any + IsWebsocket() bool +} diff --git a/server/options.go b/server/options.go index 4772713..9783360 100644 --- a/server/options.go +++ b/server/options.go @@ -24,6 +24,16 @@ const ( type Option func(srv *Server) +// WithDiversion 通过分流的方式创建服务器 +// - diversion:分流函数,返回一个函数通道,用于接收分流的消息 +// - 需要确保能够通过 conn 和 packet 确定分流通道 +// - 多核模式下将导致消息顺序不一致,通过结果依然是单核处理的,因为分流通道仅有一个 +func WithDiversion(diversion func(conn ConnReadonly, packet []byte) chan func()) Option { + return func(srv *Server) { + srv.diversion = diversion + } +} + // WithTicker 通过定时器创建服务器,为服务器添加定时器功能 // - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题) // - 多核与分流情况下需要考虑是否有必要 autonomy diff --git a/server/server.go b/server/server.go index e32303c..2a4c3bb 100644 --- a/server/server.go +++ b/server/server.go @@ -53,20 +53,21 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event // 事件 - cross map[string]Cross // 跨服 - id int64 // 服务器id - network Network // 网络类型 - addr string // 侦听地址 - options []Option // 选项 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 + *event // 事件 + cross map[string]Cross // 跨服 + id int64 // 服务器id + network Network // 网络类型 + addr string // 侦听地址 + options []Option // 选项 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + isRunning bool // 是否正在运行 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 + diversion func(conn ConnReadonly, packet []byte) chan func() // 分流器 gServer *gNet // TCP或UDP模式下的服务器 messagePool *synchronization.Pool[*Message] // 消息池 @@ -129,7 +130,7 @@ func (slf *Server) Run(addr string) error { messageChannel := messageChannel go func() { for message := range messageChannel { - slf.dispatchMessage(message) + slf.dispatchMessage(message, slf.diversion != nil) } }() } @@ -456,7 +457,16 @@ func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []b } // dispatchMessage 消息分发 -func (slf *Server) dispatchMessage(msg *Message) { +func (slf *Server) dispatchMessage(msg *Message, isRedirect bool) { + if slf.diversion != nil && isRedirect && msg.t == MessageTypePacket { + conn, packet, _ := msg.t.deconstructWebSocketPacket(msg.attrs...) + if redirect := slf.diversion(conn, packet); redirect != nil { + redirect <- func() { + slf.dispatchMessage(msg, false) + } + } + return + } present := time.Now() defer func() { if err := recover(); err != nil {