refactor: 迁移 concurrent.Pool 至 hub.ObjectPool,并将 concurrent 包更名为 hub

This commit is contained in:
kercylan98 2024-01-12 12:34:09 +08:00
parent e28a5a259f
commit 161fbfe4e3
12 changed files with 37 additions and 37 deletions

View File

@ -4,7 +4,7 @@ import (
"errors"
"fmt"
"github.com/kercylan98/minotaur/server/writeloop"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hub"
"sync"
)
@ -30,7 +30,7 @@ type Client struct {
core Core
mutex sync.Mutex
closed bool // 是否已关闭
pool *concurrent.Pool[*Packet] // 数据包缓冲池
pool *hub.ObjectPool[*Packet] // 数据包缓冲池
loop *writeloop.Channel[*Packet] // 写入循环
loopBufferSize int // 写入循环缓冲区大小
block chan struct{} // 以阻塞方式运行
@ -73,7 +73,7 @@ func (slf *Client) RunByBufferSize(size int, block ...bool) error {
return err
}
slf.closed = false
slf.pool = concurrent.NewPool[Packet](func() *Packet {
slf.pool = hub.NewObjectPool[Packet](func() *Packet {
return new(Packet)
}, func(data *Packet) {
data.wst = 0

View File

@ -7,7 +7,7 @@ import (
"github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/server/writeloop"
"github.com/kercylan98/minotaur/utils/collection"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hub"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/random"
"github.com/kercylan98/minotaur/utils/timer"
@ -125,7 +125,7 @@ type connection struct {
gw func(packet []byte)
data map[any]any
closed bool
pool *concurrent.Pool[*connPacket]
pool *hub.Pool[*connPacket]
loop writeloop.WriteLoop[*connPacket]
mu sync.Mutex
openTime time.Time
@ -286,7 +286,7 @@ func (slf *Conn) init() {
}))
}
}
slf.pool = concurrent.NewPool[connPacket](
slf.pool = hub.NewObjectPool[connPacket](
func() *connPacket {
return &connPacket{}
}, func(data *connPacket) {

View File

@ -8,7 +8,7 @@ import (
"github.com/kercylan98/minotaur/server/internal/dispatcher"
"github.com/kercylan98/minotaur/server/internal/logger"
"github.com/kercylan98/minotaur/utils/collection"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hub"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/network"
"github.com/kercylan98/minotaur/utils/str"
@ -76,7 +76,7 @@ type Server struct {
gServer *gNet // TCP或UDP模式下的服务器
multiple *MultipleServer // 多服务器模式下的服务器
ants *ants.Pool // 协程池
messagePool *concurrent.Pool[*Message] // 消息池
messagePool *hub.Pool[*Message] // 消息池
ctx context.Context // 上下文
cancel context.CancelFunc // 停止上下文
systemSignal chan os.Signal // 系统信号
@ -676,7 +676,7 @@ func onServicesInit(srv *Server) {
// onMessageSystemInit 消息系统初始化
func onMessageSystemInit(srv *Server) {
srv.messagePool = concurrent.NewPool[Message](
srv.messagePool = hub.NewObjectPool[Message](
func() *Message {
return &Message{}
},

View File

@ -31,7 +31,7 @@ package main
import (
"fmt"
"github.com/kercylan98/minotaur/server/writeloop"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hub"
)
func main() {

View File

@ -1,7 +1,7 @@
package writeloop
import (
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hub"
"github.com/kercylan98/minotaur/utils/log"
)
@ -12,7 +12,7 @@ import (
// - errorHandler 错误处理函数
//
// 传入 writeHandler 的消息对象是从 Channel 中获取的,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象
func NewChannel[Message any](pool *concurrent.Pool[Message], channelSize int, writeHandler func(message Message) error, errorHandler func(err any)) *Channel[Message] {
func NewChannel[Message any](pool *hub.ObjectPool[Message], channelSize int, writeHandler func(message Message) error, errorHandler func(err any)) *Channel[Message] {
wl := &Channel[Message]{
c: make(chan Message, channelSize),
}
@ -45,7 +45,7 @@ type Channel[T any] struct {
c chan T
}
// Put 将数据放入写循环message 应该来源于 concurrent.Pool
// Put 将数据放入写循环message 应该来源于 hub.ObjectPool
func (slf *Channel[T]) Put(message T) {
slf.c <- message
}

View File

@ -2,7 +2,7 @@ package writeloop
import (
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hub"
"github.com/kercylan98/minotaur/utils/log"
)
@ -12,7 +12,7 @@ import (
// - errorHandler 错误处理函数
//
// 传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象
func NewUnbounded[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *Unbounded[Message] {
func NewUnbounded[Message any](pool *hub.ObjectPool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *Unbounded[Message] {
wl := &Unbounded[Message]{
buf: buffer.NewUnbounded[Message](),
}
@ -47,7 +47,7 @@ type Unbounded[Message any] struct {
buf *buffer.Unbounded[Message]
}
// Put 将数据放入写循环message 应该来源于 concurrent.Pool
// Put 将数据放入写循环message 应该来源于 hub.ObjectPool
func (slf *Unbounded[Message]) Put(message Message) {
slf.buf.Put(message)
}

View File

@ -3,12 +3,12 @@ package writeloop_test
import (
"fmt"
"github.com/kercylan98/minotaur/server/writeloop"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hub"
"sync"
)
func ExampleNewUnbounded() {
pool := concurrent.NewPool[Message](func() *Message {
pool := hub.NewObjectPool[Message](func() *Message {
return &Message{}
}, func(data *Message) {
data.ID = 0

View File

@ -2,7 +2,7 @@ package writeloop_test
import (
"github.com/kercylan98/minotaur/server/writeloop"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hub"
"github.com/stretchr/testify/assert"
"testing"
)
@ -11,7 +11,7 @@ type Message struct {
ID int
}
var wp = concurrent.NewPool(func() *Message {
var wp = hub.NewObjectPool(func() *Message {
return &Message{}
}, func(data *Message) {
data.ID = 0

View File

@ -1 +0,0 @@
package concurrent_test

View File

@ -1,18 +1,18 @@
package concurrent
package hub
import (
"errors"
"sync"
)
// NewPool 创建一个线程安全的对象缓冲池
// NewObjectPool 创建一个线程安全的对象缓冲池
// - 通过 Get 获取一个对象,如果缓冲区内存在可用对象则直接返回,否则新建一个进行返回
// - 通过 Release 将使用完成的对象放回缓冲区,超出缓冲区大小的对象将被放弃
func NewPool[T any](generator func() *T, releaser func(data *T)) *Pool[*T] {
func NewObjectPool[T any](generator func() *T, releaser func(data *T)) *ObjectPool[*T] {
if generator == nil || releaser == nil {
panic(errors.New("generator or releaser is nil"))
}
return &Pool[*T]{
return &ObjectPool[*T]{
releaser: releaser,
p: sync.Pool{
New: func() interface{} {
@ -22,22 +22,22 @@ func NewPool[T any](generator func() *T, releaser func(data *T)) *Pool[*T] {
}
}
// Pool 线程安全的对象缓冲池
// - 一些高频临时生成使用的对象可以通过 Pool 进行管理,例如属性计算等
// ObjectPool 线程安全的对象缓冲池
// - 一些高频临时生成使用的对象可以通过 ObjectPool 进行管理,例如属性计算等
// - 缓冲区内存在可用对象时直接返回,否则新建一个进行返回
// - 通过 Release 将使用完成的对象放回缓冲区,超出缓冲区大小的对象将被放弃
type Pool[T any] struct {
type ObjectPool[T any] struct {
p sync.Pool
releaser func(data T)
}
// Get 获取一个对象
func (slf *Pool[T]) Get() T {
return slf.p.Get().(T)
func (op *ObjectPool[T]) Get() T {
return op.p.Get().(T)
}
// Release 将使用完成的对象放回缓冲区
func (slf *Pool[T]) Release(data T) {
slf.releaser(data)
slf.p.Put(data)
func (op *ObjectPool[T]) Release(data T) {
op.releaser(data)
op.p.Put(data)
}

View File

@ -1,12 +1,12 @@
package concurrent_test
package hub_test
import (
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/hub"
"testing"
)
func BenchmarkPool_Get2Put(b *testing.B) {
var pool = concurrent.NewPool[map[string]int](func() *map[string]int {
var pool = hub.NewObjectPool[map[string]int](func() *map[string]int {
return &map[string]int{}
}, func(data *map[string]int) {
for k := range *data {

View File

@ -0,0 +1 @@
package hub_test