style: 优化 server 包代码可读性

This commit is contained in:
kercylan98 2023-07-11 10:10:36 +08:00
parent 9f2242b6f7
commit 74c8f215d7
4 changed files with 65 additions and 46 deletions

10
server/constants.go Normal file
View File

@ -0,0 +1,10 @@
package server
import "time"
const (
DefaultMessageBufferSize = 1024
DefaultMessageChannelSize = 1024 * 4096
DefaultAsyncPoolSize = 256
DefaultWebsocketReadDeadline = 30 * time.Second
)

View File

@ -39,7 +39,7 @@ func (slf *MultipleServer) Run() {
wait.Add(1) wait.Add(1)
go func(address string, server *Server) { go func(address string, server *Server) {
var startFinish bool var startFinish bool
server.RegStartFinishEvent(func(srv *Server) { server.startFinishEventHandles = append(server.startFinishEventHandles, func(srv *Server) {
if !startFinish { if !startFinish {
startFinish = true startFinish = true
wait.Done() wait.Done()

View File

@ -30,11 +30,32 @@ type option struct {
} }
type runtime struct { type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测 id int64 // 服务器id
cross map[string]Cross // 跨服
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小
messageChannelSize int // 消息通道大小
prod bool // 是否为生产模式
ticker *timer.Ticker // 定时器
websocketReadDeadline time.Duration // websocket连接超时时间
}
// WithMessageChannelSize 通过指定消息通道大小的方式创建服务器
// - 默认值为 DefaultMessageChannelSize
func WithMessageChannelSize(size int) Option {
return func(srv *Server) {
if size <= 0 {
size = DefaultMessageChannelSize
}
srv.messageChannelSize = size
}
} }
// WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器 // WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器
// - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock" // - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock"
// - 默认不开启死锁检测
func WithDeadlockDetect(t time.Duration) Option { func WithDeadlockDetect(t time.Duration) Option {
return func(srv *Server) { return func(srv *Server) {
if t > 0 { if t > 0 {
@ -53,7 +74,7 @@ func WithDisableAsyncMessage() Option {
// WithAsyncPoolSize 通过指定异步消息池大小的方式创建服务器 // WithAsyncPoolSize 通过指定异步消息池大小的方式创建服务器
// - 当通过 WithDisableAsyncMessage 禁用异步消息时,此选项无效 // - 当通过 WithDisableAsyncMessage 禁用异步消息时,此选项无效
// - 默认值为 256 // - 默认值为 DefaultAsyncPoolSize
func WithAsyncPoolSize(size int) Option { func WithAsyncPoolSize(size int) Option {
return func(srv *Server) { return func(srv *Server) {
srv.antsPoolSize = size srv.antsPoolSize = size
@ -61,7 +82,7 @@ func WithAsyncPoolSize(size int) Option {
} }
// WithWebsocketReadDeadline 设置 Websocket 读取超时时间 // WithWebsocketReadDeadline 设置 Websocket 读取超时时间
// - 默认: 30 * time.Second // - 默认: DefaultWebsocketReadDeadline
// - 当 t <= 0 时,表示不设置超时时间 // - 当 t <= 0 时,表示不设置超时时间
func WithWebsocketReadDeadline(t time.Duration) Option { func WithWebsocketReadDeadline(t time.Duration) Option {
return func(srv *Server) { return func(srv *Server) {
@ -87,8 +108,8 @@ func WithTicker(size int, autonomy bool) Option {
} }
// WithCross 通过跨服的方式创建服务器 // WithCross 通过跨服的方式创建服务器
// - 推送跨服消息时,将推送到对应crossName的跨服中间件中crossName可以满足不同功能采用不同的跨服/消息中间件 // - 推送跨服消息时,将推送到对应 crossName 的跨服中间件中crossName 可以满足不同功能采用不同的跨服/消息中间件
// - 通常情况下crossName仅需一个即可 // - 通常情况下 crossName 仅需一个即可
func WithCross(crossName string, serverId int64, cross Cross) Option { func WithCross(crossName string, serverId int64, cross Cross) Option {
return func(srv *Server) { return func(srv *Server) {
start: start:
@ -161,7 +182,7 @@ func WithWebsocketMessageType(messageTypes ...int) Option {
} }
// WithMessageBufferSize 通过特定的消息缓冲池大小运行服务器 // WithMessageBufferSize 通过特定的消息缓冲池大小运行服务器
// - 默认大小为 1024 // - 默认大小为 DefaultMessageBufferSize
// - 消息数量超出这个值的时候,消息处理将会造成更大的开销(频繁创建新的结构体),同时服务器将输出警告内容 // - 消息数量超出这个值的时候,消息处理将会造成更大的开销(频繁创建新的结构体),同时服务器将输出警告内容
func WithMessageBufferSize(size int) Option { func WithMessageBufferSize(size int) Option {
return func(srv *Server) { return func(srv *Server) {

View File

@ -29,13 +29,12 @@ import (
// New 根据特定网络类型创建一个服务器 // New 根据特定网络类型创建一个服务器
func New(network Network, options ...Option) *Server { func New(network Network, options ...Option) *Server {
server := &Server{ server := &Server{
event: &event{}, event: &event{},
runtime: &runtime{}, runtime: &runtime{messagePoolSize: DefaultMessageBufferSize, messageChannelSize: DefaultMessageChannelSize},
option: &option{}, option: &option{},
network: network, network: network,
closeChannel: make(chan struct{}, 1), closeChannel: make(chan struct{}, 1),
systemSignal: make(chan os.Signal, 1), systemSignal: make(chan os.Signal, 1),
messagePoolSize: 1024,
} }
server.event.Server = server server.event.Server = server
@ -48,7 +47,7 @@ func New(network Network, options ...Option) *Server {
case NetworkGRPC: case NetworkGRPC:
server.grpcServer = grpc.NewServer() server.grpcServer = grpc.NewServer()
case NetworkWebsocket: case NetworkWebsocket:
server.websocketReadDeadline = time.Second * 30 server.websocketReadDeadline = DefaultWebsocketReadDeadline
} }
for _, option := range options { for _, option := range options {
@ -57,7 +56,7 @@ func New(network Network, options ...Option) *Server {
if !server.disableAnts { if !server.disableAnts {
if server.antsPoolSize <= 0 { if server.antsPoolSize <= 0 {
server.antsPoolSize = 256 server.antsPoolSize = DefaultAsyncPoolSize
} }
var err error var err error
server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.Logger())) server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.Logger()))
@ -72,35 +71,24 @@ func New(network Network, options ...Option) *Server {
// Server 网络服务器 // Server 网络服务器
type Server struct { type Server struct {
*event // 事件 *event // 事件
*runtime // 运行时 *runtime // 运行时
*option // 可选项 *option // 可选项
systemSignal chan os.Signal // 系统信号 network Network // 网络类型
cross map[string]Cross // 跨服 addr string // 侦听地址
id int64 // 服务器id systemSignal chan os.Signal // 系统信号
network Network // 网络类型 ginServer *gin.Engine // HTTP模式下的路由器
addr string // 侦听地址 httpServer *http.Server // HTTP模式下的服务器
ginServer *gin.Engine // HTTP模式下的路由器 grpcServer *grpc.Server // GRPC模式下的服务器
httpServer *http.Server // HTTP模式下的服务器 gServer *gNet // TCP或UDP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器 isRunning bool // 是否正在运行
supportMessageTypes map[int]bool // websocket模式下支持的消息类型 isShutdown atomic.Bool // 是否已关闭
certFile, keyFile string // TLS文件 closeChannel chan struct{} // 关闭信号
isRunning bool // 是否正在运行 ants *ants.Pool // 协程池
isShutdown atomic.Bool // 是否已关闭 messagePool *synchronization.Pool[*Message] // 消息池
closeChannel chan struct{} // 关闭信号 messageChannel chan *Message // 消息管道
ants *ants.Pool // 协程池 multiple *MultipleServer // 多服务器模式下的服务器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
gServer *gNet // TCP或UDP模式下的服务器
messagePool *synchronization.Pool[*Message] // 消息池
messagePoolSize int // 消息池大小
messageChannel chan *Message // 消息管道
multiple *MultipleServer // 多服务器模式下的服务器
prod bool // 是否为生产模式
ticker *timer.Ticker // 定时器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
websocketReadDeadline time.Duration // websocket连接超时时间
} }
// Run 使用特定地址运行服务器 // Run 使用特定地址运行服务器
@ -132,7 +120,7 @@ func (slf *Server) Run(addr string) error {
data.attrs = nil data.attrs = nil
}, },
) )
slf.messageChannel = make(chan *Message, 4096*1000) slf.messageChannel = make(chan *Message, slf.messageChannelSize)
if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC { if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC {
slf.gServer = &gNet{Server: slf} slf.gServer = &gNet{Server: slf}
} }