From e0c63d569d13f6a349544bcea00af43c225d84fc Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 4 Jul 2023 18:56:35 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E9=87=8D=E6=9E=84=20server.Connect?= =?UTF-8?q?ionClosedEventHandle=EF=BC=8C=E4=BF=AE=E5=A4=8D=E9=83=A8?= =?UTF-8?q?=E5=88=86=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 支持在连接关闭时获取到错误信息,修复建立连接立刻发送请求无法被处理的问题 --- examples/simple-server-chatroom/main.go | 2 +- examples/simple-server-lockstep/main.go | 2 +- server/conn.go | 20 ++++++++++++++------ server/event.go | 6 +++--- server/gnet.go | 2 +- server/server.go | 4 ++-- 6 files changed, 22 insertions(+), 14 deletions(-) diff --git a/examples/simple-server-chatroom/main.go b/examples/simple-server-chatroom/main.go index 901f3e5..394ccff 100644 --- a/examples/simple-server-chatroom/main.go +++ b/examples/simple-server-chatroom/main.go @@ -18,7 +18,7 @@ func main() { connections.Set(conn.GetID(), conn) conn.Write([]byte("欢迎加入")) }) - srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn) { + srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) { if connections.DeleteExist(conn.GetID()) { for id, c := range connections.Map() { c.Write([]byte(fmt.Sprintf("%s 退出了聊天室", id))) diff --git a/examples/simple-server-lockstep/main.go b/examples/simple-server-lockstep/main.go index 70fa992..d85cbb1 100644 --- a/examples/simple-server-lockstep/main.go +++ b/examples/simple-server-lockstep/main.go @@ -29,7 +29,7 @@ func main() { players.Set(conn.GetID(), player) lockstep.JoinClient(player) }) - srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn) { + srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) { players.Delete(conn.GetID()) lockstep.LeaveClient(conn.GetID()) if players.Size() == 0 { diff --git a/server/conn.go b/server/conn.go index 24e3707..bd9a330 100644 --- a/server/conn.go +++ b/server/conn.go @@ -23,7 +23,10 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn { if index := strings.LastIndex(c.ip, ":"); index != -1 { c.ip = c.ip[0:index] } - go c.writeLoop() + var wait = new(sync.WaitGroup) + wait.Add(1) + go c.writeLoop(wait) + wait.Wait() return c } @@ -39,7 +42,10 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn { if index := strings.LastIndex(c.ip, ":"); index != -1 { c.ip = c.ip[0:index] } - go c.writeLoop() + var wait = new(sync.WaitGroup) + wait.Add(1) + go c.writeLoop(wait) + wait.Wait() return c } @@ -52,7 +58,10 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn { ws: ws, data: map[any]any{}, } - go c.writeLoop() + var wait = new(sync.WaitGroup) + wait.Add(1) + go c.writeLoop(wait) + wait.Wait() return c } @@ -84,8 +93,6 @@ func (slf *Conn) GetIP() string { // Close 关闭连接 func (slf *Conn) Close() { - slf.mutex.Lock() - defer slf.mutex.Unlock() if slf.ws != nil { _ = slf.ws.Close() } else if slf.gn != nil { @@ -147,7 +154,7 @@ func (slf *Conn) Write(data []byte, messageType ...int) { } // writeLoop 写循环 -func (slf *Conn) writeLoop() { +func (slf *Conn) writeLoop(wait *sync.WaitGroup) { slf.packetPool = synchronization.NewPool[*connPacket](10*1024, func() *connPacket { return &connPacket{} @@ -161,6 +168,7 @@ func (slf *Conn) writeLoop() { slf.Close() } }() + wait.Done() for { slf.mutex.Lock() if slf.packetPool == nil { diff --git a/server/event.go b/server/event.go index 96d8c39..6105a4d 100644 --- a/server/event.go +++ b/server/event.go @@ -15,7 +15,7 @@ type StartFinishEventHandle func(srv *Server) type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte) type ConnectionReceiveWebsocketPacketEventHandle func(srv *Server, conn *Conn, packet []byte, messageType int) type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) -type ConnectionClosedEventHandle func(srv *Server, conn *Conn) +type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any) type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId int64, packet []byte) type MessageErrorEventHandle func(srv *Server, message *Message, err error) type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Duration) @@ -106,10 +106,10 @@ func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle) { log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String())) } -func (slf *event) OnConnectionClosedEvent(conn *Conn) { +func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { log.Debug("Server", zap.String("ConnectionClosed", conn.GetID())) for _, handle := range slf.connectionClosedEventHandles { - handle(slf.Server, conn) + handle(slf.Server, conn, err) } conn.Close() } diff --git a/server/gnet.go b/server/gnet.go index 6cfe774..a21e630 100644 --- a/server/gnet.go +++ b/server/gnet.go @@ -35,7 +35,7 @@ func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) { } func (slf *gNet) OnClosed(c gnet.Conn, err error) (action gnet.Action) { - slf.OnConnectionClosedEvent(c.Context().(*Conn)) + slf.OnConnectionClosedEvent(c.Context().(*Conn), err) return } diff --git a/server/server.go b/server/server.go index 4270764..d862339 100644 --- a/server/server.go +++ b/server/server.go @@ -185,7 +185,7 @@ func (slf *Server) Run(addr string) error { go func(conn *Conn) { defer func() { if err := recover(); err != nil { - slf.OnConnectionClosedEvent(conn) + slf.OnConnectionClosedEvent(conn, err) } }() @@ -265,7 +265,7 @@ func (slf *Server) Run(addr string) error { defer func() { if err := recover(); err != nil { - slf.OnConnectionClosedEvent(conn) + slf.OnConnectionClosedEvent(conn, err) } }()