diff --git a/server/constants.go b/server/constants.go new file mode 100644 index 0000000..a2aecd9 --- /dev/null +++ b/server/constants.go @@ -0,0 +1,10 @@ +package server + +import "time" + +const ( + DefaultMessageBufferSize = 1024 + DefaultMessageChannelSize = 1024 * 4096 + DefaultAsyncPoolSize = 256 + DefaultWebsocketReadDeadline = 30 * time.Second +) diff --git a/server/multiple.go b/server/multiple.go index c76346d..f72164e 100644 --- a/server/multiple.go +++ b/server/multiple.go @@ -39,7 +39,7 @@ func (slf *MultipleServer) Run() { wait.Add(1) go func(address string, server *Server) { var startFinish bool - server.RegStartFinishEvent(func(srv *Server) { + server.startFinishEventHandles = append(server.startFinishEventHandles, func(srv *Server) { if !startFinish { startFinish = true wait.Done() diff --git a/server/options.go b/server/options.go index 69e8df5..14af5b4 100644 --- a/server/options.go +++ b/server/options.go @@ -30,11 +30,32 @@ type option struct { } type runtime struct { - deadlockDetect time.Duration // 是否开启死锁检测 + id int64 // 服务器id + cross map[string]Cross // 跨服 + deadlockDetect time.Duration // 是否开启死锁检测 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + messagePoolSize int // 消息池大小 + messageChannelSize int // 消息通道大小 + prod bool // 是否为生产模式 + ticker *timer.Ticker // 定时器 + websocketReadDeadline time.Duration // websocket连接超时时间 +} + +// WithMessageChannelSize 通过指定消息通道大小的方式创建服务器 +// - 默认值为 DefaultMessageChannelSize +func WithMessageChannelSize(size int) Option { + return func(srv *Server) { + if size <= 0 { + size = DefaultMessageChannelSize + } + srv.messageChannelSize = size + } } // WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器 // - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock" +// - 默认不开启死锁检测 func WithDeadlockDetect(t time.Duration) Option { return func(srv *Server) { if t > 0 { @@ -53,7 +74,7 @@ func WithDisableAsyncMessage() Option { // WithAsyncPoolSize 通过指定异步消息池大小的方式创建服务器 // - 当通过 WithDisableAsyncMessage 禁用异步消息时,此选项无效 -// - 默认值为 256 +// - 默认值为 DefaultAsyncPoolSize func WithAsyncPoolSize(size int) Option { return func(srv *Server) { srv.antsPoolSize = size @@ -61,7 +82,7 @@ func WithAsyncPoolSize(size int) Option { } // WithWebsocketReadDeadline 设置 Websocket 读取超时时间 -// - 默认: 30 * time.Second +// - 默认: DefaultWebsocketReadDeadline // - 当 t <= 0 时,表示不设置超时时间 func WithWebsocketReadDeadline(t time.Duration) Option { return func(srv *Server) { @@ -87,8 +108,8 @@ func WithTicker(size int, autonomy bool) Option { } // WithCross 通过跨服的方式创建服务器 -// - 推送跨服消息时,将推送到对应crossName的跨服中间件中,crossName可以满足不同功能采用不同的跨服/消息中间件 -// - 通常情况下crossName仅需一个即可 +// - 推送跨服消息时,将推送到对应 crossName 的跨服中间件中,crossName 可以满足不同功能采用不同的跨服/消息中间件 +// - 通常情况下 crossName 仅需一个即可 func WithCross(crossName string, serverId int64, cross Cross) Option { return func(srv *Server) { start: @@ -161,7 +182,7 @@ func WithWebsocketMessageType(messageTypes ...int) Option { } // WithMessageBufferSize 通过特定的消息缓冲池大小运行服务器 -// - 默认大小为 1024 +// - 默认大小为 DefaultMessageBufferSize // - 消息数量超出这个值的时候,消息处理将会造成更大的开销(频繁创建新的结构体),同时服务器将输出警告内容 func WithMessageBufferSize(size int) Option { return func(srv *Server) { diff --git a/server/server.go b/server/server.go index a1d973a..d778c10 100644 --- a/server/server.go +++ b/server/server.go @@ -29,13 +29,12 @@ import ( // New 根据特定网络类型创建一个服务器 func New(network Network, options ...Option) *Server { server := &Server{ - event: &event{}, - runtime: &runtime{}, - option: &option{}, - network: network, - closeChannel: make(chan struct{}, 1), - systemSignal: make(chan os.Signal, 1), - messagePoolSize: 1024, + event: &event{}, + runtime: &runtime{messagePoolSize: DefaultMessageBufferSize, messageChannelSize: DefaultMessageChannelSize}, + option: &option{}, + network: network, + closeChannel: make(chan struct{}, 1), + systemSignal: make(chan os.Signal, 1), } server.event.Server = server @@ -48,7 +47,7 @@ func New(network Network, options ...Option) *Server { case NetworkGRPC: server.grpcServer = grpc.NewServer() case NetworkWebsocket: - server.websocketReadDeadline = time.Second * 30 + server.websocketReadDeadline = DefaultWebsocketReadDeadline } for _, option := range options { @@ -57,7 +56,7 @@ func New(network Network, options ...Option) *Server { if !server.disableAnts { if server.antsPoolSize <= 0 { - server.antsPoolSize = 256 + server.antsPoolSize = DefaultAsyncPoolSize } var err error server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.Logger())) @@ -72,35 +71,24 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event // 事件 - *runtime // 运行时 - *option // 可选项 - systemSignal chan os.Signal // 系统信号 - cross map[string]Cross // 跨服 - id int64 // 服务器id - network Network // 网络类型 - addr string // 侦听地址 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 - ants *ants.Pool // 协程池 - - gServer *gNet // TCP或UDP模式下的服务器 - messagePool *synchronization.Pool[*Message] // 消息池 - messagePoolSize int // 消息池大小 - messageChannel chan *Message // 消息管道 - multiple *MultipleServer // 多服务器模式下的服务器 - prod bool // 是否为生产模式 - ticker *timer.Ticker // 定时器 - - multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 - - websocketReadDeadline time.Duration // websocket连接超时时间 + *event // 事件 + *runtime // 运行时 + *option // 可选项 + network Network // 网络类型 + addr string // 侦听地址 + systemSignal chan os.Signal // 系统信号 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + gServer *gNet // TCP或UDP模式下的服务器 + isRunning bool // 是否正在运行 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 + ants *ants.Pool // 协程池 + messagePool *synchronization.Pool[*Message] // 消息池 + messageChannel chan *Message // 消息管道 + multiple *MultipleServer // 多服务器模式下的服务器 + multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 } // Run 使用特定地址运行服务器 @@ -132,7 +120,7 @@ func (slf *Server) Run(addr string) error { data.attrs = nil }, ) - slf.messageChannel = make(chan *Message, 4096*1000) + slf.messageChannel = make(chan *Message, slf.messageChannelSize) if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC { slf.gServer = &gNet{Server: slf} }