perf: server.Conn 写循环更改为采用无界缓冲区的写入,优化整体逻辑

This commit is contained in:
kercylan98 2023-09-19 16:49:28 +08:00
parent 31c0e1b735
commit 551a3e5c51
1 changed files with 58 additions and 83 deletions

View File

@ -2,11 +2,16 @@ package server
import (
"context"
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/server/writeloop"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log"
"github.com/panjf2000/gnet"
"github.com/xtaci/kcp-go/v5"
"net"
"runtime/debug"
"strings"
"sync"
)
@ -16,7 +21,6 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: server,
remoteAddr: session.RemoteAddr(),
ip: session.RemoteAddr().String(),
@ -27,10 +31,7 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn {
if index := strings.LastIndex(c.ip, ":"); index != -1 {
c.ip = c.ip[0:index]
}
var wait = new(sync.WaitGroup)
wait.Add(1)
go c.writeLoop(wait)
wait.Wait()
c.writeLoop()
return c
}
@ -39,7 +40,6 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: server,
remoteAddr: conn.RemoteAddr(),
ip: conn.RemoteAddr().String(),
@ -50,10 +50,7 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn {
if index := strings.LastIndex(c.ip, ":"); index != -1 {
c.ip = c.ip[0:index]
}
var wait = new(sync.WaitGroup)
wait.Add(1)
go c.writeLoop(wait)
wait.Wait()
c.writeLoop()
return c
}
@ -62,7 +59,6 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: server,
remoteAddr: ws.RemoteAddr(),
ip: ip,
@ -70,10 +66,7 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn {
data: map[any]any{},
},
}
var wait = new(sync.WaitGroup)
wait.Add(1)
go c.writeLoop(wait)
wait.Wait()
c.writeLoop()
return c
}
@ -82,7 +75,6 @@ func newGatewayConn(conn *Conn, connId string) *Conn {
c := &Conn{
//ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: conn.server,
data: map[any]any{},
},
@ -98,17 +90,13 @@ func NewEmptyConn(server *Server) *Conn {
c := &Conn{
ctx: server.ctx,
connection: &connection{
packets: make(chan *connPacket, DefaultConnectionChannelSize),
server: server,
remoteAddr: &net.TCPAddr{},
ip: "0.0.0.0:0",
data: map[any]any{},
},
}
var wait = new(sync.WaitGroup)
wait.Add(1)
go c.writeLoop(wait)
wait.Wait()
c.writeLoop()
return c
}
@ -121,9 +109,6 @@ type Conn struct {
// connection 长久保持的连接
type connection struct {
server *Server
close sync.Once
closed bool
closeL sync.Mutex
remoteAddr net.Addr
ip string
ws *websocket.Conn
@ -131,8 +116,10 @@ type connection struct {
kcp *kcp.UDPSession
gw func(packet []byte)
data map[any]any
packetPool *concurrent.Pool[*connPacket]
packets chan *connPacket
closed bool
pool *concurrent.Pool[*connPacket]
loop *writeloop.WriteLoop[*connPacket]
mu sync.Mutex
}
// IsEmpty 是否是空连接
@ -158,6 +145,8 @@ func (slf *Conn) GetIP() string {
// IsClosed 是否已经关闭
func (slf *Conn) IsClosed() bool {
slf.mu.Lock()
defer slf.mu.Unlock()
return slf.closed
}
@ -216,23 +205,23 @@ func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
return
}
packet = slf.server.OnConnectionWritePacketBeforeEvent(slf, packet)
slf.closeL.Lock()
defer slf.closeL.Unlock()
if slf.packetPool == nil || slf.packets == nil {
slf.mu.Lock()
defer slf.mu.Unlock()
if slf.closed {
return
}
cp := slf.packetPool.Get()
cp := slf.pool.Get()
cp.wst = slf.GetWST()
cp.packet = packet
if len(callback) > 0 {
cp.callback = callback[0]
}
slf.packets <- cp
slf.loop.Put(cp)
}
// writeLoop 写循环
func (slf *Conn) writeLoop(wait *sync.WaitGroup) {
slf.packetPool = concurrent.NewPool[*connPacket](10*1024,
func (slf *Conn) writeLoop() {
slf.pool = concurrent.NewPool[*connPacket](10*1024,
func() *connPacket {
return &connPacket{}
}, func(data *connPacket) {
@ -241,21 +230,7 @@ func (slf *Conn) writeLoop(wait *sync.WaitGroup) {
data.callback = nil
},
)
defer func() {
if err := recover(); err != nil {
slf.Close()
slf.packets = nil
}
}()
wait.Done()
for {
packet, ok := <-slf.packets
if !ok {
slf.packets = nil
break
}
data := packet
slf.loop = writeloop.NewWriteLoop[*connPacket](slf.pool, func(data *connPacket) error {
var err error
if slf.IsWebsocket() {
err = slf.ws.WriteMessage(data.wst, data.packet)
@ -271,24 +246,29 @@ func (slf *Conn) writeLoop(wait *sync.WaitGroup) {
_, err = slf.kcp.Write(data.packet)
}
}
callback := data.callback
slf.closeL.Lock()
slf.packetPool.Release(data)
slf.closeL.Unlock()
if callback != nil {
callback(err)
}
if err != nil {
panic(err)
}
if data.callback != nil {
data.callback(err)
}
return err
}, func(err any) {
slf.Close(errors.New(fmt.Sprint(err)))
})
}
// Close 关闭连接
func (slf *Conn) Close(err ...error) {
slf.close.Do(func() {
slf.closeL.Lock()
defer slf.closeL.Unlock()
slf.mu.Lock()
if slf.closed {
slf.mu.Unlock()
return
}
defer func() {
if err := recover(); err != nil {
log.Error("Conn.Close", log.String("State", "Panic"), log.Any("Error", err))
debug.PrintStack()
slf.mu.Unlock()
}
}()
slf.closed = true
if slf.ws != nil {
_ = slf.ws.Close()
@ -297,17 +277,12 @@ func (slf *Conn) Close(err ...error) {
} else if slf.kcp != nil {
_ = slf.kcp.Close()
}
if slf.packetPool != nil {
slf.packetPool.Close()
}
slf.packetPool = nil
if slf.packets != nil {
close(slf.packets)
}
slf.pool.Close()
slf.loop.Close()
slf.mu.Unlock()
if len(err) > 0 {
slf.server.OnConnectionClosedEvent(slf, err[0])
return
}
slf.server.OnConnectionClosedEvent(slf, nil)
})
}