fix: 修复 server.WithDispatcherBufferSize 过小的情况下,在消息中发布新消息导致永久阻塞的问题

This commit is contained in:
kercylan98 2023-12-26 10:57:53 +08:00
parent 556d1cdc02
commit b39625c0cb
1 changed files with 63 additions and 22 deletions

View File

@ -1,65 +1,106 @@
package server package server
import ( import (
"context"
"github.com/alphadose/haxmap" "github.com/alphadose/haxmap"
"sync"
) )
var dispatcherUnique = struct{}{} var dispatcherUnique = struct{}{}
// generateDispatcher 生成消息分发器 // generateDispatcher 生成消息分发器
func generateDispatcher(size int, name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher { func generateDispatcher(size int, name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
return &dispatcher{ d := &dispatcher{
name: name, name: name,
buffer: make(chan *Message, size), buffer: make(chan *Message, size),
handler: handler, handler: handler,
uniques: haxmap.New[string, struct{}](), uniques: haxmap.New[string, struct{}](),
queueMutex: new(sync.Mutex),
} }
d.ctx, d.cancel = context.WithCancel(context.Background())
d.queueCond = sync.NewCond(d.queueMutex)
return d
} }
// dispatcher 消息分发器 // dispatcher 消息分发器
type dispatcher struct { type dispatcher struct {
name string name string
buffer chan *Message buffer chan *Message
uniques *haxmap.Map[string, struct{}] uniques *haxmap.Map[string, struct{}]
handler func(dispatcher *dispatcher, message *Message) handler func(dispatcher *dispatcher, message *Message)
ctx context.Context
cancel context.CancelFunc
queue []*Message
queueMutex *sync.Mutex
queueCond *sync.Cond
} }
func (slf *dispatcher) unique(name string) bool { func (d *dispatcher) unique(name string) bool {
_, loaded := slf.uniques.GetOrSet(name, dispatcherUnique) _, loaded := d.uniques.GetOrSet(name, dispatcherUnique)
return loaded return loaded
} }
func (slf *dispatcher) antiUnique(name string) { func (d *dispatcher) antiUnique(name string) {
slf.uniques.Del(name) d.uniques.Del(name)
} }
func (slf *dispatcher) start() { func (d *dispatcher) start() {
d.process()
for { for {
select { select {
case message, ok := <-slf.buffer: case message, ok := <-d.buffer:
if !ok { if !ok {
return return
} }
slf.handler(slf, message) d.handler(d, message)
} }
} }
} }
func (slf *dispatcher) put(message *Message) { func (d *dispatcher) process() {
slf.buffer <- message go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
d.queueMutex.Lock()
if len(d.queue) == 0 {
d.queueCond.Wait()
}
messages := make([]*Message, len(d.queue))
copy(messages, d.queue)
d.queue = d.queue[:0]
d.queueMutex.Unlock()
for _, message := range messages {
select {
case d.buffer <- message:
}
}
}
}
}(d.ctx)
} }
func (slf *dispatcher) close() { func (d *dispatcher) put(message *Message) {
close(slf.buffer) d.queueMutex.Lock()
d.queue = append(d.queue, message)
d.queueCond.Signal()
defer d.queueMutex.Unlock()
} }
func (slf *dispatcher) transfer(target *dispatcher) { func (d *dispatcher) close() {
close(d.buffer)
d.cancel()
}
func (d *dispatcher) transfer(target *dispatcher) {
if target == nil { if target == nil {
return return
} }
for { for {
select { select {
case message, ok := <-slf.buffer: case message, ok := <-d.buffer:
if !ok { if !ok {
return return
} }