From 551a3e5c51c048eac13bf31c0fc1665d2b7b8431 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 19 Sep 2023 16:49:28 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20server.Conn=20=E5=86=99=E5=BE=AA?= =?UTF-8?q?=E7=8E=AF=E6=9B=B4=E6=94=B9=E4=B8=BA=E9=87=87=E7=94=A8=E6=97=A0?= =?UTF-8?q?=E7=95=8C=E7=BC=93=E5=86=B2=E5=8C=BA=E7=9A=84=E5=86=99=E5=85=A5?= =?UTF-8?q?=EF=BC=8C=E4=BC=98=E5=8C=96=E6=95=B4=E4=BD=93=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/conn.go | 141 ++++++++++++++++++++----------------------------- 1 file changed, 58 insertions(+), 83 deletions(-) diff --git a/server/conn.go b/server/conn.go index b4911c5..86a046a 100644 --- a/server/conn.go +++ b/server/conn.go @@ -2,11 +2,16 @@ package server import ( "context" + "errors" + "fmt" "github.com/gorilla/websocket" + "github.com/kercylan98/minotaur/server/writeloop" "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/log" "github.com/panjf2000/gnet" "github.com/xtaci/kcp-go/v5" "net" + "runtime/debug" "strings" "sync" ) @@ -16,7 +21,6 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, DefaultConnectionChannelSize), server: server, remoteAddr: session.RemoteAddr(), ip: session.RemoteAddr().String(), @@ -27,10 +31,7 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn { if index := strings.LastIndex(c.ip, ":"); index != -1 { c.ip = c.ip[0:index] } - var wait = new(sync.WaitGroup) - wait.Add(1) - go c.writeLoop(wait) - wait.Wait() + c.writeLoop() return c } @@ -39,7 +40,6 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, DefaultConnectionChannelSize), server: server, remoteAddr: conn.RemoteAddr(), ip: conn.RemoteAddr().String(), @@ -50,10 +50,7 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn { if index := strings.LastIndex(c.ip, ":"); index != -1 { c.ip = c.ip[0:index] } - var wait = new(sync.WaitGroup) - wait.Add(1) - go c.writeLoop(wait) - wait.Wait() + c.writeLoop() return c } @@ -62,7 +59,6 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, DefaultConnectionChannelSize), server: server, remoteAddr: ws.RemoteAddr(), ip: ip, @@ -70,10 +66,7 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn { data: map[any]any{}, }, } - var wait = new(sync.WaitGroup) - wait.Add(1) - go c.writeLoop(wait) - wait.Wait() + c.writeLoop() return c } @@ -82,9 +75,8 @@ func newGatewayConn(conn *Conn, connId string) *Conn { c := &Conn{ //ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, DefaultConnectionChannelSize), - server: conn.server, - data: map[any]any{}, + server: conn.server, + data: map[any]any{}, }, } c.gw = func(packet []byte) { @@ -98,17 +90,13 @@ func NewEmptyConn(server *Server) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ - packets: make(chan *connPacket, DefaultConnectionChannelSize), server: server, remoteAddr: &net.TCPAddr{}, ip: "0.0.0.0:0", data: map[any]any{}, }, } - var wait = new(sync.WaitGroup) - wait.Add(1) - go c.writeLoop(wait) - wait.Wait() + c.writeLoop() return c } @@ -121,9 +109,6 @@ type Conn struct { // connection 长久保持的连接 type connection struct { server *Server - close sync.Once - closed bool - closeL sync.Mutex remoteAddr net.Addr ip string ws *websocket.Conn @@ -131,8 +116,10 @@ type connection struct { kcp *kcp.UDPSession gw func(packet []byte) data map[any]any - packetPool *concurrent.Pool[*connPacket] - packets chan *connPacket + closed bool + pool *concurrent.Pool[*connPacket] + loop *writeloop.WriteLoop[*connPacket] + mu sync.Mutex } // IsEmpty 是否是空连接 @@ -158,6 +145,8 @@ func (slf *Conn) GetIP() string { // IsClosed 是否已经关闭 func (slf *Conn) IsClosed() bool { + slf.mu.Lock() + defer slf.mu.Unlock() return slf.closed } @@ -216,23 +205,23 @@ func (slf *Conn) Write(packet []byte, callback ...func(err error)) { return } packet = slf.server.OnConnectionWritePacketBeforeEvent(slf, packet) - slf.closeL.Lock() - defer slf.closeL.Unlock() - if slf.packetPool == nil || slf.packets == nil { + slf.mu.Lock() + defer slf.mu.Unlock() + if slf.closed { return } - cp := slf.packetPool.Get() + cp := slf.pool.Get() cp.wst = slf.GetWST() cp.packet = packet if len(callback) > 0 { cp.callback = callback[0] } - slf.packets <- cp + slf.loop.Put(cp) } // writeLoop 写循环 -func (slf *Conn) writeLoop(wait *sync.WaitGroup) { - slf.packetPool = concurrent.NewPool[*connPacket](10*1024, +func (slf *Conn) writeLoop() { + slf.pool = concurrent.NewPool[*connPacket](10*1024, func() *connPacket { return &connPacket{} }, func(data *connPacket) { @@ -241,21 +230,7 @@ func (slf *Conn) writeLoop(wait *sync.WaitGroup) { data.callback = nil }, ) - defer func() { - if err := recover(); err != nil { - slf.Close() - slf.packets = nil - } - }() - wait.Done() - for { - packet, ok := <-slf.packets - if !ok { - slf.packets = nil - break - } - - data := packet + slf.loop = writeloop.NewWriteLoop[*connPacket](slf.pool, func(data *connPacket) error { var err error if slf.IsWebsocket() { err = slf.ws.WriteMessage(data.wst, data.packet) @@ -271,43 +246,43 @@ func (slf *Conn) writeLoop(wait *sync.WaitGroup) { _, err = slf.kcp.Write(data.packet) } } - callback := data.callback - slf.closeL.Lock() - slf.packetPool.Release(data) - slf.closeL.Unlock() - if callback != nil { - callback(err) + if data.callback != nil { + data.callback(err) } - if err != nil { - panic(err) - } - } + return err + }, func(err any) { + slf.Close(errors.New(fmt.Sprint(err))) + }) } // Close 关闭连接 func (slf *Conn) Close(err ...error) { - slf.close.Do(func() { - slf.closeL.Lock() - defer slf.closeL.Unlock() - slf.closed = true - if slf.ws != nil { - _ = slf.ws.Close() - } else if slf.gn != nil { - _ = slf.gn.Close() - } else if slf.kcp != nil { - _ = slf.kcp.Close() + slf.mu.Lock() + if slf.closed { + slf.mu.Unlock() + return + } + defer func() { + if err := recover(); err != nil { + log.Error("Conn.Close", log.String("State", "Panic"), log.Any("Error", err)) + debug.PrintStack() + slf.mu.Unlock() } - if slf.packetPool != nil { - slf.packetPool.Close() - } - slf.packetPool = nil - if slf.packets != nil { - close(slf.packets) - } - if len(err) > 0 { - slf.server.OnConnectionClosedEvent(slf, err[0]) - return - } - slf.server.OnConnectionClosedEvent(slf, nil) - }) + }() + slf.closed = true + if slf.ws != nil { + _ = slf.ws.Close() + } else if slf.gn != nil { + _ = slf.gn.Close() + } else if slf.kcp != nil { + _ = slf.kcp.Close() + } + slf.pool.Close() + slf.loop.Close() + slf.mu.Unlock() + if len(err) > 0 { + slf.server.OnConnectionClosedEvent(slf, err[0]) + return + } + slf.server.OnConnectionClosedEvent(slf, nil) }