diff --git a/server/options.go b/server/options.go index 3039366..1aa4cf9 100644 --- a/server/options.go +++ b/server/options.go @@ -2,6 +2,7 @@ package server import ( "github.com/gin-contrib/pprof" + "github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/timer" "google.golang.org/grpc" @@ -229,3 +230,22 @@ func WithPProf(pattern ...string) Option { pprof.Register(srv.ginServer, pattern...) } } + +// WithShunt 通过连接数据包分流的方式创建服务器 +// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况 +// - channelGenerator:用于生成分流通道的函数 +// - shuntMatcher:用于匹配连接的函数,返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道 +// +// 将被分流的消息类型(更多类型有待斟酌): +// - MessageTypePacket +func WithShunt(channelGenerator func(guid int64) chan *Message, shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool)) Option { + return func(srv *Server) { + if channelGenerator == nil || shuntMatcher == nil { + log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "channelGenerator or shuntMatcher is nil")) + return + } + srv.shuntChannels = concurrent.NewBalanceMap[int64, chan *Message]() + srv.channelGenerator = channelGenerator + srv.shuntMatcher = shuntMatcher + } +} diff --git a/server/server.go b/server/server.go index e7ec63a..d957cf6 100644 --- a/server/server.go +++ b/server/server.go @@ -72,26 +72,29 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event // 事件 - *runtime // 运行时 - *option // 可选项 - network Network // 网络类型 - addr string // 侦听地址 - systemSignal chan os.Signal // 系统信号 - online *concurrent.BalanceMap[string, *Conn] // 在线连接 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - gServer *gNet // TCP或UDP模式下的服务器 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 - ants *ants.Pool // 协程池 - messagePool *concurrent.Pool[*Message] // 消息池 - messageChannel chan *Message // 消息管道 - multiple *MultipleServer // 多服务器模式下的服务器 - multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 - runMode RunMode // 运行模式 + *event // 事件 + *runtime // 运行时 + *option // 可选项 + network Network // 网络类型 + addr string // 侦听地址 + systemSignal chan os.Signal // 系统信号 + online *concurrent.BalanceMap[string, *Conn] // 在线连接 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + gServer *gNet // TCP或UDP模式下的服务器 + isRunning bool // 是否正在运行 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 + ants *ants.Pool // 协程池 + messagePool *concurrent.Pool[*Message] // 消息池 + messageChannel chan *Message // 消息管道 + multiple *MultipleServer // 多服务器模式下的服务器 + multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 + runMode RunMode // 运行模式 + shuntChannels *concurrent.BalanceMap[int64, chan *Message] // 分流管道 + channelGenerator func(guid int64) chan *Message // 消息管道生成器 + shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器 } // Run 使用特定地址运行服务器 @@ -417,6 +420,14 @@ func (slf *Server) shutdown(err error) { slf.messagePool.Close() slf.messageChannel = nil } + if slf.shuntChannels != nil { + slf.shuntChannels.Range(func(key int64, c chan *Message) bool { + close(c) + return false + }) + slf.shuntChannels.Clear() + slf.shuntChannels = nil + } if slf.grpcServer != nil && slf.isRunning { slf.grpcServer.GracefulStop() } @@ -469,12 +480,42 @@ func (slf *Server) HttpRouter() gin.IRouter { return slf.ginServer } +// ShuntChannelFreed 释放分流通道 +func (slf *Server) ShuntChannelFreed(channelGuid int64) { + if slf.shuntChannels == nil { + return + } + channel, exist := slf.shuntChannels.GetExist(channelGuid) + if exist { + close(channel) + slf.shuntChannels.Delete(channelGuid) + } +} + // pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 func (slf *Server) pushMessage(message *Message) { if slf.messagePool.IsClose() { slf.messagePool.Release(message) return } + if slf.shuntChannels != nil && (message.t == MessageTypePacket) { + conn := message.attrs[0].(*Conn) + channelGuid, allowToCreate := slf.shuntMatcher(conn) + channel, exist := slf.shuntChannels.GetExist(channelGuid) + if !exist && allowToCreate { + channel = slf.channelGenerator(channelGuid) + slf.shuntChannels.Set(channelGuid, channel) + go func(channel chan *Message) { + for message := range channel { + slf.dispatchMessage(message) + } + }(channel) + } + if channel != nil { + channel <- message + return + } + } slf.messageChannel <- message }