refactor: server 包重构消息通道,采用无界缓冲区替代原本的 chan,解决消息通道的缓冲区达到上限时造成永久阻塞的问题,移除 WithMessageChannelSize 可选项,修改 WithShunt 可选项不再需要 channelGenerator 参数

This commit is contained in:
kercylan98 2023-09-18 15:52:03 +08:00
parent e95e1ba399
commit 810a9fdb73
5 changed files with 89 additions and 82 deletions

View File

@ -22,7 +22,6 @@ const (
const ( const (
DefaultMessageBufferSize = 1024 DefaultMessageBufferSize = 1024
DefaultMessageChannelSize = 1024 * 1024
DefaultAsyncPoolSize = 256 DefaultAsyncPoolSize = 256
DefaultWebsocketReadDeadline = 30 * time.Second DefaultWebsocketReadDeadline = 30 * time.Second
DefaultConnectionChannelSize = 1024 * 10 DefaultConnectionChannelSize = 1024 * 10

View File

@ -253,11 +253,21 @@ func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle, priority
} }
func (slf *event) OnMessageErrorEvent(message *Message, err error) { func (slf *event) OnMessageErrorEvent(message *Message, err error) {
PushSystemMessage(slf.Server, func() { if slf.messageErrorEventHandles.Len() == 0 {
return
}
defer func() {
if err := recover(); err != nil {
log.Error("Server", log.String("OnMessageErrorEvent", messageNames[message.t]), log.Any("Error", err))
debug.PrintStack()
}
}()
slf.messageErrorEventHandles.RangeValue(func(index int, value MessageErrorEventHandle) bool { slf.messageErrorEventHandles.RangeValue(func(index int, value MessageErrorEventHandle) bool {
value(slf.Server, message, err) value(slf.Server, message, err)
return true return true
}) })
PushSystemMessage(slf.Server, func() {
}, "MessageErrorEvent") }, "MessageErrorEvent")
} }
@ -268,12 +278,14 @@ func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle, prior
} }
func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) { func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) {
PushSystemMessage(slf.Server, func() { if slf.messageLowExecEventHandles.Len() == 0 {
return
}
// 慢消息不再占用消息通道
slf.messageLowExecEventHandles.RangeValue(func(index int, value MessageLowExecEventHandle) bool { slf.messageLowExecEventHandles.RangeValue(func(index int, value MessageLowExecEventHandle) bool {
value(slf.Server, message, cost) value(slf.Server, message, cost)
return true return true
}) })
}, "MessageLowExecEvent")
} }
// RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数 // RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数

View File

@ -2,6 +2,7 @@ package server
import ( import (
"github.com/gin-contrib/pprof" "github.com/gin-contrib/pprof"
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/timer" "github.com/kercylan98/minotaur/utils/timer"
@ -36,7 +37,6 @@ type runtime struct {
supportMessageTypes map[int]bool // websocket模式下支持的消息类型 supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件 certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小 messagePoolSize int // 消息池大小
messageChannelSize int // 消息通道大小
ticker *timer.Ticker // 定时器 ticker *timer.Ticker // 定时器
websocketReadDeadline time.Duration // websocket连接超时时间 websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级 websocketCompression int // websocket压缩等级
@ -90,18 +90,6 @@ func WithWebsocketCompression(level int) Option {
} }
} }
// WithMessageChannelSize 通过指定消息通道大小的方式创建服务器
// - 足够大的消息通道可以确保服务器在短时间内接收到大量的消息而不至于阻塞
// - 默认值为 DefaultMessageChannelSize
func WithMessageChannelSize(size int) Option {
return func(srv *Server) {
if size <= 0 {
size = DefaultMessageChannelSize
}
srv.messageChannelSize = size
}
}
// WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器 // WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器
// - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock" // - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock"
// - 默认不开启死锁检测 // - 默认不开启死锁检测
@ -255,7 +243,6 @@ func WithPProf(pattern ...string) Option {
// WithShunt 通过连接数据包分流的方式创建服务器 // WithShunt 通过连接数据包分流的方式创建服务器
// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况 // - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况
// - channelGenerator用于生成分流通道的函数
// - shuntMatcher用于匹配连接的函数返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道 // - shuntMatcher用于匹配连接的函数返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道
// //
// 将被分流的消息类型(更多类型有待斟酌): // 将被分流的消息类型(更多类型有待斟酌):
@ -263,14 +250,13 @@ func WithPProf(pattern ...string) Option {
// //
// 注意事项: // 注意事项:
// - 需要在分流通道使用完成后主动调用 Server.ShuntChannelFreed 函数释放分流通道,避免内存泄漏 // - 需要在分流通道使用完成后主动调用 Server.ShuntChannelFreed 函数释放分流通道,避免内存泄漏
func WithShunt(channelGenerator func(guid int64) chan *Message, shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool)) Option { func WithShunt(shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool)) Option {
return func(srv *Server) { return func(srv *Server) {
if channelGenerator == nil || shuntMatcher == nil { if shuntMatcher == nil {
log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "channelGenerator or shuntMatcher is nil")) log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil"))
return return
} }
srv.shuntChannels = concurrent.NewBalanceMap[int64, chan *Message]() srv.shuntChannels = concurrent.NewBalanceMap[int64, *buffer.Unbounded[*Message]]()
srv.channelGenerator = channelGenerator
srv.shuntMatcher = shuntMatcher srv.shuntMatcher = shuntMatcher
} }
} }

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/str" "github.com/kercylan98/minotaur/utils/str"
@ -33,7 +34,6 @@ func New(network Network, options ...Option) *Server {
server := &Server{ server := &Server{
runtime: &runtime{ runtime: &runtime{
messagePoolSize: DefaultMessageBufferSize, messagePoolSize: DefaultMessageBufferSize,
messageChannelSize: DefaultMessageChannelSize,
connMessageChannelSize: DefaultConnectionChannelSize, connMessageChannelSize: DefaultConnectionChannelSize,
}, },
option: &option{}, option: &option{},
@ -94,12 +94,12 @@ type Server struct {
closeChannel chan struct{} // 关闭信号 closeChannel chan struct{} // 关闭信号
ants *ants.Pool // 协程池 ants *ants.Pool // 协程池
messagePool *concurrent.Pool[*Message] // 消息池 messagePool *concurrent.Pool[*Message] // 消息池
messageChannel chan *Message // 消息管道 //messageChannel chan *Message // 消息管道
messageChannel *buffer.Unbounded[*Message] // 消息无界缓冲区
multiple *MultipleServer // 多服务器模式下的服务器 multiple *MultipleServer // 多服务器模式下的服务器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
runMode RunMode // 运行模式 runMode RunMode // 运行模式
shuntChannels *concurrent.BalanceMap[int64, chan *Message] // 分流管道 shuntChannels *concurrent.BalanceMap[int64, *buffer.Unbounded[*Message]] // 分流管道
channelGenerator func(guid int64) chan *Message // 消息管道生成器
shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器 shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器
messageCounter atomic.Int64 // 消息计数器 messageCounter atomic.Int64 // 消息计数器
ctx context.Context // 上下文 ctx context.Context // 上下文
@ -138,20 +138,28 @@ func (slf *Server) Run(addr string) error {
data.attrs = nil data.attrs = nil
}, },
) )
slf.messageChannel = make(chan *Message, slf.messageChannelSize) slf.messageChannel = buffer.NewUnbounded[*Message](func() *Message {
return nil
})
if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC { if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC {
slf.gServer = &gNet{Server: slf} slf.gServer = &gNet{Server: slf}
} }
if callback != nil { if callback != nil {
go callback() go callback()
} }
go func(messageChannel <-chan *Message) { go func() {
messageInitFinish <- struct{}{} messageInitFinish <- struct{}{}
for message := range messageChannel { for {
msg := message select {
case msg, ok := <-slf.messageChannel.Get():
if !ok {
return
}
slf.messageChannel.Load()
slf.dispatchMessage(msg) slf.dispatchMessage(msg)
} }
}(slf.messageChannel) }
}()
} }
switch slf.network { switch slf.network {
@ -486,13 +494,12 @@ func (slf *Server) shutdown(err error) {
cross.Release() cross.Release()
} }
if slf.messageChannel != nil { if slf.messageChannel != nil {
close(slf.messageChannel) slf.messageChannel.Close()
slf.messagePool.Close() slf.messagePool.Close()
slf.messageChannel = nil
} }
if slf.shuntChannels != nil { if slf.shuntChannels != nil {
slf.shuntChannels.Range(func(key int64, c chan *Message) bool { slf.shuntChannels.Range(func(key int64, c *buffer.Unbounded[*Message]) bool {
close(c) c.Close()
return false return false
}) })
slf.shuntChannels.Clear() slf.shuntChannels.Clear()
@ -575,7 +582,7 @@ func (slf *Server) ShuntChannelFreed(channelGuid int64) {
} }
channel, exist := slf.shuntChannels.GetExist(channelGuid) channel, exist := slf.shuntChannels.GetExist(channelGuid)
if exist { if exist {
close(channel) channel.Close()
slf.shuntChannels.Delete(channelGuid) slf.shuntChannels.Delete(channelGuid)
slf.OnShuntChannelClosedEvent(channelGuid) slf.OnShuntChannelClosedEvent(channelGuid)
} }
@ -592,23 +599,32 @@ func (slf *Server) pushMessage(message *Message) {
channelGuid, allowToCreate := slf.shuntMatcher(conn) channelGuid, allowToCreate := slf.shuntMatcher(conn)
channel, exist := slf.shuntChannels.GetExist(channelGuid) channel, exist := slf.shuntChannels.GetExist(channelGuid)
if !exist && allowToCreate { if !exist && allowToCreate {
channel = slf.channelGenerator(channelGuid) channel = buffer.NewUnbounded[*Message](func() *Message {
return nil
})
slf.shuntChannels.Set(channelGuid, channel) slf.shuntChannels.Set(channelGuid, channel)
go func(channel chan *Message) { go func(channel *buffer.Unbounded[*Message]) {
for message := range channel { for {
slf.dispatchMessage(message) select {
case msg, ok := <-channel.Get():
if !ok {
return
}
channel.Load()
slf.dispatchMessage(msg)
}
} }
}(channel) }(channel)
defer slf.OnShuntChannelCreatedEvent(channelGuid) defer slf.OnShuntChannelCreatedEvent(channelGuid)
} }
if channel != nil { if channel != nil {
slf.messageCounter.Add(1) slf.messageCounter.Add(1)
channel <- message channel.Put(message)
return return
} }
} }
slf.messageCounter.Add(1) slf.messageCounter.Add(1)
slf.messageChannel <- message slf.messageChannel.Put(message)
} }
func (slf *Server) low(message *Message, present time.Time, expect time.Duration, messageReplace ...string) { func (slf *Server) low(message *Message, present time.Time, expect time.Duration, messageReplace ...string) {

View File

@ -40,6 +40,7 @@ func (slf *Unbounded[V]) Put(t V) {
} }
// Load 将缓冲区中的数据发送到读取通道中,如果缓冲区中没有数据,则不会发送 // Load 将缓冲区中的数据发送到读取通道中,如果缓冲区中没有数据,则不会发送
// - 在每次 Get 后都应该执行该函数
func (slf *Unbounded[V]) Load() { func (slf *Unbounded[V]) Load() {
slf.mu.Lock() slf.mu.Lock()
defer slf.mu.Unlock() defer slf.mu.Unlock()
@ -56,19 +57,12 @@ func (slf *Unbounded[V]) Load() {
} }
} }
// Get returns a read channel on which values added to the buffer, via Put(), // Get 获取读取通道
// are sent on.
//
// Upon reading a value from this channel, users are expected to call Load() to
// send the next buffered value onto the channel if there is any.
//
// If the unbounded buffer is closed, the read channel returned by this method
// is closed.
func (slf *Unbounded[V]) Get() <-chan V { func (slf *Unbounded[V]) Get() <-chan V {
return slf.c return slf.c
} }
// Close closes the unbounded buffer. // Close 关闭
func (slf *Unbounded[V]) Close() { func (slf *Unbounded[V]) Close() {
slf.mu.Lock() slf.mu.Lock()
defer slf.mu.Unlock() defer slf.mu.Unlock()