diff --git a/server/internal/v2/events.go b/server/internal/v2/events.go index f422656..338ea7e 100644 --- a/server/internal/v2/events.go +++ b/server/internal/v2/events.go @@ -1,12 +1,14 @@ package server import ( - "github.com/kercylan98/minotaur/utils/collection/listings" "time" + + "github.com/kercylan98/minotaur/utils/collection/listings" ) type ( LaunchedEventHandler func(srv Server, ip string, t time.Time) + ShutdownEventHandler func(srv Server) ConnectionOpenedEventHandler func(srv Server, conn Conn) ConnectionClosedEventHandler func(srv Server, conn Conn, err error) ConnectionReceivePacketEventHandler func(srv Server, conn Conn, packet Packet) @@ -16,12 +18,20 @@ type Events interface { // RegisterLaunchedEvent 注册服务器启动事件,当服务器启动后将会触发该事件 // - 该事件将在系统级 Actor 中运行,该事件中阻塞会导致服务器启动延迟 RegisterLaunchedEvent(handler LaunchedEventHandler, priority ...int) + + // RegisterShutdownEvent 注册服务器关闭事件,当服务器关闭时将会触发该事件,当该事件处理完毕后服务器将关闭 + // - 该事件将在系统级 Actor 中运行,该事件中阻塞会导致服务器关闭延迟 + // - 该事件未执行完毕前,服务器的一切均正常运行 + RegisterShutdownEvent(handler ShutdownEventHandler, priority ...int) + // RegisterConnectionOpenedEvent 注册连接打开事件,当新连接创建完毕时将会触发该事件 // - 该事件将在系统级 Actor 中运行,不应执行阻塞操作 RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) + // RegisterConnectionClosedEvent 注册连接关闭事件,当连接关闭后将会触发该事件 // - 该事件将在系统级 Actor 中运行,不应执行阻塞操作 RegisterConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) + // RegisterConnectionReceivePacketEvent 注册连接接收数据包事件,当连接接收到数据包后将会触发该事件 // - 该事件将在连接的 Actor 中运行,不应执行阻塞操作 RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) @@ -31,6 +41,7 @@ type events struct { *server launchedEventHandlers listings.SyncPrioritySlice[LaunchedEventHandler] + shutdownEventHandlers listings.SyncPrioritySlice[ShutdownEventHandler] connectionOpenedEventHandlers listings.SyncPrioritySlice[ConnectionOpenedEventHandler] connectionClosedEventHandlers listings.SyncPrioritySlice[ConnectionClosedEventHandler] connectionReceivePacketEventHandlers listings.SyncPrioritySlice[ConnectionReceivePacketEventHandler] @@ -92,3 +103,16 @@ func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) { }) })) } + +func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority ...int) { + s.shutdownEventHandlers.AppendByOptionalPriority(handler, priority...) +} + +func (s *events) onShutdown() { + _ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) { + s.shutdownEventHandlers.RangeValue(func(index int, value ShutdownEventHandler) bool { + value(s.server) + return true + }) + })) +} diff --git a/server/internal/v2/notify.go b/server/internal/v2/notify.go new file mode 100644 index 0000000..118a6a9 --- /dev/null +++ b/server/internal/v2/notify.go @@ -0,0 +1,53 @@ +package server + +import ( + "math" + "os" + "os/signal" + "syscall" + "time" +) + +type notify struct { + server *server + + systemSignal chan os.Signal + lifeCycleLimit chan struct{} + lifeCycleTimer *time.Timer + lifeCycleTime chan time.Duration +} + +func (n *notify) init(srv *server) *notify { + n.server = srv + n.systemSignal = make(chan os.Signal, 1) + n.lifeCycleLimit = make(chan struct{}, 1) + n.lifeCycleTime = make(chan time.Duration, 1) + n.lifeCycleTimer = time.NewTimer(math.MaxInt64) + n.lifeCycleTimer.Stop() + signal.Notify(n.systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) + + return n +} + +func (n *notify) run() { + defer func() { + if err := n.server.Shutdown(); err != nil { + panic(err) + } + }() + for { + select { + case <-n.systemSignal: + return + case <-n.lifeCycleLimit: + n.systemSignal <- syscall.SIGQUIT + case <-n.lifeCycleTimer.C: + n.systemSignal <- syscall.SIGQUIT + case d := <-n.lifeCycleTime: + n.lifeCycleTimer.Stop() + if d > 0 { + n.lifeCycleTimer.Reset(d) + } + } + } +} diff --git a/server/internal/v2/options.go b/server/internal/v2/options.go new file mode 100644 index 0000000..1fefbc3 --- /dev/null +++ b/server/internal/v2/options.go @@ -0,0 +1,172 @@ +package server + +import ( + "sync" + "time" +) + +func NewOptions() *Options { + return DefaultOptions() +} + +func DefaultOptions() *Options { + return &Options{ + ServerMessageChannelSize: 1024 * 4, + ActorMessageChannelSize: 1024, + ServerMessageBufferInitialSize: 1024, + ActorMessageBufferInitialSize: 1024, + MessageErrorHandler: nil, + LifeCycleLimit: 0, + } +} + +type Options struct { + server *server + rw sync.RWMutex + ServerMessageChannelSize int // 服务器 Actor 消息处理管道大小 + ActorMessageChannelSize int // Actor 消息处理管道大小 + ServerMessageBufferInitialSize int // 服务器 Actor 消息写入缓冲区初始化大小 + ActorMessageBufferInitialSize int // Actor 消息写入缓冲区初始化大小 + MessageErrorHandler func(srv Server, message Message, err error) // 消息错误处理器 + LifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器 +} + +func (opt *Options) init(srv *server) *Options { + opt.server = srv + return opt +} + +func (opt *Options) Apply(options ...*Options) { + opt.rw.Lock() + defer opt.rw.Unlock() + for _, option := range options { + option.rw.RLock() + + opt.ServerMessageChannelSize = option.ServerMessageChannelSize + opt.ActorMessageChannelSize = option.ActorMessageChannelSize + opt.ServerMessageBufferInitialSize = option.ServerMessageBufferInitialSize + opt.ActorMessageBufferInitialSize = option.ActorMessageBufferInitialSize + opt.MessageErrorHandler = option.MessageErrorHandler + opt.LifeCycleLimit = option.LifeCycleLimit + + option.rw.RUnlock() + } + if opt.server != nil && !opt.server.state.LaunchedAt.IsZero() { + opt.active() + } +} + +func (opt *Options) active() { + opt.server.notify.lifeCycleTime <- opt.getLifeCycleLimit() +} + +// WithServerMessageChannelSize 设置服务器 Actor 用于处理消息的管道大小,当管道由于逻辑阻塞而导致满载时,会导致新消息无法及时从缓冲区拿出,从而增加内存的消耗,但是并不会影响消息的写入 +func (opt *Options) WithServerMessageChannelSize(size int) *Options { + return opt.modifyOptionsValue(func(opt *Options) { + opt.ServerMessageChannelSize = size + }) +} + +func (opt *Options) getServerMessageChannelSize() int { + return getOptionsValue(opt, func(opt *Options) int { + return opt.ServerMessageChannelSize + }) +} + +// WithActorMessageChannelSize 设置 Actor 用于处理消息的管道大小,当管道由于逻辑阻塞而导致满载时,会导致新消息无法及时从缓冲区拿出,从而增加内存的消耗,但是并不会影响消息的写入 +func (opt *Options) WithActorMessageChannelSize(size int) *Options { + return opt.modifyOptionsValue(func(opt *Options) { + opt.ActorMessageChannelSize = size + }) +} + +func (opt *Options) getActorMessageChannelSize() int { + return getOptionsValue(opt, func(opt *Options) int { + return opt.ActorMessageChannelSize + }) +} + +// WithServerMessageBufferInitialSize 设置服务器 Actor 消息环形缓冲区 buffer.Ring 的初始大小,适当的值可以避免频繁扩容 +// - 由于扩容是按照当前大小的 2 倍进行扩容,过大的值也可能会导致内存消耗过高 +func (opt *Options) WithServerMessageBufferInitialSize(size int) *Options { + return opt.modifyOptionsValue(func(opt *Options) { + opt.ServerMessageBufferInitialSize = size + }) +} + +func (opt *Options) getServerMessageBufferInitialSize() int { + return getOptionsValue(opt, func(opt *Options) int { + return opt.ServerMessageBufferInitialSize + }) +} + +// WithActorMessageBufferInitialSize 设置 Actor 消息环形缓冲区 buffer.Ring 的初始大小,适当的值可以避免频繁扩容 +// - 由于扩容是按照当前大小的 2 倍进行扩容,过大的值也可能会导致内存消耗过高 +func (opt *Options) WithActorMessageBufferInitialSize(size int) *Options { + return opt.modifyOptionsValue(func(opt *Options) { + opt.ActorMessageBufferInitialSize = size + }) +} + +func (opt *Options) getActorMessageBufferInitialSize() int { + return getOptionsValue(opt, func(opt *Options) int { + return opt.ActorMessageBufferInitialSize + }) +} + +// WithMessageErrorHandler 设置消息错误处理器,当消息处理出现错误时,会调用该处理器进行处理 +// - 如果在运行时设置,后续消息错误将会使用新的 handler 进行处理 +func (opt *Options) WithMessageErrorHandler(handler func(srv Server, message Message, err error)) *Options { + return opt.modifyOptionsValue(func(opt *Options) { + opt.MessageErrorHandler = handler + }) +} + +func (opt *Options) getMessageErrorHandler() func(srv Server, message Message, err error) { + return getOptionsValue(opt, func(opt *Options) func(srv Server, message Message, err error) { + return opt.MessageErrorHandler + }) +} + +// WithLifeCycleLimit 设置服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器 +// - 如果设置为 <= 0 的值,将不限制服务器生命周期 +// - 该函数支持运行时设置 +func (opt *Options) WithLifeCycleLimit(limit time.Duration) *Options { + return opt.modifyOptionsValue(func(opt *Options) { + opt.LifeCycleLimit = limit + }) +} + +// WithLifeCycleEnd 设置服务器生命周期终点,在服务器达到该时间后将关闭服务器 +// - 如果设置 end 为零值或小于当前时间的值,将不限制服务器生命周期 +// - 该函数支持运行时设置 +func (opt *Options) WithLifeCycleEnd(end time.Time) *Options { + return opt.modifyOptionsValue(func(opt *Options) { + now := time.Now() + if end.Before(now) { + opt.LifeCycleLimit = 0 + return + } + opt.LifeCycleLimit = end.Sub(now) + }) +} + +// getLifeCycleLimit 获取服务器生命周期上限 +func (opt *Options) getLifeCycleLimit() time.Duration { + return getOptionsValue(opt, func(opt *Options) time.Duration { + return opt.LifeCycleLimit + }) +} + +func (opt *Options) modifyOptionsValue(handler func(opt *Options)) *Options { + opt.rw.Lock() + handler(opt) + opt.rw.Unlock() + return opt +} + +func getOptionsValue[V any](opt *Options, handler func(opt *Options) V) V { + opt.rw.RLock() + defer opt.rw.RUnlock() + return handler(opt) +} diff --git a/server/internal/v2/pprof.go b/server/internal/v2/pprof.go new file mode 100644 index 0000000..6379157 --- /dev/null +++ b/server/internal/v2/pprof.go @@ -0,0 +1,56 @@ +package server + +import ( + "fmt" + "net/http" + "net/http/pprof" + "sync" +) + +var ( + httpPProf *http.Server // HTTP PProf 服务器 + httpPProfMutex sync.Mutex // HTTP PProf 服务器互斥锁 +) + +// EnableHttpPProf 设置启用 http pprof +// - 该函数支持运行时调用 +func EnableHttpPProf(addr, prefix string, errorHandler func(err error)) { + httpPProfMutex.Lock() + defer httpPProfMutex.Unlock() + if httpPProf != nil { + return + } + + mux := http.NewServeMux() + mux.HandleFunc(fmt.Sprintf("GET %s/", prefix), pprof.Index) + mux.HandleFunc(fmt.Sprintf("GET %s/heap", prefix), pprof.Handler("heap").ServeHTTP) + mux.HandleFunc(fmt.Sprintf("GET %s/goroutine", prefix), pprof.Handler("goroutine").ServeHTTP) + mux.HandleFunc(fmt.Sprintf("GET %s/block", prefix), pprof.Handler("block").ServeHTTP) + mux.HandleFunc(fmt.Sprintf("GET %s/threadcreate", prefix), pprof.Handler("threadcreate").ServeHTTP) + mux.HandleFunc(fmt.Sprintf("GET %s/cmdline", prefix), pprof.Cmdline) + mux.HandleFunc(fmt.Sprintf("GET %s/profile", prefix), pprof.Profile) + mux.HandleFunc(fmt.Sprintf("GET %s/symbol", prefix), pprof.Symbol) + mux.HandleFunc(fmt.Sprintf("POST %s/symbol", prefix), pprof.Symbol) + mux.HandleFunc(fmt.Sprintf("GET %s/trace", prefix), pprof.Trace) + mux.HandleFunc(fmt.Sprintf("GET %s/mutex", prefix), pprof.Handler("mutex").ServeHTTP) + srv := &http.Server{Addr: addr, Handler: mux} + httpPProf = srv + go func(srv *http.Server, errHandler func(err error)) { + if err := srv.ListenAndServe(); err != nil { + errorHandler(err) + } + }(srv, errorHandler) +} + +// DisableHttpPProf 设置禁用 http pprof +// - 该函数支持运行时调用 +func DisableHttpPProf() { + httpPProfMutex.Lock() + defer httpPProfMutex.Unlock() + if httpPProf == nil { + return + } + + _ = httpPProf.Close() + httpPProf = nil +} diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go index c5656d2..b889e8e 100644 --- a/server/internal/v2/reactor/reactor.go +++ b/server/internal/v2/reactor/reactor.go @@ -14,24 +14,25 @@ import ( ) const ( - StatusNone = iota - 1 // 事件循环未运行 - StatusRunning // 事件循环运行中 - StatusClosing // 事件循环关闭中 - StatusClosed // 事件循环已关闭 + statusNone = iota - 1 // 事件循环未运行 + statusRunning // 事件循环运行中 + statusClosing // 事件循环关闭中 + statusClosed // 事件循环已关闭 ) var sysIdent = &identifiable{ident: "system"} // NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列 -func NewReactor[M any](systemQueueSize, queueSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { +func NewReactor[M any](systemQueueSize, queueSize, systemBufferSize, queueBufferSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] { r := &Reactor[M]{ - logger: log.Default().Logger, - systemQueue: newQueue[M](-1, systemQueueSize, 1024), - identifiers: haxmap.New[string, *identifiable](), - lb: loadbalancer.NewRoundRobin[int, *queue[M]](), - errorHandler: errorHandler, - queueSize: queueSize, - state: StatusNone, + logger: log.Default().Logger, + systemQueue: newQueue[M](-1, systemQueueSize, systemBufferSize), + identifiers: haxmap.New[string, *identifiable](), + lb: loadbalancer.NewRoundRobin[int, *queue[M]](), + errorHandler: errorHandler, + queueSize: queueSize, + queueBufferSize: queueBufferSize, + state: statusNone, } defaultNum := runtime.NumCPU() @@ -79,25 +80,20 @@ func NewReactor[M any](systemQueueSize, queueSize int, handler MessageHandler[M] // Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列 type Reactor[M any] struct { - logger *slog.Logger // 日志记录器 - state int32 // 状态 - systemQueue *queue[M] // 系统级别的队列 - queueSize int // Socket 队列大小 - queues []*queue[M] // Socket 使用的队列 - queueRW sync.RWMutex // 队列读写锁 - identifiers *haxmap.Map[string, *identifiable] // 标识符到队列索引的映射及消息计数 - lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器 - wg sync.WaitGroup // 等待组 - cwg sync.WaitGroup // 关闭等待组 - handler queueMessageHandler[M] // 消息处理器 - errorHandler ErrorHandler[M] // 错误处理器 - debug bool // 是否开启调试模式 -} - -// SetLogger 设置日志记录器 -func (r *Reactor[M]) SetLogger(logger *slog.Logger) *Reactor[M] { - r.logger = logger - return r + logger *slog.Logger // 日志记录器 + state int32 // 状态 + systemQueue *queue[M] // 系统级别的队列 + queueSize int // 队列管道大小 + queueBufferSize int // 队列缓冲区大小 + queues []*queue[M] // 所有使用的队列 + queueRW sync.RWMutex // 队列读写锁 + identifiers *haxmap.Map[string, *identifiable] // 标识符到队列索引的映射及消息计数 + lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器 + wg sync.WaitGroup // 等待组 + cwg sync.WaitGroup // 关闭等待组 + handler queueMessageHandler[M] // 消息处理器 + errorHandler ErrorHandler[M] // 错误处理器 + debug bool // 是否开启调试模式 } // SetDebug 设置是否开启调试模式 @@ -116,7 +112,7 @@ func (r *Reactor[M]) AutoDispatch(ident string, msg M) error { // SystemDispatch 将消息分发到系统级别的队列 func (r *Reactor[M]) SystemDispatch(msg M) error { - if atomic.LoadInt32(&r.state) > StatusRunning { + if atomic.LoadInt32(&r.state) > statusRunning { r.queueRW.RUnlock() return fmt.Errorf("reactor closing or closed") } @@ -126,7 +122,7 @@ func (r *Reactor[M]) SystemDispatch(msg M) error { // Dispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列 func (r *Reactor[M]) Dispatch(ident string, msg M) error { r.queueRW.RLock() - if atomic.LoadInt32(&r.state) > StatusRunning { + if atomic.LoadInt32(&r.state) > statusRunning { r.queueRW.RUnlock() return fmt.Errorf("reactor closing or closed") } @@ -140,7 +136,7 @@ func (r *Reactor[M]) Dispatch(ident string, msg M) error { // Run 启动 Reactor,运行系统级别的队列和多个 Socket 对应的队列 func (r *Reactor[M]) Run() { - if !atomic.CompareAndSwapInt32(&r.state, StatusNone, StatusRunning) { + if !atomic.CompareAndSwapInt32(&r.state, statusNone, statusRunning) { return } r.queueRW.Lock() @@ -153,7 +149,7 @@ func (r *Reactor[M]) Run() { } func (r *Reactor[M]) noneLockAddQueue() { - q := newQueue[M](len(r.queues), r.queueSize, 1024*8) + q := newQueue[M](len(r.queues), r.queueSize, r.queueBufferSize) r.lb.Add(q) // 运行前添加到负载均衡器,未运行时允许接收消息 r.queues = append(r.queues, q) } @@ -187,7 +183,7 @@ func (r *Reactor[M]) runQueue(q *queue[M]) { } func (r *Reactor[M]) Close() { - if !atomic.CompareAndSwapInt32(&r.state, StatusRunning, StatusClosing) { + if !atomic.CompareAndSwapInt32(&r.state, statusRunning, statusClosing) { return } r.queueRW.Lock() @@ -196,6 +192,6 @@ func (r *Reactor[M]) Close() { q.Close() } r.cwg.Wait() - atomic.StoreInt32(&r.state, StatusClosed) + atomic.StoreInt32(&r.state, statusClosed) r.queueRW.Unlock() } diff --git a/server/internal/v2/server.go b/server/internal/v2/server.go index c507f9d..864fa7a 100644 --- a/server/internal/v2/server.go +++ b/server/internal/v2/server.go @@ -1,13 +1,11 @@ package server import ( + "context" + "time" + "github.com/kercylan98/minotaur/server/internal/v2/reactor" "github.com/kercylan98/minotaur/utils/network" - "golang.org/x/net/context" - "os" - "os/signal" - "syscall" - "time" ) type Server interface { @@ -26,24 +24,36 @@ type Server interface { type server struct { *controller *events + *Options state *State + notify *notify ctx context.Context cancel context.CancelFunc network Network reactor *reactor.Reactor[Message] } -func NewServer(network Network) Server { +func NewServer(network Network, options ...*Options) Server { srv := &server{ network: network, + Options: DefaultOptions(), } srv.ctx, srv.cancel = context.WithCancel(context.Background()) + srv.notify = new(notify).init(srv) srv.controller = new(controller).init(srv) srv.events = new(events).init(srv) - srv.reactor = reactor.NewReactor[Message](1024*8, 1024, func(msg Message) { - msg.Execute() - }, nil) srv.state = new(State).init(srv) + srv.reactor = reactor.NewReactor[Message]( + srv.getServerMessageChannelSize(), srv.getActorMessageChannelSize(), + srv.getServerMessageBufferInitialSize(), srv.getActorMessageBufferInitialSize(), + func(msg Message) { + msg.Execute() + }, func(msg Message, err error) { + if handler := srv.getMessageErrorHandler(); handler != nil { + handler(srv, msg, err) + } + }) + srv.Options.init(srv).Apply(options...) return srv } @@ -61,17 +71,14 @@ func (s *server) Run() (err error) { } }(s) - var systemSignal = make(chan os.Signal) - signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) - <-systemSignal - if err := s.Shutdown(); err != nil { - panic(err) - } + s.Options.active() + s.notify.run() return } func (s *server) Shutdown() (err error) { defer s.cancel() + s.events.onShutdown() err = s.network.OnShutdown() s.reactor.Close() return diff --git a/server/internal/v2/server_test.go b/server/internal/v2/server_test.go index c4f940c..c959829 100644 --- a/server/internal/v2/server_test.go +++ b/server/internal/v2/server_test.go @@ -4,17 +4,26 @@ import ( "github.com/gobwas/ws" "github.com/kercylan98/minotaur/server/internal/v2" "github.com/kercylan98/minotaur/server/internal/v2/network" + "github.com/kercylan98/minotaur/utils/times" "testing" "time" ) func TestNewServer(t *testing.T) { - srv := server.NewServer(network.WebSocket(":9999")) + server.EnableHttpPProf(":9998", "/debug/pprof", func(err error) { + panic(err) + }) + + srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Second*3)) srv.RegisterLaunchedEvent(func(srv server.Server, ip string, launchedAt time.Time) { t.Log("launched", ip, launchedAt) }) + srv.RegisterShutdownEvent(func(srv server.Server) { + t.Log("shutdown") + }) + srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) { if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil { t.Error(err)