From d27fa7c246319d2d4119c892b1808643db20a5e7 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Sat, 23 Dec 2023 19:05:39 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E6=9B=B4=E6=94=B9=20server=20=E5=92=8C?= =?UTF-8?q?=20conn=20=E7=9A=84=E6=B6=88=E6=81=AF=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E4=B8=BA=20channel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/conn.go | 4 ++-- server/constants.go | 2 ++ server/dispatcher.go | 29 +++++++++++++++++++++-------- server/options.go | 26 ++++++++++++++++++++++++++ server/server.go | 11 +++++++---- server/server_test.go | 28 ++++++---------------------- server/writeloop/README.md | 10 ++++++---- 7 files changed, 70 insertions(+), 40 deletions(-) diff --git a/server/conn.go b/server/conn.go index 2ee1c09..3e3c7a6 100644 --- a/server/conn.go +++ b/server/conn.go @@ -126,7 +126,7 @@ type connection struct { data map[any]any closed bool pool *concurrent.Pool[*connPacket] - loop *writeloop.WriteLoop[*connPacket] + loop writeloop.WriteLoop[*connPacket] mu sync.Mutex openTime time.Time delay time.Duration @@ -295,7 +295,7 @@ func (slf *Conn) init() { data.callback = nil }, ) - slf.loop = writeloop.NewWriteLoop[*connPacket](slf.pool, func(data *connPacket) error { + slf.loop = writeloop.NewChannel[*connPacket](slf.pool, slf.server.connWriteBufferSize, func(data *connPacket) error { if slf.server.runtime.packetWarnSize > 0 && len(data.packet) > slf.server.runtime.packetWarnSize { log.Warn("Conn.Write", log.String("State", "PacketWarn"), log.String("Reason", "PacketSize"), log.String("ID", slf.GetID()), log.Int("PacketSize", len(data.packet))) } diff --git a/server/constants.go b/server/constants.go index 99b7a91..e190685 100644 --- a/server/constants.go +++ b/server/constants.go @@ -14,4 +14,6 @@ const ( DefaultAsyncPoolSize = 256 DefaultWebsocketReadDeadline = 30 * time.Second DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB + DefaultDispatcherBufferSize = 1024 * 16 + DefaultConnWriteBufferSize = 1024 * 1 ) diff --git a/server/dispatcher.go b/server/dispatcher.go index ef9c2a0..569f542 100644 --- a/server/dispatcher.go +++ b/server/dispatcher.go @@ -2,16 +2,15 @@ package server import ( "github.com/alphadose/haxmap" - "github.com/kercylan98/minotaur/utils/buffer" ) var dispatcherUnique = struct{}{} // generateDispatcher 生成消息分发器 -func generateDispatcher(name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher { +func generateDispatcher(size int, name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher { return &dispatcher{ name: name, - buffer: buffer.NewUnbounded[*Message](), + buffer: make(chan *Message, size), handler: handler, uniques: haxmap.New[string, struct{}](), } @@ -20,7 +19,7 @@ func generateDispatcher(name string, handler func(dispatcher *dispatcher, messag // dispatcher 消息分发器 type dispatcher struct { name string - buffer *buffer.Unbounded[*Message] + buffer chan *Message uniques *haxmap.Map[string, struct{}] handler func(dispatcher *dispatcher, message *Message) } @@ -37,20 +36,34 @@ func (slf *dispatcher) antiUnique(name string) { func (slf *dispatcher) start() { for { select { - case message, ok := <-slf.buffer.Get(): + case message, ok := <-slf.buffer: if !ok { return } - slf.buffer.Load() slf.handler(slf, message) } } } func (slf *dispatcher) put(message *Message) { - slf.buffer.Put(message) + slf.buffer <- message } func (slf *dispatcher) close() { - slf.buffer.Close() + close(slf.buffer) +} + +func (slf *dispatcher) transfer(target *dispatcher) { + if target == nil { + return + } + for { + select { + case message, ok := <-slf.buffer: + if !ok { + return + } + target.buffer <- message + } + } } diff --git a/server/options.go b/server/options.go index 0793d12..d69f147 100644 --- a/server/options.go +++ b/server/options.go @@ -46,6 +46,32 @@ type runtime struct { messageStatisticsLimit int // 消息统计数量 messageStatistics []*atomic.Int64 // 消息统计数量 messageStatisticsLock *sync.RWMutex // 消息统计锁 + dispatcherBufferSize int // 消息分发器缓冲区大小 + connWriteBufferSize int // 连接写入缓冲区大小 +} + +// WithConnWriteBufferSize 通过连接写入缓冲区大小的方式创建服务器 +// - 默认值为 DefaultConnWriteBufferSize +// - 设置合适的缓冲区大小可以提高服务器性能,但是会占用更多的内存 +func WithConnWriteBufferSize(size int) Option { + return func(srv *Server) { + if size <= 0 { + return + } + srv.connWriteBufferSize = size + } +} + +// WithDispatcherBufferSize 通过消息分发器缓冲区大小的方式创建服务器 +// - 默认值为 DefaultDispatcherBufferSize +// - 设置合适的缓冲区大小可以提高服务器性能,但是会占用更多的内存 +func WithDispatcherBufferSize(size int) Option { + return func(srv *Server) { + if size <= 0 { + return + } + srv.dispatcherBufferSize = size + } } // WithMessageStatistics 通过消息统计的方式创建服务器 diff --git a/server/server.go b/server/server.go index 025885b..bd67c8d 100644 --- a/server/server.go +++ b/server/server.go @@ -33,7 +33,9 @@ import ( func New(network Network, options ...Option) *Server { server := &Server{ runtime: &runtime{ - packetWarnSize: DefaultPacketWarnSize, + packetWarnSize: DefaultPacketWarnSize, + dispatcherBufferSize: DefaultDispatcherBufferSize, + connWriteBufferSize: DefaultConnWriteBufferSize, }, option: &option{}, network: network, @@ -131,7 +133,7 @@ func (slf *Server) Run(addr string) error { slf.event.check() slf.addr = addr slf.startMessageStatistics() - slf.systemDispatcher = generateDispatcher(serverSystemDispatcher, slf.dispatchMessage) + slf.systemDispatcher = generateDispatcher(slf.dispatcherBufferSize, serverSystemDispatcher, slf.dispatchMessage) slf.messagePool = concurrent.NewPool[Message]( func() *Message { return &Message{} @@ -564,7 +566,7 @@ func (slf *Server) UseShunt(conn *Conn, name string) { defer slf.dispatcherLock.Unlock() d, exist := slf.dispatchers[name] if !exist { - d = generateDispatcher(name, slf.dispatchMessage) + d = generateDispatcher(slf.dispatcherBufferSize, name, slf.dispatchMessage) go d.start() slf.dispatchers[name] = d } @@ -577,8 +579,9 @@ func (slf *Server) UseShunt(conn *Conn, name string) { delete(slf.dispatcherMember[curr.name], conn.GetID()) if len(slf.dispatcherMember[curr.name]) == 0 { - curr.close() delete(slf.dispatchers, curr.name) + curr.transfer(d) + curr.close() } } slf.currDispatcher[conn.GetID()] = d diff --git a/server/server_test.go b/server/server_test.go index 271e693..1cc8aef 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -10,27 +10,10 @@ import ( ) func TestNew(t *testing.T) { - //limiter := rate.NewLimiter(rate.Every(time.Second), 100) - srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 200, 10, false), server.WithPProf()) - //srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool { - // t, c := srv.TimeoutContext(time.Second * 5) - // defer c() - // if err := limiter.Wait(t); err != nil { - // return false - // } - // return true - //}) + srv := server.New(server.NetworkWebsocket, server.WithPProf()) srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) { fmt.Println("关闭", conn.GetID(), err, "Count", srv.GetOnlineCount()) }) - srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) { - srv.UseShunt(conn, "1") - srv.UseShunt(conn, "2") - srv.UseShunt(conn, "3") - //if srv.GetOnlineCount() > 1 { - // conn.Close() - //} - }) srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { conn.Write(packet) @@ -43,11 +26,13 @@ func TestNew(t *testing.T) { func TestNewClient(t *testing.T) { count := 500 for i := 0; i < count; i++ { - id := i fmt.Println("启动", i+1) - cli := client.NewWebsocket("ws://172.28.102.242:9999") + cli := client.NewWebsocket("ws://172.29.5.138:9999") cli.RegConnectionReceivePacketEvent(func(conn *client.Client, wst int, packet []byte) { - fmt.Println("收到", id+1, string(packet)) + fmt.Println(time.Now().Unix(), "收到", string(packet)) + }) + cli.RegConnectionClosedEvent(func(conn *client.Client, err any) { + fmt.Println("关闭", err) }) cli.RegConnectionOpenedEvent(func(conn *client.Client) { go func() { @@ -55,7 +40,6 @@ func TestNewClient(t *testing.T) { time.Sleep(time.Second) } for { - time.Sleep(time.Second) for i := 0; i < 10; i++ { cli.WriteWS(2, []byte("hello")) } diff --git a/server/writeloop/README.md b/server/writeloop/README.md index 50f6c6f..9364877 100644 --- a/server/writeloop/README.md +++ b/server/writeloop/README.md @@ -4,11 +4,13 @@ 该包提供了一个并发安全的写循环实现。开发者可以使用它来快速构建和管理写入操作。 -## WriteLoop [`写循环`](https://pkg.go.dev/github.com/kercylan98/minotaur/server/writeloop#WriteLoop) - 写循环是一种特殊的循环,它可以并发安全地将数据写入到底层连接。写循环在 `Minotaur` 中是一个泛型类型,可以处理任意类型的消息。 -> [`WriteLoop`](https://pkg.go.dev/github.com/kercylan98/minotaur/server/writeloop#WriteLoop) 使用了 [`Pool`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/concurrent#Pool) 和 [`Unbounded`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/buffer#Unbounded) 进行实现。 +## Unbounded [`写循环`](https://pkg.go.dev/github.com/kercylan98/minotaur/server/writeloop#WriteLoop) + +一个基于无界缓冲区的写循环实现,它可以处理任意数量的消息。它使用 [`Pool`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/concurrent#Pool) 来管理消息对象,使用 [`Unbounded`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/buffer#Unbounded) 来管理消息队列。 + +> [`Unbounded`](https://pkg.go.dev/github.com/kercylan98/minotaur/server/writeloop#Unbounded) 使用了 [`Pool`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/concurrent#Pool) 和 [`Unbounded`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/buffer#Unbounded) 进行实现。 > 通过 [`Pool`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/concurrent#Pool) 创建的消息对象无需手动释放,它会在写循环处理完消息后自动回收。 ### 使用示例 @@ -30,7 +32,7 @@ func main() { }) var wait sync.WaitGroup wait.Add(10) - wl := writeloop.NewWriteLoop(pool, func(message *Message) error { + wl := writeloop.NewUnbounded(pool, func(message *Message) error { fmt.Println(message.ID) wait.Done() return nil