fix: 修复 server.Server 部分事件中发生 panic 导致程序退出的问题

This commit is contained in:
kercylan98 2023-07-14 12:40:13 +08:00
parent 10fcb54322
commit 0215d9ff8c
3 changed files with 75 additions and 34 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/kercylan98/minotaur/utils/runtimes" "github.com/kercylan98/minotaur/utils/runtimes"
"go.uber.org/zap" "go.uber.org/zap"
"reflect" "reflect"
"runtime/debug"
"sync" "sync"
"time" "time"
) )
@ -45,9 +46,11 @@ func (slf *event) RegStopEvent(handle StopEventHandle) {
} }
func (slf *event) OnStopEvent() { func (slf *event) OnStopEvent() {
for _, handle := range slf.stopEventHandles { PushSystemMessage(slf.Server, func() {
handle(slf.Server) for _, handle := range slf.stopEventHandles {
} handle(slf.Server)
}
})
} }
// RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数 // RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数
@ -69,20 +72,22 @@ func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEv
} }
func (slf *event) OnConsoleCommandEvent(command string) { func (slf *event) OnConsoleCommandEvent(command string) {
handles, exist := slf.consoleCommandEventHandles[command] PushSystemMessage(slf.Server, func() {
if !exist { handles, exist := slf.consoleCommandEventHandles[command]
switch command { if !exist {
case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN": switch command {
log.Info("Console", zap.String("Receive", command), zap.String("Action", "Shutdown")) case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN":
slf.Server.shutdown(nil) log.Info("Console", zap.String("Receive", command), zap.String("Action", "Shutdown"))
return slf.Server.shutdown(nil)
return
}
log.Warn("Server", zap.String("Command", "unregistered"))
} else {
for _, handle := range handles {
handle(slf.Server)
}
} }
log.Warn("Server", zap.String("Command", "unregistered")) })
} else {
for _, handle := range handles {
handle(slf.Server)
}
}
} }
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
@ -92,6 +97,12 @@ func (slf *event) RegStartBeforeEvent(handle StartBeforeEventHandle) {
} }
func (slf *event) OnStartBeforeEvent() { func (slf *event) OnStartBeforeEvent() {
defer func() {
if err := recover(); err != nil {
log.Error("Server", zap.String("OnStartBeforeEvent", fmt.Sprintf("%v", err)))
debug.PrintStack()
}
}()
for _, handle := range slf.startBeforeEventHandles { for _, handle := range slf.startBeforeEventHandles {
handle(slf.Server) handle(slf.Server)
} }
@ -104,9 +115,11 @@ func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle) {
} }
func (slf *event) OnStartFinishEvent() { func (slf *event) OnStartFinishEvent() {
for _, handle := range slf.startFinishEventHandles { PushSystemMessage(slf.Server, func() {
handle(slf.Server) for _, handle := range slf.startFinishEventHandles {
} handle(slf.Server)
}
})
} }
// RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数 // RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数
@ -119,11 +132,13 @@ func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle) {
} }
func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
for _, handle := range slf.connectionClosedEventHandles { PushSystemMessage(slf.Server, func() {
handle(slf.Server, conn, err) for _, handle := range slf.connectionClosedEventHandles {
} handle(slf.Server, conn, err)
conn.Close() }
slf.Server.online.Delete(conn.GetID()) conn.Close()
slf.Server.online.Delete(conn.GetID())
})
} }
// RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数 // RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数
@ -136,10 +151,12 @@ func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle) {
} }
func (slf *event) OnConnectionOpenedEvent(conn *Conn) { func (slf *event) OnConnectionOpenedEvent(conn *Conn) {
slf.Server.online.Set(conn.GetID(), conn) PushSystemMessage(slf.Server, func() {
for _, handle := range slf.connectionOpenedEventHandles { slf.Server.online.Set(conn.GetID(), conn)
handle(slf.Server, conn) for _, handle := range slf.connectionOpenedEventHandles {
} handle(slf.Server, conn)
}
})
} }
// RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数 // RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数
@ -176,9 +193,11 @@ func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle) {
} }
func (slf *event) OnMessageErrorEvent(message *Message, err error) { func (slf *event) OnMessageErrorEvent(message *Message, err error) {
for _, handle := range slf.messageErrorEventHandles { PushSystemMessage(slf.Server, func() {
handle(slf.Server, message, err) for _, handle := range slf.messageErrorEventHandles {
} handle(slf.Server, message, err)
}
})
} }
// RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数 // RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数
@ -188,9 +207,11 @@ func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle) {
} }
func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) { func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) {
for _, handle := range slf.messageLowExecEventHandles { PushSystemMessage(slf.Server, func() {
handle(slf.Server, message, cost) for _, handle := range slf.messageLowExecEventHandles {
} handle(slf.Server, message, cost)
}
})
} }
func (slf *event) check() { func (slf *event) check() {

View File

@ -21,6 +21,9 @@ const (
// MessageTypeAsync 异步消息类型 // MessageTypeAsync 异步消息类型
MessageTypeAsync MessageTypeAsync
// MessageTypeSystem 系统消息类型
MessageTypeSystem
) )
var messageNames = map[MessageType]string{ var messageNames = map[MessageType]string{
@ -29,6 +32,7 @@ var messageNames = map[MessageType]string{
MessageTypeCross: "MessageTypeCross", MessageTypeCross: "MessageTypeCross",
MessageTypeTicker: "MessageTypeTicker", MessageTypeTicker: "MessageTypeTicker",
MessageTypeAsync: "MessageTypeAsync", MessageTypeAsync: "MessageTypeAsync",
MessageTypeSystem: "MessageTypeSystem",
} }
const ( const (
@ -146,6 +150,14 @@ func PushAsyncMessage(srv *Server, caller func() error, callback func(err error)
srv.pushMessage(msg) srv.pushMessage(msg)
} }
// PushSystemMessage 向特定服务器中推送 MessageTypeSystem 消息
func PushSystemMessage(srv *Server, handle func(), mark ...any) {
msg := srv.messagePool.Get()
msg.t = MessageTypeSystem
msg.attrs = append([]any{handle}, mark...)
srv.pushMessage(msg)
}
// SetMessagePacketVisualizer 设置消息可视化函数 // SetMessagePacketVisualizer 设置消息可视化函数
// - 消息可视化将在慢消息等情况用于打印,使用自定消息可视化函数可以便于开发者进行调试 // - 消息可视化将在慢消息等情况用于打印,使用自定消息可视化函数可以便于开发者进行调试
// - 默认的消息可视化函数将直接返回消息的字符串表示 // - 默认的消息可视化函数将直接返回消息的字符串表示

View File

@ -114,6 +114,7 @@ func (slf *Server) Run(addr string) error {
slf.event.check() slf.event.check()
slf.addr = addr slf.addr = addr
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr) var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
var messageInitFinish = make(chan struct{}, 1)
var connectionInitHandle = func(callback func()) { var connectionInitHandle = func(callback func()) {
slf.messagePool = synchronization.NewPool[*Message](slf.messagePoolSize, slf.messagePool = synchronization.NewPool[*Message](slf.messagePoolSize,
func() *Message { func() *Message {
@ -132,6 +133,7 @@ func (slf *Server) Run(addr string) error {
go callback() go callback()
} }
go func() { go func() {
messageInitFinish <- struct{}{}
for message := range slf.messageChannel { for message := range slf.messageChannel {
slf.dispatchMessage(message) slf.dispatchMessage(message)
} }
@ -312,6 +314,10 @@ func (slf *Server) Run(addr string) error {
return ErrCanNotSupportNetwork return ErrCanNotSupportNetwork
} }
<-messageInitFinish
close(messageInitFinish)
messageInitFinish = nil
fmt.Println("messageInitFinish")
if slf.multiple == nil { if slf.multiple == nil {
log.Info("Server", zap.String(serverMark, "====================================================================")) log.Info("Server", zap.String(serverMark, "===================================================================="))
log.Info("Server", zap.String(serverMark, "RunningInfo"), log.Info("Server", zap.String(serverMark, "RunningInfo"),
@ -568,6 +574,8 @@ func (slf *Server) dispatchMessage(msg *Message) {
}); err != nil { }); err != nil {
panic(err) panic(err)
} }
case MessageTypeSystem:
attrs[0].(func())()
default: default:
log.Warn("Server", zap.String("not support message type", msg.t.String())) log.Warn("Server", zap.String("not support message type", msg.t.String()))
} }