other: 移除 writeloop 的 defer recover 行为,发生未处理错误将不再 panic,更改为输出 Error 日志
This commit is contained in:
parent
cc5274ce62
commit
32576fbc79
|
@ -4,41 +4,36 @@ import (
|
||||||
"github.com/kercylan98/minotaur/utils/buffer"
|
"github.com/kercylan98/minotaur/utils/buffer"
|
||||||
"github.com/kercylan98/minotaur/utils/concurrent"
|
"github.com/kercylan98/minotaur/utils/concurrent"
|
||||||
"github.com/kercylan98/minotaur/utils/log"
|
"github.com/kercylan98/minotaur/utils/log"
|
||||||
"runtime/debug"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewWriteLoop 创建写循环
|
// NewWriteLoop 创建写循环
|
||||||
// - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 WriteLoop 会在写入完成后将 Message 对象放回缓冲池
|
// - pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 WriteLoop 会在写入完成后将 Message 对象放回缓冲池
|
||||||
func NewWriteLoop[Message any](pool *concurrent.Pool[Message], writeHandle func(message Message) error, errorHandle func(err any)) *WriteLoop[Message] {
|
// - writeHandler 写入处理函数
|
||||||
|
// - errorHandler 错误处理函数
|
||||||
|
//
|
||||||
|
// 传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象
|
||||||
|
func NewWriteLoop[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *WriteLoop[Message] {
|
||||||
wl := &WriteLoop[Message]{
|
wl := &WriteLoop[Message]{
|
||||||
buf: buffer.NewUnboundedN[Message](),
|
buf: buffer.NewUnbounded[Message](),
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
for !wl.buf.IsClosed() {
|
for {
|
||||||
select {
|
select {
|
||||||
case message, ok := <-wl.buf.Get():
|
case message, ok := <-wl.buf.Get():
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
wl.buf.Load()
|
wl.buf.Load()
|
||||||
func() {
|
|
||||||
defer func() {
|
|
||||||
pool.Release(message)
|
|
||||||
if err := recover(); err != nil {
|
|
||||||
if errorHandle == nil {
|
|
||||||
log.Error("WriteLoop", log.Any("err", err))
|
|
||||||
debug.PrintStack()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
errorHandle(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
err := writeHandle(message)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
}()
|
err := writeHandler(message)
|
||||||
|
pool.Release(message)
|
||||||
|
if err != nil {
|
||||||
|
if errorHandler == nil {
|
||||||
|
log.Error("WriteLoop", log.Err(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
errorHandler(err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -52,7 +47,7 @@ type WriteLoop[Message any] struct {
|
||||||
buf *buffer.Unbounded[Message]
|
buf *buffer.Unbounded[Message]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put 将数据放入写循环
|
// Put 将数据放入写循环,message 应该来源于 concurrent.Pool
|
||||||
func (slf *WriteLoop[Message]) Put(message Message) {
|
func (slf *WriteLoop[Message]) Put(message Message) {
|
||||||
slf.buf.Put(message)
|
slf.buf.Put(message)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue