From 810a9fdb73f4460f181ff7a60937615a6c926db8 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Mon, 18 Sep 2023 15:52:03 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20server=20=E5=8C=85=E9=87=8D?= =?UTF-8?q?=E6=9E=84=E6=B6=88=E6=81=AF=E9=80=9A=E9=81=93=EF=BC=8C=E9=87=87?= =?UTF-8?q?=E7=94=A8=E6=97=A0=E7=95=8C=E7=BC=93=E5=86=B2=E5=8C=BA=E6=9B=BF?= =?UTF-8?q?=E4=BB=A3=E5=8E=9F=E6=9C=AC=E7=9A=84=20chan=EF=BC=8C=E8=A7=A3?= =?UTF-8?q?=E5=86=B3=E6=B6=88=E6=81=AF=E9=80=9A=E9=81=93=E7=9A=84=E7=BC=93?= =?UTF-8?q?=E5=86=B2=E5=8C=BA=E8=BE=BE=E5=88=B0=E4=B8=8A=E9=99=90=E6=97=B6?= =?UTF-8?q?=E9=80=A0=E6=88=90=E6=B0=B8=E4=B9=85=E9=98=BB=E5=A1=9E=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=8C=E7=A7=BB=E9=99=A4=20WithMessageChan?= =?UTF-8?q?nelSize=20=E5=8F=AF=E9=80=89=E9=A1=B9=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=20WithShunt=20=E5=8F=AF=E9=80=89=E9=A1=B9=E4=B8=8D?= =?UTF-8?q?=E5=86=8D=E9=9C=80=E8=A6=81=20channelGenerator=20=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/constants.go | 1 - server/event.go | 32 ++++++++---- server/options.go | 24 ++------- server/server.go | 102 ++++++++++++++++++++++---------------- utils/buffer/unbounded.go | 12 ++--- 5 files changed, 89 insertions(+), 82 deletions(-) diff --git a/server/constants.go b/server/constants.go index 3ef2107..fe375d2 100644 --- a/server/constants.go +++ b/server/constants.go @@ -22,7 +22,6 @@ const ( const ( DefaultMessageBufferSize = 1024 - DefaultMessageChannelSize = 1024 * 1024 DefaultAsyncPoolSize = 256 DefaultWebsocketReadDeadline = 30 * time.Second DefaultConnectionChannelSize = 1024 * 10 diff --git a/server/event.go b/server/event.go index 3191413..5999264 100644 --- a/server/event.go +++ b/server/event.go @@ -253,11 +253,21 @@ func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle, priority } func (slf *event) OnMessageErrorEvent(message *Message, err error) { + if slf.messageErrorEventHandles.Len() == 0 { + return + } + defer func() { + if err := recover(); err != nil { + log.Error("Server", log.String("OnMessageErrorEvent", messageNames[message.t]), log.Any("Error", err)) + debug.PrintStack() + } + }() + slf.messageErrorEventHandles.RangeValue(func(index int, value MessageErrorEventHandle) bool { + value(slf.Server, message, err) + return true + }) PushSystemMessage(slf.Server, func() { - slf.messageErrorEventHandles.RangeValue(func(index int, value MessageErrorEventHandle) bool { - value(slf.Server, message, err) - return true - }) + }, "MessageErrorEvent") } @@ -268,12 +278,14 @@ func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle, prior } func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) { - PushSystemMessage(slf.Server, func() { - slf.messageLowExecEventHandles.RangeValue(func(index int, value MessageLowExecEventHandle) bool { - value(slf.Server, message, cost) - return true - }) - }, "MessageLowExecEvent") + if slf.messageLowExecEventHandles.Len() == 0 { + return + } + // 慢消息不再占用消息通道 + slf.messageLowExecEventHandles.RangeValue(func(index int, value MessageLowExecEventHandle) bool { + value(slf.Server, message, cost) + return true + }) } // RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数 diff --git a/server/options.go b/server/options.go index 3a104f8..ad4de9f 100644 --- a/server/options.go +++ b/server/options.go @@ -2,6 +2,7 @@ package server import ( "github.com/gin-contrib/pprof" + "github.com/kercylan98/minotaur/utils/buffer" "github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/timer" @@ -36,7 +37,6 @@ type runtime struct { supportMessageTypes map[int]bool // websocket模式下支持的消息类型 certFile, keyFile string // TLS文件 messagePoolSize int // 消息池大小 - messageChannelSize int // 消息通道大小 ticker *timer.Ticker // 定时器 websocketReadDeadline time.Duration // websocket连接超时时间 websocketCompression int // websocket压缩等级 @@ -90,18 +90,6 @@ func WithWebsocketCompression(level int) Option { } } -// WithMessageChannelSize 通过指定消息通道大小的方式创建服务器 -// - 足够大的消息通道可以确保服务器在短时间内接收到大量的消息而不至于阻塞 -// - 默认值为 DefaultMessageChannelSize -func WithMessageChannelSize(size int) Option { - return func(srv *Server) { - if size <= 0 { - size = DefaultMessageChannelSize - } - srv.messageChannelSize = size - } -} - // WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器 // - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock" // - 默认不开启死锁检测 @@ -255,7 +243,6 @@ func WithPProf(pattern ...string) Option { // WithShunt 通过连接数据包分流的方式创建服务器 // - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况 -// - channelGenerator:用于生成分流通道的函数 // - shuntMatcher:用于匹配连接的函数,返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道 // // 将被分流的消息类型(更多类型有待斟酌): @@ -263,14 +250,13 @@ func WithPProf(pattern ...string) Option { // // 注意事项: // - 需要在分流通道使用完成后主动调用 Server.ShuntChannelFreed 函数释放分流通道,避免内存泄漏 -func WithShunt(channelGenerator func(guid int64) chan *Message, shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool)) Option { +func WithShunt(shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool)) Option { return func(srv *Server) { - if channelGenerator == nil || shuntMatcher == nil { - log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "channelGenerator or shuntMatcher is nil")) + if shuntMatcher == nil { + log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil")) return } - srv.shuntChannels = concurrent.NewBalanceMap[int64, chan *Message]() - srv.channelGenerator = channelGenerator + srv.shuntChannels = concurrent.NewBalanceMap[int64, *buffer.Unbounded[*Message]]() srv.shuntMatcher = shuntMatcher } } diff --git a/server/server.go b/server/server.go index 709995a..36d8d72 100644 --- a/server/server.go +++ b/server/server.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" + "github.com/kercylan98/minotaur/utils/buffer" "github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/str" @@ -33,7 +34,6 @@ func New(network Network, options ...Option) *Server { server := &Server{ runtime: &runtime{ messagePoolSize: DefaultMessageBufferSize, - messageChannelSize: DefaultMessageChannelSize, connMessageChannelSize: DefaultConnectionChannelSize, }, option: &option{}, @@ -78,31 +78,31 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event // 事件 - *runtime // 运行时 - *option // 可选项 - network Network // 网络类型 - addr string // 侦听地址 - systemSignal chan os.Signal // 系统信号 - online *concurrent.BalanceMap[string, *Conn] // 在线连接 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - gServer *gNet // TCP或UDP模式下的服务器 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 - ants *ants.Pool // 协程池 - messagePool *concurrent.Pool[*Message] // 消息池 - messageChannel chan *Message // 消息管道 - multiple *MultipleServer // 多服务器模式下的服务器 - multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 - runMode RunMode // 运行模式 - shuntChannels *concurrent.BalanceMap[int64, chan *Message] // 分流管道 - channelGenerator func(guid int64) chan *Message // 消息管道生成器 - shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器 - messageCounter atomic.Int64 // 消息计数器 - ctx context.Context // 上下文 + *event // 事件 + *runtime // 运行时 + *option // 可选项 + network Network // 网络类型 + addr string // 侦听地址 + systemSignal chan os.Signal // 系统信号 + online *concurrent.BalanceMap[string, *Conn] // 在线连接 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + gServer *gNet // TCP或UDP模式下的服务器 + isRunning bool // 是否正在运行 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 + ants *ants.Pool // 协程池 + messagePool *concurrent.Pool[*Message] // 消息池 + //messageChannel chan *Message // 消息管道 + messageChannel *buffer.Unbounded[*Message] // 消息无界缓冲区 + multiple *MultipleServer // 多服务器模式下的服务器 + multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 + runMode RunMode // 运行模式 + shuntChannels *concurrent.BalanceMap[int64, *buffer.Unbounded[*Message]] // 分流管道 + shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器 + messageCounter atomic.Int64 // 消息计数器 + ctx context.Context // 上下文 } // Run 使用特定地址运行服务器 @@ -138,20 +138,28 @@ func (slf *Server) Run(addr string) error { data.attrs = nil }, ) - slf.messageChannel = make(chan *Message, slf.messageChannelSize) + slf.messageChannel = buffer.NewUnbounded[*Message](func() *Message { + return nil + }) if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC { slf.gServer = &gNet{Server: slf} } if callback != nil { go callback() } - go func(messageChannel <-chan *Message) { + go func() { messageInitFinish <- struct{}{} - for message := range messageChannel { - msg := message - slf.dispatchMessage(msg) + for { + select { + case msg, ok := <-slf.messageChannel.Get(): + if !ok { + return + } + slf.messageChannel.Load() + slf.dispatchMessage(msg) + } } - }(slf.messageChannel) + }() } switch slf.network { @@ -486,13 +494,12 @@ func (slf *Server) shutdown(err error) { cross.Release() } if slf.messageChannel != nil { - close(slf.messageChannel) + slf.messageChannel.Close() slf.messagePool.Close() - slf.messageChannel = nil } if slf.shuntChannels != nil { - slf.shuntChannels.Range(func(key int64, c chan *Message) bool { - close(c) + slf.shuntChannels.Range(func(key int64, c *buffer.Unbounded[*Message]) bool { + c.Close() return false }) slf.shuntChannels.Clear() @@ -575,7 +582,7 @@ func (slf *Server) ShuntChannelFreed(channelGuid int64) { } channel, exist := slf.shuntChannels.GetExist(channelGuid) if exist { - close(channel) + channel.Close() slf.shuntChannels.Delete(channelGuid) slf.OnShuntChannelClosedEvent(channelGuid) } @@ -592,23 +599,32 @@ func (slf *Server) pushMessage(message *Message) { channelGuid, allowToCreate := slf.shuntMatcher(conn) channel, exist := slf.shuntChannels.GetExist(channelGuid) if !exist && allowToCreate { - channel = slf.channelGenerator(channelGuid) + channel = buffer.NewUnbounded[*Message](func() *Message { + return nil + }) slf.shuntChannels.Set(channelGuid, channel) - go func(channel chan *Message) { - for message := range channel { - slf.dispatchMessage(message) + go func(channel *buffer.Unbounded[*Message]) { + for { + select { + case msg, ok := <-channel.Get(): + if !ok { + return + } + channel.Load() + slf.dispatchMessage(msg) + } } }(channel) defer slf.OnShuntChannelCreatedEvent(channelGuid) } if channel != nil { slf.messageCounter.Add(1) - channel <- message + channel.Put(message) return } } slf.messageCounter.Add(1) - slf.messageChannel <- message + slf.messageChannel.Put(message) } func (slf *Server) low(message *Message, present time.Time, expect time.Duration, messageReplace ...string) { diff --git a/utils/buffer/unbounded.go b/utils/buffer/unbounded.go index dfb3230..ba3a6b4 100644 --- a/utils/buffer/unbounded.go +++ b/utils/buffer/unbounded.go @@ -40,6 +40,7 @@ func (slf *Unbounded[V]) Put(t V) { } // Load 将缓冲区中的数据发送到读取通道中,如果缓冲区中没有数据,则不会发送 +// - 在每次 Get 后都应该执行该函数 func (slf *Unbounded[V]) Load() { slf.mu.Lock() defer slf.mu.Unlock() @@ -56,19 +57,12 @@ func (slf *Unbounded[V]) Load() { } } -// Get returns a read channel on which values added to the buffer, via Put(), -// are sent on. -// -// Upon reading a value from this channel, users are expected to call Load() to -// send the next buffered value onto the channel if there is any. -// -// If the unbounded buffer is closed, the read channel returned by this method -// is closed. +// Get 获取读取通道 func (slf *Unbounded[V]) Get() <-chan V { return slf.c } -// Close closes the unbounded buffer. +// Close 关闭 func (slf *Unbounded[V]) Close() { slf.mu.Lock() defer slf.mu.Unlock()