diff --git a/server/internal/v2/actor.go b/server/internal/v2/actor.go new file mode 100644 index 0000000..d70516f --- /dev/null +++ b/server/internal/v2/actor.go @@ -0,0 +1,3 @@ +package server + +type diff --git a/server/internal/v2/actor/actor.go b/server/internal/v2/actor/actor.go new file mode 100644 index 0000000..8acb236 --- /dev/null +++ b/server/internal/v2/actor/actor.go @@ -0,0 +1,7 @@ +package actor + +import "github.com/kercylan98/minotaur/server/internal/v2/dispatcher" + +type Actor[M any] struct { + *dispatcher.Dispatcher[M] +} diff --git a/server/internal/v2/balancer/balancer.go b/server/internal/v2/balancer/balancer.go new file mode 100644 index 0000000..c530e24 --- /dev/null +++ b/server/internal/v2/balancer/balancer.go @@ -0,0 +1,20 @@ +package balancer + +type Item[Id comparable] interface { + // Id 返回唯一标识 + Id() Id + + // Weight 返回权重 + Weight() int +} + +type Balancer[Id comparable, T Item[Id]] interface { + // Add 添加一个负载均衡目标 + Add(t T) + + // Remove 移除一个负载均衡目标 + Remove(t T) + + // Next 根据负载均衡策略选择下一个目标 + Next() T +} diff --git a/server/internal/v2/balancer/round_robin.go b/server/internal/v2/balancer/round_robin.go new file mode 100644 index 0000000..fa5b718 --- /dev/null +++ b/server/internal/v2/balancer/round_robin.go @@ -0,0 +1,46 @@ +package balancer + +import "sync" + +func NewRoundRobin[Id comparable, T Item[Id]]() *RoundRobin[Id, T] { + +} + +type RoundRobin[Id comparable, T Item[Id]] struct { + ref map[Id]int + items []T + rw sync.RWMutex + curr int +} + +func (r *RoundRobin[Id, T]) Add(t T) { + r.rw.Lock() + defer r.rw.Unlock() + _, exist := r.ref[t.Id()] + if exist { + return + } + r.ref[t.Id()] = len(r.items) + r.items = append(r.items, t) +} + +func (r *RoundRobin[Id, T]) Remove(t T) { + r.rw.Lock() + defer r.rw.Unlock() + index, exist := r.ref[t.Id()] + if !exist { + return + } + r.items = append(r.items[:index], r.items[index+1:]...) + delete(r.ref, t.Id()) +} + +func (r *RoundRobin[Id, T]) Next() T { + r.rw.RLock() + defer r.rw.RUnlock() + if r.curr >= len(r.items) { + r.curr = 0 + } + t := r.items[r.curr] + r.curr++ +} diff --git a/server/v2/conn.go b/server/internal/v2/conn.go similarity index 54% rename from server/v2/conn.go rename to server/internal/v2/conn.go index bfbb0f6..51bb9e9 100644 --- a/server/v2/conn.go +++ b/server/internal/v2/conn.go @@ -2,23 +2,25 @@ package server import ( "context" - "github.com/kercylan98/minotaur/server/v2/actor" + "github.com/kercylan98/minotaur/server/internal/v2/dispatcher" "net" ) +type ConnWriter func(packet Packet) error + type Conn interface { } -func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter, handler actor.MessageHandler[Packet]) Conn { +func newConn(ctx context.Context, c net.Conn, connWriter ConnWriter) Conn { return &conn{ conn: c, writer: connWriter, - actor: actor.NewActor[Packet](ctx, handler), + actor: dispatcher.NewActor[Packet](ctx, handler), } } type conn struct { conn net.Conn writer ConnWriter - actor *actor.Actor[Packet] + actor *dispatcher.Actor[Packet] } diff --git a/server/internal/v2/conn_context.go b/server/internal/v2/conn_context.go new file mode 100644 index 0000000..16b7d0f --- /dev/null +++ b/server/internal/v2/conn_context.go @@ -0,0 +1,4 @@ +package server + +type ConnContext interface { +} diff --git a/server/internal/v2/controller.go b/server/internal/v2/controller.go new file mode 100644 index 0000000..74ee17d --- /dev/null +++ b/server/internal/v2/controller.go @@ -0,0 +1,25 @@ +package server + +import "net" + +type Controller interface { + Run() error + Shutdown() error +} + +type controller struct { + *server +} + +func (s *controller) init(srv *server) *controller { + s.server = srv + return s +} + +func (s *controller) RegisterConn(conn net.Conn, writer ConnWriter) { + +} + +func (s *controller) UnRegisterConn() { + +} diff --git a/server/internal/v2/dispatcher/dispatcher.go b/server/internal/v2/dispatcher/dispatcher.go new file mode 100644 index 0000000..8ef6a02 --- /dev/null +++ b/server/internal/v2/dispatcher/dispatcher.go @@ -0,0 +1,108 @@ +package dispatcher + +import ( + "context" + "errors" + "github.com/kercylan98/minotaur/utils/buffer" + "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/super" + "sync" + "sync/atomic" +) + +// NewDispatcher 创建一个消息调度器 +func NewDispatcher[M any](bufferSize int, handler func(M)) *Dispatcher[M] { + d := &Dispatcher[M]{ + buf: buffer.NewRing[M](bufferSize), + bufCond: sync.NewCond(&sync.Mutex{}), + ctx: context.Background(), + } + + d.BindErrorHandler(func(err error) { + log.Error("dispatcher", log.Err(err)) + }) + + d.handler = func(m M) { + defer func() { + if err := super.RecoverTransform(recover()); err != nil { + d.errorHandler.Load().(func(error))(err) + } + }() + handler(m) + } + + return d +} + +// Dispatcher 并发安全且不阻塞的消息调度器 +type Dispatcher[M any] struct { + buf *buffer.Ring[M] + bufCond *sync.Cond + handler func(M) + closed bool + ctx context.Context + errorHandler atomic.Value +} + +// BindErrorHandler 绑定一个错误处理器到调度器中 +func (d *Dispatcher[M]) BindErrorHandler(handler func(error)) { + d.errorHandler.Store(handler) +} + +// BindContext 绑定一个上下文到调度器中 +func (d *Dispatcher[M]) BindContext(ctx context.Context) { + d.bufCond.L.Lock() + d.ctx = ctx + if _, canceled := d.ctx.Deadline(); canceled { + d.closed = true + d.bufCond.Signal() + d.bufCond.L.Unlock() + return + } + d.bufCond.L.Unlock() +} + +// Send 发送消息到调度器中等待处理 +func (d *Dispatcher[M]) Send(m M) error { + d.bufCond.L.Lock() + if d.closed { + d.bufCond.L.Unlock() + return errors.New("dispatcher closed") + } + d.buf.Write(m) + d.bufCond.Signal() + d.bufCond.L.Unlock() + return nil +} + +// Start 阻塞式启动调度器,调用后将开始处理消息 +func (d *Dispatcher[M]) Start() { + for { + select { + case <-d.ctx.Done(): + d.Stop() + default: + d.bufCond.L.Lock() + if d.buf.Len() == 0 { + if d.closed { + d.bufCond.L.Unlock() + return + } + d.bufCond.Wait() + } + messages := d.buf.ReadAll() + d.bufCond.L.Unlock() + for _, msg := range messages { + d.handler(msg) + } + } + } +} + +// Stop 停止调度器,调用后将不再接受新消息,但会处理完已有消息 +func (d *Dispatcher[M]) Stop() { + d.bufCond.L.Lock() + d.closed = true + d.bufCond.Signal() + d.bufCond.L.Unlock() +} diff --git a/server/v2/network.go b/server/internal/v2/network.go similarity index 63% rename from server/v2/network.go rename to server/internal/v2/network.go index 05ffee9..053563d 100644 --- a/server/v2/network.go +++ b/server/internal/v2/network.go @@ -5,7 +5,7 @@ import ( ) type Network interface { - OnSetup(ctx context.Context, event NetworkCore) error + OnSetup(ctx context.Context, controller Controller) error OnRun() error diff --git a/server/v2/network/http.go b/server/internal/v2/network/http.go similarity index 92% rename from server/v2/network/http.go rename to server/internal/v2/network/http.go index 0faeb33..88faa1d 100644 --- a/server/v2/network/http.go +++ b/server/internal/v2/network/http.go @@ -2,14 +2,14 @@ package network import ( "context" - "github.com/kercylan98/minotaur/server/v2" + "github.com/kercylan98/minotaur/server/internal/v2" "github.com/pkg/errors" "net" "net/http" "time" ) -func Http(addr string) server.Network { +func Http(addr string) server.server { return HttpWithHandler(addr, &HttpServe{ServeMux: http.NewServeMux()}) } diff --git a/server/v2/network/http_serve.go b/server/internal/v2/network/http_serve.go similarity index 100% rename from server/v2/network/http_serve.go rename to server/internal/v2/network/http_serve.go diff --git a/server/v2/network/websocket.go b/server/internal/v2/network/websocket.go similarity index 88% rename from server/v2/network/websocket.go rename to server/internal/v2/network/websocket.go index 39b2f03..f10c746 100644 --- a/server/v2/network/websocket.go +++ b/server/internal/v2/network/websocket.go @@ -3,13 +3,13 @@ package network import ( "context" "fmt" - "github.com/kercylan98/minotaur/server/v2" + "github.com/kercylan98/minotaur/server/internal/v2" "github.com/kercylan98/minotaur/utils/collection" "github.com/panjf2000/gnet/v2" "time" ) -func WebSocket(addr string, pattern ...string) server.Network { +func WebSocket(addr string, pattern ...string) server.server { ws := &websocketCore{ addr: addr, pattern: collection.FindFirstOrDefaultInSlice(pattern, "/"), diff --git a/server/v2/network/websocket_handler.go b/server/internal/v2/network/websocket_handler.go similarity index 96% rename from server/v2/network/websocket_handler.go rename to server/internal/v2/network/websocket_handler.go index 63efdea..c88e3dd 100644 --- a/server/v2/network/websocket_handler.go +++ b/server/internal/v2/network/websocket_handler.go @@ -4,7 +4,7 @@ import ( "errors" "github.com/gobwas/ws" "github.com/gobwas/ws/wsutil" - "github.com/kercylan98/minotaur/server/v2" + "github.com/kercylan98/minotaur/server/internal/v2" "github.com/kercylan98/minotaur/utils/log" "github.com/panjf2000/gnet/v2" "time" @@ -35,7 +35,7 @@ func (w *websocketHandler) OnShutdown(eng gnet.Engine) { func (w *websocketHandler) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { wrapper := newWebsocketWrapper(w.core.ctx, c) c.SetContext(wrapper) - w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.Packet) error { + w.core.core.OnConnectionOpened(wrapper.ctx, c, func(message server.server) error { return wsutil.WriteServerMessage(c, message.GetContext().(ws.OpCode), message.GetBytes()) }) return diff --git a/server/v2/network/wsbsocket_wrapper.go b/server/internal/v2/network/wsbsocket_wrapper.go similarity index 100% rename from server/v2/network/wsbsocket_wrapper.go rename to server/internal/v2/network/wsbsocket_wrapper.go diff --git a/server/v2/packet.go b/server/internal/v2/packet.go similarity index 100% rename from server/v2/packet.go rename to server/internal/v2/packet.go diff --git a/server/internal/v2/reactor/event.go b/server/internal/v2/reactor/event.go new file mode 100644 index 0000000..7145683 --- /dev/null +++ b/server/internal/v2/reactor/event.go @@ -0,0 +1,4 @@ +package reactor + +type Event struct { +} diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go new file mode 100644 index 0000000..3b96fb7 --- /dev/null +++ b/server/internal/v2/reactor/reactor.go @@ -0,0 +1,9 @@ +package reactor + +type Reactor[P comparable] struct { + chs []chan P +} + +func (el *Reactor[P]) Send(producer P, event Event) { + +} diff --git a/server/v2/server.go b/server/internal/v2/server.go similarity index 59% rename from server/v2/server.go rename to server/internal/v2/server.go index 17852e4..3934b2c 100644 --- a/server/v2/server.go +++ b/server/internal/v2/server.go @@ -1,9 +1,6 @@ package server -import ( - "context" - "github.com/kercylan98/minotaur/utils/super" -) +import "golang.org/x/net/context" type Server interface { Run() error @@ -11,17 +8,18 @@ type Server interface { } type server struct { - *networkCore - ctx *super.CancelContext + *controller + ctx context.Context + cancel context.CancelFunc network Network } func NewServer(network Network) Server { srv := &server{ - ctx: super.WithCancelContext(context.Background()), network: network, } - srv.networkCore = new(networkCore).init(srv) + srv.ctx, srv.cancel = context.WithCancel(context.Background()) + srv.controller = new(controller).init(srv) return srv } @@ -30,14 +28,14 @@ func (s *server) Run() (err error) { return } - if err = s.network.OnRun(s.ctx); err != nil { + if err = s.network.OnRun(); err != nil { panic(err) } return } func (s *server) Shutdown() (err error) { - defer s.ctx.Cancel() + defer s.server.cancel() err = s.network.OnShutdown() return } diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go new file mode 100644 index 0000000..74170d0 --- /dev/null +++ b/server/internal/v2/server_test.go @@ -0,0 +1,14 @@ +package server_test + +import ( + "github.com/kercylan98/minotaur/server/internal/v2" + "github.com/kercylan98/minotaur/server/internal/v2/network" + "testing" +) + +func TestNewServer(t *testing.T) { + srv := server.server.NewServer(network.WebSocket(":9999")) + if err := srv.Run(); err != nil { + panic(err) + } +} diff --git a/server/v2/actor/actor.go b/server/v2/actor/actor.go deleted file mode 100644 index bb97184..0000000 --- a/server/v2/actor/actor.go +++ /dev/null @@ -1,153 +0,0 @@ -package actor - -import ( - "context" - "github.com/kercylan98/minotaur/utils/buffer" - "github.com/kercylan98/minotaur/utils/super" - "sync" - "time" -) - -// MessageHandler 定义了处理消息的函数类型 -type MessageHandler[M any] func(message M) - -// NewActor 创建一个新的 Actor,并启动其消息处理循环 -func NewActor[M any](ctx context.Context, handler MessageHandler[M]) *Actor[M] { - a := newActor(ctx, handler) - a.counter = new(super.Counter[int]) - go a.run() - return a -} - -// newActor 创建一个新的 Actor -func newActor[M any](ctx context.Context, handler MessageHandler[M]) (actor *Actor[M]) { - a := &Actor[M]{ - buf: buffer.NewRing[M](1024), - handler: handler, - } - a.cond = sync.NewCond(&a.rw) - a.ctx, a.cancel = context.WithCancel(ctx) - return a -} - -// Actor 是一个消息驱动的并发实体 -type Actor[M any] struct { - idx int // Actor 在其父 Actor 中的索引 - ctx context.Context // Actor 的上下文 - cancel context.CancelFunc // 用于取消 Actor 的函数 - buf *buffer.Ring[M] // 用于缓存消息的环形缓冲区 - handler MessageHandler[M] // 处理消息的函数 - rw sync.RWMutex // 读写锁,用于保护 Actor 的并发访问 - cond *sync.Cond // 条件变量,用于触发消息处理流程 - counter *super.Counter[int] // 消息计数器,用于统计处理的消息数量 - dying bool // 标识 Actor 是否正在关闭中 - parent *Actor[M] // 父 Actor - subs []*Actor[M] // 子 Actor 切片 - gap []int // 用于记录已经关闭的子 Actor 的索引位置,以便复用 -} - -// run 启动 Actor 的消息处理循环 -func (a *Actor[M]) run() { - var ctx = a.ctx - var clearGap = time.NewTicker(time.Second * 30) - defer func(a *Actor[M], clearGap *time.Ticker) { - clearGap.Stop() - a.cancel() - a.parent.removeSub(a) - }(a, clearGap) - for { - select { - case <-a.ctx.Done(): - a.rw.Lock() - if ctx == a.ctx { - a.dying = true - } else { - ctx = a.ctx - } - a.rw.Unlock() - a.cond.Signal() - case <-clearGap.C: - a.rw.Lock() - for _, idx := range a.gap { - a.subs = append(a.subs[:idx], a.subs[idx+1:]...) - } - for idx, sub := range a.subs { - sub.idx = idx - } - a.gap = a.gap[:0] - a.rw.Unlock() - default: - a.rw.Lock() - if a.buf.IsEmpty() { - if a.dying && a.counter.Val() == 0 { - return - } - a.cond.Wait() - } - messages := a.buf.ReadAll() - a.rw.Unlock() - for _, message := range messages { - a.handler(message) - } - a.counter.Add(-len(messages)) - } - } -} - -// Reuse 重用 Actor,Actor 会重新激活 -func (a *Actor[M]) Reuse(ctx context.Context) { - before := a.cancel - defer before() - - a.rw.Lock() - a.ctx, a.cancel = context.WithCancel(ctx) - a.dying = false - for _, sub := range a.subs { - sub.Reuse(a.ctx) - } - a.rw.Unlock() - a.cond.Signal() -} - -// Send 发送消息 -func (a *Actor[M]) Send(message M) { - a.rw.Lock() - a.counter.Add(1) - a.buf.Write(message) - a.rw.Unlock() - a.cond.Signal() -} - -// Sub 派生一个子 Actor,该子 Actor 生命周期将继承父 Actor 的生命周期 -func (a *Actor[M]) Sub() { - a.rw.Lock() - defer a.rw.Unlock() - - sub := newActor(a.ctx, a.handler) - sub.counter = a.counter.Sub() - sub.parent = a - if len(a.gap) > 0 { - sub.idx = a.gap[0] - a.gap = a.gap[1:] - } else { - sub.idx = len(a.subs) - } - a.subs = append(a.subs, sub) - go sub.run() -} - -// removeSub 从父 Actor 中移除指定的子 Actor -func (a *Actor[M]) removeSub(sub *Actor[M]) { - if a == nil { - return - } - - a.rw.Lock() - defer a.rw.Unlock() - if sub.idx == len(a.subs)-1 { - a.subs = a.subs[:sub.idx] - return - } - a.subs[sub.idx] = nil - a.gap = append(a.gap, sub.idx) -} diff --git a/server/v2/message.go b/server/v2/message.go deleted file mode 100644 index c9487cd..0000000 --- a/server/v2/message.go +++ /dev/null @@ -1,4 +0,0 @@ -package server - -type message struct { -} diff --git a/server/v2/network_core.go b/server/v2/network_core.go deleted file mode 100644 index 46f398f..0000000 --- a/server/v2/network_core.go +++ /dev/null @@ -1,50 +0,0 @@ -package server - -import ( - "github.com/kercylan98/minotaur/utils/hub" - "golang.org/x/net/context" - "net" -) - -type ConnWriter func(message Packet) error - -type NetworkCore interface { - OnConnectionOpened(ctx context.Context, conn net.Conn, writer ConnWriter) - - OnConnectionClosed(conn Conn) - - OnReceivePacket(packet Packet) - - GeneratePacket(data []byte) Packet -} - -type networkCore struct { - *server - packetPool *hub.ObjectPool[*packet] -} - -func (ne *networkCore) init(srv *server) *networkCore { - ne.server = srv - ne.packetPool = hub.NewObjectPool(func() *packet { - return new(packet) - }, func(data *packet) { - data.reset() - }) - return ne -} - -func (ne *networkCore) OnConnectionOpened(ctx context.Context, conn net.Conn, writer ConnWriter) { - -} - -func (ne *networkCore) OnConnectionClosed(conn Conn) { - -} - -func (ne *networkCore) OnReceivePacket(packet Packet) { - -} - -func (ne *networkCore) GeneratePacket(data []byte) Packet { - return ne.packetPool.Get().init(data) -} diff --git a/server/v2/server_test.go b/server/v2/server_test.go deleted file mode 100644 index 0130dd6..0000000 --- a/server/v2/server_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package server_test - -import ( - "github.com/kercylan98/minotaur/server/v2" - "github.com/kercylan98/minotaur/server/v2/network" - "testing" -) - -func TestNewServer(t *testing.T) { - srv := server.NewServer(network.WebSocket(":9999")) - if err := srv.Run(); err != nil { - panic(err) - } -} diff --git a/utils/super/counter.go b/utils/super/counter.go index 2585945..a84f1b2 100644 --- a/utils/super/counter.go +++ b/utils/super/counter.go @@ -20,7 +20,7 @@ func (c *Counter[T]) Sub() *Counter[T] { func (c *Counter[T]) Add(delta T) { c.rw.Lock() c.v += delta - c.rw.RUnlock() + c.rw.Unlock() if c.p != nil { c.p.Add(delta) }