perf: 更改 server 和 conn 的消息实现为 channel

This commit is contained in:
kercylan98 2023-12-23 19:05:39 +08:00
parent 4b85ceaf13
commit d27fa7c246
7 changed files with 70 additions and 40 deletions

View File

@ -126,7 +126,7 @@ type connection struct {
data map[any]any
closed bool
pool *concurrent.Pool[*connPacket]
loop *writeloop.WriteLoop[*connPacket]
loop writeloop.WriteLoop[*connPacket]
mu sync.Mutex
openTime time.Time
delay time.Duration
@ -295,7 +295,7 @@ func (slf *Conn) init() {
data.callback = nil
},
)
slf.loop = writeloop.NewWriteLoop[*connPacket](slf.pool, func(data *connPacket) error {
slf.loop = writeloop.NewChannel[*connPacket](slf.pool, slf.server.connWriteBufferSize, func(data *connPacket) error {
if slf.server.runtime.packetWarnSize > 0 && len(data.packet) > slf.server.runtime.packetWarnSize {
log.Warn("Conn.Write", log.String("State", "PacketWarn"), log.String("Reason", "PacketSize"), log.String("ID", slf.GetID()), log.Int("PacketSize", len(data.packet)))
}

View File

@ -14,4 +14,6 @@ const (
DefaultAsyncPoolSize = 256
DefaultWebsocketReadDeadline = 30 * time.Second
DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB
DefaultDispatcherBufferSize = 1024 * 16
DefaultConnWriteBufferSize = 1024 * 1
)

View File

@ -2,16 +2,15 @@ package server
import (
"github.com/alphadose/haxmap"
"github.com/kercylan98/minotaur/utils/buffer"
)
var dispatcherUnique = struct{}{}
// generateDispatcher 生成消息分发器
func generateDispatcher(name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
func generateDispatcher(size int, name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
return &dispatcher{
name: name,
buffer: buffer.NewUnbounded[*Message](),
buffer: make(chan *Message, size),
handler: handler,
uniques: haxmap.New[string, struct{}](),
}
@ -20,7 +19,7 @@ func generateDispatcher(name string, handler func(dispatcher *dispatcher, messag
// dispatcher 消息分发器
type dispatcher struct {
name string
buffer *buffer.Unbounded[*Message]
buffer chan *Message
uniques *haxmap.Map[string, struct{}]
handler func(dispatcher *dispatcher, message *Message)
}
@ -37,20 +36,34 @@ func (slf *dispatcher) antiUnique(name string) {
func (slf *dispatcher) start() {
for {
select {
case message, ok := <-slf.buffer.Get():
case message, ok := <-slf.buffer:
if !ok {
return
}
slf.buffer.Load()
slf.handler(slf, message)
}
}
}
func (slf *dispatcher) put(message *Message) {
slf.buffer.Put(message)
slf.buffer <- message
}
func (slf *dispatcher) close() {
slf.buffer.Close()
close(slf.buffer)
}
func (slf *dispatcher) transfer(target *dispatcher) {
if target == nil {
return
}
for {
select {
case message, ok := <-slf.buffer:
if !ok {
return
}
target.buffer <- message
}
}
}

View File

@ -46,6 +46,32 @@ type runtime struct {
messageStatisticsLimit int // 消息统计数量
messageStatistics []*atomic.Int64 // 消息统计数量
messageStatisticsLock *sync.RWMutex // 消息统计锁
dispatcherBufferSize int // 消息分发器缓冲区大小
connWriteBufferSize int // 连接写入缓冲区大小
}
// WithConnWriteBufferSize 通过连接写入缓冲区大小的方式创建服务器
// - 默认值为 DefaultConnWriteBufferSize
// - 设置合适的缓冲区大小可以提高服务器性能,但是会占用更多的内存
func WithConnWriteBufferSize(size int) Option {
return func(srv *Server) {
if size <= 0 {
return
}
srv.connWriteBufferSize = size
}
}
// WithDispatcherBufferSize 通过消息分发器缓冲区大小的方式创建服务器
// - 默认值为 DefaultDispatcherBufferSize
// - 设置合适的缓冲区大小可以提高服务器性能,但是会占用更多的内存
func WithDispatcherBufferSize(size int) Option {
return func(srv *Server) {
if size <= 0 {
return
}
srv.dispatcherBufferSize = size
}
}
// WithMessageStatistics 通过消息统计的方式创建服务器

View File

@ -33,7 +33,9 @@ import (
func New(network Network, options ...Option) *Server {
server := &Server{
runtime: &runtime{
packetWarnSize: DefaultPacketWarnSize,
packetWarnSize: DefaultPacketWarnSize,
dispatcherBufferSize: DefaultDispatcherBufferSize,
connWriteBufferSize: DefaultConnWriteBufferSize,
},
option: &option{},
network: network,
@ -131,7 +133,7 @@ func (slf *Server) Run(addr string) error {
slf.event.check()
slf.addr = addr
slf.startMessageStatistics()
slf.systemDispatcher = generateDispatcher(serverSystemDispatcher, slf.dispatchMessage)
slf.systemDispatcher = generateDispatcher(slf.dispatcherBufferSize, serverSystemDispatcher, slf.dispatchMessage)
slf.messagePool = concurrent.NewPool[Message](
func() *Message {
return &Message{}
@ -564,7 +566,7 @@ func (slf *Server) UseShunt(conn *Conn, name string) {
defer slf.dispatcherLock.Unlock()
d, exist := slf.dispatchers[name]
if !exist {
d = generateDispatcher(name, slf.dispatchMessage)
d = generateDispatcher(slf.dispatcherBufferSize, name, slf.dispatchMessage)
go d.start()
slf.dispatchers[name] = d
}
@ -577,8 +579,9 @@ func (slf *Server) UseShunt(conn *Conn, name string) {
delete(slf.dispatcherMember[curr.name], conn.GetID())
if len(slf.dispatcherMember[curr.name]) == 0 {
curr.close()
delete(slf.dispatchers, curr.name)
curr.transfer(d)
curr.close()
}
}
slf.currDispatcher[conn.GetID()] = d

View File

@ -10,27 +10,10 @@ import (
)
func TestNew(t *testing.T) {
//limiter := rate.NewLimiter(rate.Every(time.Second), 100)
srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 200, 10, false), server.WithPProf())
//srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool {
// t, c := srv.TimeoutContext(time.Second * 5)
// defer c()
// if err := limiter.Wait(t); err != nil {
// return false
// }
// return true
//})
srv := server.New(server.NetworkWebsocket, server.WithPProf())
srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) {
fmt.Println("关闭", conn.GetID(), err, "Count", srv.GetOnlineCount())
})
srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) {
srv.UseShunt(conn, "1")
srv.UseShunt(conn, "2")
srv.UseShunt(conn, "3")
//if srv.GetOnlineCount() > 1 {
// conn.Close()
//}
})
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
conn.Write(packet)
@ -43,11 +26,13 @@ func TestNew(t *testing.T) {
func TestNewClient(t *testing.T) {
count := 500
for i := 0; i < count; i++ {
id := i
fmt.Println("启动", i+1)
cli := client.NewWebsocket("ws://172.28.102.242:9999")
cli := client.NewWebsocket("ws://172.29.5.138:9999")
cli.RegConnectionReceivePacketEvent(func(conn *client.Client, wst int, packet []byte) {
fmt.Println("收到", id+1, string(packet))
fmt.Println(time.Now().Unix(), "收到", string(packet))
})
cli.RegConnectionClosedEvent(func(conn *client.Client, err any) {
fmt.Println("关闭", err)
})
cli.RegConnectionOpenedEvent(func(conn *client.Client) {
go func() {
@ -55,7 +40,6 @@ func TestNewClient(t *testing.T) {
time.Sleep(time.Second)
}
for {
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
cli.WriteWS(2, []byte("hello"))
}

View File

@ -4,11 +4,13 @@
该包提供了一个并发安全的写循环实现。开发者可以使用它来快速构建和管理写入操作。
## WriteLoop [`写循环`](https://pkg.go.dev/github.com/kercylan98/minotaur/server/writeloop#WriteLoop)
写循环是一种特殊的循环,它可以并发安全地将数据写入到底层连接。写循环在 `Minotaur` 中是一个泛型类型,可以处理任意类型的消息。
> [`WriteLoop`](https://pkg.go.dev/github.com/kercylan98/minotaur/server/writeloop#WriteLoop) 使用了 [`Pool`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/concurrent#Pool) 和 [`Unbounded`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/buffer#Unbounded) 进行实现。
## Unbounded [`写循环`](https://pkg.go.dev/github.com/kercylan98/minotaur/server/writeloop#WriteLoop)
一个基于无界缓冲区的写循环实现,它可以处理任意数量的消息。它使用 [`Pool`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/concurrent#Pool) 来管理消息对象,使用 [`Unbounded`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/buffer#Unbounded) 来管理消息队列。
> [`Unbounded`](https://pkg.go.dev/github.com/kercylan98/minotaur/server/writeloop#Unbounded) 使用了 [`Pool`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/concurrent#Pool) 和 [`Unbounded`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/buffer#Unbounded) 进行实现。
> 通过 [`Pool`](https://pkg.go.dev/github.com/kercylan98/minotaur/utils/concurrent#Pool) 创建的消息对象无需手动释放,它会在写循环处理完消息后自动回收。
### 使用示例
@ -30,7 +32,7 @@ func main() {
})
var wait sync.WaitGroup
wait.Add(10)
wl := writeloop.NewWriteLoop(pool, func(message *Message) error {
wl := writeloop.NewUnbounded(pool, func(message *Message) error {
fmt.Println(message.ID)
wait.Done()
return nil