From dd1acfd017e9f0eccbc23663fa7f871a6b2b7de4 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 19 Sep 2023 12:40:02 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=20writeloop=20?= =?UTF-8?q?=E5=8C=85=EF=BC=8C=E5=86=85=E7=BD=AE=E4=BA=86=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E5=86=99=E5=BE=AA=E7=8E=AF=E7=9A=84=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/writeloop/writeloop.go | 63 +++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 server/writeloop/writeloop.go diff --git a/server/writeloop/writeloop.go b/server/writeloop/writeloop.go new file mode 100644 index 0000000..4d03ea3 --- /dev/null +++ b/server/writeloop/writeloop.go @@ -0,0 +1,63 @@ +package writeloop + +import ( + "github.com/kercylan98/minotaur/utils/buffer" + "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/log" + "runtime/debug" +) + +// NewWriteLoop 创建写循环 +// - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 WriteLoop 会在写入完成后将 Message 对象放回缓冲池 +func NewWriteLoop[Message any](pool *concurrent.Pool[Message], writeHandle func(message Message) error, errorHandle func(err any)) *WriteLoop[Message] { + wl := &WriteLoop[Message]{ + buf: buffer.NewUnboundedN[Message](), + } + go func() { + for !wl.buf.IsClosed() { + select { + case message, ok := <-wl.buf.Get(): + if !ok { + return + } + wl.buf.Load() + func() { + defer func() { + pool.Release(message) + if err := recover(); err != nil { + if errorHandle == nil { + log.Error("WriteLoop", log.Any("err", err)) + debug.PrintStack() + return + } + errorHandle(err) + } + }() + err := writeHandle(message) + if err != nil { + panic(err) + } + + }() + } + } + }() + + return wl +} + +// WriteLoop 写循环 +// - 用于将数据并发安全的写入到底层连接 +type WriteLoop[Message any] struct { + buf *buffer.Unbounded[Message] +} + +// Put 将数据放入写循环 +func (slf *WriteLoop[Message]) Put(message Message) { + slf.buf.Put(message) +} + +// Close 关闭写循环 +func (slf *WriteLoop[Message]) Close() { + slf.buf.Close() +}