refactor: 服务器支持异步消息类型、死锁阻塞、异步慢消息检测

This commit is contained in:
kercylan98 2023-07-07 16:26:19 +08:00
parent f0e3822ecf
commit 1a2c1df289
10 changed files with 176 additions and 35 deletions

View File

@ -39,8 +39,6 @@ func NewLockstep[ClientID comparable, Command any](options ...LockstepOption[Cli
// - 自定帧序列化方式 WithLockstepSerialization // - 自定帧序列化方式 WithLockstepSerialization
// - 从特定帧开始追帧 // - 从特定帧开始追帧
// - 兼容各种基于TCP/UDP/Unix的网络类型可通过客户端实现其他网络类型同步 // - 兼容各种基于TCP/UDP/Unix的网络类型可通过客户端实现其他网络类型同步
//
// 可在 examples 目录下找到示例示例项目simple-server-lockstep
type Lockstep[ClientID comparable, Command any] struct { type Lockstep[ClientID comparable, Command any] struct {
clients *synchronization.Map[ClientID, component.LockstepClient[ClientID]] // 接受广播的客户端 clients *synchronization.Map[ClientID, component.LockstepClient[ClientID]] // 接受广播的客户端
frames *synchronization.Map[int, []Command] // 所有帧指令 frames *synchronization.Map[int, []Command] // 所有帧指令

View File

@ -30,8 +30,8 @@ func (slf *Player[ID]) UseConn(conn *server.Conn) {
} }
// Send 向该玩家发送数据 // Send 向该玩家发送数据
func (slf *Player[ID]) Send(packet []byte, messageType ...int) { func (slf *Player[ID]) Send(packet server.Packet) {
slf.conn.Write(packet, messageType...) slf.conn.Write(packet)
} }
// Close 关闭玩家 // Close 关闭玩家

View File

@ -73,8 +73,8 @@ func (slf *event) OnConsoleCommandEvent(command string) {
if !exist { if !exist {
switch command { switch command {
case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN": case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN":
log.Info("Console", zap.String("Receive", command), zap.String("Action", "Shutdown")) log.Info("Console", zap.String("Receive", command), zap.String("Action", "shutdown"))
slf.Server.Shutdown(nil) slf.Server.shutdown(nil)
return return
} }
log.Warn("Server", zap.String("Command", "unregistered")) log.Warn("Server", zap.String("Command", "unregistered"))

View File

@ -23,7 +23,7 @@ func (slf *gNet) OnShutdown(server gnet.Server) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() defer cancel()
if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil { if err := gnet.Stop(ctx, fmt.Sprintf("%s://%s", slf.network, slf.addr)); err != nil {
log.Error("Server", zap.String("Minotaur GNet Server", "Shutdown"), zap.Error(err)) log.Error("Server", zap.String("Minotaur GNet Server", "shutdown"), zap.Error(err))
} }
} }

View File

@ -31,12 +31,12 @@ var messageNames = map[MessageType]string{
const ( const (
MessageErrorActionNone MessageErrorAction = iota // 错误消息类型操作:将不会被进行任何特殊处理,仅进行日志输出 MessageErrorActionNone MessageErrorAction = iota // 错误消息类型操作:将不会被进行任何特殊处理,仅进行日志输出
MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.Shutdown 函数 MessageErrorActionShutdown // 错误消息类型操作:当接收到该类型的操作时,服务器将执行 Server.shutdown 函数
) )
var messageErrorActionNames = map[MessageErrorAction]string{ var messageErrorActionNames = map[MessageErrorAction]string{
MessageErrorActionNone: "None", MessageErrorActionNone: "None",
MessageErrorActionShutdown: "Shutdown", MessageErrorActionShutdown: "shutdown",
} }
type ( type (
@ -60,7 +60,7 @@ func (slf MessageType) String() string {
return messageNames[slf] return messageNames[slf]
} }
// PushPacketMessage 向特定服务器中推送 Packet 消息 // PushPacketMessage 向特定服务器中推送 MessageTypePacket 消息
func PushPacketMessage(srv *Server, conn *Conn, packet []byte) { func PushPacketMessage(srv *Server, conn *Conn, packet []byte) {
msg := srv.messagePool.Get() msg := srv.messagePool.Get()
msg.t = MessageTypePacket msg.t = MessageTypePacket
@ -68,7 +68,7 @@ func PushPacketMessage(srv *Server, conn *Conn, packet []byte) {
srv.pushMessage(msg) srv.pushMessage(msg)
} }
// PushErrorMessage 向特定服务器中推送 Error 消息 // PushErrorMessage 向特定服务器中推送 MessageTypeError 消息
func PushErrorMessage(srv *Server, err error, action MessageErrorAction) { func PushErrorMessage(srv *Server, err error, action MessageErrorAction) {
msg := srv.messagePool.Get() msg := srv.messagePool.Get()
msg.t = MessageTypeError msg.t = MessageTypeError
@ -76,7 +76,7 @@ func PushErrorMessage(srv *Server, err error, action MessageErrorAction) {
srv.pushMessage(msg) srv.pushMessage(msg)
} }
// PushCrossMessage 向特定服务器中推送 Cross 消息 // PushCrossMessage 向特定服务器中推送 MessageTypeCross 消息
func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []byte) { func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []byte) {
if serverId == srv.id { if serverId == srv.id {
msg := srv.messagePool.Get() msg := srv.messagePool.Get()
@ -95,10 +95,18 @@ func PushCrossMessage(srv *Server, crossName string, serverId int64, packet []by
} }
} }
// PushTickerMessage 向特定服务器中推送 Ticker 消息 // PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息
func PushTickerMessage(srv *Server, caller func()) { func PushTickerMessage(srv *Server, caller func()) {
msg := srv.messagePool.Get() msg := srv.messagePool.Get()
msg.t = MessageTypeTicker msg.t = MessageTypeTicker
msg.attrs = []any{caller} msg.attrs = []any{caller}
srv.pushMessage(msg) srv.pushMessage(msg)
} }
// PushAsyncMessage 向特定服务器中推送 MessageTypeAsync 消息
func PushAsyncMessage(srv *Server, caller func() error, callback ...func(err error)) {
msg := srv.messagePool.Get()
msg.t = MessageTypeAsync
msg.attrs = []any{caller, callback}
srv.pushMessage(msg)
}

View File

@ -60,7 +60,7 @@ func (slf *MultipleServer) Run() {
case err := <-exceptionChannel: case err := <-exceptionChannel:
for len(slf.servers) > 0 { for len(slf.servers) > 0 {
server := slf.servers[0] server := slf.servers[0]
server.Shutdown(err) server.shutdown(err)
slf.servers = slf.servers[1:] slf.servers = slf.servers[1:]
} }
break break
@ -68,7 +68,7 @@ func (slf *MultipleServer) Run() {
for len(slf.servers) > 0 { for len(slf.servers) > 0 {
server := slf.servers[0] server := slf.servers[0]
server.multipleRuntimeErrorChan = nil server.multipleRuntimeErrorChan = nil
server.Shutdown(nil) server.shutdown(nil)
slf.servers = slf.servers[1:] slf.servers = slf.servers[1:]
} }
break break
@ -76,7 +76,7 @@ func (slf *MultipleServer) Run() {
for len(slf.servers) > 0 { for len(slf.servers) > 0 {
server := slf.servers[0] server := slf.servers[0]
server.multipleRuntimeErrorChan = nil server.multipleRuntimeErrorChan = nil
server.Shutdown(nil) server.shutdown(nil)
slf.servers = slf.servers[1:] slf.servers = slf.servers[1:]
} }
break break

View File

@ -23,6 +23,41 @@ const (
) )
type Option func(srv *Server) type Option func(srv *Server)
type option struct {
disableAnts bool // 是否禁用协程池
antsPoolSize int // 协程池大小
}
type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测
}
// WithDeadlockDetect 通过死锁、死循环、永久阻塞检测的方式创建服务器
// - 当检测到死锁、死循环、永久阻塞时,服务器将会生成 WARN 类型的日志,关键字为 "SuspectedDeadlock"
func WithDeadlockDetect(t time.Duration) Option {
return func(srv *Server) {
if t > 0 {
srv.deadlockDetect = t
log.Info("DeadlockDetect", zap.Any("Time", t))
}
}
}
// WithDisableAsyncMessage 通过禁用异步消息的方式创建服务器
func WithDisableAsyncMessage() Option {
return func(srv *Server) {
srv.disableAnts = true
}
}
// WithAsyncPoolSize 通过指定异步消息池大小的方式创建服务器
// - 当通过 WithDisableAsyncMessage 禁用异步消息时,此选项无效
// - 默认值为 256
func WithAsyncPoolSize(size int) Option {
return func(srv *Server) {
srv.antsPoolSize = size
}
}
// WithWebsocketReadDeadline 设置 Websocket 读取超时时间 // WithWebsocketReadDeadline 设置 Websocket 读取超时时间
// - 默认: 30 * time.Second // - 默认: 30 * time.Second

View File

@ -10,6 +10,7 @@ import (
"github.com/kercylan98/minotaur/utils/synchronization" "github.com/kercylan98/minotaur/utils/synchronization"
"github.com/kercylan98/minotaur/utils/timer" "github.com/kercylan98/minotaur/utils/timer"
"github.com/kercylan98/minotaur/utils/times" "github.com/kercylan98/minotaur/utils/times"
"github.com/panjf2000/ants/v2"
"github.com/panjf2000/gnet" "github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/pkg/logging" "github.com/panjf2000/gnet/pkg/logging"
"github.com/xtaci/kcp-go/v5" "github.com/xtaci/kcp-go/v5"
@ -29,9 +30,11 @@ import (
func New(network Network, options ...Option) *Server { func New(network Network, options ...Option) *Server {
server := &Server{ server := &Server{
event: &event{}, event: &event{},
runtime: &runtime{},
option: &option{},
network: network, network: network,
options: options,
closeChannel: make(chan struct{}, 1), closeChannel: make(chan struct{}, 1),
systemSignal: make(chan os.Signal, 1),
} }
server.event.Server = server server.event.Server = server
@ -50,17 +53,32 @@ func New(network Network, options ...Option) *Server {
for _, option := range options { for _, option := range options {
option(server) option(server)
} }
if !server.disableAnts {
if server.antsPoolSize <= 0 {
server.antsPoolSize = 256
}
var err error
server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.Logger()))
if err != nil {
panic(err)
}
}
server.option = nil
return server return server
} }
// Server 网络服务器 // Server 网络服务器
type Server struct { type Server struct {
*event // 事件 *event // 事件
*runtime // 运行时
*option // 可选项
systemSignal chan os.Signal // 系统信号
cross map[string]Cross // 跨服 cross map[string]Cross // 跨服
id int64 // 服务器id id int64 // 服务器id
network Network // 网络类型 network Network // 网络类型
addr string // 侦听地址 addr string // 侦听地址
options []Option // 选项
ginServer *gin.Engine // HTTP模式下的路由器 ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器 httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器 grpcServer *grpc.Server // GRPC模式下的服务器
@ -69,6 +87,7 @@ type Server struct {
isRunning bool // 是否正在运行 isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭 isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号 closeChannel chan struct{} // 关闭信号
ants *ants.Pool // 协程池
gServer *gNet // TCP或UDP模式下的服务器 gServer *gNet // TCP或UDP模式下的服务器
messagePool *synchronization.Pool[*Message] // 消息池 messagePool *synchronization.Pool[*Message] // 消息池
@ -304,11 +323,11 @@ func (slf *Server) Run(addr string) error {
) )
log.Info("Server", zap.String("Minotaur Server", "====================================================================")) log.Info("Server", zap.String("Minotaur Server", "===================================================================="))
slf.OnStartFinishEvent() slf.OnStartFinishEvent()
systemSignal := make(chan os.Signal, 1)
signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) signal.Notify(slf.systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
select { select {
case <-systemSignal: case <-slf.systemSignal:
slf.Shutdown(nil) slf.shutdown(nil)
} }
select { select {
@ -348,8 +367,13 @@ func (slf *Server) Ticker() *timer.Ticker {
return slf.ticker return slf.ticker
} }
// Shutdown 停止运行服务器 // Shutdown 主动停止运行服务器
func (slf *Server) Shutdown(err error, stack ...string) { func (slf *Server) Shutdown() {
slf.systemSignal <- syscall.SIGQUIT
}
// shutdown 停止运行服务器
func (slf *Server) shutdown(err error, stack ...string) {
slf.OnStopEvent() slf.OnStopEvent()
defer func() { defer func() {
if slf.multipleRuntimeErrorChan != nil { if slf.multipleRuntimeErrorChan != nil {
@ -432,8 +456,32 @@ func (slf *Server) pushMessage(message *Message) {
slf.messageChannel <- message slf.messageChannel <- message
} }
func (slf *Server) low(message *Message, present time.Time) {
cost := time.Since(present)
if cost > time.Millisecond*100 {
log.Warn("Server", zap.String("MessageType", messageNames[message.t]), zap.String("LowExecCost", cost.String()))
slf.OnMessageLowExecEvent(message, cost)
}
}
// dispatchMessage 消息分发 // dispatchMessage 消息分发
func (slf *Server) dispatchMessage(msg *Message) { func (slf *Server) dispatchMessage(msg *Message) {
var (
ctx context.Context
cancel context.CancelFunc
)
if slf.deadlockDetect > 0 {
ctx, cancel = context.WithTimeout(context.Background(), slf.deadlockDetect)
go func() {
select {
case <-ctx.Done():
if err := ctx.Err(); err == context.DeadlineExceeded {
log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("SuspectedDeadlock", msg.attrs))
}
}
}()
}
present := time.Now() present := time.Now()
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
@ -443,15 +491,14 @@ func (slf *Server) dispatchMessage(msg *Message) {
} }
} }
cost := time.Since(present) if msg.t != MessageTypeAsync {
if cost > time.Millisecond*100 { super.Handle(cancel)
log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.String("LowExecCost", cost.String()), zap.Any("MessageAttrs", msg.attrs)) slf.low(msg, present)
slf.OnMessageLowExecEvent(msg, cost)
}
if !slf.isShutdown.Load() { if !slf.isShutdown.Load() {
slf.messagePool.Release(msg) slf.messagePool.Release(msg)
} }
}
}() }()
var attrs = msg.attrs var attrs = msg.attrs
switch msg.t { switch msg.t {
@ -465,7 +512,7 @@ func (slf *Server) dispatchMessage(msg *Message) {
case MessageErrorActionNone: case MessageErrorActionNone:
log.ErrorWithStack("Server", stack, zap.Error(err)) log.ErrorWithStack("Server", stack, zap.Error(err))
case MessageErrorActionShutdown: case MessageErrorActionShutdown:
slf.Shutdown(err, stack) slf.shutdown(err, stack)
default: default:
log.Warn("Server", zap.String("not support message error action", action.String())) log.Warn("Server", zap.String("not support message error action", action.String()))
} }
@ -474,7 +521,29 @@ func (slf *Server) dispatchMessage(msg *Message) {
case MessageTypeTicker: case MessageTypeTicker:
attrs[0].(func())() attrs[0].(func())()
case MessageTypeAsync: case MessageTypeAsync:
handle := attrs[0].(func() error)
callbacks := attrs[1].([]func(err error))
if err := slf.ants.Submit(func() {
defer func() {
if err := recover(); err != nil {
log.Error("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("MessageAttrs", msg.attrs), zap.Any("error", err))
if e, ok := err.(error); ok {
slf.OnMessageErrorEvent(msg, e)
}
}
super.Handle(cancel)
if !slf.isShutdown.Load() {
slf.messagePool.Release(msg)
}
}()
if err := handle(); err != nil {
for _, callback := range callbacks {
callback(err)
}
}
}); err != nil {
panic(err)
}
default: default:
log.Warn("Server", zap.String("not support message type", msg.t.String())) log.Warn("Server", zap.String("not support message type", msg.t.String()))
} }

View File

@ -0,0 +1,23 @@
package server_test
import (
"github.com/kercylan98/minotaur/server"
"time"
)
func ExampleNew() {
srv := server.New(server.NetworkWebsocket,
server.WithDeadlockDetect(time.Second*5),
)
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet server.Packet) {
conn.Write(packet)
})
go func() { time.Sleep(1 * time.Second); srv.Shutdown() }()
if err := srv.Run(":9999"); err != nil {
panic(err)
}
// Output:
}

View File

@ -101,8 +101,16 @@ func getWriter(filename string, times int) io.Writer {
return hook return hook
} }
func Logger() *zap.Logger { type MLogger struct {
return logger *zap.Logger
}
func (slf *MLogger) Printf(format string, args ...interface{}) {
slf.Info(fmt.Sprintf(format, args...))
}
func Logger() *MLogger {
return &MLogger{logger}
} }
func Info(msg string, fields ...zap.Field) { func Info(msg string, fields ...zap.Field) {