From 0215d9ff8c6771bc398149fbaca35ae3862aa329 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Fri, 14 Jul 2023 12:40:13 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20server.Server=20?= =?UTF-8?q?=E9=83=A8=E5=88=86=E4=BA=8B=E4=BB=B6=E4=B8=AD=E5=8F=91=E7=94=9F?= =?UTF-8?q?=20panic=20=E5=AF=BC=E8=87=B4=E7=A8=8B=E5=BA=8F=E9=80=80?= =?UTF-8?q?=E5=87=BA=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/event.go | 89 +++++++++++++++++++++++++++++------------------ server/message.go | 12 +++++++ server/server.go | 8 +++++ 3 files changed, 75 insertions(+), 34 deletions(-) diff --git a/server/event.go b/server/event.go index 02b2402..537d472 100644 --- a/server/event.go +++ b/server/event.go @@ -6,6 +6,7 @@ import ( "github.com/kercylan98/minotaur/utils/runtimes" "go.uber.org/zap" "reflect" + "runtime/debug" "sync" "time" ) @@ -45,9 +46,11 @@ func (slf *event) RegStopEvent(handle StopEventHandle) { } func (slf *event) OnStopEvent() { - for _, handle := range slf.stopEventHandles { - handle(slf.Server) - } + PushSystemMessage(slf.Server, func() { + for _, handle := range slf.stopEventHandles { + handle(slf.Server) + } + }) } // RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数 @@ -69,20 +72,22 @@ func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEv } func (slf *event) OnConsoleCommandEvent(command string) { - handles, exist := slf.consoleCommandEventHandles[command] - if !exist { - switch command { - case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN": - log.Info("Console", zap.String("Receive", command), zap.String("Action", "Shutdown")) - slf.Server.shutdown(nil) - return + PushSystemMessage(slf.Server, func() { + handles, exist := slf.consoleCommandEventHandles[command] + if !exist { + switch command { + case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN": + log.Info("Console", zap.String("Receive", command), zap.String("Action", "Shutdown")) + slf.Server.shutdown(nil) + return + } + log.Warn("Server", zap.String("Command", "unregistered")) + } else { + for _, handle := range handles { + handle(slf.Server) + } } - log.Warn("Server", zap.String("Command", "unregistered")) - } else { - for _, handle := range handles { - handle(slf.Server) - } - } + }) } // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 @@ -92,6 +97,12 @@ func (slf *event) RegStartBeforeEvent(handle StartBeforeEventHandle) { } func (slf *event) OnStartBeforeEvent() { + defer func() { + if err := recover(); err != nil { + log.Error("Server", zap.String("OnStartBeforeEvent", fmt.Sprintf("%v", err))) + debug.PrintStack() + } + }() for _, handle := range slf.startBeforeEventHandles { handle(slf.Server) } @@ -104,9 +115,11 @@ func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle) { } func (slf *event) OnStartFinishEvent() { - for _, handle := range slf.startFinishEventHandles { - handle(slf.Server) - } + PushSystemMessage(slf.Server, func() { + for _, handle := range slf.startFinishEventHandles { + handle(slf.Server) + } + }) } // RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数 @@ -119,11 +132,13 @@ func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle) { } func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { - for _, handle := range slf.connectionClosedEventHandles { - handle(slf.Server, conn, err) - } - conn.Close() - slf.Server.online.Delete(conn.GetID()) + PushSystemMessage(slf.Server, func() { + for _, handle := range slf.connectionClosedEventHandles { + handle(slf.Server, conn, err) + } + conn.Close() + slf.Server.online.Delete(conn.GetID()) + }) } // RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数 @@ -136,10 +151,12 @@ func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle) { } func (slf *event) OnConnectionOpenedEvent(conn *Conn) { - slf.Server.online.Set(conn.GetID(), conn) - for _, handle := range slf.connectionOpenedEventHandles { - handle(slf.Server, conn) - } + PushSystemMessage(slf.Server, func() { + slf.Server.online.Set(conn.GetID(), conn) + for _, handle := range slf.connectionOpenedEventHandles { + handle(slf.Server, conn) + } + }) } // RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数 @@ -176,9 +193,11 @@ func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle) { } func (slf *event) OnMessageErrorEvent(message *Message, err error) { - for _, handle := range slf.messageErrorEventHandles { - handle(slf.Server, message, err) - } + PushSystemMessage(slf.Server, func() { + for _, handle := range slf.messageErrorEventHandles { + handle(slf.Server, message, err) + } + }) } // RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数 @@ -188,9 +207,11 @@ func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle) { } func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) { - for _, handle := range slf.messageLowExecEventHandles { - handle(slf.Server, message, cost) - } + PushSystemMessage(slf.Server, func() { + for _, handle := range slf.messageLowExecEventHandles { + handle(slf.Server, message, cost) + } + }) } func (slf *event) check() { diff --git a/server/message.go b/server/message.go index 8c9d5cb..6bac3ae 100644 --- a/server/message.go +++ b/server/message.go @@ -21,6 +21,9 @@ const ( // MessageTypeAsync 异步消息类型 MessageTypeAsync + + // MessageTypeSystem 系统消息类型 + MessageTypeSystem ) var messageNames = map[MessageType]string{ @@ -29,6 +32,7 @@ var messageNames = map[MessageType]string{ MessageTypeCross: "MessageTypeCross", MessageTypeTicker: "MessageTypeTicker", MessageTypeAsync: "MessageTypeAsync", + MessageTypeSystem: "MessageTypeSystem", } const ( @@ -146,6 +150,14 @@ func PushAsyncMessage(srv *Server, caller func() error, callback func(err error) srv.pushMessage(msg) } +// PushSystemMessage 向特定服务器中推送 MessageTypeSystem 消息 +func PushSystemMessage(srv *Server, handle func(), mark ...any) { + msg := srv.messagePool.Get() + msg.t = MessageTypeSystem + msg.attrs = append([]any{handle}, mark...) + srv.pushMessage(msg) +} + // SetMessagePacketVisualizer 设置消息可视化函数 // - 消息可视化将在慢消息等情况用于打印,使用自定消息可视化函数可以便于开发者进行调试 // - 默认的消息可视化函数将直接返回消息的字符串表示 diff --git a/server/server.go b/server/server.go index 162aafe..e0d2c61 100644 --- a/server/server.go +++ b/server/server.go @@ -114,6 +114,7 @@ func (slf *Server) Run(addr string) error { slf.event.check() slf.addr = addr var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr) + var messageInitFinish = make(chan struct{}, 1) var connectionInitHandle = func(callback func()) { slf.messagePool = synchronization.NewPool[*Message](slf.messagePoolSize, func() *Message { @@ -132,6 +133,7 @@ func (slf *Server) Run(addr string) error { go callback() } go func() { + messageInitFinish <- struct{}{} for message := range slf.messageChannel { slf.dispatchMessage(message) } @@ -312,6 +314,10 @@ func (slf *Server) Run(addr string) error { return ErrCanNotSupportNetwork } + <-messageInitFinish + close(messageInitFinish) + messageInitFinish = nil + fmt.Println("messageInitFinish") if slf.multiple == nil { log.Info("Server", zap.String(serverMark, "====================================================================")) log.Info("Server", zap.String(serverMark, "RunningInfo"), @@ -568,6 +574,8 @@ func (slf *Server) dispatchMessage(msg *Message) { }); err != nil { panic(err) } + case MessageTypeSystem: + attrs[0].(func())() default: log.Warn("Server", zap.String("not support message type", msg.t.String())) }