perf: server 包连接关闭逻辑优化
This commit is contained in:
parent
376ff779e1
commit
483ace2fa9
|
@ -127,6 +127,8 @@ type Conn struct {
|
||||||
type connection struct {
|
type connection struct {
|
||||||
server *Server
|
server *Server
|
||||||
mutex *sync.Mutex
|
mutex *sync.Mutex
|
||||||
|
close sync.Once
|
||||||
|
closed bool
|
||||||
remoteAddr net.Addr
|
remoteAddr net.Addr
|
||||||
ip string
|
ip string
|
||||||
ws *websocket.Conn
|
ws *websocket.Conn
|
||||||
|
@ -159,23 +161,36 @@ func (slf *Conn) GetIP() string {
|
||||||
return slf.ip
|
return slf.ip
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsClosed 是否已经关闭
|
||||||
|
func (slf *Conn) IsClosed() bool {
|
||||||
|
return slf.closed
|
||||||
|
}
|
||||||
|
|
||||||
// Close 关闭连接
|
// Close 关闭连接
|
||||||
func (slf *Conn) Close() {
|
func (slf *Conn) Close(err ...error) {
|
||||||
if slf.ws != nil {
|
slf.close.Do(func() {
|
||||||
_ = slf.ws.Close()
|
slf.closed = true
|
||||||
} else if slf.gn != nil {
|
if slf.ws != nil {
|
||||||
_ = slf.gn.Close()
|
_ = slf.ws.Close()
|
||||||
} else if slf.kcp != nil {
|
} else if slf.gn != nil {
|
||||||
_ = slf.kcp.Close()
|
_ = slf.gn.Close()
|
||||||
}
|
} else if slf.kcp != nil {
|
||||||
if slf.packetPool != nil {
|
_ = slf.kcp.Close()
|
||||||
slf.packetPool.Close()
|
}
|
||||||
}
|
if slf.packetPool != nil {
|
||||||
slf.packetPool = nil
|
slf.packetPool.Close()
|
||||||
if slf.packets != nil {
|
}
|
||||||
close(slf.packets)
|
slf.packetPool = nil
|
||||||
slf.packets = nil
|
if slf.packets != nil {
|
||||||
}
|
close(slf.packets)
|
||||||
|
slf.packets = nil
|
||||||
|
}
|
||||||
|
if len(err) > 0 {
|
||||||
|
slf.server.OnConnectionClosedEvent(slf, err[0])
|
||||||
|
return
|
||||||
|
}
|
||||||
|
slf.server.OnConnectionClosedEvent(slf, nil)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetData 设置连接数据,该数据将在连接关闭前始终存在
|
// SetData 设置连接数据,该数据将在连接关闭前始终存在
|
||||||
|
@ -261,9 +276,6 @@ func (slf *Conn) writeLoop(wait *sync.WaitGroup) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
slf.Close()
|
slf.Close()
|
||||||
// TODO: 以下代码是否需要?
|
|
||||||
// log.Error("WriteLoop", log.Any("Error", err))
|
|
||||||
// debug.PrintStack()
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
wait.Done()
|
wait.Done()
|
||||||
|
|
|
@ -184,12 +184,11 @@ func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle, p
|
||||||
|
|
||||||
func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
|
func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
|
||||||
PushSystemMessage(slf.Server, func() {
|
PushSystemMessage(slf.Server, func() {
|
||||||
|
slf.Server.online.Delete(conn.GetID())
|
||||||
slf.connectionClosedEventHandles.RangeValue(func(index int, value ConnectionClosedEventHandle) bool {
|
slf.connectionClosedEventHandles.RangeValue(func(index int, value ConnectionClosedEventHandle) bool {
|
||||||
value(slf.Server, conn, err)
|
value(slf.Server, conn, err)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
conn.Close()
|
|
||||||
slf.Server.online.Delete(conn.GetID())
|
|
||||||
}, "ConnectionClosedEvent")
|
}, "ConnectionClosedEvent")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,8 @@ func (slf *gNet) OnOpened(c gnet.Conn) (out []byte, action gnet.Action) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *gNet) OnClosed(c gnet.Conn, err error) (action gnet.Action) {
|
func (slf *gNet) OnClosed(c gnet.Conn, err error) (action gnet.Action) {
|
||||||
slf.OnConnectionClosedEvent(c.Context().(*Conn), err)
|
conn := c.Context().(*Conn)
|
||||||
|
conn.Close(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -205,14 +205,21 @@ func (slf *Server) Run(addr string) error {
|
||||||
go func(conn *Conn) {
|
go func(conn *Conn) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
slf.OnConnectionClosedEvent(conn, err)
|
e, ok := err.(error)
|
||||||
|
if !ok {
|
||||||
|
e = fmt.Errorf("%v", err)
|
||||||
|
}
|
||||||
|
conn.Close(e)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
buf := make([]byte, 4096)
|
buf := make([]byte, 4096)
|
||||||
for {
|
for !conn.IsClosed() {
|
||||||
n, err := conn.kcp.Read(buf)
|
n, err := conn.kcp.Read(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if conn.IsClosed() {
|
||||||
|
break
|
||||||
|
}
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
PushPacketMessage(slf, conn, 0, buf[:n])
|
PushPacketMessage(slf, conn, 0, buf[:n])
|
||||||
|
@ -292,15 +299,22 @@ func (slf *Server) Run(addr string) error {
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
slf.OnConnectionClosedEvent(conn, err)
|
e, ok := err.(error)
|
||||||
|
if !ok {
|
||||||
|
e = fmt.Errorf("%v", err)
|
||||||
|
}
|
||||||
|
conn.Close(e)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
for {
|
for !conn.IsClosed() {
|
||||||
if err := ws.SetReadDeadline(super.If(slf.websocketReadDeadline <= 0, times.Zero, time.Now().Add(slf.websocketReadDeadline))); err != nil {
|
if err := ws.SetReadDeadline(super.If(slf.websocketReadDeadline <= 0, times.Zero, time.Now().Add(slf.websocketReadDeadline))); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
messageType, packet, readErr := ws.ReadMessage()
|
messageType, packet, readErr := ws.ReadMessage()
|
||||||
if readErr != nil {
|
if readErr != nil {
|
||||||
|
if conn.IsClosed() {
|
||||||
|
break
|
||||||
|
}
|
||||||
panic(readErr)
|
panic(readErr)
|
||||||
}
|
}
|
||||||
if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] {
|
if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] {
|
||||||
|
|
|
@ -21,14 +21,15 @@ func TestNew(t *testing.T) {
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
var current *server.Conn
|
srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) {
|
||||||
|
fmt.Println("关闭", conn.GetID(), err, "Count", srv.GetOnlineCount())
|
||||||
|
})
|
||||||
srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) {
|
srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) {
|
||||||
if current != nil {
|
if srv.GetOnlineCount() > 1 {
|
||||||
current.Reuse(conn)
|
conn.Close()
|
||||||
} else {
|
|
||||||
current = conn
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
||||||
conn.Write(packet)
|
conn.Write(packet)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue