diff --git a/server/client/client.go b/server/client/client.go index af03fba..ecdcc68 100644 --- a/server/client/client.go +++ b/server/client/client.go @@ -31,7 +31,7 @@ type Client struct { mutex sync.Mutex closed bool // 是否已关闭 pool *concurrent.Pool[*Packet] // 数据包缓冲池 - loop *writeloop.WriteLoop[*Packet] // 写入循环 + loop *writeloop.Unbounded[*Packet] // 写入循环 block chan struct{} // 以阻塞方式运行 } @@ -69,7 +69,7 @@ func (slf *Client) Run(block ...bool) error { data.data = nil data.callback = nil }) - slf.loop = writeloop.NewWriteLoop[*Packet](slf.pool, func(message *Packet) error { + slf.loop = writeloop.NewUnbounded[*Packet](slf.pool, func(message *Packet) error { err := slf.core.Write(message) if message.callback != nil { message.callback(err) diff --git a/server/writeloop/channel.go b/server/writeloop/channel.go new file mode 100644 index 0000000..3ee4c9c --- /dev/null +++ b/server/writeloop/channel.go @@ -0,0 +1,56 @@ +package writeloop + +import ( + "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/log" +) + +// NewChannel 创建基于 Channel 的写循环 +// - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Channel 会在写入完成后将 Message 对象放回缓冲池 +// - channelSize Channel 的大小 +// - writeHandler 写入处理函数 +// - errorHandler 错误处理函数 +// +// 传入 writeHandler 的消息对象是从 Channel 中获取的,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象 +func NewChannel[Message any](pool *concurrent.Pool[Message], channelSize int, writeHandler func(message Message) error, errorHandler func(err any)) *Channel[Message] { + wl := &Channel[Message]{ + c: make(chan Message, channelSize), + } + go func() { + for { + select { + case message, ok := <-wl.c: + if !ok { + return + } + + err := writeHandler(message) + pool.Release(message) + if err != nil { + if errorHandler == nil { + log.Error("Channel", log.Err(err)) + continue + } + errorHandler(err) + } + } + } + }() + + return wl +} + +// Channel 基于 chan 的写循环,与 Unbounded 相同,但是使用 Channel 实现 +type Channel[T any] struct { + c chan T +} + +// Put 将数据放入写循环,message 应该来源于 concurrent.Pool +func (slf *Channel[T]) Put(message T) { + slf.c <- message +} + +// Close 关闭写循环 +func (slf *Channel[T]) Close() { + close(slf.c) +} diff --git a/server/writeloop/unbounded.go b/server/writeloop/unbounded.go new file mode 100644 index 0000000..21ea6c1 --- /dev/null +++ b/server/writeloop/unbounded.go @@ -0,0 +1,58 @@ +package writeloop + +import ( + "github.com/kercylan98/minotaur/utils/buffer" + "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/log" +) + +// NewUnbounded 创建写循环 +// - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Unbounded 会在写入完成后将 Message 对象放回缓冲池 +// - writeHandler 写入处理函数 +// - errorHandler 错误处理函数 +// +// 传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象 +func NewUnbounded[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *Unbounded[Message] { + wl := &Unbounded[Message]{ + buf: buffer.NewUnbounded[Message](), + } + go func() { + for { + select { + case message, ok := <-wl.buf.Get(): + if !ok { + return + } + wl.buf.Load() + + err := writeHandler(message) + pool.Release(message) + if err != nil { + if errorHandler == nil { + log.Error("Unbounded", log.Err(err)) + continue + } + errorHandler(err) + } + } + } + }() + + return wl +} + +// Unbounded 写循环 +// - 用于将数据并发安全的写入到底层连接 +type Unbounded[Message any] struct { + buf *buffer.Unbounded[Message] +} + +// Put 将数据放入写循环,message 应该来源于 concurrent.Pool +func (slf *Unbounded[Message]) Put(message Message) { + slf.buf.Put(message) +} + +// Close 关闭写循环 +func (slf *Unbounded[Message]) Close() { + slf.buf.Close() +} diff --git a/server/writeloop/writeloop_benchmark_test.go b/server/writeloop/unbounded_benchmark_test.go similarity index 71% rename from server/writeloop/writeloop_benchmark_test.go rename to server/writeloop/unbounded_benchmark_test.go index 819e791..7f81bba 100644 --- a/server/writeloop/writeloop_benchmark_test.go +++ b/server/writeloop/unbounded_benchmark_test.go @@ -5,8 +5,8 @@ import ( "testing" ) -func BenchmarkWriteLoop_Put(b *testing.B) { - wl := writeloop.NewWriteLoop(wp, func(message *Message) error { +func BenchmarkUnbounded_Put(b *testing.B) { + wl := writeloop.NewUnbounded(wp, func(message *Message) error { return nil }, nil) diff --git a/server/writeloop/writeloop_example_test.go b/server/writeloop/unbounded_example_test.go similarity index 86% rename from server/writeloop/writeloop_example_test.go rename to server/writeloop/unbounded_example_test.go index 819ffee..982c662 100644 --- a/server/writeloop/writeloop_example_test.go +++ b/server/writeloop/unbounded_example_test.go @@ -7,7 +7,7 @@ import ( "sync" ) -func ExampleNewWriteLoop() { +func ExampleNewUnbounded() { pool := concurrent.NewPool[Message](func() *Message { return &Message{} }, func(data *Message) { @@ -15,7 +15,7 @@ func ExampleNewWriteLoop() { }) var wait sync.WaitGroup wait.Add(10) - wl := writeloop.NewWriteLoop(pool, func(message *Message) error { + wl := writeloop.NewUnbounded(pool, func(message *Message) error { fmt.Println(message.ID) wait.Done() return nil diff --git a/server/writeloop/writeloop_test.go b/server/writeloop/unbounded_test.go similarity index 71% rename from server/writeloop/writeloop_test.go rename to server/writeloop/unbounded_test.go index 04e3cfd..1ff6f76 100644 --- a/server/writeloop/writeloop_test.go +++ b/server/writeloop/unbounded_test.go @@ -17,8 +17,8 @@ var wp = concurrent.NewPool(func() *Message { data.ID = 0 }) -func TestNewWriteLoop(t *testing.T) { - wl := writeloop.NewWriteLoop(wp, func(message *Message) error { +func TestNewUnbounded(t *testing.T) { + wl := writeloop.NewUnbounded(wp, func(message *Message) error { t.Log(message.ID) return nil }, func(err any) { @@ -28,8 +28,8 @@ func TestNewWriteLoop(t *testing.T) { wl.Close() } -func TestWriteLoop_Put(t *testing.T) { - wl := writeloop.NewWriteLoop(wp, func(message *Message) error { +func TestUnbounded_Put(t *testing.T) { + wl := writeloop.NewUnbounded(wp, func(message *Message) error { t.Log(message.ID) return nil }, func(err any) { @@ -46,8 +46,8 @@ func TestWriteLoop_Put(t *testing.T) { wl.Close() } -func TestWriteLoop_Close(t *testing.T) { - wl := writeloop.NewWriteLoop(wp, func(message *Message) error { +func TestUnbounded_Close(t *testing.T) { + wl := writeloop.NewUnbounded(wp, func(message *Message) error { t.Log(message.ID) return nil }, func(err any) { diff --git a/server/writeloop/writeloop.go b/server/writeloop/writeloop.go index 7076e9c..36cbc14 100644 --- a/server/writeloop/writeloop.go +++ b/server/writeloop/writeloop.go @@ -1,58 +1,6 @@ package writeloop -import ( - "github.com/kercylan98/minotaur/utils/buffer" - "github.com/kercylan98/minotaur/utils/concurrent" - "github.com/kercylan98/minotaur/utils/log" -) - -// NewWriteLoop 创建写循环 -// - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 WriteLoop 会在写入完成后将 Message 对象放回缓冲池 -// - writeHandler 写入处理函数 -// - errorHandler 错误处理函数 -// -// 传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象 -func NewWriteLoop[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *WriteLoop[Message] { - wl := &WriteLoop[Message]{ - buf: buffer.NewUnbounded[Message](), - } - go func() { - for { - select { - case message, ok := <-wl.buf.Get(): - if !ok { - return - } - wl.buf.Load() - - err := writeHandler(message) - pool.Release(message) - if err != nil { - if errorHandler == nil { - log.Error("WriteLoop", log.Err(err)) - continue - } - errorHandler(err) - } - } - } - }() - - return wl -} - -// WriteLoop 写循环 -// - 用于将数据并发安全的写入到底层连接 -type WriteLoop[Message any] struct { - buf *buffer.Unbounded[Message] -} - -// Put 将数据放入写循环,message 应该来源于 concurrent.Pool -func (slf *WriteLoop[Message]) Put(message Message) { - slf.buf.Put(message) -} - -// Close 关闭写循环 -func (slf *WriteLoop[Message]) Close() { - slf.buf.Close() +type WriteLoop[Message any] interface { + Put(message Message) + Close() }