diff --git a/utils/concurrent/pool.go b/utils/concurrent/pool.go index 55c7390..3c8619f 100644 --- a/utils/concurrent/pool.go +++ b/utils/concurrent/pool.go @@ -1,40 +1,25 @@ package concurrent import ( - "github.com/kercylan98/minotaur/utils/log" + "errors" "sync" - "time" ) // NewPool 创建一个线程安全的对象缓冲池 // - 通过 Get 获取一个对象,如果缓冲区内存在可用对象则直接返回,否则新建一个进行返回 // - 通过 Release 将使用完成的对象放回缓冲区,超出缓冲区大小的对象将被放弃 -func NewPool[T any](bufferSize int, generator func() T, releaser func(data T), options ...PoolOption[T]) *Pool[T] { - pool := &Pool[T]{ - bufferSize: bufferSize, - generator: generator, - releaser: releaser, +func NewPool[T any](generator func() *T, releaser func(data *T)) *Pool[*T] { + if generator == nil || releaser == nil { + panic(errors.New("generator or releaser is nil")) } - for _, option := range options { - option(pool) + return &Pool[*T]{ + releaser: releaser, + p: sync.Pool{ + New: func() interface{} { + return generator() + }, + }, } - for i := 0; i < bufferSize; i++ { - pool.put(generator()) - } - return pool -} - -// NewMapPool 创建一个线程安全的 map 缓冲池 -// - 通过 Get 获取一个 map,如果缓冲区内存在可用 map 则直接返回,否则新建一个进行返回 -// - 通过 Release 将使用完成的 map 放回缓冲区,超出缓冲区大小的 map 将被放弃 -func NewMapPool[K comparable, V any](bufferSize int) *Pool[map[K]V] { - return NewPool[map[K]V](bufferSize, func() map[K]V { - return make(map[K]V) - }, func(data map[K]V) { - for k := range data { - delete(data, k) - } - }) } // Pool 线程安全的对象缓冲池 @@ -42,79 +27,17 @@ func NewMapPool[K comparable, V any](bufferSize int) *Pool[map[K]V] { // - 缓冲区内存在可用对象时直接返回,否则新建一个进行返回 // - 通过 Release 将使用完成的对象放回缓冲区,超出缓冲区大小的对象将被放弃 type Pool[T any] struct { - mutex sync.Mutex - buffers []T - bufferSize int - generator func() T - releaser func(data T) - warn int64 - silent bool -} - -// EAC 动态调整缓冲区大小,适用于突发场景使用 -// - 当 size <= 0 时,不进行调整 -// - 当缓冲区大小不足时,会导致大量的新对象生成、销毁,增加 GC 压力。此时应考虑调整缓冲区大小 -// - 当缓冲区大小过大时,会导致大量的对象占用内存,增加内存压力。此时应考虑调整缓冲区大小 -func (slf *Pool[T]) EAC(size int) { - if size <= 0 { - return - } - slf.mutex.Lock() - slf.bufferSize = size - slf.mutex.Unlock() + p sync.Pool + releaser func(data T) } +// Get 获取一个对象 func (slf *Pool[T]) Get() T { - slf.mutex.Lock() - if len(slf.buffers) > 0 { - data := slf.buffers[0] - slf.buffers = slf.buffers[1:] - slf.mutex.Unlock() - return data - } - if !slf.silent { - now := time.Now().Unix() - if now-slf.warn >= 1 { - log.Warn("Pool", log.String("Get", "the number of buffer members is insufficient, consider whether it is due to unreleased or inappropriate buffer size"), log.Stack("stack")) - slf.warn = now - } - } - slf.mutex.Unlock() - - return slf.generator() -} - -func (slf *Pool[T]) IsClose() bool { - return slf.generator == nil + return slf.p.Get().(T) } +// Release 将使用完成的对象放回缓冲区 func (slf *Pool[T]) Release(data T) { - slf.mutex.Lock() - if slf.releaser == nil { - slf.mutex.Unlock() - return - } slf.releaser(data) - slf.mutex.Unlock() - slf.put(data) -} - -func (slf *Pool[T]) Close() { - slf.mutex.Lock() - slf.buffers = nil - slf.bufferSize = 0 - slf.generator = nil - slf.releaser = nil - slf.warn = 0 - slf.mutex.Unlock() -} - -func (slf *Pool[T]) put(data T) { - slf.mutex.Lock() - if len(slf.buffers) > slf.bufferSize { - slf.mutex.Unlock() - return - } - slf.buffers = append(slf.buffers, data) - slf.mutex.Unlock() + slf.p.Put(data) } diff --git a/utils/concurrent/pool_options.go b/utils/concurrent/pool_options.go deleted file mode 100644 index 1fc713f..0000000 --- a/utils/concurrent/pool_options.go +++ /dev/null @@ -1,12 +0,0 @@ -package concurrent - -// PoolOption 线程安全的对象缓冲池选项 -type PoolOption[T any] func(pool *Pool[T]) - -// WithPoolSilent 静默模式 -// - 静默模式下,当缓冲区大小不足时,将不再输出警告日志 -func WithPoolSilent[T any]() PoolOption[T] { - return func(pool *Pool[T]) { - pool.silent = true - } -}