diff --git a/server/internal/v2/balancer/balancer.go b/server/internal/v2/balancer/balancer.go deleted file mode 100644 index c530e24..0000000 --- a/server/internal/v2/balancer/balancer.go +++ /dev/null @@ -1,20 +0,0 @@ -package balancer - -type Item[Id comparable] interface { - // Id 返回唯一标识 - Id() Id - - // Weight 返回权重 - Weight() int -} - -type Balancer[Id comparable, T Item[Id]] interface { - // Add 添加一个负载均衡目标 - Add(t T) - - // Remove 移除一个负载均衡目标 - Remove(t T) - - // Next 根据负载均衡策略选择下一个目标 - Next() T -} diff --git a/server/internal/v2/balancer/round_robin.go b/server/internal/v2/balancer/round_robin.go deleted file mode 100644 index fa5b718..0000000 --- a/server/internal/v2/balancer/round_robin.go +++ /dev/null @@ -1,46 +0,0 @@ -package balancer - -import "sync" - -func NewRoundRobin[Id comparable, T Item[Id]]() *RoundRobin[Id, T] { - -} - -type RoundRobin[Id comparable, T Item[Id]] struct { - ref map[Id]int - items []T - rw sync.RWMutex - curr int -} - -func (r *RoundRobin[Id, T]) Add(t T) { - r.rw.Lock() - defer r.rw.Unlock() - _, exist := r.ref[t.Id()] - if exist { - return - } - r.ref[t.Id()] = len(r.items) - r.items = append(r.items, t) -} - -func (r *RoundRobin[Id, T]) Remove(t T) { - r.rw.Lock() - defer r.rw.Unlock() - index, exist := r.ref[t.Id()] - if !exist { - return - } - r.items = append(r.items[:index], r.items[index+1:]...) - delete(r.ref, t.Id()) -} - -func (r *RoundRobin[Id, T]) Next() T { - r.rw.RLock() - defer r.rw.RUnlock() - if r.curr >= len(r.items) { - r.curr = 0 - } - t := r.items[r.curr] - r.curr++ -} diff --git a/server/internal/v2/loadbalancer/round_robin.go b/server/internal/v2/loadbalancer/round_robin.go index 40d946b..b37b24f 100644 --- a/server/internal/v2/loadbalancer/round_robin.go +++ b/server/internal/v2/loadbalancer/round_robin.go @@ -76,3 +76,23 @@ func (r *RoundRobin[Id, T]) Next() (t T) { r.curr = r.curr.Next return r.curr.Value } + +func (r *RoundRobin[Id, T]) Refresh() { + r.rw.Lock() + defer r.rw.Unlock() + + if r.head == nil { + return + } + + curr := r.head + for i := 0; i < r.size; i++ { + if curr.Value.Id() == r.curr.Value.Id() { + r.curr = curr + return + } + curr = curr.Next + } + + r.curr = r.head +} diff --git a/server/internal/v2/reactor/handlers.go b/server/internal/v2/reactor/handlers.go index e43bc67..59aa4a7 100644 --- a/server/internal/v2/reactor/handlers.go +++ b/server/internal/v2/reactor/handlers.go @@ -1,6 +1,6 @@ package reactor -type queueMessageHandler[M any] func(q *queue[M], msg M) +type queueMessageHandler[M any] func(q *queue[M], ident *identifiable, msg M) type MessageHandler[M any] func(msg M) diff --git a/server/internal/v2/reactor/queue.go b/server/internal/v2/reactor/queue.go index b34ea20..68d8d4b 100644 --- a/server/internal/v2/reactor/queue.go +++ b/server/internal/v2/reactor/queue.go @@ -9,9 +9,9 @@ import ( func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] { q := &queue[M]{ - c: make(chan M, chanSize), - buf: buffer.NewRing[M](bufferSize), - rw: sync.NewCond(&sync.Mutex{}), + c: make(chan queueMessage[M], chanSize), + buf: buffer.NewRing[queueMessage[M]](bufferSize), + cond: sync.NewCond(&sync.Mutex{}), } q.QueueState = &QueueState[M]{ queue: q, @@ -23,49 +23,60 @@ func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] { type queue[M any] struct { *QueueState[M] - c chan M // 通道 - buf *buffer.Ring[M] // 缓冲区 - rw *sync.Cond // 读写锁 + c chan queueMessage[M] // 通道 + buf *buffer.Ring[queueMessage[M]] // 缓冲区 + cond *sync.Cond // 条件变量 + closedHandler func(q *queue[M]) // 关闭处理函数 } func (q *queue[M]) Id() int { return q.idx } +func (q *queue[M]) setClosedHandler(handler func(q *queue[M])) { + q.closedHandler = handler +} + func (q *queue[M]) run() { atomic.StoreInt32(&q.status, QueueStatusRunning) defer func(q *queue[M]) { atomic.StoreInt32(&q.status, QueueStatusClosed) + if q.closedHandler != nil { + q.closedHandler(q) + } }(q) for { - q.rw.L.Lock() + q.cond.L.Lock() for q.buf.IsEmpty() { if atomic.LoadInt32(&q.status) >= QueueStatusClosing { - q.rw.L.Unlock() + q.cond.L.Unlock() close(q.c) return } - q.rw.Wait() + q.cond.Wait() } items := q.buf.ReadAll() - q.rw.L.Unlock() + q.cond.L.Unlock() for _, item := range items { q.c <- item } } } -func (q *queue[M]) push(m M) error { - if atomic.LoadInt32(&q.status) != QueueStatusRunning { +func (q *queue[M]) push(ident *identifiable, m M) error { + if atomic.LoadInt32(&q.status) > QueueStatusRunning { return errors.New("queue status exception") } - q.rw.L.Lock() - q.buf.Write(m) - q.rw.Signal() - q.rw.L.Unlock() + q.cond.L.Lock() + q.buf.Write(queueMessage[M]{ + ident: ident, + msg: m, + }) + q.cond.Signal() + q.cond.L.Unlock() return nil } -func (q *queue[M]) read() <-chan M { +func (q *queue[M]) read() <-chan queueMessage[M] { return q.c } diff --git a/server/internal/v2/reactor/queue_message.go b/server/internal/v2/reactor/queue_message.go new file mode 100644 index 0000000..930795b --- /dev/null +++ b/server/internal/v2/reactor/queue_message.go @@ -0,0 +1,6 @@ +package reactor + +type queueMessage[M any] struct { + ident *identifiable + msg M +} diff --git a/server/internal/v2/reactor/queue_state.go b/server/internal/v2/reactor/queue_state.go index 93de96a..72926d4 100644 --- a/server/internal/v2/reactor/queue_state.go +++ b/server/internal/v2/reactor/queue_state.go @@ -35,4 +35,5 @@ func (q *QueueState[M]) IsRunning() bool { // Close 关闭队列 func (q *QueueState[M]) Close() { atomic.CompareAndSwapInt32(&q.status, QueueStatusRunning, QueueStatusClosing) + q.queue.cond.Broadcast() } diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go index ffbf4e2..92e893c 100644 --- a/server/internal/v2/reactor/reactor.go +++ b/server/internal/v2/reactor/reactor.go @@ -10,22 +10,43 @@ import ( "runtime" "runtime/debug" "sync" - "time" + "sync/atomic" ) +const ( + StatusNone = iota - 1 // 事件循环未运行 + StatusRunning // 事件循环运行中 + StatusClosing // 事件循环关闭中 + StatusClosed // 事件循环已关闭 +) + +var sysIdent = &identifiable{ident: "system"} + // NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列 -func NewReactor[M any](systemQueueSize, socketQueueSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { +func NewReactor[M any](systemQueueSize, queueSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { r := &Reactor[M]{ - logger: log.Default().Logger, - systemQueue: newQueue[M](-1, systemQueueSize, 1024*16), - identifiers: haxmap.New[string, int](), - lb: loadbalancer.NewRoundRobin[int, *queue[M]](), - errorHandler: errorHandler, - socketQueueSize: socketQueueSize, + logger: log.Default().Logger, + systemQueue: newQueue[M](-1, systemQueueSize, 1024), + identifiers: haxmap.New[string, *identifiable](), + lb: loadbalancer.NewRoundRobin[int, *queue[M]](), + errorHandler: errorHandler, + queueSize: queueSize, + state: StatusNone, } - r.handler = func(q *queue[M], msg M) { - defer func(msg M) { + defaultNum := runtime.NumCPU() + if defaultNum < 1 { + defaultNum = 1 + } + + r.queueRW.Lock() + for i := 0; i < defaultNum; i++ { + r.noneLockAddQueue() + } + r.queueRW.Unlock() + + r.handler = func(q *queue[M], ident *identifiable, msg M) { + defer func(ident *identifiable, msg M) { if err := super.RecoverTransform(recover()); err != nil { defer func(msg M) { if err = super.RecoverTransform(recover()); err != nil { @@ -40,12 +61,17 @@ func NewReactor[M any](systemQueueSize, socketQueueSize int, handler MessageHand debug.PrintStack() } } - }(msg) - var startedAt = time.Now() + + if atomic.AddInt64(&ident.n, -1) == 0 { + r.queueRW.Lock() + r.identifiers.Del(ident.ident) + r.queueRW.Unlock() + } + + }(ident, msg) if handler != nil { handler(msg) } - r.log(log.String("action", "handle"), log.Int("queue", q.Id()), log.Int64("cost/ns", time.Since(startedAt).Nanoseconds())) } return r @@ -53,16 +79,19 @@ func NewReactor[M any](systemQueueSize, socketQueueSize int, handler MessageHand // Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列 type Reactor[M any] struct { - logger *slog.Logger // 日志记录器 - systemQueue *queue[M] // 系统级别的队列 - socketQueueSize int // Socket 队列大小 - queues []*queue[M] // Socket 使用的队列 - identifiers *haxmap.Map[string, int] // 标识符到队列索引的映射 - lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器 - wg sync.WaitGroup // 等待组 - handler queueMessageHandler[M] // 消息处理器 - errorHandler ErrorHandler[M] // 错误处理器 - debug bool // 是否开启调试模式 + logger *slog.Logger // 日志记录器 + state int32 // 状态 + systemQueue *queue[M] // 系统级别的队列 + queueSize int // Socket 队列大小 + queues []*queue[M] // Socket 使用的队列 + queueRW sync.RWMutex // 队列读写锁 + identifiers *haxmap.Map[string, *identifiable] // 标识符到队列索引的映射及消息计数 + lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器 + wg sync.WaitGroup // 等待组 + cwg sync.WaitGroup // 关闭等待组 + handler queueMessageHandler[M] // 消息处理器 + errorHandler ErrorHandler[M] // 错误处理器 + debug bool // 是否开启调试模式 } // SetLogger 设置日志记录器 @@ -79,39 +108,49 @@ func (r *Reactor[M]) SetDebug(debug bool) *Reactor[M] { // SystemDispatch 将消息分发到系统级别的队列 func (r *Reactor[M]) SystemDispatch(msg M) error { - return r.systemQueue.push(msg) + if atomic.LoadInt32(&r.state) > StatusRunning { + r.queueRW.RUnlock() + return fmt.Errorf("reactor closing or closed") + } + return r.systemQueue.push(sysIdent, msg) } -// Dispatch 将消息分发到 identifier 使用的队列,当 identifier 首次使用时,将会根据负载均衡策略选择一个队列 -func (r *Reactor[M]) Dispatch(identifier string, msg M) error { - next := r.lb.Next() - if next == nil { - return r.Dispatch(identifier, msg) +// Dispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列 +func (r *Reactor[M]) Dispatch(ident string, msg M) error { + r.queueRW.RLock() + if atomic.LoadInt32(&r.state) > StatusRunning { + r.queueRW.RUnlock() + return fmt.Errorf("reactor closing or closed") } - idx, _ := r.identifiers.GetOrSet(identifier, next.Id()) - q := r.queues[idx] - r.log(log.String("action", "dispatch"), log.String("identifier", identifier), log.Int("queue", q.Id())) - return q.push(msg) + next := r.lb.Next() + i, _ := r.identifiers.GetOrSet(ident, &identifiable{ident: ident}) + q := r.queues[next.Id()] + atomic.AddInt64(&i.n, 1) + r.queueRW.RUnlock() + return q.push(i, msg) } // Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列 func (r *Reactor[M]) Run() { - r.initQueue(r.systemQueue) - for i := 0; i < runtime.NumCPU(); i++ { - r.addQueue() + if !atomic.CompareAndSwapInt32(&r.state, StatusNone, StatusRunning) { + return } + r.queueRW.Lock() + r.runQueue(r.systemQueue) + for i := 0; i < len(r.queues); i++ { + r.runQueue(r.queues[i]) + } + r.queueRW.Unlock() r.wg.Wait() } -func (r *Reactor[M]) addQueue() { - r.log(log.String("action", "add queue"), log.Int("queue", len(r.queues))) - r.wg.Add(1) - q := newQueue[M](len(r.queues), r.socketQueueSize, 1024*8) - r.initQueue(q) +func (r *Reactor[M]) noneLockAddQueue() { + q := newQueue[M](len(r.queues), r.queueSize, 1024*8) + r.lb.Add(q) // 运行前添加到负载均衡器,未运行时允许接收消息 r.queues = append(r.queues, q) } -func (r *Reactor[M]) removeQueue(q *queue[M]) { +func (r *Reactor[M]) noneLockDelQueue(q *queue[M]) { idx := q.Id() if idx < 0 || idx >= len(r.queues) || r.queues[idx] != q { return @@ -120,34 +159,35 @@ func (r *Reactor[M]) removeQueue(q *queue[M]) { for i := idx; i < len(r.queues); i++ { r.queues[i].idx = i } - r.log(log.String("action", "remove queue"), log.Int("queue", len(r.queues))) } -func (r *Reactor[M]) initQueue(q *queue[M]) { +func (r *Reactor[M]) runQueue(q *queue[M]) { r.wg.Add(1) + q.setClosedHandler(func(q *queue[M]) { + // 关闭时正在等待关闭完成,外部已加锁,无需再次加锁 + r.noneLockDelQueue(q) + r.cwg.Done() + }) + go q.run() + go func(r *Reactor[M], q *queue[M]) { defer r.wg.Done() - go q.run() - if q.idx >= 0 { - r.lb.Add(q) - } for m := range q.read() { - r.handler(q, m) + r.handler(q, m.ident, m.msg) } }(r, q) - r.log(log.String("action", "run queue"), log.Int("queue", q.Id())) } func (r *Reactor[M]) Close() { - queues := append(r.queues, r.systemQueue) - for _, q := range queues { - q.Close() - } -} - -func (r *Reactor[M]) log(args ...any) { - if !r.debug { + if !atomic.CompareAndSwapInt32(&r.state, StatusRunning, StatusClosing) { return } - r.logger.Debug("Reactor", args...) + r.queueRW.Lock() + r.cwg.Add(len(r.queues) + 1) + for _, q := range append(r.queues, r.systemQueue) { + q.Close() + } + r.cwg.Wait() + atomic.StoreInt32(&r.state, StatusClosed) + r.queueRW.Unlock() } diff --git a/server/internal/v2/reactor/reactor_test.go b/server/internal/v2/reactor/reactor_test.go index 3f41865..7c37f93 100644 --- a/server/internal/v2/reactor/reactor_test.go +++ b/server/internal/v2/reactor/reactor_test.go @@ -3,12 +3,12 @@ package reactor_test import ( "github.com/kercylan98/minotaur/server/internal/v2/reactor" "github.com/kercylan98/minotaur/utils/random" + "github.com/kercylan98/minotaur/utils/times" "testing" "time" ) func BenchmarkReactor_Dispatch(b *testing.B) { - var r = reactor.NewReactor(1024*16, 1024, func(msg func()) { msg() }, func(msg func(), err error) { @@ -20,9 +20,8 @@ func BenchmarkReactor_Dispatch(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { if err := r.Dispatch(random.HostName(), func() { - }); err != nil { - + return } } }) @@ -33,7 +32,7 @@ func TestReactor_Dispatch(t *testing.T) { msg() }, func(msg func(), err error) { t.Error(err) - }).SetDebug(false) + }).SetDebug(true) go r.Run() @@ -41,16 +40,16 @@ func TestReactor_Dispatch(t *testing.T) { go func() { id := random.HostName() for { - // 每秒 50 次 time.Sleep(time.Millisecond * 20) if err := r.Dispatch(id, func() { }); err != nil { - t.Error(err) + return } } }() } - time.Sleep(time.Second * 10) + time.Sleep(times.Second) + r.Close() } diff --git a/server/internal/v2/reactor/ref.go b/server/internal/v2/reactor/ref.go new file mode 100644 index 0000000..848e6e3 --- /dev/null +++ b/server/internal/v2/reactor/ref.go @@ -0,0 +1,6 @@ +package reactor + +type identifiable struct { + ident string // 标识 + n int64 // 消息数量 +}