fix: client 包内存溢出、死循环等问题处理

This commit is contained in:
kercylan98 2023-08-24 10:54:25 +08:00
parent 75a8608bf4
commit 08559d8225
4 changed files with 30 additions and 16 deletions

View File

@ -27,10 +27,6 @@ type Client struct {
} }
func (slf *Client) Run() error { func (slf *Client) Run() error {
var wait = new(sync.WaitGroup)
wait.Add(1)
go slf.writeLoop(wait)
wait.Wait()
var runState = make(chan error) var runState = make(chan error)
go func() { go func() {
defer func() { defer func() {
@ -42,9 +38,20 @@ func (slf *Client) Run() error {
}() }()
err := <-runState err := <-runState
if err != nil { if err != nil {
slf.Close() slf.mutex.Lock()
if slf.packetPool != nil {
slf.packetPool.Close()
slf.packetPool = nil
}
slf.accumulate = append(slf.accumulate, slf.packets...)
slf.packets = nil
slf.mutex.Unlock()
return err return err
} }
var wait = new(sync.WaitGroup)
wait.Add(1)
go slf.writeLoop(wait)
wait.Wait()
slf.OnConnectionOpenedEvent(slf) slf.OnConnectionOpenedEvent(slf)
return nil return nil
} }
@ -83,16 +90,23 @@ func (slf *Client) Write(packet []byte, callback ...func(err error)) {
// write 向连接中写入数据 // write 向连接中写入数据
// - messageType: websocket模式中指定消息类型 // - messageType: websocket模式中指定消息类型
func (slf *Client) write(wst int, packet []byte, callback ...func(err error)) { func (slf *Client) write(wst int, packet []byte, callback ...func(err error)) {
if slf.packetPool == nil {
var p = &Packet{
wst: wst,
data: packet,
}
if len(callback) > 0 {
p.callback = callback[0]
}
slf.accumulate = append(slf.accumulate, p)
return
}
cp := slf.packetPool.Get() cp := slf.packetPool.Get()
cp.wst = wst cp.wst = wst
cp.data = packet cp.data = packet
if len(callback) > 0 { if len(callback) > 0 {
cp.callback = callback[0] cp.callback = callback[0]
} }
if slf.packetPool == nil {
slf.accumulate = append(slf.accumulate, cp)
return
}
slf.mutex.Lock() slf.mutex.Lock()
slf.packets = append(slf.packets, cp) slf.packets = append(slf.packets, cp)
slf.mutex.Unlock() slf.mutex.Unlock()

View File

@ -26,6 +26,7 @@ func (slf *Websocket) Run(runState chan<- error, receive func(wst int, packet []
return return
} }
slf.conn = ws slf.conn = ws
slf.clsoed = false
runState <- nil runState <- nil
for !slf.clsoed { for !slf.clsoed {
messageType, packet, readErr := ws.ReadMessage() messageType, packet, readErr := ws.ReadMessage()

View File

@ -70,7 +70,7 @@ func (slf *Endpoint) Connect() {
slf.state = slf.evaluator(float64(time.Now().UnixNano() - cur)) slf.state = slf.evaluator(float64(time.Now().UnixNano() - cur))
break break
} }
time.Sleep(100 * time.Millisecond) time.Sleep(1000 * time.Millisecond)
} }
} }
@ -98,5 +98,9 @@ func (slf *Endpoint) onConnectionReceivePacket(conn *client.Client, wst int, pac
panic(err) panic(err)
} }
slf.state = slf.evaluator(float64(time.Now().UnixNano() - sendTime)) slf.state = slf.evaluator(float64(time.Now().UnixNano() - sendTime))
slf.GetLink(addr).SetWST(wst).Write(packet) cli := slf.GetLink(addr)
if cli == nil {
return
}
cli.SetWST(wst).Write(packet)
} }

View File

@ -1,7 +1,6 @@
package gateway_test package gateway_test
import ( import (
"fmt"
"github.com/kercylan98/minotaur/server" "github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/server/client" "github.com/kercylan98/minotaur/server/client"
"github.com/kercylan98/minotaur/server/gateway" "github.com/kercylan98/minotaur/server/gateway"
@ -11,9 +10,6 @@ import (
func TestGateway_RunEndpointServer(t *testing.T) { func TestGateway_RunEndpointServer(t *testing.T) {
srv := server.New(server.NetworkWebsocket, server.WithDeadlockDetect(time.Second*3)) srv := server.New(server.NetworkWebsocket, server.WithDeadlockDetect(time.Second*3))
srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) {
fmt.Println(err)
})
srv.RegConnectionPacketPreprocessEvent(func(srv *server.Server, conn *server.Conn, packet []byte, abort func(), usePacket func(newPacket []byte)) { srv.RegConnectionPacketPreprocessEvent(func(srv *server.Server, conn *server.Conn, packet []byte, abort func(), usePacket func(newPacket []byte)) {
addr, packet, err := gateway.UnmarshalGatewayOutPacket(packet) addr, packet, err := gateway.UnmarshalGatewayOutPacket(packet)
if err != nil { if err != nil {
@ -35,7 +31,6 @@ func TestGateway_RunEndpointServer(t *testing.T) {
return packet return packet
}) })
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
fmt.Println("endpoint receive packet", string(packet))
conn.Write(packet) conn.Write(packet)
}) })
if err := srv.Run(":8889"); err != nil { if err := srv.Run(":8889"); err != nil {