fix: 修复 dispatcher.Dispatcher 在消息归零的时候使用协程运行处理函数可能导致不可知问题的情况,修复消息消费时获取生产者可能已经被释放的问题。修复在无消息时候设置消息完成处理函数不会触发一次的问题

This commit is contained in:
kercylan98 2024-01-12 15:29:42 +08:00
parent a2a9199d41
commit 7528dc4a1b
4 changed files with 98 additions and 23 deletions

View File

@ -0,0 +1,30 @@
package dispatcher
// Action 消息分发器操作器,用于暴露外部可操作的消息分发器函数
type Action[P Producer, M Message[P]] struct {
unlock bool
d *Dispatcher[P, M]
}
// Name 获取消息分发器名称
func (a *Action[P, M]) Name() string {
return a.d.Name()
}
// UnExpel 取消特定生产者的驱逐计划
func (a *Action[P, M]) UnExpel() {
if !a.unlock {
a.d.UnExpel()
} else {
a.d.noLockUnExpel()
}
}
// Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭
func (a *Action[P, M]) Expel() {
if !a.unlock {
a.d.Expel()
} else {
a.d.noLockExpel()
}
}

View File

@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"github.com/alphadose/haxmap" "github.com/alphadose/haxmap"
"github.com/kercylan98/minotaur/utils/buffer" "github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
"sync" "sync"
"sync/atomic" "sync/atomic"
) )
@ -24,7 +26,7 @@ func NewDispatcher[P Producer, M Message[P]](bufferSize int, name string, handle
handler: handler, handler: handler,
uniques: haxmap.New[string, struct{}](), uniques: haxmap.New[string, struct{}](),
pmc: make(map[P]int64), pmc: make(map[P]int64),
pmcF: make(map[P]func(p P, dispatcher *Dispatcher[P, M])), pmcF: make(map[P]func(p P, dispatcher *Action[P, M])),
abort: make(chan struct{}), abort: make(chan struct{}),
} }
return d return d
@ -50,28 +52,40 @@ type Dispatcher[P Producer, M Message[P]] struct {
expel bool expel bool
mc int64 mc int64
pmc map[P]int64 pmc map[P]int64
pmcF map[P]func(p P, dispatcher *Dispatcher[P, M]) pmcF map[P]func(p P, dispatcher *Action[P, M])
lock sync.RWMutex lock sync.RWMutex
name string name string
closedHandler atomic.Pointer[func(dispatcher *Dispatcher[P, M])] closedHandler atomic.Pointer[func(dispatcher *Action[P, M])]
abort chan struct{} abort chan struct{}
} }
// SetProducerDoneHandler 设置特定生产者所有消息处理完成时的回调函数 // SetProducerDoneHandler 设置特定生产者所有消息处理完成时的回调函数
// - 如果 handler 为 nil则会删除该生产者的回调函数 // - 如果 handler 为 nil则会删除该生产者的回调函数
func (d *Dispatcher[P, M]) SetProducerDoneHandler(p P, handler func(p P, dispatcher *Dispatcher[P, M])) *Dispatcher[P, M] { //
// 需要注意的是,该 handler 中
func (d *Dispatcher[P, M]) SetProducerDoneHandler(p P, handler func(p P, dispatcher *Action[P, M])) *Dispatcher[P, M] {
d.lock.Lock() d.lock.Lock()
if handler == nil { if handler == nil {
delete(d.pmcF, p) delete(d.pmcF, p)
} else { } else {
d.pmcF[p] = handler d.pmcF[p] = handler
if pmc := d.pmc[p]; pmc <= 0 {
func(producer P, handler func(p P, dispatcher *Action[P, M])) {
defer func(producer P) {
if err := super.RecoverTransform(recover()); err != nil {
log.Error("Dispatcher.ProducerDoneHandler", log.Any("producer", producer), log.Err(err))
}
}(p)
handler(p, &Action[P, M]{d: d, unlock: true})
}(p, handler)
}
} }
d.lock.Unlock() d.lock.Unlock()
return d return d
} }
// SetClosedHandler 设置消息分发器关闭时的回调函数 // SetClosedHandler 设置消息分发器关闭时的回调函数
func (d *Dispatcher[P, M]) SetClosedHandler(handler func(dispatcher *Dispatcher[P, M])) *Dispatcher[P, M] { func (d *Dispatcher[P, M]) SetClosedHandler(handler func(dispatcher *Action[P, M])) *Dispatcher[P, M] {
d.closedHandler.Store(&handler) d.closedHandler.Store(&handler)
return d return d
} }
@ -95,29 +109,38 @@ func (d *Dispatcher[P, M]) AntiUnique(name string) {
// Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭 // Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭
func (d *Dispatcher[P, M]) Expel() { func (d *Dispatcher[P, M]) Expel() {
d.lock.Lock() d.lock.Lock()
d.noLockExpel()
d.lock.Unlock()
}
func (d *Dispatcher[P, M]) noLockExpel() {
d.expel = true d.expel = true
if d.mc <= 0 { if d.mc <= 0 {
d.abort <- struct{}{} d.abort <- struct{}{}
} }
d.lock.Unlock()
} }
// UnExpel 取消特定生产者的驱逐计划 // UnExpel 取消特定生产者的驱逐计划
func (d *Dispatcher[P, M]) UnExpel() { func (d *Dispatcher[P, M]) UnExpel() {
d.lock.Lock() d.lock.Lock()
d.expel = false d.noLockUnExpel()
d.lock.Unlock() d.lock.Unlock()
} }
func (d *Dispatcher[P, M]) noLockUnExpel() {
d.expel = false
}
// IncrCount 主动增量设置特定生产者的消息计数,这在等待异步消息完成后再关闭消息分发器时非常有用 // IncrCount 主动增量设置特定生产者的消息计数,这在等待异步消息完成后再关闭消息分发器时非常有用
// - 如果 i 为负数,则会减少消息计数
func (d *Dispatcher[P, M]) IncrCount(producer P, i int64) { func (d *Dispatcher[P, M]) IncrCount(producer P, i int64) {
d.lock.Lock() d.lock.Lock()
defer d.lock.Unlock()
d.mc += i d.mc += i
d.pmc[producer] += i d.pmc[producer] += i
if d.expel && d.mc <= 0 { if d.expel && d.mc <= 0 {
d.abort <- struct{}{} d.abort <- struct{}{}
} }
d.lock.Unlock()
} }
// Put 将消息放入分发器 // Put 将消息放入分发器
@ -129,7 +152,7 @@ func (d *Dispatcher[P, M]) Put(message M) {
d.buf.Write(message) d.buf.Write(message)
} }
// Start 以阻塞的方式开始进行消息分发,当消息分发器中没有任何消息时,会自动关闭 // Start 以阻塞的方式开始进行消息分发,当消息分发器中没有任何消息并且处于驱逐计划 Expel 时,会自动关闭
func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M] { func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M] {
go func(d *Dispatcher[P, M]) { go func(d *Dispatcher[P, M]) {
process: process:
@ -139,25 +162,33 @@ func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M] {
d.buf.Close() d.buf.Close()
break process break process
case message := <-d.buf.Read(): case message := <-d.buf.Read():
// 先取出生产者信息,避免处理函数中将消息释放
p := message.GetProducer()
d.handler(d, message) d.handler(d, message)
d.lock.Lock() d.lock.Lock()
d.mc-- d.mc--
p := message.GetProducer()
pmc := d.pmc[p] - 1 pmc := d.pmc[p] - 1
d.pmc[p] = pmc d.pmc[p] = pmc
if f := d.pmcF[p]; f != nil && pmc <= 0 { if f := d.pmcF[p]; f != nil && pmc <= 0 {
go f(p, d) func(producer P) {
defer func(producer P) {
if err := super.RecoverTransform(recover()); err != nil {
log.Error("Dispatcher.ProducerDoneHandler", log.Any("producer", producer), log.Err(err))
}
}(p)
f(p, &Action[P, M]{d: d, unlock: true})
}(p)
} }
if d.mc <= 0 && d.expel { if d.mc <= 0 && d.expel {
d.buf.Close() d.buf.Close()
d.lock.Unlock()
break process break process
} }
d.lock.Unlock() d.lock.Unlock()
} }
} }
closedHandler := *(d.closedHandler.Load()) if ch := d.closedHandler.Load(); ch != nil {
if closedHandler != nil { (*ch)(&Action[P, M]{d: d, unlock: true})
closedHandler(d)
} }
close(d.abort) close(d.abort)
}(d) }(d)

View File

@ -12,10 +12,13 @@ func NewManager[P Producer, M Message[P]](bufferSize int, handler Handler[P, M])
handler: handler, handler: handler,
dispatchers: make(map[string]*Dispatcher[P, M]), dispatchers: make(map[string]*Dispatcher[P, M]),
member: make(map[string]map[P]struct{}), member: make(map[string]map[P]struct{}),
sys: NewDispatcher(bufferSize, SystemName, handler).Start(), sys: NewDispatcher(bufferSize, SystemName, handler),
curr: make(map[P]*Dispatcher[P, M]), curr: make(map[P]*Dispatcher[P, M]),
size: bufferSize, size: bufferSize,
} }
mgr.sys.SetClosedHandler(func(dispatcher *Action[P, M]) {
mgr.w.Done()
}).Start()
return mgr return mgr
} }
@ -35,6 +38,14 @@ type Manager[P Producer, M Message[P]] struct {
createdHandler func(name string) createdHandler func(name string)
} }
// Wait 等待所有消息分发器关闭
func (m *Manager[P, M]) Wait() {
m.w.Wait()
m.w.Add(1)
m.sys.Expel()
m.w.Wait()
}
// SetDispatcherClosedHandler 设置消息分发器关闭时的回调函数 // SetDispatcherClosedHandler 设置消息分发器关闭时的回调函数
func (m *Manager[P, M]) SetDispatcherClosedHandler(handler func(name string)) *Manager[P, M] { func (m *Manager[P, M]) SetDispatcherClosedHandler(handler func(name string)) *Manager[P, M] {
m.closedHandler = handler m.closedHandler = handler
@ -111,15 +122,17 @@ func (m *Manager[P, M]) BindProducer(p P, name string) {
dispatcher, exist := m.dispatchers[name] dispatcher, exist := m.dispatchers[name]
if !exist { if !exist {
dispatcher = NewDispatcher(m.size, name, m.handler).SetClosedHandler(func(dispatcher *Dispatcher[P, M]) { m.w.Add(1)
dispatcher = NewDispatcher(m.size, name, m.handler).SetClosedHandler(func(dispatcher *Action[P, M]) {
// 消息分发器关闭时,将会将其从管理器中移除 // 消息分发器关闭时,将会将其从管理器中移除
m.lock.Lock() m.lock.Lock()
delete(m.dispatchers, dispatcher.name) delete(m.dispatchers, dispatcher.Name())
delete(m.member, dispatcher.name) delete(m.member, dispatcher.Name())
m.lock.Unlock() m.lock.Unlock()
if m.closedHandler != nil { if m.closedHandler != nil {
m.closedHandler(dispatcher.name) m.closedHandler(dispatcher.Name())
} }
m.w.Done()
}).Start() }).Start()
m.dispatchers[name] = dispatcher m.dispatchers[name] = dispatcher
defer func(m *Manager[P, M], name string) { defer func(m *Manager[P, M], name string) {
@ -135,18 +148,18 @@ func (m *Manager[P, M]) BindProducer(p P, name string) {
// UnBindProducer 解绑生产者使用特定的消息分发器 // UnBindProducer 解绑生产者使用特定的消息分发器
func (m *Manager[P, M]) UnBindProducer(p P) { func (m *Manager[P, M]) UnBindProducer(p P) {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock()
curr, exist := m.curr[p] curr, exist := m.curr[p]
m.lock.Unlock()
if !exist { if !exist {
return return
} }
curr.SetProducerDoneHandler(p, func(p P, dispatcher *Dispatcher[P, M]) { curr.SetProducerDoneHandler(p, func(p P, dispatcher *Action[P, M]) {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
delete(m.member[dispatcher.name], p) delete(m.member[dispatcher.Name()], p)
delete(m.curr, p) delete(m.curr, p)
if len(m.member[dispatcher.name]) == 0 { if len(m.member[dispatcher.Name()]) == 0 {
dispatcher.Expel() dispatcher.Expel()
} }
}) })

View File

@ -102,6 +102,7 @@ func (slf *Message) reset() {
slf.name = "" slf.name = ""
slf.t = 0 slf.t = 0
slf.marks = nil slf.marks = nil
slf.producer = ""
} }
// MessageType 返回消息类型 // MessageType 返回消息类型