other: reactor 内存优化

This commit is contained in:
kercylan 2024-03-31 21:34:35 +08:00
parent 92f30cdadc
commit 5b0ea566d5
10 changed files with 167 additions and 150 deletions

View File

@ -1,20 +0,0 @@
package balancer
type Item[Id comparable] interface {
// Id 返回唯一标识
Id() Id
// Weight 返回权重
Weight() int
}
type Balancer[Id comparable, T Item[Id]] interface {
// Add 添加一个负载均衡目标
Add(t T)
// Remove 移除一个负载均衡目标
Remove(t T)
// Next 根据负载均衡策略选择下一个目标
Next() T
}

View File

@ -1,46 +0,0 @@
package balancer
import "sync"
func NewRoundRobin[Id comparable, T Item[Id]]() *RoundRobin[Id, T] {
}
type RoundRobin[Id comparable, T Item[Id]] struct {
ref map[Id]int
items []T
rw sync.RWMutex
curr int
}
func (r *RoundRobin[Id, T]) Add(t T) {
r.rw.Lock()
defer r.rw.Unlock()
_, exist := r.ref[t.Id()]
if exist {
return
}
r.ref[t.Id()] = len(r.items)
r.items = append(r.items, t)
}
func (r *RoundRobin[Id, T]) Remove(t T) {
r.rw.Lock()
defer r.rw.Unlock()
index, exist := r.ref[t.Id()]
if !exist {
return
}
r.items = append(r.items[:index], r.items[index+1:]...)
delete(r.ref, t.Id())
}
func (r *RoundRobin[Id, T]) Next() T {
r.rw.RLock()
defer r.rw.RUnlock()
if r.curr >= len(r.items) {
r.curr = 0
}
t := r.items[r.curr]
r.curr++
}

View File

@ -76,3 +76,23 @@ func (r *RoundRobin[Id, T]) Next() (t T) {
r.curr = r.curr.Next r.curr = r.curr.Next
return r.curr.Value return r.curr.Value
} }
func (r *RoundRobin[Id, T]) Refresh() {
r.rw.Lock()
defer r.rw.Unlock()
if r.head == nil {
return
}
curr := r.head
for i := 0; i < r.size; i++ {
if curr.Value.Id() == r.curr.Value.Id() {
r.curr = curr
return
}
curr = curr.Next
}
r.curr = r.head
}

View File

@ -1,6 +1,6 @@
package reactor package reactor
type queueMessageHandler[M any] func(q *queue[M], msg M) type queueMessageHandler[M any] func(q *queue[M], ident *identifiable, msg M)
type MessageHandler[M any] func(msg M) type MessageHandler[M any] func(msg M)

View File

@ -9,9 +9,9 @@ import (
func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] { func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] {
q := &queue[M]{ q := &queue[M]{
c: make(chan M, chanSize), c: make(chan queueMessage[M], chanSize),
buf: buffer.NewRing[M](bufferSize), buf: buffer.NewRing[queueMessage[M]](bufferSize),
rw: sync.NewCond(&sync.Mutex{}), cond: sync.NewCond(&sync.Mutex{}),
} }
q.QueueState = &QueueState[M]{ q.QueueState = &QueueState[M]{
queue: q, queue: q,
@ -23,49 +23,60 @@ func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] {
type queue[M any] struct { type queue[M any] struct {
*QueueState[M] *QueueState[M]
c chan M // 通道 c chan queueMessage[M] // 通道
buf *buffer.Ring[M] // 缓冲区 buf *buffer.Ring[queueMessage[M]] // 缓冲区
rw *sync.Cond // 读写锁 cond *sync.Cond // 条件变量
closedHandler func(q *queue[M]) // 关闭处理函数
} }
func (q *queue[M]) Id() int { func (q *queue[M]) Id() int {
return q.idx return q.idx
} }
func (q *queue[M]) setClosedHandler(handler func(q *queue[M])) {
q.closedHandler = handler
}
func (q *queue[M]) run() { func (q *queue[M]) run() {
atomic.StoreInt32(&q.status, QueueStatusRunning) atomic.StoreInt32(&q.status, QueueStatusRunning)
defer func(q *queue[M]) { defer func(q *queue[M]) {
atomic.StoreInt32(&q.status, QueueStatusClosed) atomic.StoreInt32(&q.status, QueueStatusClosed)
if q.closedHandler != nil {
q.closedHandler(q)
}
}(q) }(q)
for { for {
q.rw.L.Lock() q.cond.L.Lock()
for q.buf.IsEmpty() { for q.buf.IsEmpty() {
if atomic.LoadInt32(&q.status) >= QueueStatusClosing { if atomic.LoadInt32(&q.status) >= QueueStatusClosing {
q.rw.L.Unlock() q.cond.L.Unlock()
close(q.c) close(q.c)
return return
} }
q.rw.Wait() q.cond.Wait()
} }
items := q.buf.ReadAll() items := q.buf.ReadAll()
q.rw.L.Unlock() q.cond.L.Unlock()
for _, item := range items { for _, item := range items {
q.c <- item q.c <- item
} }
} }
} }
func (q *queue[M]) push(m M) error { func (q *queue[M]) push(ident *identifiable, m M) error {
if atomic.LoadInt32(&q.status) != QueueStatusRunning { if atomic.LoadInt32(&q.status) > QueueStatusRunning {
return errors.New("queue status exception") return errors.New("queue status exception")
} }
q.rw.L.Lock() q.cond.L.Lock()
q.buf.Write(m) q.buf.Write(queueMessage[M]{
q.rw.Signal() ident: ident,
q.rw.L.Unlock() msg: m,
})
q.cond.Signal()
q.cond.L.Unlock()
return nil return nil
} }
func (q *queue[M]) read() <-chan M { func (q *queue[M]) read() <-chan queueMessage[M] {
return q.c return q.c
} }

View File

@ -0,0 +1,6 @@
package reactor
type queueMessage[M any] struct {
ident *identifiable
msg M
}

View File

@ -35,4 +35,5 @@ func (q *QueueState[M]) IsRunning() bool {
// Close 关闭队列 // Close 关闭队列
func (q *QueueState[M]) Close() { func (q *QueueState[M]) Close() {
atomic.CompareAndSwapInt32(&q.status, QueueStatusRunning, QueueStatusClosing) atomic.CompareAndSwapInt32(&q.status, QueueStatusRunning, QueueStatusClosing)
q.queue.cond.Broadcast()
} }

View File

@ -10,22 +10,43 @@ import (
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"sync" "sync"
"time" "sync/atomic"
) )
const (
StatusNone = iota - 1 // 事件循环未运行
StatusRunning // 事件循环运行中
StatusClosing // 事件循环关闭中
StatusClosed // 事件循环已关闭
)
var sysIdent = &identifiable{ident: "system"}
// NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列 // NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列
func NewReactor[M any](systemQueueSize, socketQueueSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { func NewReactor[M any](systemQueueSize, queueSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] {
r := &Reactor[M]{ r := &Reactor[M]{
logger: log.Default().Logger, logger: log.Default().Logger,
systemQueue: newQueue[M](-1, systemQueueSize, 1024*16), systemQueue: newQueue[M](-1, systemQueueSize, 1024),
identifiers: haxmap.New[string, int](), identifiers: haxmap.New[string, *identifiable](),
lb: loadbalancer.NewRoundRobin[int, *queue[M]](), lb: loadbalancer.NewRoundRobin[int, *queue[M]](),
errorHandler: errorHandler, errorHandler: errorHandler,
socketQueueSize: socketQueueSize, queueSize: queueSize,
state: StatusNone,
} }
r.handler = func(q *queue[M], msg M) { defaultNum := runtime.NumCPU()
defer func(msg M) { if defaultNum < 1 {
defaultNum = 1
}
r.queueRW.Lock()
for i := 0; i < defaultNum; i++ {
r.noneLockAddQueue()
}
r.queueRW.Unlock()
r.handler = func(q *queue[M], ident *identifiable, msg M) {
defer func(ident *identifiable, msg M) {
if err := super.RecoverTransform(recover()); err != nil { if err := super.RecoverTransform(recover()); err != nil {
defer func(msg M) { defer func(msg M) {
if err = super.RecoverTransform(recover()); err != nil { if err = super.RecoverTransform(recover()); err != nil {
@ -40,12 +61,17 @@ func NewReactor[M any](systemQueueSize, socketQueueSize int, handler MessageHand
debug.PrintStack() debug.PrintStack()
} }
} }
}(msg)
var startedAt = time.Now() if atomic.AddInt64(&ident.n, -1) == 0 {
r.queueRW.Lock()
r.identifiers.Del(ident.ident)
r.queueRW.Unlock()
}
}(ident, msg)
if handler != nil { if handler != nil {
handler(msg) handler(msg)
} }
r.log(log.String("action", "handle"), log.Int("queue", q.Id()), log.Int64("cost/ns", time.Since(startedAt).Nanoseconds()))
} }
return r return r
@ -53,16 +79,19 @@ func NewReactor[M any](systemQueueSize, socketQueueSize int, handler MessageHand
// Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列 // Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列
type Reactor[M any] struct { type Reactor[M any] struct {
logger *slog.Logger // 日志记录器 logger *slog.Logger // 日志记录器
systemQueue *queue[M] // 系统级别的队列 state int32 // 状态
socketQueueSize int // Socket 队列大小 systemQueue *queue[M] // 系统级别的队列
queues []*queue[M] // Socket 使用的队列 queueSize int // Socket 队列大小
identifiers *haxmap.Map[string, int] // 标识符到队列索引的映射 queues []*queue[M] // Socket 使用的队列
lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器 queueRW sync.RWMutex // 队列读写锁
wg sync.WaitGroup // 等待组 identifiers *haxmap.Map[string, *identifiable] // 标识符到队列索引的映射及消息计数
handler queueMessageHandler[M] // 消息处理器 lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器
errorHandler ErrorHandler[M] // 错误处理器 wg sync.WaitGroup // 等待组
debug bool // 是否开启调试模式 cwg sync.WaitGroup // 关闭等待组
handler queueMessageHandler[M] // 消息处理器
errorHandler ErrorHandler[M] // 错误处理器
debug bool // 是否开启调试模式
} }
// SetLogger 设置日志记录器 // SetLogger 设置日志记录器
@ -79,39 +108,49 @@ func (r *Reactor[M]) SetDebug(debug bool) *Reactor[M] {
// SystemDispatch 将消息分发到系统级别的队列 // SystemDispatch 将消息分发到系统级别的队列
func (r *Reactor[M]) SystemDispatch(msg M) error { func (r *Reactor[M]) SystemDispatch(msg M) error {
return r.systemQueue.push(msg) if atomic.LoadInt32(&r.state) > StatusRunning {
r.queueRW.RUnlock()
return fmt.Errorf("reactor closing or closed")
}
return r.systemQueue.push(sysIdent, msg)
} }
// Dispatch 将消息分发到 identifier 使用的队列,当 identifier 首次使用时,将会根据负载均衡策略选择一个队列 // Dispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列
func (r *Reactor[M]) Dispatch(identifier string, msg M) error { func (r *Reactor[M]) Dispatch(ident string, msg M) error {
next := r.lb.Next() r.queueRW.RLock()
if next == nil { if atomic.LoadInt32(&r.state) > StatusRunning {
return r.Dispatch(identifier, msg) r.queueRW.RUnlock()
return fmt.Errorf("reactor closing or closed")
} }
idx, _ := r.identifiers.GetOrSet(identifier, next.Id()) next := r.lb.Next()
q := r.queues[idx] i, _ := r.identifiers.GetOrSet(ident, &identifiable{ident: ident})
r.log(log.String("action", "dispatch"), log.String("identifier", identifier), log.Int("queue", q.Id())) q := r.queues[next.Id()]
return q.push(msg) atomic.AddInt64(&i.n, 1)
r.queueRW.RUnlock()
return q.push(i, msg)
} }
// Run 启动 Reactor运行系统级别的队列和多个 Socket 对应的队列 // Run 启动 Reactor运行系统级别的队列和多个 Socket 对应的队列
func (r *Reactor[M]) Run() { func (r *Reactor[M]) Run() {
r.initQueue(r.systemQueue) if !atomic.CompareAndSwapInt32(&r.state, StatusNone, StatusRunning) {
for i := 0; i < runtime.NumCPU(); i++ { return
r.addQueue()
} }
r.queueRW.Lock()
r.runQueue(r.systemQueue)
for i := 0; i < len(r.queues); i++ {
r.runQueue(r.queues[i])
}
r.queueRW.Unlock()
r.wg.Wait() r.wg.Wait()
} }
func (r *Reactor[M]) addQueue() { func (r *Reactor[M]) noneLockAddQueue() {
r.log(log.String("action", "add queue"), log.Int("queue", len(r.queues))) q := newQueue[M](len(r.queues), r.queueSize, 1024*8)
r.wg.Add(1) r.lb.Add(q) // 运行前添加到负载均衡器,未运行时允许接收消息
q := newQueue[M](len(r.queues), r.socketQueueSize, 1024*8)
r.initQueue(q)
r.queues = append(r.queues, q) r.queues = append(r.queues, q)
} }
func (r *Reactor[M]) removeQueue(q *queue[M]) { func (r *Reactor[M]) noneLockDelQueue(q *queue[M]) {
idx := q.Id() idx := q.Id()
if idx < 0 || idx >= len(r.queues) || r.queues[idx] != q { if idx < 0 || idx >= len(r.queues) || r.queues[idx] != q {
return return
@ -120,34 +159,35 @@ func (r *Reactor[M]) removeQueue(q *queue[M]) {
for i := idx; i < len(r.queues); i++ { for i := idx; i < len(r.queues); i++ {
r.queues[i].idx = i r.queues[i].idx = i
} }
r.log(log.String("action", "remove queue"), log.Int("queue", len(r.queues)))
} }
func (r *Reactor[M]) initQueue(q *queue[M]) { func (r *Reactor[M]) runQueue(q *queue[M]) {
r.wg.Add(1) r.wg.Add(1)
q.setClosedHandler(func(q *queue[M]) {
// 关闭时正在等待关闭完成,外部已加锁,无需再次加锁
r.noneLockDelQueue(q)
r.cwg.Done()
})
go q.run()
go func(r *Reactor[M], q *queue[M]) { go func(r *Reactor[M], q *queue[M]) {
defer r.wg.Done() defer r.wg.Done()
go q.run()
if q.idx >= 0 {
r.lb.Add(q)
}
for m := range q.read() { for m := range q.read() {
r.handler(q, m) r.handler(q, m.ident, m.msg)
} }
}(r, q) }(r, q)
r.log(log.String("action", "run queue"), log.Int("queue", q.Id()))
} }
func (r *Reactor[M]) Close() { func (r *Reactor[M]) Close() {
queues := append(r.queues, r.systemQueue) if !atomic.CompareAndSwapInt32(&r.state, StatusRunning, StatusClosing) {
for _, q := range queues {
q.Close()
}
}
func (r *Reactor[M]) log(args ...any) {
if !r.debug {
return return
} }
r.logger.Debug("Reactor", args...) r.queueRW.Lock()
r.cwg.Add(len(r.queues) + 1)
for _, q := range append(r.queues, r.systemQueue) {
q.Close()
}
r.cwg.Wait()
atomic.StoreInt32(&r.state, StatusClosed)
r.queueRW.Unlock()
} }

View File

@ -3,12 +3,12 @@ package reactor_test
import ( import (
"github.com/kercylan98/minotaur/server/internal/v2/reactor" "github.com/kercylan98/minotaur/server/internal/v2/reactor"
"github.com/kercylan98/minotaur/utils/random" "github.com/kercylan98/minotaur/utils/random"
"github.com/kercylan98/minotaur/utils/times"
"testing" "testing"
"time" "time"
) )
func BenchmarkReactor_Dispatch(b *testing.B) { func BenchmarkReactor_Dispatch(b *testing.B) {
var r = reactor.NewReactor(1024*16, 1024, func(msg func()) { var r = reactor.NewReactor(1024*16, 1024, func(msg func()) {
msg() msg()
}, func(msg func(), err error) { }, func(msg func(), err error) {
@ -20,9 +20,8 @@ func BenchmarkReactor_Dispatch(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
for pb.Next() { for pb.Next() {
if err := r.Dispatch(random.HostName(), func() { if err := r.Dispatch(random.HostName(), func() {
}); err != nil { }); err != nil {
return
} }
} }
}) })
@ -33,7 +32,7 @@ func TestReactor_Dispatch(t *testing.T) {
msg() msg()
}, func(msg func(), err error) { }, func(msg func(), err error) {
t.Error(err) t.Error(err)
}).SetDebug(false) }).SetDebug(true)
go r.Run() go r.Run()
@ -41,16 +40,16 @@ func TestReactor_Dispatch(t *testing.T) {
go func() { go func() {
id := random.HostName() id := random.HostName()
for { for {
// 每秒 50 次
time.Sleep(time.Millisecond * 20) time.Sleep(time.Millisecond * 20)
if err := r.Dispatch(id, func() { if err := r.Dispatch(id, func() {
}); err != nil { }); err != nil {
t.Error(err) return
} }
} }
}() }()
} }
time.Sleep(time.Second * 10) time.Sleep(times.Second)
r.Close()
} }

View File

@ -0,0 +1,6 @@
package reactor
type identifiable struct {
ident string // 标识
n int64 // 消息数量
}