refactor: writeloop.WriteLoop 更名为 Unbounded,新增基于 chan 实现的 WriteLoop

This commit is contained in:
kercylan98 2023-12-23 18:27:40 +08:00
parent c3e1581289
commit 4b85ceaf13
7 changed files with 129 additions and 67 deletions

View File

@ -31,7 +31,7 @@ type Client struct {
mutex sync.Mutex
closed bool // 是否已关闭
pool *concurrent.Pool[*Packet] // 数据包缓冲池
loop *writeloop.WriteLoop[*Packet] // 写入循环
loop *writeloop.Unbounded[*Packet] // 写入循环
block chan struct{} // 以阻塞方式运行
}
@ -69,7 +69,7 @@ func (slf *Client) Run(block ...bool) error {
data.data = nil
data.callback = nil
})
slf.loop = writeloop.NewWriteLoop[*Packet](slf.pool, func(message *Packet) error {
slf.loop = writeloop.NewUnbounded[*Packet](slf.pool, func(message *Packet) error {
err := slf.core.Write(message)
if message.callback != nil {
message.callback(err)

View File

@ -0,0 +1,56 @@
package writeloop
import (
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log"
)
// NewChannel 创建基于 Channel 的写循环
// - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Channel 会在写入完成后将 Message 对象放回缓冲池
// - channelSize Channel 的大小
// - writeHandler 写入处理函数
// - errorHandler 错误处理函数
//
// 传入 writeHandler 的消息对象是从 Channel 中获取的,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象
func NewChannel[Message any](pool *concurrent.Pool[Message], channelSize int, writeHandler func(message Message) error, errorHandler func(err any)) *Channel[Message] {
wl := &Channel[Message]{
c: make(chan Message, channelSize),
}
go func() {
for {
select {
case message, ok := <-wl.c:
if !ok {
return
}
err := writeHandler(message)
pool.Release(message)
if err != nil {
if errorHandler == nil {
log.Error("Channel", log.Err(err))
continue
}
errorHandler(err)
}
}
}
}()
return wl
}
// Channel 基于 chan 的写循环,与 Unbounded 相同,但是使用 Channel 实现
type Channel[T any] struct {
c chan T
}
// Put 将数据放入写循环message 应该来源于 concurrent.Pool
func (slf *Channel[T]) Put(message T) {
slf.c <- message
}
// Close 关闭写循环
func (slf *Channel[T]) Close() {
close(slf.c)
}

View File

@ -0,0 +1,58 @@
package writeloop
import (
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log"
)
// NewUnbounded 创建写循环
// - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Unbounded 会在写入完成后将 Message 对象放回缓冲池
// - writeHandler 写入处理函数
// - errorHandler 错误处理函数
//
// 传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象
func NewUnbounded[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *Unbounded[Message] {
wl := &Unbounded[Message]{
buf: buffer.NewUnbounded[Message](),
}
go func() {
for {
select {
case message, ok := <-wl.buf.Get():
if !ok {
return
}
wl.buf.Load()
err := writeHandler(message)
pool.Release(message)
if err != nil {
if errorHandler == nil {
log.Error("Unbounded", log.Err(err))
continue
}
errorHandler(err)
}
}
}
}()
return wl
}
// Unbounded 写循环
// - 用于将数据并发安全的写入到底层连接
type Unbounded[Message any] struct {
buf *buffer.Unbounded[Message]
}
// Put 将数据放入写循环message 应该来源于 concurrent.Pool
func (slf *Unbounded[Message]) Put(message Message) {
slf.buf.Put(message)
}
// Close 关闭写循环
func (slf *Unbounded[Message]) Close() {
slf.buf.Close()
}

View File

@ -5,8 +5,8 @@ import (
"testing"
)
func BenchmarkWriteLoop_Put(b *testing.B) {
wl := writeloop.NewWriteLoop(wp, func(message *Message) error {
func BenchmarkUnbounded_Put(b *testing.B) {
wl := writeloop.NewUnbounded(wp, func(message *Message) error {
return nil
}, nil)

View File

@ -7,7 +7,7 @@ import (
"sync"
)
func ExampleNewWriteLoop() {
func ExampleNewUnbounded() {
pool := concurrent.NewPool[Message](func() *Message {
return &Message{}
}, func(data *Message) {
@ -15,7 +15,7 @@ func ExampleNewWriteLoop() {
})
var wait sync.WaitGroup
wait.Add(10)
wl := writeloop.NewWriteLoop(pool, func(message *Message) error {
wl := writeloop.NewUnbounded(pool, func(message *Message) error {
fmt.Println(message.ID)
wait.Done()
return nil

View File

@ -17,8 +17,8 @@ var wp = concurrent.NewPool(func() *Message {
data.ID = 0
})
func TestNewWriteLoop(t *testing.T) {
wl := writeloop.NewWriteLoop(wp, func(message *Message) error {
func TestNewUnbounded(t *testing.T) {
wl := writeloop.NewUnbounded(wp, func(message *Message) error {
t.Log(message.ID)
return nil
}, func(err any) {
@ -28,8 +28,8 @@ func TestNewWriteLoop(t *testing.T) {
wl.Close()
}
func TestWriteLoop_Put(t *testing.T) {
wl := writeloop.NewWriteLoop(wp, func(message *Message) error {
func TestUnbounded_Put(t *testing.T) {
wl := writeloop.NewUnbounded(wp, func(message *Message) error {
t.Log(message.ID)
return nil
}, func(err any) {
@ -46,8 +46,8 @@ func TestWriteLoop_Put(t *testing.T) {
wl.Close()
}
func TestWriteLoop_Close(t *testing.T) {
wl := writeloop.NewWriteLoop(wp, func(message *Message) error {
func TestUnbounded_Close(t *testing.T) {
wl := writeloop.NewUnbounded(wp, func(message *Message) error {
t.Log(message.ID)
return nil
}, func(err any) {

View File

@ -1,58 +1,6 @@
package writeloop
import (
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log"
)
// NewWriteLoop 创建写循环
// - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 WriteLoop 会在写入完成后将 Message 对象放回缓冲池
// - writeHandler 写入处理函数
// - errorHandler 错误处理函数
//
// 传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象
func NewWriteLoop[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *WriteLoop[Message] {
wl := &WriteLoop[Message]{
buf: buffer.NewUnbounded[Message](),
}
go func() {
for {
select {
case message, ok := <-wl.buf.Get():
if !ok {
return
}
wl.buf.Load()
err := writeHandler(message)
pool.Release(message)
if err != nil {
if errorHandler == nil {
log.Error("WriteLoop", log.Err(err))
continue
}
errorHandler(err)
}
}
}
}()
return wl
}
// WriteLoop 写循环
// - 用于将数据并发安全的写入到底层连接
type WriteLoop[Message any] struct {
buf *buffer.Unbounded[Message]
}
// Put 将数据放入写循环message 应该来源于 concurrent.Pool
func (slf *WriteLoop[Message]) Put(message Message) {
slf.buf.Put(message)
}
// Close 关闭写循环
func (slf *WriteLoop[Message]) Close() {
slf.buf.Close()
type WriteLoop[Message any] interface {
Put(message Message)
Close()
}