vRp.CD2g_test/server/internal/dispatcher/dispatcher.go

209 lines
6.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dispatcher
import (
"fmt"
"github.com/alphadose/haxmap"
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
"sync"
"sync/atomic"
)
var unique = struct{}{}
// Handler 消息处理器
type Handler[P Producer, M Message[P]] func(dispatcher *Dispatcher[P, M], message M)
// NewDispatcher 创建一个新的消息分发器 Dispatcher 实例
func NewDispatcher[P Producer, M Message[P]](bufferSize int, name string, handler Handler[P, M]) *Dispatcher[P, M] {
if bufferSize <= 0 || handler == nil {
panic(fmt.Errorf("bufferSize must be greater than 0 and handler must not be nil, but got bufferSize: %d, handler is nil: %v", bufferSize, handler == nil))
}
d := &Dispatcher[P, M]{
name: name,
buf: buffer.NewRingUnbounded[M](bufferSize),
handler: handler,
uniques: haxmap.New[string, struct{}](),
pmc: make(map[P]int64),
pmcF: make(map[P]func(p P, dispatcher *Action[P, M])),
abort: make(chan struct{}),
}
return d
}
// Dispatcher 用于服务器消息处理的消息分发器
//
// 这个消息分发器为并发安全的生产者和消费者模型,生产者可以是任意类型,消费者必须是 Message 接口的实现。
// 生产者可以通过 Put 方法并发安全地将消息放入消息分发器,消息执行过程不会阻塞到 Put 方法,同时允许在 Start 方法之前调用 Put 方法。
// 在执行 Start 方法后,消息分发器会阻塞地从消息缓冲区中读取消息,然后执行消息处理器,消息处理器的执行过程不会阻塞到消息的生产。
//
// 为了保证消息不丢失,内部采用了 buffer.RingUnbounded 作为缓冲区实现,并且消息分发器不提供 Close 方法。
// 如果需要关闭消息分发器,可以通过 Expel 方法设置驱逐计划,当消息分发器中没有任何消息时,将会被释放。
// 同时,也可以使用 UnExpel 方法取消驱逐计划。
//
// 为什么提供 Expel 和 UnExpel 方法:
// - 在连接断开时,当需要执行一系列消息处理时,如果直接关闭消息分发器,可能会导致消息丢失。所以提供了 Expel 方法,可以在消息处理完成后再关闭消息分发器。
// - 当消息还未处理完成时连接重连,如果没有取消驱逐计划,可能会导致消息分发器被关闭。所以提供了 UnExpel 方法,可以在连接重连后取消驱逐计划。
type Dispatcher[P Producer, M Message[P]] struct {
buf *buffer.RingUnbounded[M]
uniques *haxmap.Map[string, struct{}]
handler Handler[P, M]
expel bool
mc int64
pmc map[P]int64
pmcF map[P]func(p P, dispatcher *Action[P, M])
lock sync.RWMutex
name string
closedHandler atomic.Pointer[func(dispatcher *Action[P, M])]
abort chan struct{}
}
// SetProducerDoneHandler 设置特定生产者的所有消息处理完成时的回调函数
// - 如果 handler 为 nil则会删除该生产者的回调函数
func (d *Dispatcher[P, M]) SetProducerDoneHandler(p P, handler func(p P, dispatcher *Action[P, M])) *Dispatcher[P, M] {
d.lock.Lock()
if handler == nil {
delete(d.pmcF, p)
} else {
d.pmcF[p] = handler
if pmc := d.pmc[p]; pmc <= 0 {
func(producer P, handler func(p P, dispatcher *Action[P, M])) {
defer func(producer P) {
if err := super.RecoverTransform(recover()); err != nil {
log.Error("Dispatcher.ProducerDoneHandler", log.Any("producer", producer), log.Err(err))
}
}(p)
handler(p, &Action[P, M]{d: d, unlock: true})
}(p, handler)
}
}
d.lock.Unlock()
return d
}
// SetClosedHandler 设置消息分发器关闭时的回调函数
func (d *Dispatcher[P, M]) SetClosedHandler(handler func(dispatcher *Action[P, M])) *Dispatcher[P, M] {
d.closedHandler.Store(&handler)
return d
}
// Name 获取消息分发器名称
func (d *Dispatcher[P, M]) Name() string {
return d.name
}
// Unique 设置唯一消息键,返回是否已存在
func (d *Dispatcher[P, M]) Unique(name string) bool {
_, loaded := d.uniques.GetOrSet(name, unique)
return loaded
}
// AntiUnique 取消唯一消息键
func (d *Dispatcher[P, M]) AntiUnique(name string) {
d.uniques.Del(name)
}
// Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭
func (d *Dispatcher[P, M]) Expel() {
d.lock.Lock()
d.noLockExpel()
d.lock.Unlock()
}
func (d *Dispatcher[P, M]) noLockExpel() {
d.expel = true
if d.mc <= 0 {
close(d.abort)
}
}
// UnExpel 取消特定生产者的驱逐计划
func (d *Dispatcher[P, M]) UnExpel() {
d.lock.Lock()
d.noLockUnExpel()
d.lock.Unlock()
}
func (d *Dispatcher[P, M]) noLockUnExpel() {
d.expel = false
}
// IncrCount 主动增量设置特定生产者的消息计数,这在等待异步消息完成后再关闭消息分发器时非常有用
// - 如果 i 为负数,则会减少消息计数
func (d *Dispatcher[P, M]) IncrCount(producer P, i int64) {
d.lock.Lock()
defer d.lock.Unlock()
d.mc += i
pmc := d.pmc[producer] + i
d.pmc[producer] = pmc
if d.mc <= 0 {
if f := d.pmcF[producer]; f != nil && pmc <= 0 {
func(producer P) {
defer func(producer P) {
if err := super.RecoverTransform(recover()); err != nil {
log.Error("Dispatcher.ProducerDoneHandler", log.Any("producer", producer), log.Err(err))
}
}(producer)
f(producer, &Action[P, M]{d: d, unlock: true})
}(producer)
}
}
}
// Put 将消息放入分发器
func (d *Dispatcher[P, M]) Put(message M) {
d.lock.Lock()
d.mc++
d.pmc[message.GetProducer()]++
d.lock.Unlock()
d.buf.Write(message)
}
// Start 以非阻塞的方式开始进行消息分发,当消息分发器中没有任何消息并且处于驱逐计划 Expel 时,将会自动关闭
func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M] {
go func(d *Dispatcher[P, M]) {
process:
for {
select {
case <-d.abort:
d.buf.Close()
break process
case message := <-d.buf.Read():
// 先取出生产者信息,避免处理函数中将消息释放
p := message.GetProducer()
d.handler(d, message)
d.lock.Lock()
d.mc--
pmc := d.pmc[p] - 1
d.pmc[p] = pmc
if f := d.pmcF[p]; f != nil && pmc <= 0 {
func(producer P) {
defer func(producer P) {
if err := super.RecoverTransform(recover()); err != nil {
log.Error("Dispatcher.ProducerDoneHandler", log.Any("producer", producer), log.Err(err))
}
}(p)
f(p, &Action[P, M]{d: d, unlock: true})
}(p)
}
if d.mc <= 0 && d.expel {
d.buf.Close()
d.lock.Unlock()
break process
}
d.lock.Unlock()
}
}
if ch := d.closedHandler.Load(); ch != nil {
(*ch)(&Action[P, M]{d: d, unlock: true})
}
}(d)
return d
}
// Closed 判断消息分发器是否已关闭
func (d *Dispatcher[P, M]) Closed() bool {
return d.buf.Closed()
}