refactor: 移除服务器多核和分流模式的可选项
This commit is contained in:
parent
501d84bf13
commit
7e67775157
|
@ -36,16 +36,6 @@ func WithWebsocketReadDeadline(t time.Duration) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithDiversion 通过分流的方式创建服务器
|
|
||||||
// - diversion:分流函数,返回一个函数通道,用于接收分流的消息
|
|
||||||
// - 需要确保能够通过 conn 和 packet 确定分流通道
|
|
||||||
// - 多核模式下将导致消息顺序不一致,通过结果依然是单核处理的,因为分流通道仅有一个
|
|
||||||
func WithDiversion(diversion func(conn ConnReadonly, packet []byte) chan func()) Option {
|
|
||||||
return func(srv *Server) {
|
|
||||||
srv.diversion = diversion
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithTicker 通过定时器创建服务器,为服务器添加定时器功能
|
// WithTicker 通过定时器创建服务器,为服务器添加定时器功能
|
||||||
// - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题)
|
// - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题)
|
||||||
// - 多核与分流情况下需要考虑是否有必要 autonomy
|
// - 多核与分流情况下需要考虑是否有必要 autonomy
|
||||||
|
@ -159,17 +149,3 @@ func WithMessageBufferSize(size int) Option {
|
||||||
srv.messagePoolSize = size
|
srv.messagePoolSize = size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithMultiCore 通过特定核心数量运行服务器,默认为单核
|
|
||||||
// - count > 1 的情况下,将会有对应数量的 goroutine 来处理消息
|
|
||||||
// - 注意:HTTP和GRPC网络模式下不会生效
|
|
||||||
// - 在需要分流的场景推荐采用多核模式,如游戏以房间的形式进行,每个房间互不干扰,这种情况下便可以每个房间单独维护数据包消息进行处理
|
|
||||||
func WithMultiCore(count int) Option {
|
|
||||||
return func(srv *Server) {
|
|
||||||
srv.core = count
|
|
||||||
if srv.core < 1 {
|
|
||||||
log.Warn("WithMultiCore", zap.Int("count", count), zap.String("tips", "wrong core count configuration, corrected to 1, currently in single-core mode"))
|
|
||||||
srv.core = 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -32,7 +32,6 @@ func New(network Network, options ...Option) *Server {
|
||||||
event: &event{},
|
event: &event{},
|
||||||
network: network,
|
network: network,
|
||||||
options: options,
|
options: options,
|
||||||
core: 1,
|
|
||||||
closeChannel: make(chan struct{}, 1),
|
closeChannel: make(chan struct{}, 1),
|
||||||
websocketWriteMessageType: WebsocketMessageTypeBinary,
|
websocketWriteMessageType: WebsocketMessageTypeBinary,
|
||||||
}
|
}
|
||||||
|
@ -72,16 +71,13 @@ type Server struct {
|
||||||
isRunning bool // 是否正在运行
|
isRunning bool // 是否正在运行
|
||||||
isShutdown atomic.Bool // 是否已关闭
|
isShutdown atomic.Bool // 是否已关闭
|
||||||
closeChannel chan struct{} // 关闭信号
|
closeChannel chan struct{} // 关闭信号
|
||||||
diversion func(conn ConnReadonly, packet []byte) chan func() // 分流器
|
|
||||||
|
|
||||||
gServer *gNet // TCP或UDP模式下的服务器
|
gServer *gNet // TCP或UDP模式下的服务器
|
||||||
messagePool *synchronization.Pool[*Message] // 消息池
|
messagePool *synchronization.Pool[*Message] // 消息池
|
||||||
messagePoolSize int // 消息池大小
|
messagePoolSize int // 消息池大小
|
||||||
messageChannel map[int]chan *Message // 消息管道
|
messageChannel chan *Message // 消息管道
|
||||||
initMessageChannel bool // 消息管道是否已经初始化
|
|
||||||
multiple *MultipleServer // 多服务器模式下的服务器
|
multiple *MultipleServer // 多服务器模式下的服务器
|
||||||
prod bool // 是否为生产模式
|
prod bool // 是否为生产模式
|
||||||
core int // 消息处理核心数
|
|
||||||
websocketWriteMessageType int // websocket写入的消息类型
|
websocketWriteMessageType int // websocket写入的消息类型
|
||||||
ticker *timer.Ticker // 定时器
|
ticker *timer.Ticker // 定时器
|
||||||
|
|
||||||
|
@ -110,7 +106,6 @@ func (slf *Server) Run(addr string) error {
|
||||||
slf.addr = addr
|
slf.addr = addr
|
||||||
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
|
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
|
||||||
var connectionInitHandle = func(callback func()) {
|
var connectionInitHandle = func(callback func()) {
|
||||||
slf.initMessageChannel = true
|
|
||||||
if slf.messagePoolSize <= 0 {
|
if slf.messagePoolSize <= 0 {
|
||||||
slf.messagePoolSize = 100
|
slf.messagePoolSize = 100
|
||||||
}
|
}
|
||||||
|
@ -123,25 +118,19 @@ func (slf *Server) Run(addr string) error {
|
||||||
data.attrs = nil
|
data.attrs = nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
slf.messageChannel = map[int]chan *Message{}
|
slf.messageChannel = make(chan *Message, 4096*1000)
|
||||||
for i := 0; i < slf.core; i++ {
|
|
||||||
slf.messageChannel[i] = make(chan *Message, 4096*1000)
|
|
||||||
}
|
|
||||||
if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC {
|
if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC {
|
||||||
slf.gServer = &gNet{Server: slf}
|
slf.gServer = &gNet{Server: slf}
|
||||||
}
|
}
|
||||||
if callback != nil {
|
if callback != nil {
|
||||||
go callback()
|
go callback()
|
||||||
}
|
}
|
||||||
for _, messageChannel := range slf.messageChannel {
|
|
||||||
messageChannel := messageChannel
|
|
||||||
go func() {
|
go func() {
|
||||||
for message := range messageChannel {
|
for message := range slf.messageChannel {
|
||||||
slf.dispatchMessage(message, slf.diversion != nil)
|
slf.dispatchMessage(message)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
switch slf.network {
|
switch slf.network {
|
||||||
case NetworkGRPC:
|
case NetworkGRPC:
|
||||||
|
@ -377,12 +366,10 @@ func (slf *Server) Shutdown(err error, stack ...string) {
|
||||||
for _, cross := range slf.cross {
|
for _, cross := range slf.cross {
|
||||||
cross.Release()
|
cross.Release()
|
||||||
}
|
}
|
||||||
if slf.initMessageChannel {
|
if slf.messageChannel != nil {
|
||||||
for _, messageChannel := range slf.messageChannel {
|
close(slf.messageChannel)
|
||||||
close(messageChannel)
|
|
||||||
}
|
|
||||||
slf.messagePool.Close()
|
slf.messagePool.Close()
|
||||||
slf.initMessageChannel = false
|
slf.messageChannel = nil
|
||||||
}
|
}
|
||||||
if slf.grpcServer != nil && slf.isRunning {
|
if slf.grpcServer != nil && slf.isRunning {
|
||||||
slf.grpcServer.GracefulStop()
|
slf.grpcServer.GracefulStop()
|
||||||
|
@ -450,10 +437,7 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) {
|
||||||
if msg.t == MessageTypeError {
|
if msg.t == MessageTypeError {
|
||||||
msg.attrs = append(msg.attrs, string(debug.Stack()))
|
msg.attrs = append(msg.attrs, string(debug.Stack()))
|
||||||
}
|
}
|
||||||
for _, channel := range slf.messageChannel {
|
slf.messageChannel <- msg
|
||||||
channel <- msg
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushCrossMessage 推送跨服消息到特定跨服的服务器中
|
// PushCrossMessage 推送跨服消息到特定跨服的服务器中
|
||||||
|
@ -469,16 +453,7 @@ func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []b
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatchMessage 消息分发
|
// dispatchMessage 消息分发
|
||||||
func (slf *Server) dispatchMessage(msg *Message, isRedirect bool) {
|
func (slf *Server) dispatchMessage(msg *Message) {
|
||||||
if slf.diversion != nil && isRedirect && msg.t == MessageTypePacket {
|
|
||||||
conn, packet, _ := msg.t.deconstructWebSocketPacket(msg.attrs...)
|
|
||||||
if redirect := slf.diversion(conn, packet); redirect != nil {
|
|
||||||
redirect <- func() {
|
|
||||||
slf.dispatchMessage(msg, false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
present := time.Now()
|
present := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
|
|
Loading…
Reference in New Issue