diff --git a/server/internal/dispatcher/action.go b/server/internal/dispatcher/action.go new file mode 100644 index 0000000..ed943fe --- /dev/null +++ b/server/internal/dispatcher/action.go @@ -0,0 +1,30 @@ +package dispatcher + +// Action 消息分发器操作器,用于暴露外部可操作的消息分发器函数 +type Action[P Producer, M Message[P]] struct { + unlock bool + d *Dispatcher[P, M] +} + +// Name 获取消息分发器名称 +func (a *Action[P, M]) Name() string { + return a.d.Name() +} + +// UnExpel 取消特定生产者的驱逐计划 +func (a *Action[P, M]) UnExpel() { + if !a.unlock { + a.d.UnExpel() + } else { + a.d.noLockUnExpel() + } +} + +// Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭 +func (a *Action[P, M]) Expel() { + if !a.unlock { + a.d.Expel() + } else { + a.d.noLockExpel() + } +} diff --git a/server/internal/dispatcher/dispatcher.go b/server/internal/dispatcher/dispatcher.go index 4c6a4e0..c859c13 100644 --- a/server/internal/dispatcher/dispatcher.go +++ b/server/internal/dispatcher/dispatcher.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/alphadose/haxmap" "github.com/kercylan98/minotaur/utils/buffer" + "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/super" "sync" "sync/atomic" ) @@ -24,7 +26,7 @@ func NewDispatcher[P Producer, M Message[P]](bufferSize int, name string, handle handler: handler, uniques: haxmap.New[string, struct{}](), pmc: make(map[P]int64), - pmcF: make(map[P]func(p P, dispatcher *Dispatcher[P, M])), + pmcF: make(map[P]func(p P, dispatcher *Action[P, M])), abort: make(chan struct{}), } return d @@ -50,28 +52,40 @@ type Dispatcher[P Producer, M Message[P]] struct { expel bool mc int64 pmc map[P]int64 - pmcF map[P]func(p P, dispatcher *Dispatcher[P, M]) + pmcF map[P]func(p P, dispatcher *Action[P, M]) lock sync.RWMutex name string - closedHandler atomic.Pointer[func(dispatcher *Dispatcher[P, M])] + closedHandler atomic.Pointer[func(dispatcher *Action[P, M])] abort chan struct{} } // SetProducerDoneHandler 设置特定生产者所有消息处理完成时的回调函数 // - 如果 handler 为 nil,则会删除该生产者的回调函数 -func (d *Dispatcher[P, M]) SetProducerDoneHandler(p P, handler func(p P, dispatcher *Dispatcher[P, M])) *Dispatcher[P, M] { +// +// 需要注意的是,该 handler 中 +func (d *Dispatcher[P, M]) SetProducerDoneHandler(p P, handler func(p P, dispatcher *Action[P, M])) *Dispatcher[P, M] { d.lock.Lock() if handler == nil { delete(d.pmcF, p) } else { d.pmcF[p] = handler + if pmc := d.pmc[p]; pmc <= 0 { + func(producer P, handler func(p P, dispatcher *Action[P, M])) { + defer func(producer P) { + if err := super.RecoverTransform(recover()); err != nil { + log.Error("Dispatcher.ProducerDoneHandler", log.Any("producer", producer), log.Err(err)) + } + }(p) + handler(p, &Action[P, M]{d: d, unlock: true}) + }(p, handler) + } } d.lock.Unlock() return d } // SetClosedHandler 设置消息分发器关闭时的回调函数 -func (d *Dispatcher[P, M]) SetClosedHandler(handler func(dispatcher *Dispatcher[P, M])) *Dispatcher[P, M] { +func (d *Dispatcher[P, M]) SetClosedHandler(handler func(dispatcher *Action[P, M])) *Dispatcher[P, M] { d.closedHandler.Store(&handler) return d } @@ -95,29 +109,38 @@ func (d *Dispatcher[P, M]) AntiUnique(name string) { // Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭 func (d *Dispatcher[P, M]) Expel() { d.lock.Lock() + d.noLockExpel() + d.lock.Unlock() +} + +func (d *Dispatcher[P, M]) noLockExpel() { d.expel = true if d.mc <= 0 { d.abort <- struct{}{} } - d.lock.Unlock() } // UnExpel 取消特定生产者的驱逐计划 func (d *Dispatcher[P, M]) UnExpel() { d.lock.Lock() - d.expel = false + d.noLockUnExpel() d.lock.Unlock() } +func (d *Dispatcher[P, M]) noLockUnExpel() { + d.expel = false +} + // IncrCount 主动增量设置特定生产者的消息计数,这在等待异步消息完成后再关闭消息分发器时非常有用 +// - 如果 i 为负数,则会减少消息计数 func (d *Dispatcher[P, M]) IncrCount(producer P, i int64) { d.lock.Lock() + defer d.lock.Unlock() d.mc += i d.pmc[producer] += i if d.expel && d.mc <= 0 { d.abort <- struct{}{} } - d.lock.Unlock() } // Put 将消息放入分发器 @@ -129,7 +152,7 @@ func (d *Dispatcher[P, M]) Put(message M) { d.buf.Write(message) } -// Start 以阻塞的方式开始进行消息分发,当消息分发器中没有任何消息时,会自动关闭 +// Start 以非阻塞的方式开始进行消息分发,当消息分发器中没有任何消息并且处于驱逐计划 Expel 时,将会自动关闭 func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M] { go func(d *Dispatcher[P, M]) { process: @@ -139,25 +162,33 @@ func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M] { d.buf.Close() break process case message := <-d.buf.Read(): + // 先取出生产者信息,避免处理函数中将消息释放 + p := message.GetProducer() d.handler(d, message) d.lock.Lock() d.mc-- - p := message.GetProducer() pmc := d.pmc[p] - 1 d.pmc[p] = pmc if f := d.pmcF[p]; f != nil && pmc <= 0 { - go f(p, d) + func(producer P) { + defer func(producer P) { + if err := super.RecoverTransform(recover()); err != nil { + log.Error("Dispatcher.ProducerDoneHandler", log.Any("producer", producer), log.Err(err)) + } + }(p) + f(p, &Action[P, M]{d: d, unlock: true}) + }(p) } if d.mc <= 0 && d.expel { d.buf.Close() + d.lock.Unlock() break process } d.lock.Unlock() } } - closedHandler := *(d.closedHandler.Load()) - if closedHandler != nil { - closedHandler(d) + if ch := d.closedHandler.Load(); ch != nil { + (*ch)(&Action[P, M]{d: d, unlock: true}) } close(d.abort) }(d) diff --git a/server/internal/dispatcher/manager.go b/server/internal/dispatcher/manager.go index 84662e0..b605480 100644 --- a/server/internal/dispatcher/manager.go +++ b/server/internal/dispatcher/manager.go @@ -12,10 +12,13 @@ func NewManager[P Producer, M Message[P]](bufferSize int, handler Handler[P, M]) handler: handler, dispatchers: make(map[string]*Dispatcher[P, M]), member: make(map[string]map[P]struct{}), - sys: NewDispatcher(bufferSize, SystemName, handler).Start(), + sys: NewDispatcher(bufferSize, SystemName, handler), curr: make(map[P]*Dispatcher[P, M]), size: bufferSize, } + mgr.sys.SetClosedHandler(func(dispatcher *Action[P, M]) { + mgr.w.Done() + }).Start() return mgr } @@ -35,6 +38,14 @@ type Manager[P Producer, M Message[P]] struct { createdHandler func(name string) } +// Wait 等待所有消息分发器关闭 +func (m *Manager[P, M]) Wait() { + m.w.Wait() + m.w.Add(1) + m.sys.Expel() + m.w.Wait() +} + // SetDispatcherClosedHandler 设置消息分发器关闭时的回调函数 func (m *Manager[P, M]) SetDispatcherClosedHandler(handler func(name string)) *Manager[P, M] { m.closedHandler = handler @@ -111,15 +122,17 @@ func (m *Manager[P, M]) BindProducer(p P, name string) { dispatcher, exist := m.dispatchers[name] if !exist { - dispatcher = NewDispatcher(m.size, name, m.handler).SetClosedHandler(func(dispatcher *Dispatcher[P, M]) { + m.w.Add(1) + dispatcher = NewDispatcher(m.size, name, m.handler).SetClosedHandler(func(dispatcher *Action[P, M]) { // 消息分发器关闭时,将会将其从管理器中移除 m.lock.Lock() - delete(m.dispatchers, dispatcher.name) - delete(m.member, dispatcher.name) + delete(m.dispatchers, dispatcher.Name()) + delete(m.member, dispatcher.Name()) m.lock.Unlock() if m.closedHandler != nil { - m.closedHandler(dispatcher.name) + m.closedHandler(dispatcher.Name()) } + m.w.Done() }).Start() m.dispatchers[name] = dispatcher defer func(m *Manager[P, M], name string) { @@ -135,18 +148,18 @@ func (m *Manager[P, M]) BindProducer(p P, name string) { // UnBindProducer 解绑生产者使用特定的消息分发器 func (m *Manager[P, M]) UnBindProducer(p P) { m.lock.Lock() - defer m.lock.Unlock() curr, exist := m.curr[p] + m.lock.Unlock() if !exist { return } - curr.SetProducerDoneHandler(p, func(p P, dispatcher *Dispatcher[P, M]) { + curr.SetProducerDoneHandler(p, func(p P, dispatcher *Action[P, M]) { m.lock.Lock() defer m.lock.Unlock() - delete(m.member[dispatcher.name], p) + delete(m.member[dispatcher.Name()], p) delete(m.curr, p) - if len(m.member[dispatcher.name]) == 0 { + if len(m.member[dispatcher.Name()]) == 0 { dispatcher.Expel() } }) diff --git a/server/message.go b/server/message.go index a8ea6b9..e86ebe1 100644 --- a/server/message.go +++ b/server/message.go @@ -102,6 +102,7 @@ func (slf *Message) reset() { slf.name = "" slf.t = 0 slf.marks = nil + slf.producer = "" } // MessageType 返回消息类型