feat: 服务器支持通过 server.WithDiversion 可选项对数据包消息进行分流处理
适用于类似房间这样的模式中,每个房间的消息将不会对其他房间消息造成阻塞
This commit is contained in:
parent
01bafe6fc0
commit
73cefc9b48
|
@ -0,0 +1,13 @@
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ConnReadonly interface {
|
||||||
|
RemoteAddr() net.Addr
|
||||||
|
GetID() string
|
||||||
|
GetIP() string // GetData 获取连接数据
|
||||||
|
GetData(key any) any
|
||||||
|
IsWebsocket() bool
|
||||||
|
}
|
|
@ -24,6 +24,16 @@ const (
|
||||||
|
|
||||||
type Option func(srv *Server)
|
type Option func(srv *Server)
|
||||||
|
|
||||||
|
// WithDiversion 通过分流的方式创建服务器
|
||||||
|
// - diversion:分流函数,返回一个函数通道,用于接收分流的消息
|
||||||
|
// - 需要确保能够通过 conn 和 packet 确定分流通道
|
||||||
|
// - 多核模式下将导致消息顺序不一致,通过结果依然是单核处理的,因为分流通道仅有一个
|
||||||
|
func WithDiversion(diversion func(conn ConnReadonly, packet []byte) chan func()) Option {
|
||||||
|
return func(srv *Server) {
|
||||||
|
srv.diversion = diversion
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithTicker 通过定时器创建服务器,为服务器添加定时器功能
|
// WithTicker 通过定时器创建服务器,为服务器添加定时器功能
|
||||||
// - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题)
|
// - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题)
|
||||||
// - 多核与分流情况下需要考虑是否有必要 autonomy
|
// - 多核与分流情况下需要考虑是否有必要 autonomy
|
||||||
|
|
|
@ -53,20 +53,21 @@ func New(network Network, options ...Option) *Server {
|
||||||
|
|
||||||
// Server 网络服务器
|
// Server 网络服务器
|
||||||
type Server struct {
|
type Server struct {
|
||||||
*event // 事件
|
*event // 事件
|
||||||
cross map[string]Cross // 跨服
|
cross map[string]Cross // 跨服
|
||||||
id int64 // 服务器id
|
id int64 // 服务器id
|
||||||
network Network // 网络类型
|
network Network // 网络类型
|
||||||
addr string // 侦听地址
|
addr string // 侦听地址
|
||||||
options []Option // 选项
|
options []Option // 选项
|
||||||
ginServer *gin.Engine // HTTP模式下的路由器
|
ginServer *gin.Engine // HTTP模式下的路由器
|
||||||
httpServer *http.Server // HTTP模式下的服务器
|
httpServer *http.Server // HTTP模式下的服务器
|
||||||
grpcServer *grpc.Server // GRPC模式下的服务器
|
grpcServer *grpc.Server // GRPC模式下的服务器
|
||||||
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
|
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
|
||||||
certFile, keyFile string // TLS文件
|
certFile, keyFile string // TLS文件
|
||||||
isRunning bool // 是否正在运行
|
isRunning bool // 是否正在运行
|
||||||
isShutdown atomic.Bool // 是否已关闭
|
isShutdown atomic.Bool // 是否已关闭
|
||||||
closeChannel chan struct{} // 关闭信号
|
closeChannel chan struct{} // 关闭信号
|
||||||
|
diversion func(conn ConnReadonly, packet []byte) chan func() // 分流器
|
||||||
|
|
||||||
gServer *gNet // TCP或UDP模式下的服务器
|
gServer *gNet // TCP或UDP模式下的服务器
|
||||||
messagePool *synchronization.Pool[*Message] // 消息池
|
messagePool *synchronization.Pool[*Message] // 消息池
|
||||||
|
@ -129,7 +130,7 @@ func (slf *Server) Run(addr string) error {
|
||||||
messageChannel := messageChannel
|
messageChannel := messageChannel
|
||||||
go func() {
|
go func() {
|
||||||
for message := range messageChannel {
|
for message := range messageChannel {
|
||||||
slf.dispatchMessage(message)
|
slf.dispatchMessage(message, slf.diversion != nil)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -456,7 +457,16 @@ func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []b
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchMessage 消息分发
|
// dispatchMessage 消息分发
|
||||||
func (slf *Server) dispatchMessage(msg *Message) {
|
func (slf *Server) dispatchMessage(msg *Message, isRedirect bool) {
|
||||||
|
if slf.diversion != nil && isRedirect && msg.t == MessageTypePacket {
|
||||||
|
conn, packet, _ := msg.t.deconstructWebSocketPacket(msg.attrs...)
|
||||||
|
if redirect := slf.diversion(conn, packet); redirect != nil {
|
||||||
|
redirect <- func() {
|
||||||
|
slf.dispatchMessage(msg, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
present := time.Now()
|
present := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
|
|
Loading…
Reference in New Issue