feat: 新版 server 包 websocket 基础实现、actor 模型实现

This commit is contained in:
kercylan98 2024-03-31 13:11:37 +08:00
parent ef1bb321d7
commit 92c42800f1
24 changed files with 262 additions and 243 deletions

View File

@ -0,0 +1,3 @@
package server
type

View File

@ -0,0 +1,7 @@
package actor
import "github.com/kercylan98/minotaur/server/internal/v2/dispatcher"
type Actor[M any] struct {
*dispatcher.Dispatcher[M]
}

View File

@ -0,0 +1,20 @@
package balancer
type Item[Id comparable] interface {
// Id 返回唯一标识
Id() Id
// Weight 返回权重
Weight() int
}
type Balancer[Id comparable, T Item[Id]] interface {
// Add 添加一个负载均衡目标
Add(t T)
// Remove 移除一个负载均衡目标
Remove(t T)
// Next 根据负载均衡策略选择下一个目标
Next() T
}

View File

@ -0,0 +1,46 @@
package balancer
import "sync"
func NewRoundRobin[Id comparable, T Item[Id]]() *RoundRobin[Id, T] {
}
type RoundRobin[Id comparable, T Item[Id]] struct {
ref map[Id]int
items []T
rw sync.RWMutex
curr int
}
func (r *RoundRobin[Id, T]) Add(t T) {
r.rw.Lock()
defer r.rw.Unlock()
_, exist := r.ref[t.Id()]
if exist {
return
}
r.ref[t.Id()] = len(r.items)
r.items = append(r.items, t)
}
func (r *RoundRobin[Id, T]) Remove(t T) {
r.rw.Lock()
defer r.rw.Unlock()
index, exist := r.ref[t.Id()]
if !exist {
return
}
r.items = append(r.items[:index], r.items[index+1:]...)
delete(r.ref, t.Id())
}
func (r *RoundRobin[Id, T]) Next() T {
r.rw.RLock()
defer r.rw.RUnlock()
if r.curr >= len(r.items) {
r.curr = 0
}
t := r.items[r.curr]
r.curr++
}

View File

@ -2,23 +2,25 @@ package server
import ( import (
"context" "context"
"github.com/kercylan98/minotaur/server/v2/actor" "github.com/kercylan98/minotaur/server/internal/v2/dispatcher"
"net" "net"
) )
type ConnWriter func(packet Packet) error
type Conn interface { type Conn interface {
} }
func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter, handler actor.MessageHandler[Packet]) Conn { func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter) Conn {
return &conn{ return &conn{
conn: c, conn: c,
writer: connWriter, writer: connWriter,
actor: actor.NewActor[Packet](ctx, handler), actor: dispatcher.NewActor[Packet](ctx, handler),
} }
} }
type conn struct { type conn struct {
conn net.Conn conn net.Conn
writer ConnWriter writer ConnWriter
actor *actor.Actor[Packet] actor *dispatcher.Actor[Packet]
} }

View File

@ -0,0 +1,4 @@
package server
type ConnContext interface {
}

View File

@ -0,0 +1,25 @@
package server
import "net"
type Controller interface {
Run() error
Shutdown() error
}
type controller struct {
*server
}
func (s *controller) init(srv *server) *controller {
s.server = srv
return s
}
func (s *controller) RegisterConn(conn net.Conn, writer ConnWriter) {
}
func (s *controller) UnRegisterConn() {
}

View File

@ -0,0 +1,108 @@
package dispatcher
import (
"context"
"errors"
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
"sync"
"sync/atomic"
)
// NewDispatcher 创建一个消息调度器
func NewDispatcher[M any](bufferSize int, handler func(M)) *Dispatcher[M] {
d := &Dispatcher[M]{
buf: buffer.NewRing[M](bufferSize),
bufCond: sync.NewCond(&sync.Mutex{}),
ctx: context.Background(),
}
d.BindErrorHandler(func(err error) {
log.Error("dispatcher", log.Err(err))
})
d.handler = func(m M) {
defer func() {
if err := super.RecoverTransform(recover()); err != nil {
d.errorHandler.Load().(func(error))(err)
}
}()
handler(m)
}
return d
}
// Dispatcher 并发安全且不阻塞的消息调度器
type Dispatcher[M any] struct {
buf *buffer.Ring[M]
bufCond *sync.Cond
handler func(M)
closed bool
ctx context.Context
errorHandler atomic.Value
}
// BindErrorHandler 绑定一个错误处理器到调度器中
func (d *Dispatcher[M]) BindErrorHandler(handler func(error)) {
d.errorHandler.Store(handler)
}
// BindContext 绑定一个上下文到调度器中
func (d *Dispatcher[M]) BindContext(ctx context.Context) {
d.bufCond.L.Lock()
d.ctx = ctx
if _, canceled := d.ctx.Deadline(); canceled {
d.closed = true
d.bufCond.Signal()
d.bufCond.L.Unlock()
return
}
d.bufCond.L.Unlock()
}
// Send 发送消息到调度器中等待处理
func (d *Dispatcher[M]) Send(m M) error {
d.bufCond.L.Lock()
if d.closed {
d.bufCond.L.Unlock()
return errors.New("dispatcher closed")
}
d.buf.Write(m)
d.bufCond.Signal()
d.bufCond.L.Unlock()
return nil
}
// Start 阻塞式启动调度器,调用后将开始处理消息
func (d *Dispatcher[M]) Start() {
for {
select {
case <-d.ctx.Done():
d.Stop()
default:
d.bufCond.L.Lock()
if d.buf.Len() == 0 {
if d.closed {
d.bufCond.L.Unlock()
return
}
d.bufCond.Wait()
}
messages := d.buf.ReadAll()
d.bufCond.L.Unlock()
for _, msg := range messages {
d.handler(msg)
}
}
}
}
// Stop 停止调度器,调用后将不再接受新消息,但会处理完已有消息
func (d *Dispatcher[M]) Stop() {
d.bufCond.L.Lock()
d.closed = true
d.bufCond.Signal()
d.bufCond.L.Unlock()
}

View File

@ -5,7 +5,7 @@ import (
) )
type Network interface { type Network interface {
OnSetup(ctx context.Context, event NetworkCore) error OnSetup(ctx context.Context, controller Controller) error
OnRun() error OnRun() error

View File

@ -2,14 +2,14 @@ package network
import ( import (
"context" "context"
"github.com/kercylan98/minotaur/server/v2" "github.com/kercylan98/minotaur/server/internal/v2"
"github.com/pkg/errors" "github.com/pkg/errors"
"net" "net"
"net/http" "net/http"
"time" "time"
) )
func Http(addr string) server.Network { func Http(addr string) server.server {
return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()}) return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()})
} }

View File

@ -3,13 +3,13 @@ package network
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/kercylan98/minotaur/server/v2" "github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/utils/collection" "github.com/kercylan98/minotaur/utils/collection"
"github.com/panjf2000/gnet/v2" "github.com/panjf2000/gnet/v2"
"time" "time"
) )
func WebSocket(addr string, pattern ...string) server.Network { func WebSocket(addr string, pattern ...string) server.server {
ws := &websocketCore{ ws := &websocketCore{
addr: addr, addr: addr,
pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"), pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"),

View File

@ -4,7 +4,7 @@ import (
"errors" "errors"
"github.com/gobwas/ws" "github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil" "github.com/gobwas/ws/wsutil"
"github.com/kercylan98/minotaur/server/v2" "github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/log"
"github.com/panjf2000/gnet/v2" "github.com/panjf2000/gnet/v2"
"time" "time"
@ -35,7 +35,7 @@ func (w *websocketHandler) OnShutdown(eng gnet.Engine) {
func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
wrapper := newWebsocketWrapper(w.core.ctx, c) wrapper := newWebsocketWrapper(w.core.ctx, c)
c.SetContext(wrapper) c.SetContext(wrapper)
w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.Packet) error { w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.server) error {
return wsutil.WriteServerMessage(c, message.GetContext().(ws.OpCode), message.GetBytes()) return wsutil.WriteServerMessage(c, message.GetContext().(ws.OpCode), message.GetBytes())
}) })
return return

View File

@ -0,0 +1,4 @@
package reactor
type Event struct {
}

View File

@ -0,0 +1,9 @@
package reactor
type Reactor[P comparable] struct {
chs []chan P
}
func (el *Reactor[P]) Send(producer P, event Event) {
}

View File

@ -1,9 +1,6 @@
package server package server
import ( import "golang.org/x/net/context"
"context"
"github.com/kercylan98/minotaur/utils/super"
)
type Server interface { type Server interface {
Run() error Run() error
@ -11,17 +8,18 @@ type Server interface {
} }
type server struct { type server struct {
*networkCore *controller
ctx *super.CancelContext ctx context.Context
cancel context.CancelFunc
network Network network Network
} }
func NewServer(network Network) Server { func NewServer(network Network) Server {
srv := &server{ srv := &server{
ctx: super.WithCancelContext(context.Background()),
network: network, network: network,
} }
srv.networkCore = new(networkCore).init(srv) srv.ctx, srv.cancel = context.WithCancel(context.Background())
srv.controller = new(controller).init(srv)
return srv return srv
} }
@ -30,14 +28,14 @@ func (s *server) Run() (err error) {
return return
} }
if err = s.network.OnRun(s.ctx); err != nil { if err = s.network.OnRun(); err != nil {
panic(err) panic(err)
} }
return return
} }
func (s *server) Shutdown() (err error) { func (s *server) Shutdown() (err error) {
defer s.ctx.Cancel() defer s.server.cancel()
err = s.network.OnShutdown() err = s.network.OnShutdown()
return return
} }

View File

@ -0,0 +1,14 @@
package server_test
import (
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/server/internal/v2/network"
"testing"
)
func TestNewServer(t *testing.T) {
srv := server.server.NewServer(network.WebSocket(":9999"))
if err := srv.Run(); err != nil {
panic(err)
}
}

View File

@ -1,153 +0,0 @@
package actor
import (
"context"
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/super"
"sync"
"time"
)
// MessageHandler 定义了处理消息的函数类型
type MessageHandler[M any] func(message M)
// NewActor 创建一个新的 Actor并启动其消息处理循环
func NewActor[M any](ctx context.Context, handler MessageHandler[M]) *Actor[M] {
a := newActor(ctx, handler)
a.counter = new(super.Counter[int])
go a.run()
return a
}
// newActor 创建一个新的 Actor
func newActor[M any](ctx context.Context, handler MessageHandler[M]) (actor *Actor[M]) {
a := &Actor[M]{
buf: buffer.NewRing[M](1024),
handler: handler,
}
a.cond = sync.NewCond(&a.rw)
a.ctx, a.cancel = context.WithCancel(ctx)
return a
}
// Actor 是一个消息驱动的并发实体
type Actor[M any] struct {
idx int // Actor 在其父 Actor 中的索引
ctx context.Context // Actor 的上下文
cancel context.CancelFunc // 用于取消 Actor 的函数
buf *buffer.Ring[M] // 用于缓存消息的环形缓冲区
handler MessageHandler[M] // 处理消息的函数
rw sync.RWMutex // 读写锁,用于保护 Actor 的并发访问
cond *sync.Cond // 条件变量,用于触发消息处理流程
counter *super.Counter[int] // 消息计数器,用于统计处理的消息数量
dying bool // 标识 Actor 是否正在关闭中
parent *Actor[M] // 父 Actor
subs []*Actor[M] // 子 Actor 切片
gap []int // 用于记录已经关闭的子 Actor 的索引位置,以便复用
}
// run 启动 Actor 的消息处理循环
func (a *Actor[M]) run() {
var ctx = a.ctx
var clearGap = time.NewTicker(time.Second * 30)
defer func(a *Actor[M], clearGap *time.Ticker) {
clearGap.Stop()
a.cancel()
a.parent.removeSub(a)
}(a, clearGap)
for {
select {
case <-a.ctx.Done():
a.rw.Lock()
if ctx == a.ctx {
a.dying = true
} else {
ctx = a.ctx
}
a.rw.Unlock()
a.cond.Signal()
case <-clearGap.C:
a.rw.Lock()
for _, idx := range a.gap {
a.subs = append(a.subs[:idx], a.subs[idx+1:]...)
}
for idx, sub := range a.subs {
sub.idx = idx
}
a.gap = a.gap[:0]
a.rw.Unlock()
default:
a.rw.Lock()
if a.buf.IsEmpty() {
if a.dying && a.counter.Val() == 0 {
return
}
a.cond.Wait()
}
messages := a.buf.ReadAll()
a.rw.Unlock()
for _, message := range messages {
a.handler(message)
}
a.counter.Add(-len(messages))
}
}
}
// Reuse 重用 ActorActor 会重新激活
func (a *Actor[M]) Reuse(ctx context.Context) {
before := a.cancel
defer before()
a.rw.Lock()
a.ctx, a.cancel = context.WithCancel(ctx)
a.dying = false
for _, sub := range a.subs {
sub.Reuse(a.ctx)
}
a.rw.Unlock()
a.cond.Signal()
}
// Send 发送消息
func (a *Actor[M]) Send(message M) {
a.rw.Lock()
a.counter.Add(1)
a.buf.Write(message)
a.rw.Unlock()
a.cond.Signal()
}
// Sub 派生一个子 Actor该子 Actor 生命周期将继承父 Actor 的生命周期
func (a *Actor[M]) Sub() {
a.rw.Lock()
defer a.rw.Unlock()
sub := newActor(a.ctx, a.handler)
sub.counter = a.counter.Sub()
sub.parent = a
if len(a.gap) > 0 {
sub.idx = a.gap[0]
a.gap = a.gap[1:]
} else {
sub.idx = len(a.subs)
}
a.subs = append(a.subs, sub)
go sub.run()
}
// removeSub 从父 Actor 中移除指定的子 Actor
func (a *Actor[M]) removeSub(sub *Actor[M]) {
if a == nil {
return
}
a.rw.Lock()
defer a.rw.Unlock()
if sub.idx == len(a.subs)-1 {
a.subs = a.subs[:sub.idx]
return
}
a.subs[sub.idx] = nil
a.gap = append(a.gap, sub.idx)
}

View File

@ -1,4 +0,0 @@
package server
type message struct {
}

View File

@ -1,50 +0,0 @@
package server
import (
"github.com/kercylan98/minotaur/utils/hub"
"golang.org/x/net/context"
"net"
)
type ConnWriter func(message Packet) error
type NetworkCore interface {
OnConnectionOpened(ctx context.Context, conn net.Conn, writer ConnWriter)
OnConnectionClosed(conn Conn)
OnReceivePacket(packet Packet)
GeneratePacket(data []byte) Packet
}
type networkCore struct {
*server
packetPool *hub.ObjectPool[*packet]
}
func (ne *networkCore) init(srv *server) *networkCore {
ne.server = srv
ne.packetPool = hub.NewObjectPool(func() *packet {
return new(packet)
}, func(data *packet) {
data.reset()
})
return ne
}
func (ne *networkCore) OnConnectionOpened(ctx context.Context, conn net.Conn, writer ConnWriter) {
}
func (ne *networkCore) OnConnectionClosed(conn Conn) {
}
func (ne *networkCore) OnReceivePacket(packet Packet) {
}
func (ne *networkCore) GeneratePacket(data []byte) Packet {
return ne.packetPool.Get().init(data)
}

View File

@ -1,14 +0,0 @@
package server_test
import (
"github.com/kercylan98/minotaur/server/v2"
"github.com/kercylan98/minotaur/server/v2/network"
"testing"
)
func TestNewServer(t *testing.T) {
srv := server.NewServer(network.WebSocket(":9999"))
if err := srv.Run(); err != nil {
panic(err)
}
}

View File

@ -20,7 +20,7 @@ func (c *Counter[T]) Sub() *Counter[T] {
func (c *Counter[T]) Add(delta T) { func (c *Counter[T]) Add(delta T) {
c.rw.Lock() c.rw.Lock()
c.v += delta c.v += delta
c.rw.RUnlock() c.rw.Unlock()
if c.p != nil { if c.p != nil {
c.p.Add(delta) c.p.Add(delta)
} }