feat: server 包新增 WithDisableAutomaticReleaseShunt 可选项,可禁止分流渠道自动释放。增加 ReleaseShunt、HasShunt、GetShuntNum 等函数。优化系统分流渠道将不再能够被释放

This commit is contained in:
kercylan98 2023-12-25 11:01:41 +08:00
parent ceffa2e46f
commit d9ef3474a7
5 changed files with 81 additions and 30 deletions

View File

@ -359,7 +359,11 @@ func (slf *Conn) Close(err ...error) {
if slf.ticker != nil { if slf.ticker != nil {
slf.ticker.Release() slf.ticker.Release()
} }
slf.server.releaseDispatcher(slf) if !slf.server.runtime.disableAutomaticReleaseShunt {
slf.server.releaseDispatcher(slf)
} else {
}
slf.loop.Close() slf.loop.Close()
slf.mu.Unlock() slf.mu.Unlock()
if len(err) > 0 { if len(err) > 0 {

View File

@ -7,7 +7,7 @@ import (
const ( const (
serverMultipleMark = "Minotaur Multiple Server" serverMultipleMark = "Minotaur Multiple Server"
serverMark = "Minotaur Server" serverMark = "Minotaur Server"
serverSystemDispatcher = "system" // 系统消息分发器 serverSystemDispatcher = "__system" // 系统消息分发器
) )
const ( const (

View File

@ -206,7 +206,7 @@ func (slf *event) RegConnectionClosedEvent(handler ConnectionClosedEventHandler,
} }
func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
slf.PushSystemMessage(func() { slf.PushShuntMessage(conn, func() {
slf.Server.online.Delete(conn.GetID()) slf.Server.online.Delete(conn.GetID())
slf.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool { slf.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool {
value(slf.Server, conn, err) value(slf.Server, conn, err)

View File

@ -30,24 +30,33 @@ type option struct {
} }
type runtime struct { type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测 deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型 supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件 certFile, keyFile string // TLS文件
tickerPool *timer.Pool // 定时器池 tickerPool *timer.Pool // 定时器池
ticker *timer.Ticker // 定时器 ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行 tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小 connTickerSize int // 连接定时器大小
websocketReadDeadline time.Duration // websocket连接超时时间 websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级 websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩 websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期 limitLife time.Duration // 限制最大生命周期
packetWarnSize int // 数据包大小警告 packetWarnSize int // 数据包大小警告
messageStatisticsDuration time.Duration // 消息统计时长 messageStatisticsDuration time.Duration // 消息统计时长
messageStatisticsLimit int // 消息统计数量 messageStatisticsLimit int // 消息统计数量
messageStatistics []*atomic.Int64 // 消息统计数量 messageStatistics []*atomic.Int64 // 消息统计数量
messageStatisticsLock *sync.RWMutex // 消息统计锁 messageStatisticsLock *sync.RWMutex // 消息统计锁
dispatcherBufferSize int // 消息分发器缓冲区大小 dispatcherBufferSize int // 消息分发器缓冲区大小
connWriteBufferSize int // 连接写入缓冲区大小 connWriteBufferSize int // 连接写入缓冲区大小
disableAutomaticReleaseShunt bool // 是否禁用自动释放分流渠道
}
// WithDisableAutomaticReleaseShunt 通过禁用自动释放分流渠道的方式创建服务器
// - 默认不开启,当禁用自动释放分流渠道时,服务器将不会在连接断开时自动释放分流渠道,需要手动调用 ReleaseShunt 方法释放
func WithDisableAutomaticReleaseShunt() Option {
return func(srv *Server) {
srv.runtime.disableAutomaticReleaseShunt = true
}
} }
// WithConnWriteBufferSize 通过连接写入缓冲区大小的方式创建服务器 // WithConnWriteBufferSize 通过连接写入缓冲区大小的方式创建服务器

View File

@ -561,6 +561,7 @@ func (slf *Server) GetMessageCount() int64 {
// UseShunt 切换连接所使用的消息分流渠道,当分流渠道 name 不存在时将会创建一个新的分流渠道,否则将会加入已存在的分流渠道 // UseShunt 切换连接所使用的消息分流渠道,当分流渠道 name 不存在时将会创建一个新的分流渠道,否则将会加入已存在的分流渠道
// - 默认情况下,所有连接都使用系统通道进行消息分发,当指定消息分流渠道时,将会使用指定的消息分流渠道进行消息分发 // - 默认情况下,所有连接都使用系统通道进行消息分发,当指定消息分流渠道时,将会使用指定的消息分流渠道进行消息分发
// - 在使用 WithDisableAutomaticReleaseShunt 创建服务器后,必须始终在连接不再使用后主动通过 ReleaseShunt 释放消息分流渠道,否则将造成内存泄漏
func (slf *Server) UseShunt(conn *Conn, name string) { func (slf *Server) UseShunt(conn *Conn, name string) {
slf.dispatcherLock.Lock() slf.dispatcherLock.Lock()
defer slf.dispatcherLock.Unlock() defer slf.dispatcherLock.Unlock()
@ -578,7 +579,7 @@ func (slf *Server) UseShunt(conn *Conn, name string) {
} }
delete(slf.dispatcherMember[curr.name], conn.GetID()) delete(slf.dispatcherMember[curr.name], conn.GetID())
if len(slf.dispatcherMember[curr.name]) == 0 { if curr.name != serverSystemDispatcher && len(slf.dispatcherMember[curr.name]) == 0 {
delete(slf.dispatchers, curr.name) delete(slf.dispatchers, curr.name)
curr.transfer(d) curr.transfer(d)
curr.close() curr.close()
@ -595,6 +596,32 @@ func (slf *Server) UseShunt(conn *Conn, name string) {
member[conn.GetID()] = conn member[conn.GetID()] = conn
} }
// HasShunt 检查特定消息分流渠道是否存在
func (slf *Server) HasShunt(name string) bool {
slf.dispatcherLock.RLock()
defer slf.dispatcherLock.RUnlock()
_, exist := slf.dispatchers[name]
return exist
}
// GetConnCurrShunt 获取连接当前所使用的消息分流渠道
func (slf *Server) GetConnCurrShunt(conn *Conn) string {
slf.dispatcherLock.RLock()
defer slf.dispatcherLock.RUnlock()
d, exist := slf.currDispatcher[conn.GetID()]
if exist {
return d.name
}
return serverSystemDispatcher
}
// GetShuntNum 获取消息分流渠道数量
func (slf *Server) GetShuntNum() int {
slf.dispatcherLock.RLock()
defer slf.dispatcherLock.RUnlock()
return len(slf.dispatchers)
}
// getConnDispatcher 获取连接所使用的消息分发器 // getConnDispatcher 获取连接所使用的消息分发器
func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher { func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher {
if conn == nil { if conn == nil {
@ -609,21 +636,29 @@ func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher {
return slf.systemDispatcher return slf.systemDispatcher
} }
// ReleaseShunt 释放分流渠道中的连接,当分流渠道中不再存在连接时将会自动释放分流渠道
// - 在未使用 WithDisableAutomaticReleaseShunt 选项时,当连接关闭时将会自动释放分流渠道中连接的资源占用
// - 若执行过程中连接正在使用,将会切换至系统通道
func (slf *Server) ReleaseShunt(conn *Conn) {
slf.releaseDispatcher(conn)
}
// releaseDispatcher 关闭消息分发器 // releaseDispatcher 关闭消息分发器
func (slf *Server) releaseDispatcher(conn *Conn) { func (slf *Server) releaseDispatcher(conn *Conn) {
if conn == nil { if conn == nil {
return return
} }
cid := conn.GetID()
slf.dispatcherLock.Lock() slf.dispatcherLock.Lock()
defer slf.dispatcherLock.Unlock() defer slf.dispatcherLock.Unlock()
d, exist := slf.currDispatcher[conn.GetID()] d, exist := slf.currDispatcher[cid]
if exist { if exist && d.name != serverSystemDispatcher {
delete(slf.dispatcherMember[d.name], conn.GetID()) delete(slf.dispatcherMember[d.name], cid)
if len(slf.dispatcherMember[d.name]) == 0 { if len(slf.dispatcherMember[d.name]) == 0 {
d.close() d.close()
delete(slf.dispatchers, d.name) delete(slf.dispatchers, d.name)
} }
delete(slf.currDispatcher, conn.GetID()) delete(slf.currDispatcher, cid)
} }
} }
@ -662,7 +697,10 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration
message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s)) message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s))
} }
} }
var fields = make([]log.Field, 0, len(message.marks)+4) var fields = make([]log.Field, 0, len(message.marks)+5)
if message.conn != nil {
fields = append(fields, log.String("shunt", slf.GetConnCurrShunt(message.conn)))
}
fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String())) fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String()))
fields = append(fields, message.marks...) fields = append(fields, message.marks...)
//fields = append(fields, log.Stack("stack")) //fields = append(fields, log.Stack("stack"))
@ -823,20 +861,20 @@ func (slf *Server) PushAsyncCallbackMessage(err error, callback func(err error),
} }
// PushShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 // PushShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发 // - 需要注意的是,当未指定 UseShunt 时,将会通过 PushAsyncMessage 进行转发
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 // - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) { func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...)) slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...))
} }
// PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 // PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发 // - 需要注意的是,当未指定 UseShunt 时,将会通过 PushAsyncCallbackMessage 进行转发
func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) { func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...)) slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...))
} }
// PushPacketMessage 向服务器中推送 MessageTypePacket 消息 // PushPacketMessage 向服务器中推送 MessageTypePacket 消息
// - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息 // - 当存在 UseShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息
func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) { func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToPacketMessage( slf.pushMessage(slf.messagePool.Get().castToPacketMessage(
&Conn{wst: wst, connection: conn.connection}, &Conn{wst: wst, connection: conn.connection},