refactor: 优化及重构 server 包关于 WebSocket 的消息类型和消息分流部分内容
- 优化 server 包中 WebSocket 服务器默认响应的消息类型与发信方不同步的问题; - 移除 server.WithShunt 函数,调整为通过 server.Server.UseShunt 来动态分流渠道,例如可以将用户连接的渠道在用户自身渠道或游戏房间渠道来回切换;
This commit is contained in:
parent
8e94a6681e
commit
dc557a06d4
|
@ -106,9 +106,10 @@ func newBotConn(server *Server) *Conn {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// Conn 服务器连接单次会话的包装
|
// Conn 服务器连接单次消息的包装
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
*connection
|
*connection
|
||||||
|
wst int
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +161,7 @@ func (slf *Conn) GetWebsocketRequest() *http.Request {
|
||||||
|
|
||||||
// IsBot 是否是机器人连接
|
// IsBot 是否是机器人连接
|
||||||
func (slf *Conn) IsBot() bool {
|
func (slf *Conn) IsBot() bool {
|
||||||
return slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil
|
return slf != nil && slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoteAddr 获取远程地址
|
// RemoteAddr 获取远程地址
|
||||||
|
@ -229,15 +230,15 @@ func (slf *Conn) IsWebsocket() bool {
|
||||||
return slf.server.network == NetworkWebsocket
|
return slf.server.network == NetworkWebsocket
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetWST 获取websocket消息类型
|
// GetWST 获取本次 websocket 消息类型
|
||||||
|
// - 默认将与发送类型相同
|
||||||
func (slf *Conn) GetWST() int {
|
func (slf *Conn) GetWST() int {
|
||||||
wst, _ := slf.ctx.Value(contextKeyWST).(int)
|
return slf.wst
|
||||||
return wst
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetWST 设置websocket消息类型
|
// SetWST 设置本次 websocket 消息类型
|
||||||
func (slf *Conn) SetWST(wst int) *Conn {
|
func (slf *Conn) SetWST(wst int) *Conn {
|
||||||
slf.ctx = context.WithValue(slf.ctx, contextKeyWST, wst)
|
slf.wst = wst
|
||||||
return slf
|
return slf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,7 +256,6 @@ func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callba
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write 向连接中写入数据
|
// Write 向连接中写入数据
|
||||||
// - messageType: websocket模式中指定消息类型
|
|
||||||
func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
|
func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
|
||||||
if slf.gw != nil {
|
if slf.gw != nil {
|
||||||
slf.gw(packet)
|
slf.gw(packet)
|
||||||
|
@ -356,9 +356,7 @@ func (slf *Conn) Close(err ...error) {
|
||||||
if slf.ticker != nil {
|
if slf.ticker != nil {
|
||||||
slf.ticker.Release()
|
slf.ticker.Release()
|
||||||
}
|
}
|
||||||
if slf.server.shuntMatcher != nil {
|
slf.server.releaseDispatcher(slf)
|
||||||
slf.server.releaseDispatcher(slf.server.shuntMatcher(slf))
|
|
||||||
}
|
|
||||||
slf.pool.Close()
|
slf.pool.Close()
|
||||||
slf.loop.Close()
|
slf.loop.Close()
|
||||||
slf.mu.Unlock()
|
slf.mu.Unlock()
|
||||||
|
|
|
@ -16,7 +16,3 @@ const (
|
||||||
DefaultWebsocketReadDeadline = 30 * time.Second
|
DefaultWebsocketReadDeadline = 30 * time.Second
|
||||||
DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB
|
DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
contextKeyWST = "_wst" // WebSocket 消息类型
|
|
||||||
)
|
|
||||||
|
|
|
@ -8,8 +8,9 @@ import (
|
||||||
var dispatcherUnique = struct{}{}
|
var dispatcherUnique = struct{}{}
|
||||||
|
|
||||||
// generateDispatcher 生成消息分发器
|
// generateDispatcher 生成消息分发器
|
||||||
func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
|
func generateDispatcher(name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
|
||||||
return &dispatcher{
|
return &dispatcher{
|
||||||
|
name: name,
|
||||||
buffer: buffer.NewUnboundedN[*Message](),
|
buffer: buffer.NewUnboundedN[*Message](),
|
||||||
handler: handler,
|
handler: handler,
|
||||||
uniques: haxmap.New[string, struct{}](),
|
uniques: haxmap.New[string, struct{}](),
|
||||||
|
@ -18,6 +19,7 @@ func generateDispatcher(handler func(dispatcher *dispatcher, message *Message))
|
||||||
|
|
||||||
// dispatcher 消息分发器
|
// dispatcher 消息分发器
|
||||||
type dispatcher struct {
|
type dispatcher struct {
|
||||||
|
name string
|
||||||
buffer *buffer.Unbounded[*Message]
|
buffer *buffer.Unbounded[*Message]
|
||||||
uniques *haxmap.Map[string, struct{}]
|
uniques *haxmap.Map[string, struct{}]
|
||||||
handler func(dispatcher *dispatcher, message *Message)
|
handler func(dispatcher *dispatcher, message *Message)
|
||||||
|
|
253
server/event.go
253
server/event.go
|
@ -15,74 +15,74 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type StartBeforeEventHandle func(srv *Server)
|
type StartBeforeEventHandler func(srv *Server)
|
||||||
type StartFinishEventHandle func(srv *Server)
|
type StartFinishEventHandler func(srv *Server)
|
||||||
type StopEventHandle func(srv *Server)
|
type StopEventHandler func(srv *Server)
|
||||||
type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte)
|
type ConnectionReceivePacketEventHandler func(srv *Server, conn *Conn, packet []byte)
|
||||||
type ConnectionOpenedEventHandle func(srv *Server, conn *Conn)
|
type ConnectionOpenedEventHandler func(srv *Server, conn *Conn)
|
||||||
type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any)
|
type ConnectionClosedEventHandler func(srv *Server, conn *Conn, err any)
|
||||||
type MessageErrorEventHandle func(srv *Server, message *Message, err error)
|
type MessageErrorEventHandler func(srv *Server, message *Message, err error)
|
||||||
type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Duration)
|
type MessageLowExecEventHandler func(srv *Server, message *Message, cost time.Duration)
|
||||||
type ConsoleCommandEventHandle func(srv *Server, command string, params ConsoleParams)
|
type ConsoleCommandEventHandler func(srv *Server, command string, params ConsoleParams)
|
||||||
type ConnectionOpenedAfterEventHandle func(srv *Server, conn *Conn)
|
type ConnectionOpenedAfterEventHandler func(srv *Server, conn *Conn)
|
||||||
type ConnectionWritePacketBeforeEventHandle func(srv *Server, conn *Conn, packet []byte) []byte
|
type ConnectionWritePacketBeforeEventHandler func(srv *Server, conn *Conn, packet []byte) []byte
|
||||||
type ShuntChannelCreatedEventHandle func(srv *Server, guid int64)
|
type ShuntChannelCreatedEventHandler func(srv *Server, guid int64)
|
||||||
type ShuntChannelClosedEventHandle func(srv *Server, guid int64)
|
type ShuntChannelClosedEventHandler func(srv *Server, guid int64)
|
||||||
type ConnectionPacketPreprocessEventHandle func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte))
|
type ConnectionPacketPreprocessEventHandler func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte))
|
||||||
type MessageExecBeforeEventHandle func(srv *Server, message *Message) bool
|
type MessageExecBeforeEventHandler func(srv *Server, message *Message) bool
|
||||||
type MessageReadyEventHandle func(srv *Server)
|
type MessageReadyEventHandler func(srv *Server)
|
||||||
|
|
||||||
func newEvent(srv *Server) *event {
|
func newEvent(srv *Server) *event {
|
||||||
return &event{
|
return &event{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
startBeforeEventHandles: slice.NewPriority[StartBeforeEventHandle](),
|
startBeforeEventHandlers: slice.NewPriority[StartBeforeEventHandler](),
|
||||||
startFinishEventHandles: slice.NewPriority[StartFinishEventHandle](),
|
startFinishEventHandlers: slice.NewPriority[StartFinishEventHandler](),
|
||||||
stopEventHandles: slice.NewPriority[StopEventHandle](),
|
stopEventHandlers: slice.NewPriority[StopEventHandler](),
|
||||||
connectionReceivePacketEventHandles: slice.NewPriority[ConnectionReceivePacketEventHandle](),
|
connectionReceivePacketEventHandlers: slice.NewPriority[ConnectionReceivePacketEventHandler](),
|
||||||
connectionOpenedEventHandles: slice.NewPriority[ConnectionOpenedEventHandle](),
|
connectionOpenedEventHandlers: slice.NewPriority[ConnectionOpenedEventHandler](),
|
||||||
connectionClosedEventHandles: slice.NewPriority[ConnectionClosedEventHandle](),
|
connectionClosedEventHandlers: slice.NewPriority[ConnectionClosedEventHandler](),
|
||||||
messageErrorEventHandles: slice.NewPriority[MessageErrorEventHandle](),
|
messageErrorEventHandlers: slice.NewPriority[MessageErrorEventHandler](),
|
||||||
messageLowExecEventHandles: slice.NewPriority[MessageLowExecEventHandle](),
|
messageLowExecEventHandlers: slice.NewPriority[MessageLowExecEventHandler](),
|
||||||
connectionOpenedAfterEventHandles: slice.NewPriority[ConnectionOpenedAfterEventHandle](),
|
connectionOpenedAfterEventHandlers: slice.NewPriority[ConnectionOpenedAfterEventHandler](),
|
||||||
connectionWritePacketBeforeHandles: slice.NewPriority[ConnectionWritePacketBeforeEventHandle](),
|
connectionWritePacketBeforeHandlers: slice.NewPriority[ConnectionWritePacketBeforeEventHandler](),
|
||||||
shuntChannelCreatedEventHandles: slice.NewPriority[ShuntChannelCreatedEventHandle](),
|
shuntChannelCreatedEventHandlers: slice.NewPriority[ShuntChannelCreatedEventHandler](),
|
||||||
shuntChannelClosedEventHandles: slice.NewPriority[ShuntChannelClosedEventHandle](),
|
shuntChannelClosedEventHandlers: slice.NewPriority[ShuntChannelClosedEventHandler](),
|
||||||
connectionPacketPreprocessEventHandles: slice.NewPriority[ConnectionPacketPreprocessEventHandle](),
|
connectionPacketPreprocessEventHandlers: slice.NewPriority[ConnectionPacketPreprocessEventHandler](),
|
||||||
messageExecBeforeEventHandles: slice.NewPriority[MessageExecBeforeEventHandle](),
|
messageExecBeforeEventHandlers: slice.NewPriority[MessageExecBeforeEventHandler](),
|
||||||
messageReadyEventHandles: slice.NewPriority[MessageReadyEventHandle](),
|
messageReadyEventHandlers: slice.NewPriority[MessageReadyEventHandler](),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type event struct {
|
type event struct {
|
||||||
*Server
|
*Server
|
||||||
startBeforeEventHandles *slice.Priority[StartBeforeEventHandle]
|
startBeforeEventHandlers *slice.Priority[StartBeforeEventHandler]
|
||||||
startFinishEventHandles *slice.Priority[StartFinishEventHandle]
|
startFinishEventHandlers *slice.Priority[StartFinishEventHandler]
|
||||||
stopEventHandles *slice.Priority[StopEventHandle]
|
stopEventHandlers *slice.Priority[StopEventHandler]
|
||||||
connectionReceivePacketEventHandles *slice.Priority[ConnectionReceivePacketEventHandle]
|
connectionReceivePacketEventHandlers *slice.Priority[ConnectionReceivePacketEventHandler]
|
||||||
connectionOpenedEventHandles *slice.Priority[ConnectionOpenedEventHandle]
|
connectionOpenedEventHandlers *slice.Priority[ConnectionOpenedEventHandler]
|
||||||
connectionClosedEventHandles *slice.Priority[ConnectionClosedEventHandle]
|
connectionClosedEventHandlers *slice.Priority[ConnectionClosedEventHandler]
|
||||||
messageErrorEventHandles *slice.Priority[MessageErrorEventHandle]
|
messageErrorEventHandlers *slice.Priority[MessageErrorEventHandler]
|
||||||
messageLowExecEventHandles *slice.Priority[MessageLowExecEventHandle]
|
messageLowExecEventHandlers *slice.Priority[MessageLowExecEventHandler]
|
||||||
connectionOpenedAfterEventHandles *slice.Priority[ConnectionOpenedAfterEventHandle]
|
connectionOpenedAfterEventHandlers *slice.Priority[ConnectionOpenedAfterEventHandler]
|
||||||
connectionWritePacketBeforeHandles *slice.Priority[ConnectionWritePacketBeforeEventHandle]
|
connectionWritePacketBeforeHandlers *slice.Priority[ConnectionWritePacketBeforeEventHandler]
|
||||||
shuntChannelCreatedEventHandles *slice.Priority[ShuntChannelCreatedEventHandle]
|
shuntChannelCreatedEventHandlers *slice.Priority[ShuntChannelCreatedEventHandler]
|
||||||
shuntChannelClosedEventHandles *slice.Priority[ShuntChannelClosedEventHandle]
|
shuntChannelClosedEventHandlers *slice.Priority[ShuntChannelClosedEventHandler]
|
||||||
connectionPacketPreprocessEventHandles *slice.Priority[ConnectionPacketPreprocessEventHandle]
|
connectionPacketPreprocessEventHandlers *slice.Priority[ConnectionPacketPreprocessEventHandler]
|
||||||
messageExecBeforeEventHandles *slice.Priority[MessageExecBeforeEventHandle]
|
messageExecBeforeEventHandlers *slice.Priority[MessageExecBeforeEventHandler]
|
||||||
messageReadyEventHandles *slice.Priority[MessageReadyEventHandle]
|
messageReadyEventHandlers *slice.Priority[MessageReadyEventHandler]
|
||||||
|
|
||||||
consoleCommandEventHandles map[string]*slice.Priority[ConsoleCommandEventHandle]
|
consoleCommandEventHandlers map[string]*slice.Priority[ConsoleCommandEventHandler]
|
||||||
consoleCommandEventHandleInitOnce sync.Once
|
consoleCommandEventHandlerInitOnce sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegStopEvent 服务器停止时将立即执行被注册的事件处理函数
|
// RegStopEvent 服务器停止时将立即执行被注册的事件处理函数
|
||||||
func (slf *event) RegStopEvent(handle StopEventHandle, priority ...int) {
|
func (slf *event) RegStopEvent(handler StopEventHandler, priority ...int) {
|
||||||
slf.stopEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.stopEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnStopEvent() {
|
func (slf *event) OnStopEvent() {
|
||||||
slf.stopEventHandles.RangeValue(func(index int, value StopEventHandle) bool {
|
slf.stopEventHandlers.RangeValue(func(index int, value StopEventHandler) bool {
|
||||||
value(slf.Server)
|
value(slf.Server)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -91,15 +91,15 @@ func (slf *event) OnStopEvent() {
|
||||||
// RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数
|
// RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数
|
||||||
// - 默认将注册 "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN" 指令作为关闭服务器的指令
|
// - 默认将注册 "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN" 指令作为关闭服务器的指令
|
||||||
// - 可通过注册默认指令进行默认行为的覆盖
|
// - 可通过注册默认指令进行默认行为的覆盖
|
||||||
func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEventHandle, priority ...int) {
|
func (slf *event) RegConsoleCommandEvent(command string, handler ConsoleCommandEventHandler, priority ...int) {
|
||||||
fd := int(os.Stdin.Fd())
|
fd := int(os.Stdin.Fd())
|
||||||
if !terminal.IsTerminal(fd) {
|
if !terminal.IsTerminal(fd) {
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("ignore", "system not terminal"))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("ignore", "system not terminal"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
slf.consoleCommandEventHandleInitOnce.Do(func() {
|
slf.consoleCommandEventHandlerInitOnce.Do(func() {
|
||||||
slf.consoleCommandEventHandles = map[string]*slice.Priority[ConsoleCommandEventHandle]{}
|
slf.consoleCommandEventHandlers = map[string]*slice.Priority[ConsoleCommandEventHandler]{}
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
var input string
|
var input string
|
||||||
|
@ -112,18 +112,18 @@ func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEv
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
})
|
})
|
||||||
list, exist := slf.consoleCommandEventHandles[command]
|
list, exist := slf.consoleCommandEventHandlers[command]
|
||||||
if !exist {
|
if !exist {
|
||||||
list = slice.NewPriority[ConsoleCommandEventHandle]()
|
list = slice.NewPriority[ConsoleCommandEventHandler]()
|
||||||
slf.consoleCommandEventHandles[command] = list
|
slf.consoleCommandEventHandlers[command] = list
|
||||||
}
|
}
|
||||||
list.Append(handle, slice.GetValue(priority, 0))
|
list.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) {
|
func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) {
|
||||||
slf.PushSystemMessage(func() {
|
slf.PushSystemMessage(func() {
|
||||||
handles, exist := slf.consoleCommandEventHandles[command]
|
handles, exist := slf.consoleCommandEventHandlers[command]
|
||||||
if !exist {
|
if !exist {
|
||||||
switch command {
|
switch command {
|
||||||
case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN":
|
case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN":
|
||||||
|
@ -142,7 +142,7 @@ func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) {
|
||||||
for key, value := range v {
|
for key, value := range v {
|
||||||
params[key] = value
|
params[key] = value
|
||||||
}
|
}
|
||||||
handles.RangeValue(func(index int, value ConsoleCommandEventHandle) bool {
|
handles.RangeValue(func(index int, value ConsoleCommandEventHandler) bool {
|
||||||
value(slf.Server, command, params)
|
value(slf.Server, command, params)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -151,9 +151,9 @@ func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
|
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
|
||||||
func (slf *event) RegStartBeforeEvent(handle StartBeforeEventHandle, priority ...int) {
|
func (slf *event) RegStartBeforeEvent(handler StartBeforeEventHandler, priority ...int) {
|
||||||
slf.startBeforeEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.startBeforeEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnStartBeforeEvent() {
|
func (slf *event) OnStartBeforeEvent() {
|
||||||
|
@ -163,7 +163,7 @@ func (slf *event) OnStartBeforeEvent() {
|
||||||
debug.PrintStack()
|
debug.PrintStack()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
slf.startBeforeEventHandles.RangeValue(func(index int, value StartBeforeEventHandle) bool {
|
slf.startBeforeEventHandlers.RangeValue(func(index int, value StartBeforeEventHandler) bool {
|
||||||
value(slf.Server)
|
value(slf.Server)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -171,14 +171,14 @@ func (slf *event) OnStartBeforeEvent() {
|
||||||
|
|
||||||
// RegStartFinishEvent 在服务器启动完成时将立刻执行被注册的事件处理函数
|
// RegStartFinishEvent 在服务器启动完成时将立刻执行被注册的事件处理函数
|
||||||
// - 需要注意该时刻服务器已经启动完成,但是还有可能未开始处理消息,客户端有可能无法连接,如果需要在消息处理器准备就绪后执行,请使用 RegMessageReadyEvent 函数
|
// - 需要注意该时刻服务器已经启动完成,但是还有可能未开始处理消息,客户端有可能无法连接,如果需要在消息处理器准备就绪后执行,请使用 RegMessageReadyEvent 函数
|
||||||
func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle, priority ...int) {
|
func (slf *event) RegStartFinishEvent(handler StartFinishEventHandler, priority ...int) {
|
||||||
slf.startFinishEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.startFinishEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnStartFinishEvent() {
|
func (slf *event) OnStartFinishEvent() {
|
||||||
slf.PushSystemMessage(func() {
|
slf.PushSystemMessage(func() {
|
||||||
slf.startFinishEventHandles.RangeValue(func(index int, value StartFinishEventHandle) bool {
|
slf.startFinishEventHandlers.RangeValue(func(index int, value StartFinishEventHandler) bool {
|
||||||
value(slf.Server)
|
value(slf.Server)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -192,18 +192,18 @@ func (slf *event) OnStartFinishEvent() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数
|
// RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数
|
||||||
func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle, priority ...int) {
|
func (slf *event) RegConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) {
|
||||||
if slf.network == NetworkHttp {
|
if slf.network == NetworkHttp {
|
||||||
panic(ErrNetworkIncompatibleHttp)
|
panic(ErrNetworkIncompatibleHttp)
|
||||||
}
|
}
|
||||||
slf.connectionClosedEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.connectionClosedEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
|
func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
|
||||||
slf.PushSystemMessage(func() {
|
slf.PushSystemMessage(func() {
|
||||||
slf.Server.online.Delete(conn.GetID())
|
slf.Server.online.Delete(conn.GetID())
|
||||||
slf.connectionClosedEventHandles.RangeValue(func(index int, value ConnectionClosedEventHandle) bool {
|
slf.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool {
|
||||||
value(slf.Server, conn, err)
|
value(slf.Server, conn, err)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -211,51 +211,53 @@ func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数
|
// RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数
|
||||||
func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle, priority ...int) {
|
// - 该阶段的事件将会在系统消息中进行处理,不适合处理耗时操作
|
||||||
|
func (slf *event) RegConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) {
|
||||||
if slf.network == NetworkHttp {
|
if slf.network == NetworkHttp {
|
||||||
panic(ErrNetworkIncompatibleHttp)
|
panic(ErrNetworkIncompatibleHttp)
|
||||||
}
|
}
|
||||||
slf.connectionOpenedEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.connectionOpenedEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnConnectionOpenedEvent(conn *Conn) {
|
func (slf *event) OnConnectionOpenedEvent(conn *Conn) {
|
||||||
slf.PushSystemMessage(func() {
|
slf.PushSystemMessage(func() {
|
||||||
slf.Server.online.Set(conn.GetID(), conn)
|
slf.Server.online.Set(conn.GetID(), conn)
|
||||||
slf.connectionOpenedEventHandles.RangeValue(func(index int, value ConnectionOpenedEventHandle) bool {
|
slf.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool {
|
||||||
value(slf.Server, conn)
|
value(slf.Server, conn)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
slf.OnConnectionOpenedAfterEvent(conn)
|
||||||
}, log.String("Event", "OnConnectionOpenedEvent"))
|
}, log.String("Event", "OnConnectionOpenedEvent"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数
|
// RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数
|
||||||
func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacketEventHandle, priority ...int) {
|
func (slf *event) RegConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) {
|
||||||
if slf.network == NetworkHttp {
|
if slf.network == NetworkHttp {
|
||||||
panic(ErrNetworkIncompatibleHttp)
|
panic(ErrNetworkIncompatibleHttp)
|
||||||
}
|
}
|
||||||
slf.connectionReceivePacketEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.connectionReceivePacketEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) {
|
func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) {
|
||||||
if slf.Server.runtime.packetWarnSize > 0 && len(packet) > slf.Server.runtime.packetWarnSize {
|
if slf.Server.runtime.packetWarnSize > 0 && len(packet) > slf.Server.runtime.packetWarnSize {
|
||||||
log.Warn("Server", log.String("OnConnectionReceivePacketEvent", fmt.Sprintf("packet size %d > %d", len(packet), slf.Server.runtime.packetWarnSize)))
|
log.Warn("Server", log.String("OnConnectionReceivePacketEvent", fmt.Sprintf("packet size %d > %d", len(packet), slf.Server.runtime.packetWarnSize)))
|
||||||
}
|
}
|
||||||
slf.connectionReceivePacketEventHandles.RangeValue(func(index int, value ConnectionReceivePacketEventHandle) bool {
|
slf.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool {
|
||||||
value(slf.Server, conn, packet)
|
value(slf.Server, conn, packet)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数
|
// RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数
|
||||||
func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle, priority ...int) {
|
func (slf *event) RegMessageErrorEvent(handler MessageErrorEventHandler, priority ...int) {
|
||||||
slf.messageErrorEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.messageErrorEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnMessageErrorEvent(message *Message, err error) {
|
func (slf *event) OnMessageErrorEvent(message *Message, err error) {
|
||||||
if slf.messageErrorEventHandles.Len() == 0 {
|
if slf.messageErrorEventHandlers.Len() == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -264,41 +266,42 @@ func (slf *event) OnMessageErrorEvent(message *Message, err error) {
|
||||||
debug.PrintStack()
|
debug.PrintStack()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
slf.messageErrorEventHandles.RangeValue(func(index int, value MessageErrorEventHandle) bool {
|
slf.messageErrorEventHandlers.RangeValue(func(index int, value MessageErrorEventHandler) bool {
|
||||||
value(slf.Server, message, err)
|
value(slf.Server, message, err)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数
|
// RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数
|
||||||
func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle, priority ...int) {
|
func (slf *event) RegMessageLowExecEvent(handler MessageLowExecEventHandler, priority ...int) {
|
||||||
slf.messageLowExecEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.messageLowExecEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) {
|
func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) {
|
||||||
if slf.messageLowExecEventHandles.Len() == 0 {
|
if slf.messageLowExecEventHandlers.Len() == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 慢消息不再占用消息通道
|
// 慢消息不再占用消息通道
|
||||||
slf.messageLowExecEventHandles.RangeValue(func(index int, value MessageLowExecEventHandle) bool {
|
slf.messageLowExecEventHandlers.RangeValue(func(index int, value MessageLowExecEventHandler) bool {
|
||||||
value(slf.Server, message, cost)
|
value(slf.Server, message, cost)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数
|
// RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数
|
||||||
func (slf *event) RegConnectionOpenedAfterEvent(handle ConnectionOpenedAfterEventHandle, priority ...int) {
|
// - 该阶段事件将会转到对应消息分流渠道中进行处理
|
||||||
|
func (slf *event) RegConnectionOpenedAfterEvent(handler ConnectionOpenedAfterEventHandler, priority ...int) {
|
||||||
if slf.network == NetworkHttp {
|
if slf.network == NetworkHttp {
|
||||||
panic(ErrNetworkIncompatibleHttp)
|
panic(ErrNetworkIncompatibleHttp)
|
||||||
}
|
}
|
||||||
slf.connectionOpenedAfterEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.connectionOpenedAfterEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) {
|
func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) {
|
||||||
slf.PushSystemMessage(func() {
|
slf.PushShuntMessage(conn, func() {
|
||||||
slf.connectionOpenedAfterEventHandles.RangeValue(func(index int, value ConnectionOpenedAfterEventHandle) bool {
|
slf.connectionOpenedAfterEventHandlers.RangeValue(func(index int, value ConnectionOpenedAfterEventHandler) bool {
|
||||||
value(slf.Server, conn)
|
value(slf.Server, conn)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -306,20 +309,20 @@ func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数
|
// RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数
|
||||||
func (slf *event) RegConnectionWritePacketBeforeEvent(handle ConnectionWritePacketBeforeEventHandle, priority ...int) {
|
func (slf *event) RegConnectionWritePacketBeforeEvent(handler ConnectionWritePacketBeforeEventHandler, priority ...int) {
|
||||||
if slf.network == NetworkHttp {
|
if slf.network == NetworkHttp {
|
||||||
panic(ErrNetworkIncompatibleHttp)
|
panic(ErrNetworkIncompatibleHttp)
|
||||||
}
|
}
|
||||||
slf.connectionWritePacketBeforeHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.connectionWritePacketBeforeHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte) (newPacket []byte) {
|
func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte) (newPacket []byte) {
|
||||||
if slf.connectionWritePacketBeforeHandles.Len() == 0 {
|
if slf.connectionWritePacketBeforeHandlers.Len() == 0 {
|
||||||
return packet
|
return packet
|
||||||
}
|
}
|
||||||
newPacket = packet
|
newPacket = packet
|
||||||
slf.connectionWritePacketBeforeHandles.RangeValue(func(index int, value ConnectionWritePacketBeforeEventHandle) bool {
|
slf.connectionWritePacketBeforeHandlers.RangeValue(func(index int, value ConnectionWritePacketBeforeEventHandler) bool {
|
||||||
newPacket = value(slf.Server, conn, newPacket)
|
newPacket = value(slf.Server, conn, newPacket)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -327,14 +330,14 @@ func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegShuntChannelCreatedEvent 在分流通道创建时将立刻执行被注册的事件处理函数
|
// RegShuntChannelCreatedEvent 在分流通道创建时将立刻执行被注册的事件处理函数
|
||||||
func (slf *event) RegShuntChannelCreatedEvent(handle ShuntChannelCreatedEventHandle, priority ...int) {
|
func (slf *event) RegShuntChannelCreatedEvent(handler ShuntChannelCreatedEventHandler, priority ...int) {
|
||||||
slf.shuntChannelCreatedEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.shuntChannelCreatedEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnShuntChannelCreatedEvent(guid int64) {
|
func (slf *event) OnShuntChannelCreatedEvent(guid int64) {
|
||||||
slf.PushSystemMessage(func() {
|
slf.PushSystemMessage(func() {
|
||||||
slf.shuntChannelCreatedEventHandles.RangeValue(func(index int, value ShuntChannelCreatedEventHandle) bool {
|
slf.shuntChannelCreatedEventHandlers.RangeValue(func(index int, value ShuntChannelCreatedEventHandler) bool {
|
||||||
value(slf.Server, guid)
|
value(slf.Server, guid)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -342,14 +345,14 @@ func (slf *event) OnShuntChannelCreatedEvent(guid int64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数
|
// RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数
|
||||||
func (slf *event) RegShuntChannelCloseEvent(handle ShuntChannelClosedEventHandle, priority ...int) {
|
func (slf *event) RegShuntChannelCloseEvent(handler ShuntChannelClosedEventHandler, priority ...int) {
|
||||||
slf.shuntChannelClosedEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.shuntChannelClosedEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnShuntChannelClosedEvent(guid int64) {
|
func (slf *event) OnShuntChannelClosedEvent(guid int64) {
|
||||||
slf.PushSystemMessage(func() {
|
slf.PushSystemMessage(func() {
|
||||||
slf.shuntChannelClosedEventHandles.RangeValue(func(index int, value ShuntChannelClosedEventHandle) bool {
|
slf.shuntChannelClosedEventHandlers.RangeValue(func(index int, value ShuntChannelClosedEventHandler) bool {
|
||||||
value(slf.Server, guid)
|
value(slf.Server, guid)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -364,17 +367,17 @@ func (slf *event) OnShuntChannelClosedEvent(guid int64) {
|
||||||
// 场景:
|
// 场景:
|
||||||
// - 数据包格式校验
|
// - 数据包格式校验
|
||||||
// - 数据包分包等情况处理
|
// - 数据包分包等情况处理
|
||||||
func (slf *event) RegConnectionPacketPreprocessEvent(handle ConnectionPacketPreprocessEventHandle, priority ...int) {
|
func (slf *event) RegConnectionPacketPreprocessEvent(handler ConnectionPacketPreprocessEventHandler, priority ...int) {
|
||||||
slf.connectionPacketPreprocessEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.connectionPacketPreprocessEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, usePacket func(newPacket []byte)) bool {
|
func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, usePacket func(newPacket []byte)) bool {
|
||||||
if slf.connectionPacketPreprocessEventHandles.Len() == 0 {
|
if slf.connectionPacketPreprocessEventHandlers.Len() == 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
var abort = false
|
var abort = false
|
||||||
slf.connectionPacketPreprocessEventHandles.RangeValue(func(index int, value ConnectionPacketPreprocessEventHandle) bool {
|
slf.connectionPacketPreprocessEventHandlers.RangeValue(func(index int, value ConnectionPacketPreprocessEventHandler) bool {
|
||||||
value(slf.Server, conn, packet, func() { abort = true }, usePacket)
|
value(slf.Server, conn, packet, func() { abort = true }, usePacket)
|
||||||
if abort {
|
if abort {
|
||||||
return false
|
return false
|
||||||
|
@ -388,13 +391,13 @@ func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, u
|
||||||
// - 当返回 true 时,将继续执行后续的消息处理函数,否则将不会执行后续的消息处理函数,并且该消息将被丢弃
|
// - 当返回 true 时,将继续执行后续的消息处理函数,否则将不会执行后续的消息处理函数,并且该消息将被丢弃
|
||||||
//
|
//
|
||||||
// 适用于限流等场景
|
// 适用于限流等场景
|
||||||
func (slf *event) RegMessageExecBeforeEvent(handle MessageExecBeforeEventHandle, priority ...int) {
|
func (slf *event) RegMessageExecBeforeEvent(handler MessageExecBeforeEventHandler, priority ...int) {
|
||||||
slf.messageExecBeforeEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.messageExecBeforeEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
|
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnMessageExecBeforeEvent(message *Message) bool {
|
func (slf *event) OnMessageExecBeforeEvent(message *Message) bool {
|
||||||
if slf.messageExecBeforeEventHandles.Len() == 0 {
|
if slf.messageExecBeforeEventHandlers.Len() == 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
var result = true
|
var result = true
|
||||||
|
@ -404,7 +407,7 @@ func (slf *event) OnMessageExecBeforeEvent(message *Message) bool {
|
||||||
debug.PrintStack()
|
debug.PrintStack()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
slf.messageExecBeforeEventHandles.RangeValue(func(index int, value MessageExecBeforeEventHandle) bool {
|
slf.messageExecBeforeEventHandlers.RangeValue(func(index int, value MessageExecBeforeEventHandler) bool {
|
||||||
result = value(slf.Server, message)
|
result = value(slf.Server, message)
|
||||||
return result
|
return result
|
||||||
})
|
})
|
||||||
|
@ -412,12 +415,12 @@ func (slf *event) OnMessageExecBeforeEvent(message *Message) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegMessageReadyEvent 在服务器消息处理器准备就绪时立即执行被注册的事件处理函数
|
// RegMessageReadyEvent 在服务器消息处理器准备就绪时立即执行被注册的事件处理函数
|
||||||
func (slf *event) RegMessageReadyEvent(handle MessageReadyEventHandle, priority ...int) {
|
func (slf *event) RegMessageReadyEvent(handler MessageReadyEventHandler, priority ...int) {
|
||||||
slf.messageReadyEventHandles.Append(handle, slice.GetValue(priority, 0))
|
slf.messageReadyEventHandlers.Append(handler, slice.GetValue(priority, 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *event) OnMessageReadyEvent() {
|
func (slf *event) OnMessageReadyEvent() {
|
||||||
if slf.messageReadyEventHandles.Len() == 0 {
|
if slf.messageReadyEventHandlers.Len() == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -426,7 +429,7 @@ func (slf *event) OnMessageReadyEvent() {
|
||||||
debug.PrintStack()
|
debug.PrintStack()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
slf.messageReadyEventHandles.RangeValue(func(index int, value MessageReadyEventHandle) bool {
|
slf.messageReadyEventHandlers.RangeValue(func(index int, value MessageReadyEventHandler) bool {
|
||||||
value(slf.Server)
|
value(slf.Server)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -436,7 +439,7 @@ func (slf *event) check() {
|
||||||
switch slf.network {
|
switch slf.network {
|
||||||
case NetworkHttp, NetworkGRPC, NetworkNone:
|
case NetworkHttp, NetworkGRPC, NetworkNone:
|
||||||
default:
|
default:
|
||||||
if slf.connectionReceivePacketEventHandles.Len() == 0 {
|
if slf.connectionReceivePacketEventHandlers.Len() == 0 {
|
||||||
log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed"))
|
log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,9 @@ const (
|
||||||
|
|
||||||
// MessageTypeSystem 系统消息类型
|
// MessageTypeSystem 系统消息类型
|
||||||
MessageTypeSystem
|
MessageTypeSystem
|
||||||
|
|
||||||
|
// MessageTypeShunt 普通分流消息类型
|
||||||
|
MessageTypeShunt
|
||||||
)
|
)
|
||||||
|
|
||||||
var messageNames = map[MessageType]string{
|
var messageNames = map[MessageType]string{
|
||||||
|
@ -60,6 +63,7 @@ var messageNames = map[MessageType]string{
|
||||||
MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync",
|
MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync",
|
||||||
MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback",
|
MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback",
|
||||||
MessageTypeSystem: "MessageTypeSystem",
|
MessageTypeSystem: "MessageTypeSystem",
|
||||||
|
MessageTypeShunt: "MessageTypeShunt",
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -209,3 +213,9 @@ func (slf *Message) castToErrorMessage(err error, action MessageErrorAction, mar
|
||||||
slf.t, slf.err, slf.errAction, slf.marks = MessageTypeError, err, action, mark
|
slf.t, slf.err, slf.errAction, slf.marks = MessageTypeError, err, action, mark
|
||||||
return slf
|
return slf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// castToShuntMessage 将消息转换为分流消息
|
||||||
|
func (slf *Message) castToShuntMessage(conn *Conn, caller func(), mark ...log.Field) *Message {
|
||||||
|
slf.t, slf.conn, slf.ordinaryHandler, slf.marks = MessageTypeShunt, conn, caller, mark
|
||||||
|
return slf
|
||||||
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ func (slf *MultipleServer) Run() {
|
||||||
go func(address string, server *Server) {
|
go func(address string, server *Server) {
|
||||||
var lock sync.Mutex
|
var lock sync.Mutex
|
||||||
var startFinish bool
|
var startFinish bool
|
||||||
server.startFinishEventHandles.Append(func(srv *Server) {
|
server.startFinishEventHandlers.Append(func(srv *Server) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
defer lock.Unlock()
|
defer lock.Unlock()
|
||||||
if !startFinish {
|
if !startFinish {
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"github.com/kercylan98/minotaur/utils/log"
|
"github.com/kercylan98/minotaur/utils/log"
|
||||||
"github.com/kercylan98/minotaur/utils/timer"
|
"github.com/kercylan98/minotaur/utils/timer"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"runtime/debug"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -40,7 +39,6 @@ type runtime struct {
|
||||||
websocketCompression int // websocket压缩等级
|
websocketCompression int // websocket压缩等级
|
||||||
websocketWriteCompression bool // websocket写入压缩
|
websocketWriteCompression bool // websocket写入压缩
|
||||||
limitLife time.Duration // 限制最大生命周期
|
limitLife time.Duration // 限制最大生命周期
|
||||||
shuntMatcher func(conn *Conn) string // 分流匹配器
|
|
||||||
packetWarnSize int // 数据包大小警告
|
packetWarnSize int // 数据包大小警告
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,32 +56,6 @@ func WithPacketWarnSize(size int) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithShunt 通过连接数据包分流的方式创建服务器
|
|
||||||
// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况
|
|
||||||
// - shuntMatcher:用于匹配连接的函数,返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道
|
|
||||||
//
|
|
||||||
// 将被分流的消息类型(更多类型有待斟酌):
|
|
||||||
// - MessageTypePacket
|
|
||||||
//
|
|
||||||
// 注意事项:
|
|
||||||
// - 当分流匹配过程发生 panic 将会在系统通道内处理消息,并打印日志
|
|
||||||
func WithShunt(shuntMatcher func(conn *Conn) string) Option {
|
|
||||||
return func(srv *Server) {
|
|
||||||
if shuntMatcher == nil {
|
|
||||||
log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
srv.shuntMatcher = func(conn *Conn) string {
|
|
||||||
defer func() {
|
|
||||||
if err := recover(); err != nil {
|
|
||||||
log.Error("ShuntMatcher", log.String("State", "Panic"), log.Any("Error", err), log.String("Stack", string(debug.Stack())))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return shuntMatcher(conn)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLimitLife 通过限制最大生命周期的方式创建服务器
|
// WithLimitLife 通过限制最大生命周期的方式创建服务器
|
||||||
// - 通常用于测试服务器,服务器将在到达最大生命周期时自动关闭
|
// - 通常用于测试服务器,服务器将在到达最大生命周期时自动关闭
|
||||||
func WithLimitLife(t time.Duration) Option {
|
func WithLimitLife(t time.Duration) Option {
|
||||||
|
|
140
server/server.go
140
server/server.go
|
@ -43,6 +43,8 @@ func New(network Network, options ...Option) *Server {
|
||||||
systemSignal: make(chan os.Signal, 1),
|
systemSignal: make(chan os.Signal, 1),
|
||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
dispatchers: make(map[string]*dispatcher),
|
dispatchers: make(map[string]*dispatcher),
|
||||||
|
dispatcherMember: map[string]map[string]*Conn{},
|
||||||
|
currDispatcher: map[string]*dispatcher{},
|
||||||
}
|
}
|
||||||
server.event = newEvent(server)
|
server.event = newEvent(server)
|
||||||
|
|
||||||
|
@ -91,6 +93,7 @@ type Server struct {
|
||||||
messagePool *concurrent.Pool[*Message] // 消息池
|
messagePool *concurrent.Pool[*Message] // 消息池
|
||||||
ctx context.Context // 上下文
|
ctx context.Context // 上下文
|
||||||
online *concurrent.BalanceMap[string, *Conn] // 在线连接
|
online *concurrent.BalanceMap[string, *Conn] // 在线连接
|
||||||
|
systemDispatcher *dispatcher // 系统消息分发器
|
||||||
network Network // 网络类型
|
network Network // 网络类型
|
||||||
addr string // 侦听地址
|
addr string // 侦听地址
|
||||||
systemSignal chan os.Signal // 系统信号
|
systemSignal chan os.Signal // 系统信号
|
||||||
|
@ -101,7 +104,9 @@ type Server struct {
|
||||||
isShutdown atomic.Bool // 是否已关闭
|
isShutdown atomic.Bool // 是否已关闭
|
||||||
messageCounter atomic.Int64 // 消息计数器
|
messageCounter atomic.Int64 // 消息计数器
|
||||||
isRunning bool // 是否正在运行
|
isRunning bool // 是否正在运行
|
||||||
dispatchers map[string]*dispatcher // 消息分发器
|
dispatchers map[string]*dispatcher // 消息分发器集合
|
||||||
|
dispatcherMember map[string]map[string]*Conn // 消息分发器包含的连接
|
||||||
|
currDispatcher map[string]*dispatcher // 当前连接所处消息分发器
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run 使用特定地址运行服务器
|
// Run 使用特定地址运行服务器
|
||||||
|
@ -125,6 +130,7 @@ func (slf *Server) Run(addr string) error {
|
||||||
}
|
}
|
||||||
slf.event.check()
|
slf.event.check()
|
||||||
slf.addr = addr
|
slf.addr = addr
|
||||||
|
slf.systemDispatcher = generateDispatcher(serverSystemDispatcher, slf.dispatchMessage)
|
||||||
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
|
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
|
||||||
var messageInitFinish = make(chan struct{}, 1)
|
var messageInitFinish = make(chan struct{}, 1)
|
||||||
var connectionInitHandle = func(callback func()) {
|
var connectionInitHandle = func(callback func()) {
|
||||||
|
@ -146,8 +152,7 @@ func (slf *Server) Run(addr string) error {
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
messageInitFinish <- struct{}{}
|
messageInitFinish <- struct{}{}
|
||||||
d, _ := slf.useDispatcher(serverSystemDispatcher)
|
slf.systemDispatcher.start()
|
||||||
d.start()
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,7 +205,6 @@ func (slf *Server) Run(addr string) error {
|
||||||
|
|
||||||
conn := newKcpConn(slf, session)
|
conn := newKcpConn(slf, session)
|
||||||
slf.OnConnectionOpenedEvent(conn)
|
slf.OnConnectionOpenedEvent(conn)
|
||||||
slf.OnConnectionOpenedAfterEvent(conn)
|
|
||||||
|
|
||||||
go func(conn *Conn) {
|
go func(conn *Conn) {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -568,30 +572,70 @@ func (slf *Server) GetMessageCount() int64 {
|
||||||
return slf.messageCounter.Load()
|
return slf.messageCounter.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
// useDispatcher 添加消息分发器
|
// UseShunt 切换连接所使用的消息分流渠道,当分流渠道 name 不存在时将会创建一个新的分流渠道,否则将会加入已存在的分流渠道
|
||||||
// - 该函数在分发器不重复的情况下将创建分发器,当分发器已存在将直接返回
|
// - 默认情况下,所有连接都使用系统通道进行消息分发,当指定消息分流渠道时,将会使用指定的消息分流渠道进行消息分发
|
||||||
func (slf *Server) useDispatcher(name string) (*dispatcher, bool) {
|
func (slf *Server) UseShunt(conn *Conn, name string) {
|
||||||
slf.dispatcherLock.Lock()
|
slf.dispatcherLock.Lock()
|
||||||
|
defer slf.dispatcherLock.Unlock()
|
||||||
d, exist := slf.dispatchers[name]
|
d, exist := slf.dispatchers[name]
|
||||||
if exist {
|
if !exist {
|
||||||
slf.dispatcherLock.Unlock()
|
d = generateDispatcher(name, slf.dispatchMessage)
|
||||||
return d, false
|
go d.start()
|
||||||
}
|
|
||||||
d = generateDispatcher(slf.dispatchMessage)
|
|
||||||
slf.dispatchers[name] = d
|
slf.dispatchers[name] = d
|
||||||
slf.dispatcherLock.Unlock()
|
}
|
||||||
return d, true
|
|
||||||
|
curr, exist := slf.currDispatcher[conn.GetID()]
|
||||||
|
if exist {
|
||||||
|
if curr.name == name {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(slf.dispatcherMember[curr.name], conn.GetID())
|
||||||
|
if len(slf.dispatcherMember[curr.name]) == 0 {
|
||||||
|
curr.close()
|
||||||
|
delete(slf.dispatchers, curr.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
member, exist := slf.dispatcherMember[name]
|
||||||
|
if !exist {
|
||||||
|
member = map[string]*Conn{}
|
||||||
|
slf.dispatcherMember[name] = member
|
||||||
|
}
|
||||||
|
|
||||||
|
member[conn.GetID()] = conn
|
||||||
|
}
|
||||||
|
|
||||||
|
// getConnDispatcher 获取连接所使用的消息分发器
|
||||||
|
func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher {
|
||||||
|
if conn == nil {
|
||||||
|
return slf.systemDispatcher
|
||||||
|
}
|
||||||
|
slf.dispatcherLock.RLock()
|
||||||
|
defer slf.dispatcherLock.RUnlock()
|
||||||
|
d, exist := slf.currDispatcher[conn.GetID()]
|
||||||
|
if exist {
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
return slf.systemDispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
// releaseDispatcher 关闭消息分发器
|
// releaseDispatcher 关闭消息分发器
|
||||||
func (slf *Server) releaseDispatcher(name string) {
|
func (slf *Server) releaseDispatcher(conn *Conn) {
|
||||||
slf.dispatcherLock.Lock()
|
if conn == nil {
|
||||||
d, exist := slf.dispatchers[name]
|
return
|
||||||
if exist {
|
}
|
||||||
delete(slf.dispatchers, name)
|
slf.dispatcherLock.Lock()
|
||||||
d.close()
|
defer slf.dispatcherLock.Unlock()
|
||||||
|
d, exist := slf.currDispatcher[conn.GetID()]
|
||||||
|
if exist {
|
||||||
|
delete(slf.dispatcherMember[d.name], conn.GetID())
|
||||||
|
if len(slf.dispatcherMember[d.name]) == 0 {
|
||||||
|
d.close()
|
||||||
|
delete(slf.dispatchers, d.name)
|
||||||
|
}
|
||||||
|
delete(slf.currDispatcher, conn.GetID())
|
||||||
}
|
}
|
||||||
slf.dispatcherLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求
|
// pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求
|
||||||
|
@ -602,20 +646,13 @@ func (slf *Server) pushMessage(message *Message) {
|
||||||
}
|
}
|
||||||
var dispatcher *dispatcher
|
var dispatcher *dispatcher
|
||||||
switch message.t {
|
switch message.t {
|
||||||
case MessageTypePacket:
|
case MessageTypePacket,
|
||||||
if slf.shuntMatcher == nil {
|
MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback,
|
||||||
dispatcher, _ = slf.useDispatcher(serverSystemDispatcher)
|
MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback,
|
||||||
break
|
MessageTypeShunt:
|
||||||
}
|
dispatcher = slf.getConnDispatcher(message.conn)
|
||||||
fallthrough
|
|
||||||
case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback:
|
|
||||||
var created bool
|
|
||||||
dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn))
|
|
||||||
if created {
|
|
||||||
go dispatcher.start()
|
|
||||||
}
|
|
||||||
case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker:
|
case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker:
|
||||||
dispatcher, _ = slf.useDispatcher(serverSystemDispatcher)
|
dispatcher = slf.systemDispatcher
|
||||||
}
|
}
|
||||||
if dispatcher == nil {
|
if dispatcher == nil {
|
||||||
return
|
return
|
||||||
|
@ -756,7 +793,7 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
|
||||||
}
|
}
|
||||||
case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback:
|
case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback:
|
||||||
msg.errHandler(msg.err)
|
msg.errHandler(msg.err)
|
||||||
case MessageTypeSystem:
|
case MessageTypeSystem, MessageTypeShunt:
|
||||||
msg.ordinaryHandler()
|
msg.ordinaryHandler()
|
||||||
default:
|
default:
|
||||||
log.Warn("Server", log.String("not support message type", msg.t.String()))
|
log.Warn("Server", log.String("not support message type", msg.t.String()))
|
||||||
|
@ -790,20 +827,12 @@ func (slf *Server) PushAsyncCallbackMessage(err error, callback func(err error),
|
||||||
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发
|
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发
|
||||||
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
|
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
|
||||||
func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) {
|
func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) {
|
||||||
if slf.shuntMatcher == nil {
|
|
||||||
slf.PushAsyncMessage(caller, callback)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...))
|
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...))
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
|
// PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
|
||||||
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发
|
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发
|
||||||
func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) {
|
func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) {
|
||||||
if slf.shuntMatcher == nil {
|
|
||||||
slf.PushAsyncCallbackMessage(err, callback)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...))
|
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -811,7 +840,7 @@ func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback
|
||||||
// - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息
|
// - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息
|
||||||
func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) {
|
func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) {
|
||||||
slf.pushMessage(slf.messagePool.Get().castToPacketMessage(
|
slf.pushMessage(slf.messagePool.Get().castToPacketMessage(
|
||||||
&Conn{ctx: context.WithValue(conn.ctx, contextKeyWST, wst), connection: conn.connection},
|
&Conn{wst: wst, connection: conn.connection},
|
||||||
packet,
|
packet,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
@ -828,13 +857,9 @@ func (slf *Server) PushTickerMessage(name string, caller func(), mark ...log.Fie
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushShuntTickerMessage 向特定分发器中推送 MessageTypeTicker 消息,消息执行与 MessageTypeTicker 一致
|
// PushShuntTickerMessage 向特定分发器中推送 MessageTypeTicker 消息,消息执行与 MessageTypeTicker 一致
|
||||||
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushTickerMessage 进行转发
|
// - 需要注意的是,当未指定 UseShunt 时,将会通过 PushTickerMessage 进行转发
|
||||||
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
|
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
|
||||||
func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) {
|
func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) {
|
||||||
if slf.shuntMatcher == nil {
|
|
||||||
slf.PushTickerMessage(name, caller)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...))
|
slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -850,23 +875,15 @@ func (slf *Server) PushUniqueAsyncCallbackMessage(unique string, err error, call
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
|
// PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
|
||||||
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncMessage 进行转发
|
// - 需要注意的是,当未指定 UseShunt 时,将会通过系统分流渠道进行转发
|
||||||
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
|
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
|
||||||
func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) {
|
func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) {
|
||||||
if slf.shuntMatcher == nil {
|
|
||||||
slf.PushUniqueAsyncMessage(unique, caller, callback)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...))
|
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...))
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
|
// PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
|
||||||
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncCallbackMessage 进行转发
|
// - 需要注意的是,当未指定 UseShunt 时,将会通过系统分流渠道进行转发
|
||||||
func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) {
|
func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) {
|
||||||
if slf.shuntMatcher == nil {
|
|
||||||
slf.PushUniqueAsyncCallbackMessage(unique, err, callback)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...))
|
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -878,3 +895,8 @@ func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string
|
||||||
func (slf *Server) PushErrorMessage(err error, errAction MessageErrorAction, mark ...log.Field) {
|
func (slf *Server) PushErrorMessage(err error, errAction MessageErrorAction, mark ...log.Field) {
|
||||||
slf.pushMessage(slf.messagePool.Get().castToErrorMessage(err, errAction, mark...))
|
slf.pushMessage(slf.messagePool.Get().castToErrorMessage(err, errAction, mark...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PushShuntMessage 向特定分发器中推送 MessageTypeShunt 消息,消息执行与 MessageTypeSystem 一致,不同的是将会在特定分发器中执行
|
||||||
|
func (slf *Server) PushShuntMessage(conn *Conn, caller func(), mark ...log.Field) {
|
||||||
|
slf.pushMessage(slf.messagePool.Get().castToShuntMessage(conn, caller, mark...))
|
||||||
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/kercylan98/minotaur/server"
|
"github.com/kercylan98/minotaur/server"
|
||||||
"github.com/kercylan98/minotaur/server/client"
|
"github.com/kercylan98/minotaur/server/client"
|
||||||
"github.com/kercylan98/minotaur/utils/log"
|
|
||||||
"github.com/kercylan98/minotaur/utils/times"
|
"github.com/kercylan98/minotaur/utils/times"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -30,13 +29,6 @@ func TestNew(t *testing.T) {
|
||||||
//}
|
//}
|
||||||
})
|
})
|
||||||
|
|
||||||
srv.RegStartFinishEvent(func(srv *server.Server) {
|
|
||||||
log.Warn("启动完成")
|
|
||||||
log.Error("启动完成")
|
|
||||||
log.Info("启动完成")
|
|
||||||
log.Debug("启动完成")
|
|
||||||
})
|
|
||||||
|
|
||||||
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
|
||||||
conn.Write(packet)
|
conn.Write(packet)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue