refactor: server.Server 兼容新的 concurrent.Pool 和 buffer.Unbounded

This commit is contained in:
kercylan98 2023-12-23 18:15:34 +08:00
parent 8f4e65219e
commit eb28d42bf1
8 changed files with 148 additions and 187 deletions

3
go.mod
View File

@ -16,6 +16,7 @@ require (
github.com/smartystreets/goconvey v1.8.1
github.com/sony/sonyflake v1.2.0
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.3
github.com/tealeg/xlsx v1.0.5
github.com/tidwall/gjson v1.16.0
github.com/xtaci/kcp-go/v5 v5.6.3
@ -29,6 +30,7 @@ require (
require (
github.com/bytedance/sonic v1.9.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
@ -48,6 +50,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/smarty/assertions v1.15.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/templexxx/cpu v0.1.0 // indirect

View File

@ -62,7 +62,7 @@ func (slf *Client) Run(block ...bool) error {
return err
}
slf.closed = false
slf.pool = concurrent.NewPool[*Packet](10*1024, func() *Packet {
slf.pool = concurrent.NewPool[Packet](func() *Packet {
return new(Packet)
}, func(data *Packet) {
data.wst = 0
@ -100,7 +100,6 @@ func (slf *Client) Close(err ...error) {
slf.closed = true
slf.core.Close()
slf.loop.Close()
slf.pool.Close()
slf.mutex.Unlock()
if len(err) > 0 {
slf.OnConnectionClosedEvent(slf, err[0])

View File

@ -286,7 +286,7 @@ func (slf *Conn) init() {
}))
}
}
slf.pool = concurrent.NewPool[*connPacket](10*1024,
slf.pool = concurrent.NewPool[connPacket](
func() *connPacket {
return &connPacket{}
}, func(data *connPacket) {
@ -360,7 +360,6 @@ func (slf *Conn) Close(err ...error) {
slf.ticker.Release()
}
slf.server.releaseDispatcher(slf)
slf.pool.Close()
slf.loop.Close()
slf.mu.Unlock()
if len(err) > 0 {

View File

@ -11,7 +11,6 @@ const (
)
const (
DefaultMessageBufferSize = 1024
DefaultAsyncPoolSize = 256
DefaultWebsocketReadDeadline = 30 * time.Second
DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB

View File

@ -11,7 +11,7 @@ var dispatcherUnique = struct{}{}
func generateDispatcher(name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
return &dispatcher{
name: name,
buffer: buffer.NewUnboundedN[*Message](),
buffer: buffer.NewUnbounded[*Message](),
handler: handler,
uniques: haxmap.New[string, struct{}](),
}

View File

@ -33,7 +33,6 @@ type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小
tickerPool *timer.Pool // 定时器池
ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行
@ -211,18 +210,6 @@ func WithWebsocketMessageType(messageTypes ...int) Option {
}
}
// WithMessageBufferSize 通过特定的消息缓冲池大小运行服务器
// - 默认大小为 DefaultMessageBufferSize
// - 消息数量超出这个值的时候,消息处理将会造成更大的开销(频繁创建新的结构体),同时服务器将输出警告内容
func WithMessageBufferSize(size int) Option {
return func(srv *Server) {
if size <= 0 {
size = 1024
}
srv.messagePoolSize = size
}
}
// WithPProf 通过性能分析工具PProf创建服务器
func WithPProf(pattern ...string) Option {
return func(srv *Server) {

View File

@ -33,7 +33,6 @@ import (
func New(network Network, options ...Option) *Server {
server := &Server{
runtime: &runtime{
messagePoolSize: DefaultMessageBufferSize,
packetWarnSize: DefaultPacketWarnSize,
},
option: &option{},
@ -101,7 +100,6 @@ type Server struct {
systemSignal chan os.Signal // 系统信号
closeChannel chan struct{} // 关闭信号
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
messageLock sync.RWMutex // 消息锁
dispatcherLock sync.RWMutex // 消息分发器锁
isShutdown atomic.Bool // 是否已关闭
messageCounter atomic.Int64 // 消息计数器
@ -134,11 +132,7 @@ func (slf *Server) Run(addr string) error {
slf.addr = addr
slf.startMessageStatistics()
slf.systemDispatcher = generateDispatcher(serverSystemDispatcher, slf.dispatchMessage)
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
var messageInitFinish = make(chan struct{}, 1)
var connectionInitHandle = func(callback func()) {
slf.messageLock.Lock()
slf.messagePool = concurrent.NewPool[*Message](slf.messagePoolSize,
slf.messagePool = concurrent.NewPool[Message](
func() *Message {
return &Message{}
},
@ -146,31 +140,21 @@ func (slf *Server) Run(addr string) error {
data.reset()
},
)
slf.messageLock.Unlock()
if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC {
slf.gServer = &gNet{Server: slf}
}
if callback != nil {
go callback()
}
go func() {
messageInitFinish <- struct{}{}
slf.systemDispatcher.start()
}()
}
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
go slf.systemDispatcher.start()
switch slf.network {
case NetworkNone:
go connectionInitHandle(func() {
slf.isRunning = true
slf.OnStartBeforeEvent()
})
case NetworkGRPC:
listener, err := net.Listen(string(NetworkTcp), slf.addr)
if err != nil {
return err
}
go connectionInitHandle(nil)
go func() {
slf.isRunning = true
slf.OnStartBeforeEvent()
@ -180,7 +164,6 @@ func (slf *Server) Run(addr string) error {
}
}()
case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix:
go connectionInitHandle(func() {
slf.isRunning = true
slf.OnStartBeforeEvent()
if err := gnet.Serve(slf.gServer, protoAddr,
@ -191,13 +174,11 @@ func (slf *Server) Run(addr string) error {
slf.isRunning = false
slf.PushErrorMessage(err, MessageErrorActionShutdown)
}
})
case NetworkKcp:
listener, err := kcp.ListenWithOptions(slf.addr, nil, 0, 0)
if err != nil {
return err
}
go connectionInitHandle(func() {
slf.isRunning = true
slf.OnStartBeforeEvent()
for {
@ -233,7 +214,6 @@ func (slf *Server) Run(addr string) error {
}
}(conn)
}
})
case NetworkHttp:
go func() {
slf.isRunning = true
@ -248,7 +228,6 @@ func (slf *Server) Run(addr string) error {
log.String("ip", c.ClientIP()), log.String("path", c.Request.URL.Path),
log.Duration("cost", time.Since(t)))
})
go connectionInitHandle(nil)
if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil {
slf.isRunning = false
@ -263,7 +242,6 @@ func (slf *Server) Run(addr string) error {
}()
case NetworkWebsocket:
go connectionInitHandle(func() {
var pattern string
var index = strings.Index(addr, "/")
if index == -1 {
@ -350,7 +328,6 @@ func (slf *Server) Run(addr string) error {
}
}()
})
default:
return ErrCanNotSupportNetwork
}
@ -359,9 +336,6 @@ func (slf *Server) Run(addr string) error {
kcp.SystemTimedSched.Close()
}
<-messageInitFinish
close(messageInitFinish)
messageInitFinish = nil
if slf.multiple == nil {
ip, _ := network.IP()
log.Info("Server", log.String(serverMark, "===================================================================="))
@ -652,7 +626,7 @@ func (slf *Server) releaseDispatcher(conn *Conn) {
// pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求
func (slf *Server) pushMessage(message *Message) {
if slf.messagePool.IsClose() || !slf.OnMessageExecBeforeEvent(message) {
if !slf.OnMessageExecBeforeEvent(message) {
slf.messagePool.Release(message)
return
}
@ -863,7 +837,7 @@ func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback
func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToPacketMessage(
&Conn{wst: wst, connection: conn.connection},
packet,
packet, mark...,
))
}

View File

@ -11,7 +11,7 @@ import (
func TestNew(t *testing.T) {
//limiter := rate.NewLimiter(rate.Every(time.Second), 100)
srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 200, 10, false), server.WithMessageBufferSize(1024*1024), server.WithPProf())
srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 200, 10, false), server.WithPProf())
//srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool {
// t, c := srv.TimeoutContext(time.Second * 5)
// defer c()