diff --git a/server/internal/v2/actor.go b/server/internal/v2/actor.go new file mode 100644 index 0000000..d70516f --- /dev/null +++ b/server/internal/v2/actor.go @@ -0,0 +1,3 @@ +package server + +type diff --git a/server/internal/v2/actor/actor.go b/server/internal/v2/actor/actor.go new file mode 100644 index 0000000..8acb236 --- /dev/null +++ b/server/internal/v2/actor/actor.go @@ -0,0 +1,7 @@ +package actor + +import "github.com/kercylan98/minotaur/server/internal/v2/dispatcher" + +type Actor[M any] struct { + *dispatcher.Dispatcher[M] +} diff --git a/server/internal/v2/balancer/balancer.go b/server/internal/v2/balancer/balancer.go new file mode 100644 index 0000000..c530e24 --- /dev/null +++ b/server/internal/v2/balancer/balancer.go @@ -0,0 +1,20 @@ +package balancer + +type Item[Id comparable] interface { + // Id 返回唯一标识 + Id() Id + + // Weight 返回权重 + Weight() int +} + +type Balancer[Id comparable, T Item[Id]] interface { + // Add 添加一个负载均衡目标 + Add(t T) + + // Remove 移除一个负载均衡目标 + Remove(t T) + + // Next 根据负载均衡策略选择下一个目标 + Next() T +} diff --git a/server/internal/v2/balancer/round_robin.go b/server/internal/v2/balancer/round_robin.go new file mode 100644 index 0000000..fa5b718 --- /dev/null +++ b/server/internal/v2/balancer/round_robin.go @@ -0,0 +1,46 @@ +package balancer + +import "sync" + +func NewRoundRobin[Id comparable, T Item[Id]]() *RoundRobin[Id, T] { + +} + +type RoundRobin[Id comparable, T Item[Id]] struct { + ref map[Id]int + items []T + rw sync.RWMutex + curr int +} + +func (r *RoundRobin[Id, T]) Add(t T) { + r.rw.Lock() + defer r.rw.Unlock() + _, exist := r.ref[t.Id()] + if exist { + return + } + r.ref[t.Id()] = len(r.items) + r.items = append(r.items, t) +} + +func (r *RoundRobin[Id, T]) Remove(t T) { + r.rw.Lock() + defer r.rw.Unlock() + index, exist := r.ref[t.Id()] + if !exist { + return + } + r.items = append(r.items[:index], r.items[index+1:]...) + delete(r.ref, t.Id()) +} + +func (r *RoundRobin[Id, T]) Next() T { + r.rw.RLock() + defer r.rw.RUnlock() + if r.curr >= len(r.items) { + r.curr = 0 + } + t := r.items[r.curr] + r.curr++ +} diff --git a/server/internal/v2/dispatcher/dispatcher.go b/server/internal/v2/dispatcher/dispatcher.go new file mode 100644 index 0000000..8ef6a02 --- /dev/null +++ b/server/internal/v2/dispatcher/dispatcher.go @@ -0,0 +1,108 @@ +package dispatcher + +import ( + "context" + "errors" + "github.com/kercylan98/minotaur/utils/buffer" + "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/super" + "sync" + "sync/atomic" +) + +// NewDispatcher 创建一个消息调度器 +func NewDispatcher[M any](bufferSize int, handler func(M)) *Dispatcher[M] { + d := &Dispatcher[M]{ + buf: buffer.NewRing[M](bufferSize), + bufCond: sync.NewCond(&sync.Mutex{}), + ctx: context.Background(), + } + + d.BindErrorHandler(func(err error) { + log.Error("dispatcher", log.Err(err)) + }) + + d.handler = func(m M) { + defer func() { + if err := super.RecoverTransform(recover()); err != nil { + d.errorHandler.Load().(func(error))(err) + } + }() + handler(m) + } + + return d +} + +// Dispatcher 并发安全且不阻塞的消息调度器 +type Dispatcher[M any] struct { + buf *buffer.Ring[M] + bufCond *sync.Cond + handler func(M) + closed bool + ctx context.Context + errorHandler atomic.Value +} + +// BindErrorHandler 绑定一个错误处理器到调度器中 +func (d *Dispatcher[M]) BindErrorHandler(handler func(error)) { + d.errorHandler.Store(handler) +} + +// BindContext 绑定一个上下文到调度器中 +func (d *Dispatcher[M]) BindContext(ctx context.Context) { + d.bufCond.L.Lock() + d.ctx = ctx + if _, canceled := d.ctx.Deadline(); canceled { + d.closed = true + d.bufCond.Signal() + d.bufCond.L.Unlock() + return + } + d.bufCond.L.Unlock() +} + +// Send 发送消息到调度器中等待处理 +func (d *Dispatcher[M]) Send(m M) error { + d.bufCond.L.Lock() + if d.closed { + d.bufCond.L.Unlock() + return errors.New("dispatcher closed") + } + d.buf.Write(m) + d.bufCond.Signal() + d.bufCond.L.Unlock() + return nil +} + +// Start 阻塞式启动调度器,调用后将开始处理消息 +func (d *Dispatcher[M]) Start() { + for { + select { + case <-d.ctx.Done(): + d.Stop() + default: + d.bufCond.L.Lock() + if d.buf.Len() == 0 { + if d.closed { + d.bufCond.L.Unlock() + return + } + d.bufCond.Wait() + } + messages := d.buf.ReadAll() + d.bufCond.L.Unlock() + for _, msg := range messages { + d.handler(msg) + } + } + } +} + +// Stop 停止调度器,调用后将不再接受新消息,但会处理完已有消息 +func (d *Dispatcher[M]) Stop() { + d.bufCond.L.Lock() + d.closed = true + d.bufCond.Signal() + d.bufCond.L.Unlock() +} diff --git a/server/internal/v2/reactor/event.go b/server/internal/v2/reactor/event.go new file mode 100644 index 0000000..7145683 --- /dev/null +++ b/server/internal/v2/reactor/event.go @@ -0,0 +1,4 @@ +package reactor + +type Event struct { +}