From 32576fbc79c41e21098b10efaa7ea0b999556781 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Sat, 23 Dec 2023 18:10:40 +0800 Subject: [PATCH] =?UTF-8?q?other:=20=E7=A7=BB=E9=99=A4=20writeloop=20?= =?UTF-8?q?=E7=9A=84=20defer=20recover=20=E8=A1=8C=E4=B8=BA=EF=BC=8C?= =?UTF-8?q?=E5=8F=91=E7=94=9F=E6=9C=AA=E5=A4=84=E7=90=86=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E5=B0=86=E4=B8=8D=E5=86=8D=20panic=EF=BC=8C=E6=9B=B4=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E8=BE=93=E5=87=BA=20Error=20=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/writeloop/writeloop.go | 39 +++++++++++++++-------------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/server/writeloop/writeloop.go b/server/writeloop/writeloop.go index 4d03ea3..7076e9c 100644 --- a/server/writeloop/writeloop.go +++ b/server/writeloop/writeloop.go @@ -4,41 +4,36 @@ import ( "github.com/kercylan98/minotaur/utils/buffer" "github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/log" - "runtime/debug" ) // NewWriteLoop 创建写循环 // - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 WriteLoop 会在写入完成后将 Message 对象放回缓冲池 -func NewWriteLoop[Message any](pool *concurrent.Pool[Message], writeHandle func(message Message) error, errorHandle func(err any)) *WriteLoop[Message] { +// - writeHandler 写入处理函数 +// - errorHandler 错误处理函数 +// +// 传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象 +func NewWriteLoop[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *WriteLoop[Message] { wl := &WriteLoop[Message]{ - buf: buffer.NewUnboundedN[Message](), + buf: buffer.NewUnbounded[Message](), } go func() { - for !wl.buf.IsClosed() { + for { select { case message, ok := <-wl.buf.Get(): if !ok { return } wl.buf.Load() - func() { - defer func() { - pool.Release(message) - if err := recover(); err != nil { - if errorHandle == nil { - log.Error("WriteLoop", log.Any("err", err)) - debug.PrintStack() - return - } - errorHandle(err) - } - }() - err := writeHandle(message) - if err != nil { - panic(err) - } - }() + err := writeHandler(message) + pool.Release(message) + if err != nil { + if errorHandler == nil { + log.Error("WriteLoop", log.Err(err)) + continue + } + errorHandler(err) + } } } }() @@ -52,7 +47,7 @@ type WriteLoop[Message any] struct { buf *buffer.Unbounded[Message] } -// Put 将数据放入写循环 +// Put 将数据放入写循环,message 应该来源于 concurrent.Pool func (slf *WriteLoop[Message]) Put(message Message) { slf.buf.Put(message) }