other: 新版 server 完善通知、事件

This commit is contained in:
kercylan98 2024-04-01 20:03:20 +08:00
parent ffc3421b29
commit 49b8efd9b2
7 changed files with 371 additions and 54 deletions

View File

@ -1,12 +1,14 @@
package server
import (
"github.com/kercylan98/minotaur/utils/collection/listings"
"time"
"github.com/kercylan98/minotaur/utils/collection/listings"
)
type (
LaunchedEventHandler func(srv Server, ip string, t time.Time)
ShutdownEventHandler func(srv Server)
ConnectionOpenedEventHandler func(srv Server, conn Conn)
ConnectionClosedEventHandler func(srv Server, conn Conn, err error)
ConnectionReceivePacketEventHandler func(srv Server, conn Conn, packet Packet)
@ -16,12 +18,20 @@ type Events interface {
// RegisterLaunchedEvent 注册服务器启动事件,当服务器启动后将会触发该事件
// - 该事件将在系统级 Actor 中运行,该事件中阻塞会导致服务器启动延迟
RegisterLaunchedEvent(handler LaunchedEventHandler, priority ...int)
// RegisterShutdownEvent 注册服务器关闭事件,当服务器关闭时将会触发该事件,当该事件处理完毕后服务器将关闭
// - 该事件将在系统级 Actor 中运行,该事件中阻塞会导致服务器关闭延迟
// - 该事件未执行完毕前,服务器的一切均正常运行
RegisterShutdownEvent(handler ShutdownEventHandler, priority ...int)
// RegisterConnectionOpenedEvent 注册连接打开事件,当新连接创建完毕时将会触发该事件
// - 该事件将在系统级 Actor 中运行,不应执行阻塞操作
RegisterConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int)
// RegisterConnectionClosedEvent 注册连接关闭事件,当连接关闭后将会触发该事件
// - 该事件将在系统级 Actor 中运行,不应执行阻塞操作
RegisterConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int)
// RegisterConnectionReceivePacketEvent 注册连接接收数据包事件,当连接接收到数据包后将会触发该事件
// - 该事件将在连接的 Actor 中运行,不应执行阻塞操作
RegisterConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int)
@ -31,6 +41,7 @@ type events struct {
*server
launchedEventHandlers listings.SyncPrioritySlice[LaunchedEventHandler]
shutdownEventHandlers listings.SyncPrioritySlice[ShutdownEventHandler]
connectionOpenedEventHandlers listings.SyncPrioritySlice[ConnectionOpenedEventHandler]
connectionClosedEventHandlers listings.SyncPrioritySlice[ConnectionClosedEventHandler]
connectionReceivePacketEventHandlers listings.SyncPrioritySlice[ConnectionReceivePacketEventHandler]
@ -92,3 +103,16 @@ func (s *events) onConnectionReceivePacket(conn Conn, packet Packet) {
})
}))
}
func (s *events) RegisterShutdownEvent(handler ShutdownEventHandler, priority ...int) {
s.shutdownEventHandlers.AppendByOptionalPriority(handler, priority...)
}
func (s *events) onShutdown() {
_ = s.server.reactor.SystemDispatch(NativeMessage(s.server, func(srv *server) {
s.shutdownEventHandlers.RangeValue(func(index int, value ShutdownEventHandler) bool {
value(s.server)
return true
})
}))
}

View File

@ -0,0 +1,53 @@
package server
import (
"math"
"os"
"os/signal"
"syscall"
"time"
)
type notify struct {
server *server
systemSignal chan os.Signal
lifeCycleLimit chan struct{}
lifeCycleTimer *time.Timer
lifeCycleTime chan time.Duration
}
func (n *notify) init(srv *server) *notify {
n.server = srv
n.systemSignal = make(chan os.Signal, 1)
n.lifeCycleLimit = make(chan struct{}, 1)
n.lifeCycleTime = make(chan time.Duration, 1)
n.lifeCycleTimer = time.NewTimer(math.MaxInt64)
n.lifeCycleTimer.Stop()
signal.Notify(n.systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
return n
}
func (n *notify) run() {
defer func() {
if err := n.server.Shutdown(); err != nil {
panic(err)
}
}()
for {
select {
case <-n.systemSignal:
return
case <-n.lifeCycleLimit:
n.systemSignal <- syscall.SIGQUIT
case <-n.lifeCycleTimer.C:
n.systemSignal <- syscall.SIGQUIT
case d := <-n.lifeCycleTime:
n.lifeCycleTimer.Stop()
if d > 0 {
n.lifeCycleTimer.Reset(d)
}
}
}
}

View File

@ -0,0 +1,172 @@
package server
import (
"sync"
"time"
)
func NewOptions() *Options {
return DefaultOptions()
}
func DefaultOptions() *Options {
return &Options{
ServerMessageChannelSize: 1024 * 4,
ActorMessageChannelSize: 1024,
ServerMessageBufferInitialSize: 1024,
ActorMessageBufferInitialSize: 1024,
MessageErrorHandler: nil,
LifeCycleLimit: 0,
}
}
type Options struct {
server *server
rw sync.RWMutex
ServerMessageChannelSize int // 服务器 Actor 消息处理管道大小
ActorMessageChannelSize int // Actor 消息处理管道大小
ServerMessageBufferInitialSize int // 服务器 Actor 消息写入缓冲区初始化大小
ActorMessageBufferInitialSize int // Actor 消息写入缓冲区初始化大小
MessageErrorHandler func(srv Server, message Message, err error) // 消息错误处理器
LifeCycleLimit time.Duration // 服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器
}
func (opt *Options) init(srv *server) *Options {
opt.server = srv
return opt
}
func (opt *Options) Apply(options ...*Options) {
opt.rw.Lock()
defer opt.rw.Unlock()
for _, option := range options {
option.rw.RLock()
opt.ServerMessageChannelSize = option.ServerMessageChannelSize
opt.ActorMessageChannelSize = option.ActorMessageChannelSize
opt.ServerMessageBufferInitialSize = option.ServerMessageBufferInitialSize
opt.ActorMessageBufferInitialSize = option.ActorMessageBufferInitialSize
opt.MessageErrorHandler = option.MessageErrorHandler
opt.LifeCycleLimit = option.LifeCycleLimit
option.rw.RUnlock()
}
if opt.server != nil && !opt.server.state.LaunchedAt.IsZero() {
opt.active()
}
}
func (opt *Options) active() {
opt.server.notify.lifeCycleTime <- opt.getLifeCycleLimit()
}
// WithServerMessageChannelSize 设置服务器 Actor 用于处理消息的管道大小,当管道由于逻辑阻塞而导致满载时,会导致新消息无法及时从缓冲区拿出,从而增加内存的消耗,但是并不会影响消息的写入
func (opt *Options) WithServerMessageChannelSize(size int) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
opt.ServerMessageChannelSize = size
})
}
func (opt *Options) getServerMessageChannelSize() int {
return getOptionsValue(opt, func(opt *Options) int {
return opt.ServerMessageChannelSize
})
}
// WithActorMessageChannelSize 设置 Actor 用于处理消息的管道大小,当管道由于逻辑阻塞而导致满载时,会导致新消息无法及时从缓冲区拿出,从而增加内存的消耗,但是并不会影响消息的写入
func (opt *Options) WithActorMessageChannelSize(size int) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
opt.ActorMessageChannelSize = size
})
}
func (opt *Options) getActorMessageChannelSize() int {
return getOptionsValue(opt, func(opt *Options) int {
return opt.ActorMessageChannelSize
})
}
// WithServerMessageBufferInitialSize 设置服务器 Actor 消息环形缓冲区 buffer.Ring 的初始大小,适当的值可以避免频繁扩容
// - 由于扩容是按照当前大小的 2 倍进行扩容,过大的值也可能会导致内存消耗过高
func (opt *Options) WithServerMessageBufferInitialSize(size int) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
opt.ServerMessageBufferInitialSize = size
})
}
func (opt *Options) getServerMessageBufferInitialSize() int {
return getOptionsValue(opt, func(opt *Options) int {
return opt.ServerMessageBufferInitialSize
})
}
// WithActorMessageBufferInitialSize 设置 Actor 消息环形缓冲区 buffer.Ring 的初始大小,适当的值可以避免频繁扩容
// - 由于扩容是按照当前大小的 2 倍进行扩容,过大的值也可能会导致内存消耗过高
func (opt *Options) WithActorMessageBufferInitialSize(size int) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
opt.ActorMessageBufferInitialSize = size
})
}
func (opt *Options) getActorMessageBufferInitialSize() int {
return getOptionsValue(opt, func(opt *Options) int {
return opt.ActorMessageBufferInitialSize
})
}
// WithMessageErrorHandler 设置消息错误处理器,当消息处理出现错误时,会调用该处理器进行处理
// - 如果在运行时设置,后续消息错误将会使用新的 handler 进行处理
func (opt *Options) WithMessageErrorHandler(handler func(srv Server, message Message, err error)) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
opt.MessageErrorHandler = handler
})
}
func (opt *Options) getMessageErrorHandler() func(srv Server, message Message, err error) {
return getOptionsValue(opt, func(opt *Options) func(srv Server, message Message, err error) {
return opt.MessageErrorHandler
})
}
// WithLifeCycleLimit 设置服务器生命周期上限,在服务器启动后达到生命周期上限将关闭服务器
// - 如果设置为 <= 0 的值,将不限制服务器生命周期
// - 该函数支持运行时设置
func (opt *Options) WithLifeCycleLimit(limit time.Duration) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
opt.LifeCycleLimit = limit
})
}
// WithLifeCycleEnd 设置服务器生命周期终点,在服务器达到该时间后将关闭服务器
// - 如果设置 end 为零值或小于当前时间的值,将不限制服务器生命周期
// - 该函数支持运行时设置
func (opt *Options) WithLifeCycleEnd(end time.Time) *Options {
return opt.modifyOptionsValue(func(opt *Options) {
now := time.Now()
if end.Before(now) {
opt.LifeCycleLimit = 0
return
}
opt.LifeCycleLimit = end.Sub(now)
})
}
// getLifeCycleLimit 获取服务器生命周期上限
func (opt *Options) getLifeCycleLimit() time.Duration {
return getOptionsValue(opt, func(opt *Options) time.Duration {
return opt.LifeCycleLimit
})
}
func (opt *Options) modifyOptionsValue(handler func(opt *Options)) *Options {
opt.rw.Lock()
handler(opt)
opt.rw.Unlock()
return opt
}
func getOptionsValue[V any](opt *Options, handler func(opt *Options) V) V {
opt.rw.RLock()
defer opt.rw.RUnlock()
return handler(opt)
}

View File

@ -0,0 +1,56 @@
package server
import (
"fmt"
"net/http"
"net/http/pprof"
"sync"
)
var (
httpPProf *http.Server // HTTP PProf 服务器
httpPProfMutex sync.Mutex // HTTP PProf 服务器互斥锁
)
// EnableHttpPProf 设置启用 http pprof
// - 该函数支持运行时调用
func EnableHttpPProf(addr, prefix string, errorHandler func(err error)) {
httpPProfMutex.Lock()
defer httpPProfMutex.Unlock()
if httpPProf != nil {
return
}
mux := http.NewServeMux()
mux.HandleFunc(fmt.Sprintf("GET %s/", prefix), pprof.Index)
mux.HandleFunc(fmt.Sprintf("GET %s/heap", prefix), pprof.Handler("heap").ServeHTTP)
mux.HandleFunc(fmt.Sprintf("GET %s/goroutine", prefix), pprof.Handler("goroutine").ServeHTTP)
mux.HandleFunc(fmt.Sprintf("GET %s/block", prefix), pprof.Handler("block").ServeHTTP)
mux.HandleFunc(fmt.Sprintf("GET %s/threadcreate", prefix), pprof.Handler("threadcreate").ServeHTTP)
mux.HandleFunc(fmt.Sprintf("GET %s/cmdline", prefix), pprof.Cmdline)
mux.HandleFunc(fmt.Sprintf("GET %s/profile", prefix), pprof.Profile)
mux.HandleFunc(fmt.Sprintf("GET %s/symbol", prefix), pprof.Symbol)
mux.HandleFunc(fmt.Sprintf("POST %s/symbol", prefix), pprof.Symbol)
mux.HandleFunc(fmt.Sprintf("GET %s/trace", prefix), pprof.Trace)
mux.HandleFunc(fmt.Sprintf("GET %s/mutex", prefix), pprof.Handler("mutex").ServeHTTP)
srv := &http.Server{Addr: addr, Handler: mux}
httpPProf = srv
go func(srv *http.Server, errHandler func(err error)) {
if err := srv.ListenAndServe(); err != nil {
errorHandler(err)
}
}(srv, errorHandler)
}
// DisableHttpPProf 设置禁用 http pprof
// - 该函数支持运行时调用
func DisableHttpPProf() {
httpPProfMutex.Lock()
defer httpPProfMutex.Unlock()
if httpPProf == nil {
return
}
_ = httpPProf.Close()
httpPProf = nil
}

View File

@ -14,24 +14,25 @@ import (
)
const (
StatusNone = iota - 1 // 事件循环未运行
StatusRunning // 事件循环运行中
StatusClosing // 事件循环关闭中
StatusClosed // 事件循环已关闭
statusNone = iota - 1 // 事件循环未运行
statusRunning // 事件循环运行中
statusClosing // 事件循环关闭中
statusClosed // 事件循环已关闭
)
var sysIdent = &identifiable{ident: "system"}
// NewReactor 创建一个新的 Reactor 实例,初始化系统级别的队列和多个 Socket 对应的队列
func NewReactor[M any](systemQueueSize, queueSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] {
func NewReactor[M any](systemQueueSize, queueSize, systemBufferSize, queueBufferSize int, handler MessageHandler[M], errorHandler ErrorHandler[M]) *Reactor[M] {
r := &Reactor[M]{
logger: log.Default().Logger,
systemQueue: newQueue[M](-1, systemQueueSize, 1024),
identifiers: haxmap.New[string, *identifiable](),
lb: loadbalancer.NewRoundRobin[int, *queue[M]](),
errorHandler: errorHandler,
queueSize: queueSize,
state: StatusNone,
logger: log.Default().Logger,
systemQueue: newQueue[M](-1, systemQueueSize, systemBufferSize),
identifiers: haxmap.New[string, *identifiable](),
lb: loadbalancer.NewRoundRobin[int, *queue[M]](),
errorHandler: errorHandler,
queueSize: queueSize,
queueBufferSize: queueBufferSize,
state: statusNone,
}
defaultNum := runtime.NumCPU()
@ -79,25 +80,20 @@ func NewReactor[M any](systemQueueSize, queueSize int, handler MessageHandler[M]
// Reactor 是一个消息反应器,管理系统级别的队列和多个 Socket 对应的队列
type Reactor[M any] struct {
logger *slog.Logger // 日志记录器
state int32 // 状态
systemQueue *queue[M] // 系统级别的队列
queueSize int // Socket 队列大小
queues []*queue[M] // Socket 使用的队列
queueRW sync.RWMutex // 队列读写锁
identifiers *haxmap.Map[string, *identifiable] // 标识符到队列索引的映射及消息计数
lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器
wg sync.WaitGroup // 等待组
cwg sync.WaitGroup // 关闭等待组
handler queueMessageHandler[M] // 消息处理器
errorHandler ErrorHandler[M] // 错误处理器
debug bool // 是否开启调试模式
}
// SetLogger 设置日志记录器
func (r *Reactor[M]) SetLogger(logger *slog.Logger) *Reactor[M] {
r.logger = logger
return r
logger *slog.Logger // 日志记录器
state int32 // 状态
systemQueue *queue[M] // 系统级别的队列
queueSize int // 队列管道大小
queueBufferSize int // 队列缓冲区大小
queues []*queue[M] // 所有使用的队列
queueRW sync.RWMutex // 队列读写锁
identifiers *haxmap.Map[string, *identifiable] // 标识符到队列索引的映射及消息计数
lb *loadbalancer.RoundRobin[int, *queue[M]] // 负载均衡器
wg sync.WaitGroup // 等待组
cwg sync.WaitGroup // 关闭等待组
handler queueMessageHandler[M] // 消息处理器
errorHandler ErrorHandler[M] // 错误处理器
debug bool // 是否开启调试模式
}
// SetDebug 设置是否开启调试模式
@ -116,7 +112,7 @@ func (r *Reactor[M]) AutoDispatch(ident string, msg M) error {
// SystemDispatch 将消息分发到系统级别的队列
func (r *Reactor[M]) SystemDispatch(msg M) error {
if atomic.LoadInt32(&r.state) > StatusRunning {
if atomic.LoadInt32(&r.state) > statusRunning {
r.queueRW.RUnlock()
return fmt.Errorf("reactor closing or closed")
}
@ -126,7 +122,7 @@ func (r *Reactor[M]) SystemDispatch(msg M) error {
// Dispatch 将消息分发到 ident 使用的队列,当 ident 首次使用时,将会根据负载均衡策略选择一个队列
func (r *Reactor[M]) Dispatch(ident string, msg M) error {
r.queueRW.RLock()
if atomic.LoadInt32(&r.state) > StatusRunning {
if atomic.LoadInt32(&r.state) > statusRunning {
r.queueRW.RUnlock()
return fmt.Errorf("reactor closing or closed")
}
@ -140,7 +136,7 @@ func (r *Reactor[M]) Dispatch(ident string, msg M) error {
// Run 启动 Reactor运行系统级别的队列和多个 Socket 对应的队列
func (r *Reactor[M]) Run() {
if !atomic.CompareAndSwapInt32(&r.state, StatusNone, StatusRunning) {
if !atomic.CompareAndSwapInt32(&r.state, statusNone, statusRunning) {
return
}
r.queueRW.Lock()
@ -153,7 +149,7 @@ func (r *Reactor[M]) Run() {
}
func (r *Reactor[M]) noneLockAddQueue() {
q := newQueue[M](len(r.queues), r.queueSize, 1024*8)
q := newQueue[M](len(r.queues), r.queueSize, r.queueBufferSize)
r.lb.Add(q) // 运行前添加到负载均衡器,未运行时允许接收消息
r.queues = append(r.queues, q)
}
@ -187,7 +183,7 @@ func (r *Reactor[M]) runQueue(q *queue[M]) {
}
func (r *Reactor[M]) Close() {
if !atomic.CompareAndSwapInt32(&r.state, StatusRunning, StatusClosing) {
if !atomic.CompareAndSwapInt32(&r.state, statusRunning, statusClosing) {
return
}
r.queueRW.Lock()
@ -196,6 +192,6 @@ func (r *Reactor[M]) Close() {
q.Close()
}
r.cwg.Wait()
atomic.StoreInt32(&r.state, StatusClosed)
atomic.StoreInt32(&r.state, statusClosed)
r.queueRW.Unlock()
}

View File

@ -1,13 +1,11 @@
package server
import (
"context"
"time"
"github.com/kercylan98/minotaur/server/internal/v2/reactor"
"github.com/kercylan98/minotaur/utils/network"
"golang.org/x/net/context"
"os"
"os/signal"
"syscall"
"time"
)
type Server interface {
@ -26,24 +24,36 @@ type Server interface {
type server struct {
*controller
*events
*Options
state *State
notify *notify
ctx context.Context
cancel context.CancelFunc
network Network
reactor *reactor.Reactor[Message]
}
func NewServer(network Network) Server {
func NewServer(network Network, options ...*Options) Server {
srv := &server{
network: network,
Options: DefaultOptions(),
}
srv.ctx, srv.cancel = context.WithCancel(context.Background())
srv.notify = new(notify).init(srv)
srv.controller = new(controller).init(srv)
srv.events = new(events).init(srv)
srv.reactor = reactor.NewReactor[Message](1024*8, 1024, func(msg Message) {
msg.Execute()
}, nil)
srv.state = new(State).init(srv)
srv.reactor = reactor.NewReactor[Message](
srv.getServerMessageChannelSize(), srv.getActorMessageChannelSize(),
srv.getServerMessageBufferInitialSize(), srv.getActorMessageBufferInitialSize(),
func(msg Message) {
msg.Execute()
}, func(msg Message, err error) {
if handler := srv.getMessageErrorHandler(); handler != nil {
handler(srv, msg, err)
}
})
srv.Options.init(srv).Apply(options...)
return srv
}
@ -61,17 +71,14 @@ func (s *server) Run() (err error) {
}
}(s)
var systemSignal = make(chan os.Signal)
signal.Notify(systemSignal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
<-systemSignal
if err := s.Shutdown(); err != nil {
panic(err)
}
s.Options.active()
s.notify.run()
return
}
func (s *server) Shutdown() (err error) {
defer s.cancel()
s.events.onShutdown()
err = s.network.OnShutdown()
s.reactor.Close()
return

View File

@ -4,17 +4,26 @@ import (
"github.com/gobwas/ws"
"github.com/kercylan98/minotaur/server/internal/v2"
"github.com/kercylan98/minotaur/server/internal/v2/network"
"github.com/kercylan98/minotaur/utils/times"
"testing"
"time"
)
func TestNewServer(t *testing.T) {
srv := server.NewServer(network.WebSocket(":9999"))
server.EnableHttpPProf(":9998", "/debug/pprof", func(err error) {
panic(err)
})
srv := server.NewServer(network.WebSocket(":9999"), server.NewOptions().WithLifeCycleLimit(times.Second*3))
srv.RegisterLaunchedEvent(func(srv server.Server, ip string, launchedAt time.Time) {
t.Log("launched", ip, launchedAt)
})
srv.RegisterShutdownEvent(func(srv server.Server) {
t.Log("shutdown")
})
srv.RegisterConnectionOpenedEvent(func(srv server.Server, conn server.Conn) {
if err := conn.WritePacket(server.NewPacket([]byte("hello")).SetContext(ws.OpText)); err != nil {
t.Error(err)