From 1cbe8ecf56430318ca1f5a190e311cbb1bcbb2a4 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Fri, 1 Sep 2023 14:31:25 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E6=8F=90=E9=AB=98=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=90=9E=E5=90=90=E9=87=8F=EF=BC=8C=E9=99=8D=E4=BD=8E=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=BB=B6=E8=BF=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/client/client.go | 45 ++++++++++++++++++++++++------------ server/client/client_core.go | 3 +++ server/client/tcp.go | 6 +++++ server/client/uds.go | 6 +++++ server/client/websocket.go | 6 +++++ server/conn.go | 30 +++++++++++++----------- 6 files changed, 68 insertions(+), 28 deletions(-) diff --git a/server/client/client.go b/server/client/client.go index baf45d1..f24bf21 100644 --- a/server/client/client.go +++ b/server/client/client.go @@ -3,27 +3,33 @@ package client import ( "github.com/kercylan98/minotaur/utils/concurrent" "sync" - "time" ) // NewClient 创建客户端 func NewClient(core Core) *Client { client := &Client{ + cond: sync.NewCond(&sync.Mutex{}), events: new(events), core: core, } return client } +// CloneClient 克隆客户端 +func CloneClient(client *Client) *Client { + return NewClient(client.core.Clone()) +} + // Client 客户端 type Client struct { *events core Core - mutex sync.Mutex + cond *sync.Cond packetPool *concurrent.Pool[*Packet] packets []*Packet - accumulate []*Packet + accumulate []*Packet + accumulation int // 积压消息数 } func (slf *Client) Run() error { @@ -38,14 +44,14 @@ func (slf *Client) Run() error { }() err := <-runState if err != nil { - slf.mutex.Lock() + slf.cond.L.Lock() if slf.packetPool != nil { slf.packetPool.Close() slf.packetPool = nil } slf.accumulate = append(slf.accumulate, slf.packets...) slf.packets = nil - slf.mutex.Unlock() + slf.cond.L.Unlock() return err } var wait = new(sync.WaitGroup) @@ -98,7 +104,10 @@ func (slf *Client) write(wst int, packet []byte, callback ...func(err error)) { if len(callback) > 0 { p.callback = callback[0] } + slf.cond.L.Lock() slf.accumulate = append(slf.accumulate, p) + slf.accumulation = len(slf.accumulate) + len(slf.packets) + slf.cond.L.Unlock() return } cp := slf.packetPool.Get() @@ -107,9 +116,11 @@ func (slf *Client) write(wst int, packet []byte, callback ...func(err error)) { if len(callback) > 0 { cp.callback = callback[0] } - slf.mutex.Lock() + slf.cond.L.Lock() slf.packets = append(slf.packets, cp) - slf.mutex.Unlock() + slf.accumulation = len(slf.accumulate) + len(slf.packets) + slf.cond.Signal() + slf.cond.L.Unlock() } // writeLoop 写循环 @@ -123,30 +134,29 @@ func (slf *Client) writeLoop(wait *sync.WaitGroup) { data.callback = nil }, ) - slf.mutex.Lock() + slf.cond.L.Lock() slf.packets = append(slf.packets, slf.accumulate...) slf.accumulate = nil - slf.mutex.Unlock() + slf.cond.L.Unlock() defer func() { if err := recover(); err != nil { slf.Close(err.(error)) } }() wait.Done() + for { - slf.mutex.Lock() + slf.cond.L.Lock() if slf.packetPool == nil { - slf.mutex.Unlock() + slf.cond.L.Unlock() return } if len(slf.packets) == 0 { - slf.mutex.Unlock() - time.Sleep(50 * time.Millisecond) - continue + slf.cond.Wait() } packets := slf.packets[0:] slf.packets = slf.packets[0:0] - slf.mutex.Unlock() + slf.cond.L.Unlock() for i := 0; i < len(packets); i++ { data := packets[i] var err = slf.core.Write(data) @@ -170,3 +180,8 @@ func (slf *Client) onReceive(wst int, packet []byte) { func (slf *Client) GetServerAddr() string { return slf.core.GetServerAddr() } + +// GetMessageAccumulationTotal 获取消息积压总数 +func (slf *Client) GetMessageAccumulationTotal() int { + return slf.accumulation +} diff --git a/server/client/client_core.go b/server/client/client_core.go index 2a0444b..d710e30 100644 --- a/server/client/client_core.go +++ b/server/client/client_core.go @@ -14,4 +14,7 @@ type Core interface { // GetServerAddr 获取服务器地址 GetServerAddr() string + + // Clone 克隆客户端 + Clone() Core } diff --git a/server/client/tcp.go b/server/client/tcp.go index 9c4198a..c8c36dc 100644 --- a/server/client/tcp.go +++ b/server/client/tcp.go @@ -34,3 +34,9 @@ func (slf *TCP) Close() { func (slf *TCP) GetServerAddr() string { return slf.addr } + +func (slf *TCP) Clone() Core { + return &TCP{ + addr: slf.addr, + } +} diff --git a/server/client/uds.go b/server/client/uds.go index 727fed9..802a348 100644 --- a/server/client/uds.go +++ b/server/client/uds.go @@ -36,3 +36,9 @@ func (slf *UnixDomainSocket) Close() { func (slf *UnixDomainSocket) GetServerAddr() string { return slf.addr } + +func (slf *UnixDomainSocket) Clone() Core { + return &UnixDomainSocket{ + addr: slf.addr, + } +} diff --git a/server/client/websocket.go b/server/client/websocket.go index b5c7d2c..921c906 100644 --- a/server/client/websocket.go +++ b/server/client/websocket.go @@ -51,3 +51,9 @@ func (slf *Websocket) Close() { func (slf *Websocket) GetServerAddr() string { return slf.addr } + +func (slf *Websocket) Clone() Core { + return &Websocket{ + addr: slf.addr, + } +} diff --git a/server/conn.go b/server/conn.go index e9af1d2..de25fa8 100644 --- a/server/conn.go +++ b/server/conn.go @@ -11,7 +11,6 @@ import ( "runtime/debug" "strings" "sync" - "time" ) // newKcpConn 创建一个处理KCP的连接 @@ -19,6 +18,7 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ + cond: sync.NewCond(&sync.Mutex{}), server: server, remoteAddr: session.RemoteAddr(), ip: session.RemoteAddr().String(), @@ -41,6 +41,7 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ + cond: sync.NewCond(&sync.Mutex{}), server: server, remoteAddr: conn.RemoteAddr(), ip: conn.RemoteAddr().String(), @@ -63,6 +64,7 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ + cond: sync.NewCond(&sync.Mutex{}), server: server, remoteAddr: ws.RemoteAddr(), ip: ip, @@ -82,6 +84,7 @@ func newGatewayConn(conn *Conn, connId string) *Conn { c := &Conn{ //ctx: server.ctx, connection: &connection{ + cond: sync.NewCond(&sync.Mutex{}), server: conn.server, data: map[any]any{}, }, @@ -97,6 +100,7 @@ func NewEmptyConn(server *Server) *Conn { c := &Conn{ ctx: server.ctx, connection: &connection{ + cond: sync.NewCond(&sync.Mutex{}), server: server, remoteAddr: &net.TCPAddr{}, ip: "0.0.0.0:0", @@ -126,7 +130,7 @@ type connection struct { kcp *kcp.UDPSession gw func(packet []byte) data map[any]any - mutex sync.Mutex + cond *sync.Cond packetPool *concurrent.Pool[*connPacket] packets []*connPacket } @@ -140,11 +144,11 @@ func (slf *Conn) IsEmpty() bool { // - 重用连接时,会将当前连接的数据复制到新连接中 // - 通常在于连接断开后,重新连接时使用 func (slf *Conn) Reuse(conn *Conn) { - slf.mutex.Lock() - conn.mutex.Lock() + slf.cond.L.Lock() + conn.cond.L.Lock() defer func() { - slf.mutex.Unlock() - conn.mutex.Unlock() + slf.cond.L.Unlock() + conn.cond.L.Unlock() }() slf.Close() slf.remoteAddr = conn.remoteAddr @@ -253,9 +257,10 @@ func (slf *Conn) Write(packet []byte, callback ...func(err error)) { if len(callback) > 0 { cp.callback = callback[0] } - slf.mutex.Lock() + slf.cond.L.Lock() slf.packets = append(slf.packets, cp) - slf.mutex.Unlock() + slf.cond.Signal() + slf.cond.L.Unlock() } // writeLoop 写循环 @@ -278,18 +283,17 @@ func (slf *Conn) writeLoop(wait *sync.WaitGroup) { }() wait.Done() for { - slf.mutex.Lock() + slf.cond.L.Lock() if slf.packetPool == nil { + slf.cond.L.Unlock() return } if len(slf.packets) == 0 { - slf.mutex.Unlock() - time.Sleep(50 * time.Millisecond) - continue + slf.cond.Wait() } packets := slf.packets[0:] slf.packets = slf.packets[0:0] - slf.mutex.Unlock() + slf.cond.L.Unlock() for i := 0; i < len(packets); i++ { data := packets[i] var err error