diff --git a/server/options.go b/server/options.go index fcecd2a..2145d0a 100644 --- a/server/options.go +++ b/server/options.go @@ -36,16 +36,6 @@ func WithWebsocketReadDeadline(t time.Duration) Option { } } -// WithDiversion 通过分流的方式创建服务器 -// - diversion:分流函数,返回一个函数通道,用于接收分流的消息 -// - 需要确保能够通过 conn 和 packet 确定分流通道 -// - 多核模式下将导致消息顺序不一致,通过结果依然是单核处理的,因为分流通道仅有一个 -func WithDiversion(diversion func(conn ConnReadonly, packet []byte) chan func()) Option { - return func(srv *Server) { - srv.diversion = diversion - } -} - // WithTicker 通过定时器创建服务器,为服务器添加定时器功能 // - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题) // - 多核与分流情况下需要考虑是否有必要 autonomy @@ -159,17 +149,3 @@ func WithMessageBufferSize(size int) Option { srv.messagePoolSize = size } } - -// WithMultiCore 通过特定核心数量运行服务器,默认为单核 -// - count > 1 的情况下,将会有对应数量的 goroutine 来处理消息 -// - 注意:HTTP和GRPC网络模式下不会生效 -// - 在需要分流的场景推荐采用多核模式,如游戏以房间的形式进行,每个房间互不干扰,这种情况下便可以每个房间单独维护数据包消息进行处理 -func WithMultiCore(count int) Option { - return func(srv *Server) { - srv.core = count - if srv.core < 1 { - log.Warn("WithMultiCore", zap.Int("count", count), zap.String("tips", "wrong core count configuration, corrected to 1, currently in single-core mode")) - srv.core = 1 - } - } -} diff --git a/server/server.go b/server/server.go index 4017bb2..2aae110 100644 --- a/server/server.go +++ b/server/server.go @@ -32,7 +32,6 @@ func New(network Network, options ...Option) *Server { event: &event{}, network: network, options: options, - core: 1, closeChannel: make(chan struct{}, 1), websocketWriteMessageType: WebsocketMessageTypeBinary, } @@ -58,30 +57,27 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event // 事件 - cross map[string]Cross // 跨服 - id int64 // 服务器id - network Network // 网络类型 - addr string // 侦听地址 - options []Option // 选项 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 - diversion func(conn ConnReadonly, packet []byte) chan func() // 分流器 + *event // 事件 + cross map[string]Cross // 跨服 + id int64 // 服务器id + network Network // 网络类型 + addr string // 侦听地址 + options []Option // 选项 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + isRunning bool // 是否正在运行 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 gServer *gNet // TCP或UDP模式下的服务器 messagePool *synchronization.Pool[*Message] // 消息池 messagePoolSize int // 消息池大小 - messageChannel map[int]chan *Message // 消息管道 - initMessageChannel bool // 消息管道是否已经初始化 + messageChannel chan *Message // 消息管道 multiple *MultipleServer // 多服务器模式下的服务器 prod bool // 是否为生产模式 - core int // 消息处理核心数 websocketWriteMessageType int // websocket写入的消息类型 ticker *timer.Ticker // 定时器 @@ -110,7 +106,6 @@ func (slf *Server) Run(addr string) error { slf.addr = addr var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr) var connectionInitHandle = func(callback func()) { - slf.initMessageChannel = true if slf.messagePoolSize <= 0 { slf.messagePoolSize = 100 } @@ -123,24 +118,18 @@ func (slf *Server) Run(addr string) error { data.attrs = nil }, ) - slf.messageChannel = map[int]chan *Message{} - for i := 0; i < slf.core; i++ { - slf.messageChannel[i] = make(chan *Message, 4096*1000) - } + slf.messageChannel = make(chan *Message, 4096*1000) if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC { slf.gServer = &gNet{Server: slf} } if callback != nil { go callback() } - for _, messageChannel := range slf.messageChannel { - messageChannel := messageChannel - go func() { - for message := range messageChannel { - slf.dispatchMessage(message, slf.diversion != nil) - } - }() - } + go func() { + for message := range slf.messageChannel { + slf.dispatchMessage(message) + } + }() } switch slf.network { @@ -377,12 +366,10 @@ func (slf *Server) Shutdown(err error, stack ...string) { for _, cross := range slf.cross { cross.Release() } - if slf.initMessageChannel { - for _, messageChannel := range slf.messageChannel { - close(messageChannel) - } + if slf.messageChannel != nil { + close(slf.messageChannel) slf.messagePool.Close() - slf.initMessageChannel = false + slf.messageChannel = nil } if slf.grpcServer != nil && slf.isRunning { slf.grpcServer.GracefulStop() @@ -450,10 +437,7 @@ func (slf *Server) PushMessage(messageType MessageType, attrs ...any) { if msg.t == MessageTypeError { msg.attrs = append(msg.attrs, string(debug.Stack())) } - for _, channel := range slf.messageChannel { - channel <- msg - break - } + slf.messageChannel <- msg } // PushCrossMessage 推送跨服消息到特定跨服的服务器中 @@ -469,16 +453,7 @@ func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []b } // dispatchMessage 消息分发 -func (slf *Server) dispatchMessage(msg *Message, isRedirect bool) { - if slf.diversion != nil && isRedirect && msg.t == MessageTypePacket { - conn, packet, _ := msg.t.deconstructWebSocketPacket(msg.attrs...) - if redirect := slf.diversion(conn, packet); redirect != nil { - redirect <- func() { - slf.dispatchMessage(msg, false) - } - } - return - } +func (slf *Server) dispatchMessage(msg *Message) { present := time.Now() defer func() { if err := recover(); err != nil {