diff --git a/server/internal/v2/conn.go b/server/internal/v2/conn.go index 282b764..b1c44c7 100644 --- a/server/internal/v2/conn.go +++ b/server/internal/v2/conn.go @@ -1,6 +1,7 @@ package server import ( + "go.uber.org/atomic" "net" ) @@ -41,17 +42,17 @@ func newConn(srv *server, c net.Conn, connWriter ConnWriter) *conn { type conn struct { server *server - conn net.Conn // 连接 - writer ConnWriter // 写入器 - actor string // Actor 名称 + conn net.Conn // 连接 + writer ConnWriter // 写入器 + actor atomic.String // Actor 名称 } func (c *conn) SetActor(actor string) { - c.actor = actor + c.actor.Store(actor) } func (c *conn) GetActor() string { - return c.actor + return c.actor.Load() } func (c *conn) WritePacket(packet Packet) error { @@ -72,7 +73,7 @@ func (c *conn) WriteContext(data []byte, context interface{}) error { } func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) { - if err := c.server.reactor.Dispatch(c.actor, SyncMessage(c.server, func(srv *server) { + if err := c.server.reactor.AutoDispatch(c.GetActor(), SyncMessage(c.server, func(srv *server) { handler(srv, c) })); err != nil { panic(err) @@ -80,7 +81,7 @@ func (c *conn) PushSyncMessage(handler func(srv Server, conn Conn)) { } func (c *conn) PushAsyncMessage(handler func(srv Server, conn Conn) error, callbacks ...func(srv Server, conn Conn, err error)) { - if err := c.server.reactor.Dispatch(c.actor, AsyncMessage(c.server, c.actor, func(srv *server) error { + if err := c.server.reactor.AutoDispatch(c.GetActor(), AsyncMessage(c.server, c.GetActor(), func(srv *server) error { return handler(srv, c) }, func(srv *server, err error) { for _, callback := range callbacks { diff --git a/server/internal/v2/message.go b/server/internal/v2/message.go index 6039c32..325343b 100644 --- a/server/internal/v2/message.go +++ b/server/internal/v2/message.go @@ -2,7 +2,6 @@ package server import ( "github.com/kercylan98/minotaur/server/internal/v2/queue" - "github.com/kercylan98/minotaur/server/internal/v2/reactor" "github.com/kercylan98/minotaur/utils/log/v2" "github.com/kercylan98/minotaur/utils/super" "runtime/debug" @@ -44,11 +43,7 @@ type asyncMessage struct { func (s *asyncMessage) Execute() { var q *queue.Queue[int, string, Message] var dispatch = func(ident string, message Message, beforeHandler ...func(queue *queue.Queue[int, string, Message], msg Message)) { - if ident == "" { - _ = s.srv.reactor.DispatchWithSystem(message, beforeHandler...) - } else { - _ = s.srv.reactor.Dispatch(ident, message, beforeHandler...) - } + _ = s.srv.reactor.AutoDispatch(ident, message, beforeHandler...) } dispatch( @@ -89,7 +84,7 @@ func (s *asyncMessage) Execute() { }) }), func(queue *queue.Queue[int, string, Message], msg Message) { - queue.WaitAdd(reactor.SysIdent, 1) + queue.WaitAdd(s.ident, 1) q = queue }, ) diff --git a/server/internal/v2/options.go b/server/internal/v2/options.go index 3d9e9e0..c214417 100644 --- a/server/internal/v2/options.go +++ b/server/internal/v2/options.go @@ -85,6 +85,7 @@ func (opt *Options) IsDebug() bool { func (opt *Options) WithLogger(logger *log.Logger) *Options { return opt.modifyOptionsValue(func(opt *Options) { opt.logger = logger + opt.server.reactor.SetLogger(opt.logger) }) } diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go index f88ae1e..72a7879 100644 --- a/server/internal/v2/reactor/reactor.go +++ b/server/internal/v2/reactor/reactor.go @@ -67,6 +67,16 @@ type Reactor[M queue.Message] struct { errorHandler ErrorHandler[M] // 错误处理器 } +// SetLogger 设置日志记录器 +func (r *Reactor[M]) SetLogger(logger *log.Logger) { + r.logger.Store(logger) +} + +// GetLogger 获取日志记录器 +func (r *Reactor[M]) GetLogger() *log.Logger { + return r.logger.Load() +} + // process 消息处理 func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) { defer func(msg queue.MessageWrapper[int, string, M]) { @@ -74,20 +84,21 @@ func (r *Reactor[M]) process(msg queue.MessageWrapper[int, string, M]) { if r.errorHandler != nil { r.errorHandler(msg, err) } else { - r.logger.Load().Error("Reactor", log.Int("queue", msg.Queue().Id()), log.String("ident", msg.Ident()), log.Err(err)) + r.GetLogger().Error("Reactor", log.String("action", "process"), log.String("ident", msg.Ident()), log.Int("queue", msg.Queue().Id()), log.Err(err)) debug.PrintStack() } } }(msg) + r.handler(msg) } // AutoDispatch 自动分发,当 ident 为空字符串时,分发到系统级别的队列,否则分发到 ident 使用的队列 -func (r *Reactor[M]) AutoDispatch(ident string, msg M) error { +func (r *Reactor[M]) AutoDispatch(ident string, msg M, beforeHandler ...func(queue *queue.Queue[int, string, M], msg M)) error { if ident == str.None { - return r.DispatchWithSystem(msg) + return r.DispatchWithSystem(msg, beforeHandler...) } - return r.Dispatch(ident, msg) + return r.Dispatch(ident, msg, beforeHandler...) } // DispatchWithSystem 将消息分发到系统级别的队列 @@ -120,6 +131,9 @@ func (r *Reactor[M]) Dispatch(ident string, msg M, beforeHandler ...func(queue * if i, exist = r.location[ident]; !exist { next = r.lb.Next() r.location[ident] = next.Id() + r.logger.Load().Debug("Reactor", log.String("action", "bind"), log.String("ident", ident), log.Any("queue", next.Id())) + } else { + next = r.queues[i] } r.locationRW.Unlock() } else { @@ -133,16 +147,19 @@ func (r *Reactor[M]) Dispatch(ident string, msg M, beforeHandler ...func(queue * } // Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列 -func (r *Reactor[M]) Run() { +func (r *Reactor[M]) Run(callbacks ...func(queues []*queue.Queue[int, string, M])) { if !atomic.CompareAndSwapInt32(&r.state, statusNone, statusRunning) { return } r.queueRW.Lock() - r.runQueue(r.systemQueue) - for i := 0; i < len(r.queues); i++ { - r.runQueue(r.queues[i]) + queues := append([]*queue.Queue[int, string, M]{r.systemQueue}, r.queues...) + for _, q := range queues { + r.runQueue(q) } r.queueRW.Unlock() + for _, callback := range callbacks { + callback(queues) + } r.wg.Wait() } @@ -169,6 +186,7 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) { // 关闭时正在等待关闭完成,外部已加锁,无需再次加锁 r.noneLockDelQueue(q) r.cwg.Done() + r.logger.Load().Debug("Reactor", log.String("action", "close"), log.Any("queue", q.Id())) }) go q.Run() @@ -177,9 +195,21 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) { for m := range q.Read() { m(r.process, func(m queue.MessageWrapper[int, string, M], last bool) { if last { - r.queueRW.Lock() - defer r.queueRW.Unlock() - delete(r.location, m.Ident()) + r.locationRW.RLock() + mq, exist := r.location[m.Ident()] + r.locationRW.RUnlock() + if exist { + r.locationRW.Lock() + defer r.locationRW.Unlock() + mq, exist = r.location[m.Ident()] + if exist { + delete(r.location, m.Ident()) + r.queueRW.RLock() + mq := r.queues[mq] + r.queueRW.RUnlock() + r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.String("ident", m.Ident()), log.Any("queue", mq.Id())) + } + } } }) } diff --git a/server/internal/v2/reactor/reactor_test.go b/server/internal/v2/reactor/reactor_test.go index 8935ebd..91dc61a 100644 --- a/server/internal/v2/reactor/reactor_test.go +++ b/server/internal/v2/reactor/reactor_test.go @@ -3,8 +3,10 @@ package reactor_test import ( "github.com/kercylan98/minotaur/server/internal/v2/queue" "github.com/kercylan98/minotaur/server/internal/v2/reactor" + "github.com/kercylan98/minotaur/utils/log/v2" "github.com/kercylan98/minotaur/utils/random" "github.com/kercylan98/minotaur/utils/times" + "os" "testing" "time" ) @@ -16,6 +18,8 @@ func BenchmarkReactor_Dispatch(b *testing.B) { }) + r.SetLogger(log.NewLogger(log.NewHandler(os.Stdout, log.DefaultOptions().WithLevel(log.LevelInfo)))) + go r.Run() b.RunParallel(func(pb *testing.PB) { diff --git a/server/internal/v2/server.go b/server/internal/v2/server.go index d087c0d..9926f76 100644 --- a/server/internal/v2/server.go +++ b/server/internal/v2/server.go @@ -64,6 +64,7 @@ func NewServer(network Network, options ...*Options) Server { } }) srv.Options.init(srv).Apply(options...) + srv.reactor.SetLogger(srv.Options.GetLogger()) antsPool, err := ants.NewPool(ants.DefaultAntsPoolSize, ants.WithOptions(ants.Options{ ExpiryDuration: 10 * time.Second, Nonblocking: true, @@ -80,7 +81,15 @@ func NewServer(network Network, options ...*Options) Server { } func (s *server) Run() (err error) { - go s.reactor.Run() + var queueWait = make(chan struct{}) + go s.reactor.Run(func(queues []*queue.Queue[int, string, Message]) { + for _, q := range queues { + s.GetLogger().Debug("Reactor", log.String("action", "run"), log.Any("queue", q.Id())) + } + close(queueWait) + }) + <-queueWait + if err = s.network.OnSetup(s.ctx, s); err != nil { return } diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go index 14e69f9..b5d404e 100644 --- a/server/internal/v2/server_test.go +++ b/server/internal/v2/server_test.go @@ -4,6 +4,7 @@ import ( "github.com/gobwas/ws" "github.com/kercylan98/minotaur/server/internal/v2" "github.com/kercylan98/minotaur/server/internal/v2/network" + "github.com/kercylan98/minotaur/utils/random" "github.com/kercylan98/minotaur/utils/times" "testing" "time" @@ -25,10 +26,20 @@ func TestNewServer(t *testing.T) { srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Day*3)) + var tm = make(map[string]bool) + srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) { + conn.SetActor("12321") if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil { t.Error(err) } + + conn.PushSyncMessage(func(srv server.Server, conn server.Conn) { + for i := 0; i < 10000000; i++ { + _ = tm["1"] + tm["1"] = random.Bool() + } + }) }) srv.RegisterConnectionReceivePacketEvent(func(srv server.Server, conn server.Conn, packet server.Packet) {