other: reactor 实现

This commit is contained in:
kercylan98 2024-03-31 13:11:37 +08:00 committed by kercylan
parent ef1bb321d7
commit 1408fdcff0
25 changed files with 554 additions and 243 deletions

View File

@ -2,23 +2,25 @@ package server
import (
"context"
"github.com/kercylan98/minotaur/server/v2/actor"
"github.com/kercylan98/minotaur/server/internal/v2/dispatcher"
"net"
)
type ConnWriter func(packet Packet) error
type Conn interface {
}
func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter, handler actor.MessageHandler[Packet]) Conn {
func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter) Conn {
return &conn{
conn: c,
writer: connWriter,
actor: actor.NewActor[Packet](ctx, handler),
actor: dispatcher.NewActor[Packet](ctx, handler),
}
}
type conn struct {
conn net.Conn
writer ConnWriter
actor *actor.Actor[Packet]
actor *dispatcher.Actor[Packet]
}

View File

@ -0,0 +1,4 @@
package server
type ConnContext interface {
}

View File

@ -0,0 +1,25 @@
package server
import "net"
type Controller interface {
Run() error
Shutdown() error
}
type controller struct {
*server
}
func (s *controller) init(srv *server) *controller {
s.server = srv
return s
}
func (s *controller) RegisterConn(conn net.Conn, writer ConnWriter) {
}
func (s *controller) UnRegisterConn() {
}

View File

@ -0,0 +1,80 @@
package loadbalancer
import (
"github.com/kercylan98/minotaur/utils/super"
"hash/fnv"
"sort"
"sync"
)
func NewConsistentHash(replicas int) *ConsistentHash {
return &ConsistentHash{
replicas: replicas,
keys: []int{},
hashMap: make(map[int]string),
mutex: sync.RWMutex{},
}
}
type ConsistentHash struct {
replicas int // 虚拟节点倍数
keys []int // 哈希环上的所有节点的哈希值
hashMap map[int]string // 哈希值到真实节点的映射
mutex sync.RWMutex // 用于保护数据结构
}
// Add 添加一个节点到哈希环
func (c *ConsistentHash) Add(node string) {
c.mutex.Lock()
defer c.mutex.Unlock()
for i := 0; i < c.replicas; i++ {
hash := c.hash(node + super.IntToString(i))
c.keys = append(c.keys, hash)
c.hashMap[hash] = node
}
sort.Ints(c.keys)
}
// Remove 从哈希环中移除一个节点
func (c *ConsistentHash) Remove(node string) {
c.mutex.Lock()
defer c.mutex.Unlock()
for i := 0; i < c.replicas; i++ {
hash := c.hash(node + super.IntToString(i))
delete(c.hashMap, hash)
// 从 keys 中移除节点的哈希值
for j, k := range c.keys {
if k == hash {
c.keys = append(c.keys[:j], c.keys[j+1:]...)
break
}
}
}
}
// Get 返回给定 key 所在的节点
func (c *ConsistentHash) Get(key string) string {
c.mutex.RLock()
defer c.mutex.RUnlock()
if len(c.keys) == 0 {
return ""
}
hash := c.hash(key)
// 顺时针找到第一个比 key 大的哈希值,即对应的节点
idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash })
if idx == len(c.keys) {
idx = 0 // 如果 key 大于所有哈希值,则返回第一个节点
}
return c.hashMap[c.keys[idx]]
}
// hash 计算字符串的哈希值
func (c *ConsistentHash) hash(key string) int {
h := fnv.New32a()
_, _ = h.Write([]byte(key))
return int(h.Sum32())
}

View File

@ -0,0 +1,78 @@
package loadbalancer
import "sync"
func NewRoundRobin[Id comparable, T RoundRobinItem[Id]]() *RoundRobin[Id, T] {
return &RoundRobin[Id, T]{
head: nil,
curr: nil,
size: 0,
}
}
type roundRobinNode[Id comparable, T RoundRobinItem[Id]] struct {
Value T
Next *roundRobinNode[Id, T]
}
type RoundRobin[Id comparable, T RoundRobinItem[Id]] struct {
head *roundRobinNode[Id, T]
curr *roundRobinNode[Id, T]
size int
rw sync.RWMutex
}
func (r *RoundRobin[Id, T]) Add(t T) {
r.rw.Lock()
defer r.rw.Unlock()
newNode := &roundRobinNode[Id, T]{Value: t}
if r.head == nil {
r.head = newNode
r.curr = newNode
newNode.Next = newNode
} else {
newNode.Next = r.head.Next
r.head.Next = newNode
}
r.size++
}
func (r *RoundRobin[Id, T]) Remove(t T) {
r.rw.Lock()
defer r.rw.Unlock()
if r.head == nil {
return
}
prev := r.head
for i := 0; i < r.size; i++ {
if prev.Next.Value.Id() == t.Id() {
if prev.Next == r.curr {
r.curr = prev
}
prev.Next = prev.Next.Next
r.size--
if r.size == 0 {
r.head = nil
r.curr = nil
}
return
}
prev = prev.Next
}
}
func (r *RoundRobin[Id, T]) Next() (t T) {
r.rw.Lock()
defer r.rw.Unlock()
if r.curr == nil {
return
}
r.curr = r.curr.Next
return r.curr.Value
}

View File

@ -0,0 +1,6 @@
package loadbalancer
type RoundRobinItem[Id comparable] interface {
// Id 返回唯一标识
Id() Id
}

View File

@ -5,7 +5,7 @@ import (
)
type Network interface {
OnSetup(ctx context.Context, event NetworkCore) error
OnSetup(ctx context.Context, controller Controller) error
OnRun() error

View File

@ -2,14 +2,14 @@ package network
import (
"context"
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/pkg/errors"
"net"
"net/http"
"time"
)
func Http(addr string) server.Network {
func Http(addr string) server.server {
return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()})
}

View File

@ -3,13 +3,13 @@ package network
import (
"context"
"fmt"
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/utils/collection"
"github.com/panjf2000/gnet/v2"
"time"
)
func WebSocket(addr string, pattern ...string) server.Network {
func WebSocket(addr string, pattern ...string) server.server {
ws := &websocketCore{
addr: addr,
pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"),

View File

@ -4,7 +4,7 @@ import (
"errors"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/utils/log"
"github.com/panjf2000/gnet/v2"
"time"
@ -35,7 +35,7 @@ func (w *websocketHandler) OnShutdown(eng gnet.Engine) {
func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
wrapper := newWebsocketWrapper(w.core.ctx, c)
c.SetContext(wrapper)
w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.Packet) error {
w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.server) error {
return wsutil.WriteServerMessage(c, message.GetContext().(ws.OpCode), message.GetBytes())
})
return

View File

@ -0,0 +1,7 @@
package reactor
type queueMessageHandler[M any] func(q *queue[M], msg M)
type MessageHandler[M any] func(msg M)
type ErrorHandler[M any] func(msg M, err error)

View File

@ -0,0 +1,71 @@
package reactor
import (
"errors"
"github.com/kercylan98/minotaur/utils/buffer"
"sync"
"sync/atomic"
)
func newQueue[M any](idx, chanSize, bufferSize int) *queue[M] {
q := &queue[M]{
c: make(chan M, chanSize),
buf: buffer.NewRing[M](bufferSize),
rw: sync.NewCond(&sync.Mutex{}),
}
q.QueueState = &QueueState[M]{
queue: q,
idx: idx,
status: QueueStatusNone,
}
return q
}
type queue[M any] struct {
*QueueState[M]
c chan M // 通道
buf *buffer.Ring[M] // 缓冲区
rw *sync.Cond // 读写锁
}
func (q *queue[M]) Id() int {
return q.idx
}
func (q *queue[M]) run() {
atomic.StoreInt32(&q.status, QueueStatusRunning)
defer func(q *queue[M]) {
atomic.StoreInt32(&q.status, QueueStatusClosed)
}(q)
for {
q.rw.L.Lock()
for q.buf.IsEmpty() {
if atomic.LoadInt32(&q.status) >= QueueStatusClosing {
q.rw.L.Unlock()
close(q.c)
return
}
q.rw.Wait()
}
items := q.buf.ReadAll()
q.rw.L.Unlock()
for _, item := range items {
q.c <- item
}
}
}
func (q *queue[M]) push(m M) error {
if atomic.LoadInt32(&q.status) != QueueStatusRunning {
return errors.New("queue status exception")
}
q.rw.L.Lock()
q.buf.Write(m)
q.rw.Signal()
q.rw.L.Unlock()
return nil
}
func (q *queue[M]) read() <-chan M {
return q.c
}

View File

@ -0,0 +1,38 @@
package reactor
import (
"sync/atomic"
)
const (
QueueStatusNone = iota - 1 // 队列未运行
QueueStatusRunning // 队列运行中
QueueStatusClosing // 队列关闭中
QueueStatusClosed // 队列已关闭
)
type QueueState[M any] struct {
queue *queue[M]
idx int // 队列索引
status int32 // 状态标志
}
// IsClosed 判断队列是否已关闭
func (q *QueueState[M]) IsClosed() bool {
return atomic.LoadInt32(&q.status) == QueueStatusClosed
}
// IsClosing 判断队列是否正在关闭
func (q *QueueState[M]) IsClosing() bool {
return atomic.LoadInt32(&q.status) == QueueStatusClosing
}
// IsRunning 判断队列是否正在运行
func (q *QueueState[M]) IsRunning() bool {
return atomic.LoadInt32(&q.status) == QueueStatusRunning
}
// Close 关闭队列
func (q *QueueState[M]) Close() {
atomic.CompareAndSwapInt32(&q.status, QueueStatusRunning, QueueStatusClosing)
}

View File

@ -0,0 +1,153 @@
package reactor
import (
"fmt"
"github.com/alphadose/haxmap"
"github.com/kercylan98/minotaur/server/internal/v2/loadbalancer"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
"log/slog"
"runtime"
"runtime/debug"
"sync"
"time"
)
// NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列
func NewReactor[M any](systemQueueSize, socketQueueSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] {
r := &Reactor[M]{
logger: log.Default().Logger,
systemQueue: newQueue[M](-1, systemQueueSize, 1024*16),
identifiers: haxmap.New[string, int](),
lb: loadbalancer.NewRoundRobin[int, *queue[M]](),
errorHandler: errorHandler,
socketQueueSize: socketQueueSize,
}
r.handler = func(q *queue[M], msg M) {
defer func(msg M) {
if err := super.RecoverTransform(recover()); err != nil {
defer func(msg M) {
if err = super.RecoverTransform(recover()); err != nil {
fmt.Println(err)
debug.PrintStack()
}
}(msg)
if r.errorHandler != nil {
r.errorHandler(msg, err)
} else {
fmt.Println(err)
debug.PrintStack()
}
}
}(msg)
var startedAt = time.Now()
if handler != nil {
handler(msg)
}
r.log(log.String("action", "handle"), log.Int("queue", q.Id()), log.Int64("cost/ns", time.Since(startedAt).Nanoseconds()))
}
return r
}
// Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列
type Reactor[M any] struct {
logger *slog.Logger // 日志记录器
systemQueue *queue[M] // 系统级别的队列
socketQueueSize int // Socket 队列大小
queues []*queue[M] // Socket 使用的队列
identifiers *haxmap.Map[string, int] // 标识符到队列索引的映射
lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器
wg sync.WaitGroup // 等待组
handler queueMessageHandler[M] // 消息处理器
errorHandler ErrorHandler[M] // 错误处理器
debug bool // 是否开启调试模式
}
// SetLogger 设置日志记录器
func (r *Reactor[M]) SetLogger(logger *slog.Logger) *Reactor[M] {
r.logger = logger
return r
}
// SetDebug 设置是否开启调试模式
func (r *Reactor[M]) SetDebug(debug bool) *Reactor[M] {
r.debug = debug
return r
}
// SystemDispatch 将消息分发到系统级别的队列
func (r *Reactor[M]) SystemDispatch(msg M) error {
return r.systemQueue.push(msg)
}
// Dispatch 将消息分发到 identifier 使用的队列,当 identifier 首次使用时,将会根据负载均衡策略选择一个队列
func (r *Reactor[M]) Dispatch(identifier string, msg M) error {
next := r.lb.Next()
if next == nil {
return r.Dispatch(identifier, msg)
}
idx, _ := r.identifiers.GetOrSet(identifier, next.Id())
q := r.queues[idx]
r.log(log.String("action", "dispatch"), log.String("identifier", identifier), log.Int("queue", q.Id()))
return q.push(msg)
}
// Run 启动 Reactor运行系统级别的队列和多个 Socket 对应的队列
func (r *Reactor[M]) Run() {
r.initQueue(r.systemQueue)
for i := 0; i < runtime.NumCPU(); i++ {
r.addQueue()
}
r.wg.Wait()
}
func (r *Reactor[M]) addQueue() {
r.log(log.String("action", "add queue"), log.Int("queue", len(r.queues)))
r.wg.Add(1)
q := newQueue[M](len(r.queues), r.socketQueueSize, 1024*8)
r.initQueue(q)
r.queues = append(r.queues, q)
}
func (r *Reactor[M]) removeQueue(q *queue[M]) {
idx := q.Id()
if idx < 0 || idx >= len(r.queues) || r.queues[idx] != q {
return
}
r.queues = append(r.queues[:idx], r.queues[idx+1:]...)
for i := idx; i < len(r.queues); i++ {
r.queues[i].idx = i
}
r.log(log.String("action", "remove queue"), log.Int("queue", len(r.queues)))
}
func (r *Reactor[M]) initQueue(q *queue[M]) {
r.wg.Add(1)
go func(r *Reactor[M], q *queue[M]) {
defer r.wg.Done()
go q.run()
if q.idx >= 0 {
r.lb.Add(q)
}
for m := range q.read() {
r.handler(q, m)
}
}(r, q)
r.log(log.String("action", "run queue"), log.Int("queue", q.Id()))
}
func (r *Reactor[M]) Close() {
queues := append(r.queues, r.systemQueue)
for _, q := range queues {
q.Close()
}
}
func (r *Reactor[M]) log(args ...any) {
if !r.debug {
return
}
r.logger.Debug("Reactor", args...)
}

View File

@ -0,0 +1,56 @@
package reactor_test
import (
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
"github.com/kercylan98/minotaur/utils/random"
"testing"
"time"
)
func BenchmarkReactor_Dispatch(b *testing.B) {
var r = reactor.NewReactor(1024*16, 1024, func(msg func()) {
msg()
}, func(msg func(), err error) {
b.Error(err)
}).SetDebug(false)
go r.Run()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := r.Dispatch(random.HostName(), func() {
}); err != nil {
}
}
})
}
func TestReactor_Dispatch(t *testing.T) {
var r = reactor.NewReactor(1024*16, 1024, func(msg func()) {
msg()
}, func(msg func(), err error) {
t.Error(err)
}).SetDebug(false)
go r.Run()
for i := 0; i < 10000; i++ {
go func() {
id := random.HostName()
for {
// 每秒 50 次
time.Sleep(time.Millisecond * 20)
if err := r.Dispatch(id, func() {
}); err != nil {
t.Error(err)
}
}
}()
}
time.Sleep(time.Second * 10)
}

View File

@ -1,9 +1,6 @@
package server
import (
"context"
"github.com/kercylan98/minotaur/utils/super"
)
import "golang.org/x/net/context"
type Server interface {
Run() error
@ -11,17 +8,18 @@ type Server interface {
}
type server struct {
*networkCore
ctx *super.CancelContext
*controller
ctx context.Context
cancel context.CancelFunc
network Network
}
func NewServer(network Network) Server {
srv := &server{
ctx: super.WithCancelContext(context.Background()),
network: network,
}
srv.networkCore = new(networkCore).init(srv)
srv.ctx, srv.cancel = context.WithCancel(context.Background())
srv.controller = new(controller).init(srv)
return srv
}
@ -30,14 +28,14 @@ func (s *server) Run() (err error) {
return
}
if err = s.network.OnRun(s.ctx); err != nil {
if err = s.network.OnRun(); err != nil {
panic(err)
}
return
}
func (s *server) Shutdown() (err error) {
defer s.ctx.Cancel()
defer s.server.cancel()
err = s.network.OnShutdown()
return
}

View File

@ -0,0 +1,14 @@
package server_test
import (
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/server/internal/v2/network"
"testing"
)
func TestNewServer(t *testing.T) {
srv := server.server.NewServer(network.WebSocket(":9999"))
if err := srv.Run(); err != nil {
panic(err)
}
}

View File

@ -1,153 +0,0 @@
package actor
import (
"context"
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/super"
"sync"
"time"
)
// MessageHandler 定义了处理消息的函数类型
type MessageHandler[M any] func(message M)
// NewActor 创建一个新的 Actor并启动其消息处理循环
func NewActor[M any](ctx context.Context, handler MessageHandler[M]) *Actor[M] {
a := newActor(ctx, handler)
a.counter = new(super.Counter[int])
go a.run()
return a
}
// newActor 创建一个新的 Actor
func newActor[M any](ctx context.Context, handler MessageHandler[M]) (actor *Actor[M]) {
a := &Actor[M]{
buf: buffer.NewRing[M](1024),
handler: handler,
}
a.cond = sync.NewCond(&a.rw)
a.ctx, a.cancel = context.WithCancel(ctx)
return a
}
// Actor 是一个消息驱动的并发实体
type Actor[M any] struct {
idx int // Actor 在其父 Actor 中的索引
ctx context.Context // Actor 的上下文
cancel context.CancelFunc // 用于取消 Actor 的函数
buf *buffer.Ring[M] // 用于缓存消息的环形缓冲区
handler MessageHandler[M] // 处理消息的函数
rw sync.RWMutex // 读写锁,用于保护 Actor 的并发访问
cond *sync.Cond // 条件变量,用于触发消息处理流程
counter *super.Counter[int] // 消息计数器,用于统计处理的消息数量
dying bool // 标识 Actor 是否正在关闭中
parent *Actor[M] // 父 Actor
subs []*Actor[M] // 子 Actor 切片
gap []int // 用于记录已经关闭的子 Actor 的索引位置,以便复用
}
// run 启动 Actor 的消息处理循环
func (a *Actor[M]) run() {
var ctx = a.ctx
var clearGap = time.NewTicker(time.Second * 30)
defer func(a *Actor[M], clearGap *time.Ticker) {
clearGap.Stop()
a.cancel()
a.parent.removeSub(a)
}(a, clearGap)
for {
select {
case <-a.ctx.Done():
a.rw.Lock()
if ctx == a.ctx {
a.dying = true
} else {
ctx = a.ctx
}
a.rw.Unlock()
a.cond.Signal()
case <-clearGap.C:
a.rw.Lock()
for _, idx := range a.gap {
a.subs = append(a.subs[:idx], a.subs[idx+1:]...)
}
for idx, sub := range a.subs {
sub.idx = idx
}
a.gap = a.gap[:0]
a.rw.Unlock()
default:
a.rw.Lock()
if a.buf.IsEmpty() {
if a.dying && a.counter.Val() == 0 {
return
}
a.cond.Wait()
}
messages := a.buf.ReadAll()
a.rw.Unlock()
for _, message := range messages {
a.handler(message)
}
a.counter.Add(-len(messages))
}
}
}
// Reuse 重用 ActorActor 会重新激活
func (a *Actor[M]) Reuse(ctx context.Context) {
before := a.cancel
defer before()
a.rw.Lock()
a.ctx, a.cancel = context.WithCancel(ctx)
a.dying = false
for _, sub := range a.subs {
sub.Reuse(a.ctx)
}
a.rw.Unlock()
a.cond.Signal()
}
// Send 发送消息
func (a *Actor[M]) Send(message M) {
a.rw.Lock()
a.counter.Add(1)
a.buf.Write(message)
a.rw.Unlock()
a.cond.Signal()
}
// Sub 派生一个子 Actor该子 Actor 生命周期将继承父 Actor 的生命周期
func (a *Actor[M]) Sub() {
a.rw.Lock()
defer a.rw.Unlock()
sub := newActor(a.ctx, a.handler)
sub.counter = a.counter.Sub()
sub.parent = a
if len(a.gap) > 0 {
sub.idx = a.gap[0]
a.gap = a.gap[1:]
} else {
sub.idx = len(a.subs)
}
a.subs = append(a.subs, sub)
go sub.run()
}
// removeSub 从父 Actor 中移除指定的子 Actor
func (a *Actor[M]) removeSub(sub *Actor[M]) {
if a == nil {
return
}
a.rw.Lock()
defer a.rw.Unlock()
if sub.idx == len(a.subs)-1 {
a.subs = a.subs[:sub.idx]
return
}
a.subs[sub.idx] = nil
a.gap = append(a.gap, sub.idx)
}

View File

@ -1,4 +0,0 @@
package server
type message struct {
}

View File

@ -1,50 +0,0 @@
package server
import (
"github.com/kercylan98/minotaur/utils/hub"
"golang.org/x/net/context"
"net"
)
type ConnWriter func(message Packet) error
type NetworkCore interface {
OnConnectionOpened(ctx context.Context, conn net.Conn, writer ConnWriter)
OnConnectionClosed(conn Conn)
OnReceivePacket(packet Packet)
GeneratePacket(data []byte) Packet
}
type networkCore struct {
*server
packetPool *hub.ObjectPool[*packet]
}
func (ne *networkCore) init(srv *server) *networkCore {
ne.server = srv
ne.packetPool = hub.NewObjectPool(func() *packet {
return new(packet)
}, func(data *packet) {
data.reset()
})
return ne
}
func (ne *networkCore) OnConnectionOpened(ctx context.Context, conn net.Conn, writer ConnWriter) {
}
func (ne *networkCore) OnConnectionClosed(conn Conn) {
}
func (ne *networkCore) OnReceivePacket(packet Packet) {
}
func (ne *networkCore) GeneratePacket(data []byte) Packet {
return ne.packetPool.Get().init(data)
}

View File

@ -1,14 +0,0 @@
package server_test
import (
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/server/v2/network"
"testing"
)
func TestNewServer(t *testing.T) {
srv := server.NewServer(network.WebSocket(":9999"))
if err := srv.Run(); err != nil {
panic(err)
}
}

View File

@ -20,7 +20,7 @@ func (c *Counter[T]) Sub() *Counter[T] {
func (c *Counter[T]) Add(delta T) {
c.rw.Lock()
c.v += delta
c.rw.RUnlock()
c.rw.Unlock()
if c.p != nil {
c.p.Add(delta)
}