diff --git a/server/client/client.go b/server/client/client.go index 1bfb16b..df2bf07 100644 --- a/server/client/client.go +++ b/server/client/client.go @@ -4,7 +4,7 @@ import ( "errors" "fmt" "github.com/kercylan98/minotaur/server/writeloop" - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hub" "sync" ) @@ -30,7 +30,7 @@ type Client struct { core Core mutex sync.Mutex closed bool // 是否已关闭 - pool *concurrent.Pool[*Packet] // 数据包缓冲池 + pool *hub.ObjectPool[*Packet] // 数据包缓冲池 loop *writeloop.Channel[*Packet] // 写入循环 loopBufferSize int // 写入循环缓冲区大小 block chan struct{} // 以阻塞方式运行 @@ -73,7 +73,7 @@ func (slf *Client) RunByBufferSize(size int, block ...bool) error { return err } slf.closed = false - slf.pool = concurrent.NewPool[Packet](func() *Packet { + slf.pool = hub.NewObjectPool[Packet](func() *Packet { return new(Packet) }, func(data *Packet) { data.wst = 0 diff --git a/server/conn.go b/server/conn.go index 1672fd6..c994a74 100644 --- a/server/conn.go +++ b/server/conn.go @@ -7,7 +7,7 @@ import ( "github.com/gorilla/websocket" "github.com/kercylan98/minotaur/server/writeloop" "github.com/kercylan98/minotaur/utils/collection" - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hub" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/random" "github.com/kercylan98/minotaur/utils/timer" @@ -125,7 +125,7 @@ type connection struct { gw func(packet []byte) data map[any]any closed bool - pool *concurrent.Pool[*connPacket] + pool *hub.Pool[*connPacket] loop writeloop.WriteLoop[*connPacket] mu sync.Mutex openTime time.Time @@ -286,7 +286,7 @@ func (slf *Conn) init() { })) } } - slf.pool = concurrent.NewPool[connPacket]( + slf.pool = hub.NewObjectPool[connPacket]( func() *connPacket { return &connPacket{} }, func(data *connPacket) { diff --git a/server/server.go b/server/server.go index c946cbb..53d4a29 100644 --- a/server/server.go +++ b/server/server.go @@ -8,7 +8,7 @@ import ( "github.com/kercylan98/minotaur/server/internal/dispatcher" "github.com/kercylan98/minotaur/server/internal/logger" "github.com/kercylan98/minotaur/utils/collection" - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hub" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/network" "github.com/kercylan98/minotaur/utils/str" @@ -76,7 +76,7 @@ type Server struct { gServer *gNet // TCP或UDP模式下的服务器 multiple *MultipleServer // 多服务器模式下的服务器 ants *ants.Pool // 协程池 - messagePool *concurrent.Pool[*Message] // 消息池 + messagePool *hub.Pool[*Message] // 消息池 ctx context.Context // 上下文 cancel context.CancelFunc // 停止上下文 systemSignal chan os.Signal // 系统信号 @@ -676,7 +676,7 @@ func onServicesInit(srv *Server) { // onMessageSystemInit 消息系统初始化 func onMessageSystemInit(srv *Server) { - srv.messagePool = concurrent.NewPool[Message]( + srv.messagePool = hub.NewObjectPool[Message]( func() *Message { return &Message{} }, diff --git a/server/writeloop/README.md b/server/writeloop/README.md index f0df274..799c9e7 100644 --- a/server/writeloop/README.md +++ b/server/writeloop/README.md @@ -29,9 +29,9 @@ package main import ( - "fmt" + "fmt" "github.com/kercylan98/minotaur/server/writeloop" - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hub" ) func main() { diff --git a/server/writeloop/channel.go b/server/writeloop/channel.go index 3ee4c9c..42d74ed 100644 --- a/server/writeloop/channel.go +++ b/server/writeloop/channel.go @@ -1,7 +1,7 @@ package writeloop import ( - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hub" "github.com/kercylan98/minotaur/utils/log" ) @@ -12,7 +12,7 @@ import ( // - errorHandler 错误处理函数 // // 传入 writeHandler 的消息对象是从 Channel 中获取的,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象 -func NewChannel[Message any](pool *concurrent.Pool[Message], channelSize int, writeHandler func(message Message) error, errorHandler func(err any)) *Channel[Message] { +func NewChannel[Message any](pool *hub.ObjectPool[Message], channelSize int, writeHandler func(message Message) error, errorHandler func(err any)) *Channel[Message] { wl := &Channel[Message]{ c: make(chan Message, channelSize), } @@ -45,7 +45,7 @@ type Channel[T any] struct { c chan T } -// Put 将数据放入写循环,message 应该来源于 concurrent.Pool +// Put 将数据放入写循环,message 应该来源于 hub.ObjectPool func (slf *Channel[T]) Put(message T) { slf.c <- message } diff --git a/server/writeloop/unbounded.go b/server/writeloop/unbounded.go index 21ea6c1..f302387 100644 --- a/server/writeloop/unbounded.go +++ b/server/writeloop/unbounded.go @@ -2,7 +2,7 @@ package writeloop import ( "github.com/kercylan98/minotaur/utils/buffer" - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hub" "github.com/kercylan98/minotaur/utils/log" ) @@ -12,7 +12,7 @@ import ( // - errorHandler 错误处理函数 // // 传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象 -func NewUnbounded[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *Unbounded[Message] { +func NewUnbounded[Message any](pool *hub.ObjectPool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *Unbounded[Message] { wl := &Unbounded[Message]{ buf: buffer.NewUnbounded[Message](), } @@ -47,7 +47,7 @@ type Unbounded[Message any] struct { buf *buffer.Unbounded[Message] } -// Put 将数据放入写循环,message 应该来源于 concurrent.Pool +// Put 将数据放入写循环,message 应该来源于 hub.ObjectPool func (slf *Unbounded[Message]) Put(message Message) { slf.buf.Put(message) } diff --git a/server/writeloop/unbounded_example_test.go b/server/writeloop/unbounded_example_test.go index 982c662..759e043 100644 --- a/server/writeloop/unbounded_example_test.go +++ b/server/writeloop/unbounded_example_test.go @@ -3,12 +3,12 @@ package writeloop_test import ( "fmt" "github.com/kercylan98/minotaur/server/writeloop" - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hub" "sync" ) func ExampleNewUnbounded() { - pool := concurrent.NewPool[Message](func() *Message { + pool := hub.NewObjectPool[Message](func() *Message { return &Message{} }, func(data *Message) { data.ID = 0 diff --git a/server/writeloop/unbounded_test.go b/server/writeloop/unbounded_test.go index 1ff6f76..d87aa4c 100644 --- a/server/writeloop/unbounded_test.go +++ b/server/writeloop/unbounded_test.go @@ -2,7 +2,7 @@ package writeloop_test import ( "github.com/kercylan98/minotaur/server/writeloop" - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hub" "github.com/stretchr/testify/assert" "testing" ) @@ -11,7 +11,7 @@ type Message struct { ID int } -var wp = concurrent.NewPool(func() *Message { +var wp = hub.NewObjectPool(func() *Message { return &Message{} }, func(data *Message) { data.ID = 0 diff --git a/utils/concurrent/pool_test.go b/utils/concurrent/pool_test.go deleted file mode 100644 index 0c8c895..0000000 --- a/utils/concurrent/pool_test.go +++ /dev/null @@ -1 +0,0 @@ -package concurrent_test diff --git a/utils/concurrent/pool.go b/utils/hub/object_pool.go similarity index 60% rename from utils/concurrent/pool.go rename to utils/hub/object_pool.go index 3c8619f..c041ae8 100644 --- a/utils/concurrent/pool.go +++ b/utils/hub/object_pool.go @@ -1,18 +1,18 @@ -package concurrent +package hub import ( "errors" "sync" ) -// NewPool 创建一个线程安全的对象缓冲池 +// NewObjectPool 创建一个线程安全的对象缓冲池 // - 通过 Get 获取一个对象,如果缓冲区内存在可用对象则直接返回,否则新建一个进行返回 // - 通过 Release 将使用完成的对象放回缓冲区,超出缓冲区大小的对象将被放弃 -func NewPool[T any](generator func() *T, releaser func(data *T)) *Pool[*T] { +func NewObjectPool[T any](generator func() *T, releaser func(data *T)) *ObjectPool[*T] { if generator == nil || releaser == nil { panic(errors.New("generator or releaser is nil")) } - return &Pool[*T]{ + return &ObjectPool[*T]{ releaser: releaser, p: sync.Pool{ New: func() interface{} { @@ -22,22 +22,22 @@ func NewPool[T any](generator func() *T, releaser func(data *T)) *Pool[*T] { } } -// Pool 线程安全的对象缓冲池 -// - 一些高频临时生成使用的对象可以通过 Pool 进行管理,例如属性计算等 +// ObjectPool 线程安全的对象缓冲池 +// - 一些高频临时生成使用的对象可以通过 ObjectPool 进行管理,例如属性计算等 // - 缓冲区内存在可用对象时直接返回,否则新建一个进行返回 // - 通过 Release 将使用完成的对象放回缓冲区,超出缓冲区大小的对象将被放弃 -type Pool[T any] struct { +type ObjectPool[T any] struct { p sync.Pool releaser func(data T) } // Get 获取一个对象 -func (slf *Pool[T]) Get() T { - return slf.p.Get().(T) +func (op *ObjectPool[T]) Get() T { + return op.p.Get().(T) } // Release 将使用完成的对象放回缓冲区 -func (slf *Pool[T]) Release(data T) { - slf.releaser(data) - slf.p.Put(data) +func (op *ObjectPool[T]) Release(data T) { + op.releaser(data) + op.p.Put(data) } diff --git a/utils/concurrent/pool_benchmark_test.go b/utils/hub/object_pool_benchmark_test.go similarity index 69% rename from utils/concurrent/pool_benchmark_test.go rename to utils/hub/object_pool_benchmark_test.go index 692089a..5f05c49 100644 --- a/utils/concurrent/pool_benchmark_test.go +++ b/utils/hub/object_pool_benchmark_test.go @@ -1,12 +1,12 @@ -package concurrent_test +package hub_test import ( - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hub" "testing" ) func BenchmarkPool_Get2Put(b *testing.B) { - var pool = concurrent.NewPool[map[string]int](func() *map[string]int { + var pool = hub.NewObjectPool[map[string]int](func() *map[string]int { return &map[string]int{} }, func(data *map[string]int) { for k := range *data { diff --git a/utils/hub/object_pool_test.go b/utils/hub/object_pool_test.go new file mode 100644 index 0000000..e3fc1df --- /dev/null +++ b/utils/hub/object_pool_test.go @@ -0,0 +1 @@ +package hub_test