diff --git a/server/conn.go b/server/conn.go index 1806047..1e8d784 100644 --- a/server/conn.go +++ b/server/conn.go @@ -106,9 +106,10 @@ func newBotConn(server *Server) *Conn { return c } -// Conn 服务器连接单次会话的包装 +// Conn 服务器连接单次消息的包装 type Conn struct { *connection + wst int ctx context.Context } @@ -160,7 +161,7 @@ func (slf *Conn) GetWebsocketRequest() *http.Request { // IsBot 是否是机器人连接 func (slf *Conn) IsBot() bool { - return slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil + return slf != nil && slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil } // RemoteAddr 获取远程地址 @@ -229,15 +230,15 @@ func (slf *Conn) IsWebsocket() bool { return slf.server.network == NetworkWebsocket } -// GetWST 获取websocket消息类型 +// GetWST 获取本次 websocket 消息类型 +// - 默认将与发送类型相同 func (slf *Conn) GetWST() int { - wst, _ := slf.ctx.Value(contextKeyWST).(int) - return wst + return slf.wst } -// SetWST 设置websocket消息类型 +// SetWST 设置本次 websocket 消息类型 func (slf *Conn) SetWST(wst int) *Conn { - slf.ctx = context.WithValue(slf.ctx, contextKeyWST, wst) + slf.wst = wst return slf } @@ -255,7 +256,6 @@ func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callba } // Write 向连接中写入数据 -// - messageType: websocket模式中指定消息类型 func (slf *Conn) Write(packet []byte, callback ...func(err error)) { if slf.gw != nil { slf.gw(packet) @@ -356,9 +356,7 @@ func (slf *Conn) Close(err ...error) { if slf.ticker != nil { slf.ticker.Release() } - if slf.server.shuntMatcher != nil { - slf.server.releaseDispatcher(slf.server.shuntMatcher(slf)) - } + slf.server.releaseDispatcher(slf) slf.pool.Close() slf.loop.Close() slf.mu.Unlock() diff --git a/server/constants.go b/server/constants.go index 014eb51..55ff658 100644 --- a/server/constants.go +++ b/server/constants.go @@ -16,7 +16,3 @@ const ( DefaultWebsocketReadDeadline = 30 * time.Second DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB ) - -const ( - contextKeyWST = "_wst" // WebSocket 消息类型 -) diff --git a/server/dispatcher.go b/server/dispatcher.go index e5639e3..40402d4 100644 --- a/server/dispatcher.go +++ b/server/dispatcher.go @@ -8,8 +8,9 @@ import ( var dispatcherUnique = struct{}{} // generateDispatcher 生成消息分发器 -func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) *dispatcher { +func generateDispatcher(name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher { return &dispatcher{ + name: name, buffer: buffer.NewUnboundedN[*Message](), handler: handler, uniques: haxmap.New[string, struct{}](), @@ -18,6 +19,7 @@ func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) // dispatcher 消息分发器 type dispatcher struct { + name string buffer *buffer.Unbounded[*Message] uniques *haxmap.Map[string, struct{}] handler func(dispatcher *dispatcher, message *Message) diff --git a/server/event.go b/server/event.go index c3c390d..6a0e6f8 100644 --- a/server/event.go +++ b/server/event.go @@ -15,74 +15,74 @@ import ( "time" ) -type StartBeforeEventHandle func(srv *Server) -type StartFinishEventHandle func(srv *Server) -type StopEventHandle func(srv *Server) -type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte) -type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) -type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any) -type MessageErrorEventHandle func(srv *Server, message *Message, err error) -type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Duration) -type ConsoleCommandEventHandle func(srv *Server, command string, params ConsoleParams) -type ConnectionOpenedAfterEventHandle func(srv *Server, conn *Conn) -type ConnectionWritePacketBeforeEventHandle func(srv *Server, conn *Conn, packet []byte) []byte -type ShuntChannelCreatedEventHandle func(srv *Server, guid int64) -type ShuntChannelClosedEventHandle func(srv *Server, guid int64) -type ConnectionPacketPreprocessEventHandle func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte)) -type MessageExecBeforeEventHandle func(srv *Server, message *Message) bool -type MessageReadyEventHandle func(srv *Server) +type StartBeforeEventHandler func(srv *Server) +type StartFinishEventHandler func(srv *Server) +type StopEventHandler func(srv *Server) +type ConnectionReceivePacketEventHandler func(srv *Server, conn *Conn, packet []byte) +type ConnectionOpenedEventHandler func(srv *Server, conn *Conn) +type ConnectionClosedEventHandler func(srv *Server, conn *Conn, err any) +type MessageErrorEventHandler func(srv *Server, message *Message, err error) +type MessageLowExecEventHandler func(srv *Server, message *Message, cost time.Duration) +type ConsoleCommandEventHandler func(srv *Server, command string, params ConsoleParams) +type ConnectionOpenedAfterEventHandler func(srv *Server, conn *Conn) +type ConnectionWritePacketBeforeEventHandler func(srv *Server, conn *Conn, packet []byte) []byte +type ShuntChannelCreatedEventHandler func(srv *Server, guid int64) +type ShuntChannelClosedEventHandler func(srv *Server, guid int64) +type ConnectionPacketPreprocessEventHandler func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte)) +type MessageExecBeforeEventHandler func(srv *Server, message *Message) bool +type MessageReadyEventHandler func(srv *Server) func newEvent(srv *Server) *event { return &event{ - Server: srv, - startBeforeEventHandles: slice.NewPriority[StartBeforeEventHandle](), - startFinishEventHandles: slice.NewPriority[StartFinishEventHandle](), - stopEventHandles: slice.NewPriority[StopEventHandle](), - connectionReceivePacketEventHandles: slice.NewPriority[ConnectionReceivePacketEventHandle](), - connectionOpenedEventHandles: slice.NewPriority[ConnectionOpenedEventHandle](), - connectionClosedEventHandles: slice.NewPriority[ConnectionClosedEventHandle](), - messageErrorEventHandles: slice.NewPriority[MessageErrorEventHandle](), - messageLowExecEventHandles: slice.NewPriority[MessageLowExecEventHandle](), - connectionOpenedAfterEventHandles: slice.NewPriority[ConnectionOpenedAfterEventHandle](), - connectionWritePacketBeforeHandles: slice.NewPriority[ConnectionWritePacketBeforeEventHandle](), - shuntChannelCreatedEventHandles: slice.NewPriority[ShuntChannelCreatedEventHandle](), - shuntChannelClosedEventHandles: slice.NewPriority[ShuntChannelClosedEventHandle](), - connectionPacketPreprocessEventHandles: slice.NewPriority[ConnectionPacketPreprocessEventHandle](), - messageExecBeforeEventHandles: slice.NewPriority[MessageExecBeforeEventHandle](), - messageReadyEventHandles: slice.NewPriority[MessageReadyEventHandle](), + Server: srv, + startBeforeEventHandlers: slice.NewPriority[StartBeforeEventHandler](), + startFinishEventHandlers: slice.NewPriority[StartFinishEventHandler](), + stopEventHandlers: slice.NewPriority[StopEventHandler](), + connectionReceivePacketEventHandlers: slice.NewPriority[ConnectionReceivePacketEventHandler](), + connectionOpenedEventHandlers: slice.NewPriority[ConnectionOpenedEventHandler](), + connectionClosedEventHandlers: slice.NewPriority[ConnectionClosedEventHandler](), + messageErrorEventHandlers: slice.NewPriority[MessageErrorEventHandler](), + messageLowExecEventHandlers: slice.NewPriority[MessageLowExecEventHandler](), + connectionOpenedAfterEventHandlers: slice.NewPriority[ConnectionOpenedAfterEventHandler](), + connectionWritePacketBeforeHandlers: slice.NewPriority[ConnectionWritePacketBeforeEventHandler](), + shuntChannelCreatedEventHandlers: slice.NewPriority[ShuntChannelCreatedEventHandler](), + shuntChannelClosedEventHandlers: slice.NewPriority[ShuntChannelClosedEventHandler](), + connectionPacketPreprocessEventHandlers: slice.NewPriority[ConnectionPacketPreprocessEventHandler](), + messageExecBeforeEventHandlers: slice.NewPriority[MessageExecBeforeEventHandler](), + messageReadyEventHandlers: slice.NewPriority[MessageReadyEventHandler](), } } type event struct { *Server - startBeforeEventHandles *slice.Priority[StartBeforeEventHandle] - startFinishEventHandles *slice.Priority[StartFinishEventHandle] - stopEventHandles *slice.Priority[StopEventHandle] - connectionReceivePacketEventHandles *slice.Priority[ConnectionReceivePacketEventHandle] - connectionOpenedEventHandles *slice.Priority[ConnectionOpenedEventHandle] - connectionClosedEventHandles *slice.Priority[ConnectionClosedEventHandle] - messageErrorEventHandles *slice.Priority[MessageErrorEventHandle] - messageLowExecEventHandles *slice.Priority[MessageLowExecEventHandle] - connectionOpenedAfterEventHandles *slice.Priority[ConnectionOpenedAfterEventHandle] - connectionWritePacketBeforeHandles *slice.Priority[ConnectionWritePacketBeforeEventHandle] - shuntChannelCreatedEventHandles *slice.Priority[ShuntChannelCreatedEventHandle] - shuntChannelClosedEventHandles *slice.Priority[ShuntChannelClosedEventHandle] - connectionPacketPreprocessEventHandles *slice.Priority[ConnectionPacketPreprocessEventHandle] - messageExecBeforeEventHandles *slice.Priority[MessageExecBeforeEventHandle] - messageReadyEventHandles *slice.Priority[MessageReadyEventHandle] + startBeforeEventHandlers *slice.Priority[StartBeforeEventHandler] + startFinishEventHandlers *slice.Priority[StartFinishEventHandler] + stopEventHandlers *slice.Priority[StopEventHandler] + connectionReceivePacketEventHandlers *slice.Priority[ConnectionReceivePacketEventHandler] + connectionOpenedEventHandlers *slice.Priority[ConnectionOpenedEventHandler] + connectionClosedEventHandlers *slice.Priority[ConnectionClosedEventHandler] + messageErrorEventHandlers *slice.Priority[MessageErrorEventHandler] + messageLowExecEventHandlers *slice.Priority[MessageLowExecEventHandler] + connectionOpenedAfterEventHandlers *slice.Priority[ConnectionOpenedAfterEventHandler] + connectionWritePacketBeforeHandlers *slice.Priority[ConnectionWritePacketBeforeEventHandler] + shuntChannelCreatedEventHandlers *slice.Priority[ShuntChannelCreatedEventHandler] + shuntChannelClosedEventHandlers *slice.Priority[ShuntChannelClosedEventHandler] + connectionPacketPreprocessEventHandlers *slice.Priority[ConnectionPacketPreprocessEventHandler] + messageExecBeforeEventHandlers *slice.Priority[MessageExecBeforeEventHandler] + messageReadyEventHandlers *slice.Priority[MessageReadyEventHandler] - consoleCommandEventHandles map[string]*slice.Priority[ConsoleCommandEventHandle] - consoleCommandEventHandleInitOnce sync.Once + consoleCommandEventHandlers map[string]*slice.Priority[ConsoleCommandEventHandler] + consoleCommandEventHandlerInitOnce sync.Once } // RegStopEvent 服务器停止时将立即执行被注册的事件处理函数 -func (slf *event) RegStopEvent(handle StopEventHandle, priority ...int) { - slf.stopEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegStopEvent(handler StopEventHandler, priority ...int) { + slf.stopEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnStopEvent() { - slf.stopEventHandles.RangeValue(func(index int, value StopEventHandle) bool { + slf.stopEventHandlers.RangeValue(func(index int, value StopEventHandler) bool { value(slf.Server) return true }) @@ -91,15 +91,15 @@ func (slf *event) OnStopEvent() { // RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数 // - 默认将注册 "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN" 指令作为关闭服务器的指令 // - 可通过注册默认指令进行默认行为的覆盖 -func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEventHandle, priority ...int) { +func (slf *event) RegConsoleCommandEvent(command string, handler ConsoleCommandEventHandler, priority ...int) { fd := int(os.Stdin.Fd()) if !terminal.IsTerminal(fd) { log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("ignore", "system not terminal")) return } - slf.consoleCommandEventHandleInitOnce.Do(func() { - slf.consoleCommandEventHandles = map[string]*slice.Priority[ConsoleCommandEventHandle]{} + slf.consoleCommandEventHandlerInitOnce.Do(func() { + slf.consoleCommandEventHandlers = map[string]*slice.Priority[ConsoleCommandEventHandler]{} go func() { for { var input string @@ -112,18 +112,18 @@ func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEv } }() }) - list, exist := slf.consoleCommandEventHandles[command] + list, exist := slf.consoleCommandEventHandlers[command] if !exist { - list = slice.NewPriority[ConsoleCommandEventHandle]() - slf.consoleCommandEventHandles[command] = list + list = slice.NewPriority[ConsoleCommandEventHandler]() + slf.consoleCommandEventHandlers[command] = list } - list.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + list.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) { slf.PushSystemMessage(func() { - handles, exist := slf.consoleCommandEventHandles[command] + handles, exist := slf.consoleCommandEventHandlers[command] if !exist { switch command { case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN": @@ -142,7 +142,7 @@ func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) { for key, value := range v { params[key] = value } - handles.RangeValue(func(index int, value ConsoleCommandEventHandle) bool { + handles.RangeValue(func(index int, value ConsoleCommandEventHandler) bool { value(slf.Server, command, params) return true }) @@ -151,9 +151,9 @@ func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) { } // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 -func (slf *event) RegStartBeforeEvent(handle StartBeforeEventHandle, priority ...int) { - slf.startBeforeEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegStartBeforeEvent(handler StartBeforeEventHandler, priority ...int) { + slf.startBeforeEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnStartBeforeEvent() { @@ -163,7 +163,7 @@ func (slf *event) OnStartBeforeEvent() { debug.PrintStack() } }() - slf.startBeforeEventHandles.RangeValue(func(index int, value StartBeforeEventHandle) bool { + slf.startBeforeEventHandlers.RangeValue(func(index int, value StartBeforeEventHandler) bool { value(slf.Server) return true }) @@ -171,14 +171,14 @@ func (slf *event) OnStartBeforeEvent() { // RegStartFinishEvent 在服务器启动完成时将立刻执行被注册的事件处理函数 // - 需要注意该时刻服务器已经启动完成,但是还有可能未开始处理消息,客户端有可能无法连接,如果需要在消息处理器准备就绪后执行,请使用 RegMessageReadyEvent 函数 -func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle, priority ...int) { - slf.startFinishEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegStartFinishEvent(handler StartFinishEventHandler, priority ...int) { + slf.startFinishEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnStartFinishEvent() { slf.PushSystemMessage(func() { - slf.startFinishEventHandles.RangeValue(func(index int, value StartFinishEventHandle) bool { + slf.startFinishEventHandlers.RangeValue(func(index int, value StartFinishEventHandler) bool { value(slf.Server) return true }) @@ -192,18 +192,18 @@ func (slf *event) OnStartFinishEvent() { } // RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle, priority ...int) { +func (slf *event) RegConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionClosedEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionClosedEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { slf.PushSystemMessage(func() { slf.Server.online.Delete(conn.GetID()) - slf.connectionClosedEventHandles.RangeValue(func(index int, value ConnectionClosedEventHandle) bool { + slf.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool { value(slf.Server, conn, err) return true }) @@ -211,51 +211,53 @@ func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { } // RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle, priority ...int) { +// - 该阶段的事件将会在系统消息中进行处理,不适合处理耗时操作 +func (slf *event) RegConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionOpenedEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionOpenedEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionOpenedEvent(conn *Conn) { slf.PushSystemMessage(func() { slf.Server.online.Set(conn.GetID(), conn) - slf.connectionOpenedEventHandles.RangeValue(func(index int, value ConnectionOpenedEventHandle) bool { + slf.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool { value(slf.Server, conn) return true }) + slf.OnConnectionOpenedAfterEvent(conn) }, log.String("Event", "OnConnectionOpenedEvent")) } // RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacketEventHandle, priority ...int) { +func (slf *event) RegConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionReceivePacketEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionReceivePacketEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) { if slf.Server.runtime.packetWarnSize > 0 && len(packet) > slf.Server.runtime.packetWarnSize { log.Warn("Server", log.String("OnConnectionReceivePacketEvent", fmt.Sprintf("packet size %d > %d", len(packet), slf.Server.runtime.packetWarnSize))) } - slf.connectionReceivePacketEventHandles.RangeValue(func(index int, value ConnectionReceivePacketEventHandle) bool { + slf.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool { value(slf.Server, conn, packet) return true }) } // RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数 -func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle, priority ...int) { - slf.messageErrorEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegMessageErrorEvent(handler MessageErrorEventHandler, priority ...int) { + slf.messageErrorEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnMessageErrorEvent(message *Message, err error) { - if slf.messageErrorEventHandles.Len() == 0 { + if slf.messageErrorEventHandlers.Len() == 0 { return } defer func() { @@ -264,41 +266,42 @@ func (slf *event) OnMessageErrorEvent(message *Message, err error) { debug.PrintStack() } }() - slf.messageErrorEventHandles.RangeValue(func(index int, value MessageErrorEventHandle) bool { + slf.messageErrorEventHandlers.RangeValue(func(index int, value MessageErrorEventHandler) bool { value(slf.Server, message, err) return true }) } // RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数 -func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle, priority ...int) { - slf.messageLowExecEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegMessageLowExecEvent(handler MessageLowExecEventHandler, priority ...int) { + slf.messageLowExecEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) { - if slf.messageLowExecEventHandles.Len() == 0 { + if slf.messageLowExecEventHandlers.Len() == 0 { return } // 慢消息不再占用消息通道 - slf.messageLowExecEventHandles.RangeValue(func(index int, value MessageLowExecEventHandle) bool { + slf.messageLowExecEventHandlers.RangeValue(func(index int, value MessageLowExecEventHandler) bool { value(slf.Server, message, cost) return true }) } // RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionOpenedAfterEvent(handle ConnectionOpenedAfterEventHandle, priority ...int) { +// - 该阶段事件将会转到对应消息分流渠道中进行处理 +func (slf *event) RegConnectionOpenedAfterEvent(handler ConnectionOpenedAfterEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionOpenedAfterEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionOpenedAfterEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) { - slf.PushSystemMessage(func() { - slf.connectionOpenedAfterEventHandles.RangeValue(func(index int, value ConnectionOpenedAfterEventHandle) bool { + slf.PushShuntMessage(conn, func() { + slf.connectionOpenedAfterEventHandlers.RangeValue(func(index int, value ConnectionOpenedAfterEventHandler) bool { value(slf.Server, conn) return true }) @@ -306,20 +309,20 @@ func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) { } // RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionWritePacketBeforeEvent(handle ConnectionWritePacketBeforeEventHandle, priority ...int) { +func (slf *event) RegConnectionWritePacketBeforeEvent(handler ConnectionWritePacketBeforeEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionWritePacketBeforeHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionWritePacketBeforeHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte) (newPacket []byte) { - if slf.connectionWritePacketBeforeHandles.Len() == 0 { + if slf.connectionWritePacketBeforeHandlers.Len() == 0 { return packet } newPacket = packet - slf.connectionWritePacketBeforeHandles.RangeValue(func(index int, value ConnectionWritePacketBeforeEventHandle) bool { + slf.connectionWritePacketBeforeHandlers.RangeValue(func(index int, value ConnectionWritePacketBeforeEventHandler) bool { newPacket = value(slf.Server, conn, newPacket) return true }) @@ -327,14 +330,14 @@ func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte) } // RegShuntChannelCreatedEvent 在分流通道创建时将立刻执行被注册的事件处理函数 -func (slf *event) RegShuntChannelCreatedEvent(handle ShuntChannelCreatedEventHandle, priority ...int) { - slf.shuntChannelCreatedEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegShuntChannelCreatedEvent(handler ShuntChannelCreatedEventHandler, priority ...int) { + slf.shuntChannelCreatedEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnShuntChannelCreatedEvent(guid int64) { slf.PushSystemMessage(func() { - slf.shuntChannelCreatedEventHandles.RangeValue(func(index int, value ShuntChannelCreatedEventHandle) bool { + slf.shuntChannelCreatedEventHandlers.RangeValue(func(index int, value ShuntChannelCreatedEventHandler) bool { value(slf.Server, guid) return true }) @@ -342,14 +345,14 @@ func (slf *event) OnShuntChannelCreatedEvent(guid int64) { } // RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数 -func (slf *event) RegShuntChannelCloseEvent(handle ShuntChannelClosedEventHandle, priority ...int) { - slf.shuntChannelClosedEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegShuntChannelCloseEvent(handler ShuntChannelClosedEventHandler, priority ...int) { + slf.shuntChannelClosedEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnShuntChannelClosedEvent(guid int64) { slf.PushSystemMessage(func() { - slf.shuntChannelClosedEventHandles.RangeValue(func(index int, value ShuntChannelClosedEventHandle) bool { + slf.shuntChannelClosedEventHandlers.RangeValue(func(index int, value ShuntChannelClosedEventHandler) bool { value(slf.Server, guid) return true }) @@ -364,17 +367,17 @@ func (slf *event) OnShuntChannelClosedEvent(guid int64) { // 场景: // - 数据包格式校验 // - 数据包分包等情况处理 -func (slf *event) RegConnectionPacketPreprocessEvent(handle ConnectionPacketPreprocessEventHandle, priority ...int) { - slf.connectionPacketPreprocessEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegConnectionPacketPreprocessEvent(handler ConnectionPacketPreprocessEventHandler, priority ...int) { + slf.connectionPacketPreprocessEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, usePacket func(newPacket []byte)) bool { - if slf.connectionPacketPreprocessEventHandles.Len() == 0 { + if slf.connectionPacketPreprocessEventHandlers.Len() == 0 { return false } var abort = false - slf.connectionPacketPreprocessEventHandles.RangeValue(func(index int, value ConnectionPacketPreprocessEventHandle) bool { + slf.connectionPacketPreprocessEventHandlers.RangeValue(func(index int, value ConnectionPacketPreprocessEventHandler) bool { value(slf.Server, conn, packet, func() { abort = true }, usePacket) if abort { return false @@ -388,13 +391,13 @@ func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, u // - 当返回 true 时,将继续执行后续的消息处理函数,否则将不会执行后续的消息处理函数,并且该消息将被丢弃 // // 适用于限流等场景 -func (slf *event) RegMessageExecBeforeEvent(handle MessageExecBeforeEventHandle, priority ...int) { - slf.messageExecBeforeEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegMessageExecBeforeEvent(handler MessageExecBeforeEventHandler, priority ...int) { + slf.messageExecBeforeEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnMessageExecBeforeEvent(message *Message) bool { - if slf.messageExecBeforeEventHandles.Len() == 0 { + if slf.messageExecBeforeEventHandlers.Len() == 0 { return true } var result = true @@ -404,7 +407,7 @@ func (slf *event) OnMessageExecBeforeEvent(message *Message) bool { debug.PrintStack() } }() - slf.messageExecBeforeEventHandles.RangeValue(func(index int, value MessageExecBeforeEventHandle) bool { + slf.messageExecBeforeEventHandlers.RangeValue(func(index int, value MessageExecBeforeEventHandler) bool { result = value(slf.Server, message) return result }) @@ -412,12 +415,12 @@ func (slf *event) OnMessageExecBeforeEvent(message *Message) bool { } // RegMessageReadyEvent 在服务器消息处理器准备就绪时立即执行被注册的事件处理函数 -func (slf *event) RegMessageReadyEvent(handle MessageReadyEventHandle, priority ...int) { - slf.messageReadyEventHandles.Append(handle, slice.GetValue(priority, 0)) +func (slf *event) RegMessageReadyEvent(handler MessageReadyEventHandler, priority ...int) { + slf.messageReadyEventHandlers.Append(handler, slice.GetValue(priority, 0)) } func (slf *event) OnMessageReadyEvent() { - if slf.messageReadyEventHandles.Len() == 0 { + if slf.messageReadyEventHandlers.Len() == 0 { return } defer func() { @@ -426,7 +429,7 @@ func (slf *event) OnMessageReadyEvent() { debug.PrintStack() } }() - slf.messageReadyEventHandles.RangeValue(func(index int, value MessageReadyEventHandle) bool { + slf.messageReadyEventHandlers.RangeValue(func(index int, value MessageReadyEventHandler) bool { value(slf.Server) return true }) @@ -436,7 +439,7 @@ func (slf *event) check() { switch slf.network { case NetworkHttp, NetworkGRPC, NetworkNone: default: - if slf.connectionReceivePacketEventHandles.Len() == 0 { + if slf.connectionReceivePacketEventHandlers.Len() == 0 { log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) } } diff --git a/server/message.go b/server/message.go index 69c5082..500ace4 100644 --- a/server/message.go +++ b/server/message.go @@ -44,6 +44,9 @@ const ( // MessageTypeSystem 系统消息类型 MessageTypeSystem + + // MessageTypeShunt 普通分流消息类型 + MessageTypeShunt ) var messageNames = map[MessageType]string{ @@ -60,6 +63,7 @@ var messageNames = map[MessageType]string{ MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync", MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback", MessageTypeSystem: "MessageTypeSystem", + MessageTypeShunt: "MessageTypeShunt", } const ( @@ -209,3 +213,9 @@ func (slf *Message) castToErrorMessage(err error, action MessageErrorAction, mar slf.t, slf.err, slf.errAction, slf.marks = MessageTypeError, err, action, mark return slf } + +// castToShuntMessage 将消息转换为分流消息 +func (slf *Message) castToShuntMessage(conn *Conn, caller func(), mark ...log.Field) *Message { + slf.t, slf.conn, slf.ordinaryHandler, slf.marks = MessageTypeShunt, conn, caller, mark + return slf +} diff --git a/server/multiple.go b/server/multiple.go index e4ec259..602a0b7 100644 --- a/server/multiple.go +++ b/server/multiple.go @@ -40,7 +40,7 @@ func (slf *MultipleServer) Run() { go func(address string, server *Server) { var lock sync.Mutex var startFinish bool - server.startFinishEventHandles.Append(func(srv *Server) { + server.startFinishEventHandlers.Append(func(srv *Server) { lock.Lock() defer lock.Unlock() if !startFinish { diff --git a/server/options.go b/server/options.go index 68a88ff..8374a61 100644 --- a/server/options.go +++ b/server/options.go @@ -5,7 +5,6 @@ import ( "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/timer" "google.golang.org/grpc" - "runtime/debug" "time" ) @@ -29,19 +28,18 @@ type option struct { } type runtime struct { - deadlockDetect time.Duration // 是否开启死锁检测 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - messagePoolSize int // 消息池大小 - ticker *timer.Ticker // 定时器 - tickerAutonomy bool // 定时器是否独立运行 - connTickerSize int // 连接定时器大小 - websocketReadDeadline time.Duration // websocket连接超时时间 - websocketCompression int // websocket压缩等级 - websocketWriteCompression bool // websocket写入压缩 - limitLife time.Duration // 限制最大生命周期 - shuntMatcher func(conn *Conn) string // 分流匹配器 - packetWarnSize int // 数据包大小警告 + deadlockDetect time.Duration // 是否开启死锁检测 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + messagePoolSize int // 消息池大小 + ticker *timer.Ticker // 定时器 + tickerAutonomy bool // 定时器是否独立运行 + connTickerSize int // 连接定时器大小 + websocketReadDeadline time.Duration // websocket连接超时时间 + websocketCompression int // websocket压缩等级 + websocketWriteCompression bool // websocket写入压缩 + limitLife time.Duration // 限制最大生命周期 + packetWarnSize int // 数据包大小警告 } // WithPacketWarnSize 通过数据包大小警告的方式创建服务器,当数据包大小超过指定大小时,将会输出 WARN 类型的日志 @@ -58,32 +56,6 @@ func WithPacketWarnSize(size int) Option { } } -// WithShunt 通过连接数据包分流的方式创建服务器 -// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况 -// - shuntMatcher:用于匹配连接的函数,返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道 -// -// 将被分流的消息类型(更多类型有待斟酌): -// - MessageTypePacket -// -// 注意事项: -// - 当分流匹配过程发生 panic 将会在系统通道内处理消息,并打印日志 -func WithShunt(shuntMatcher func(conn *Conn) string) Option { - return func(srv *Server) { - if shuntMatcher == nil { - log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil")) - return - } - srv.shuntMatcher = func(conn *Conn) string { - defer func() { - if err := recover(); err != nil { - log.Error("ShuntMatcher", log.String("State", "Panic"), log.Any("Error", err), log.String("Stack", string(debug.Stack()))) - } - }() - return shuntMatcher(conn) - } - } -} - // WithLimitLife 通过限制最大生命周期的方式创建服务器 // - 通常用于测试服务器,服务器将在到达最大生命周期时自动关闭 func WithLimitLife(t time.Duration) Option { diff --git a/server/server.go b/server/server.go index 7492347..47fb883 100644 --- a/server/server.go +++ b/server/server.go @@ -36,13 +36,15 @@ func New(network Network, options ...Option) *Server { messagePoolSize: DefaultMessageBufferSize, packetWarnSize: DefaultPacketWarnSize, }, - option: &option{}, - network: network, - online: concurrent.NewBalanceMap[string, *Conn](), - closeChannel: make(chan struct{}, 1), - systemSignal: make(chan os.Signal, 1), - ctx: context.Background(), - dispatchers: make(map[string]*dispatcher), + option: &option{}, + network: network, + online: concurrent.NewBalanceMap[string, *Conn](), + closeChannel: make(chan struct{}, 1), + systemSignal: make(chan os.Signal, 1), + ctx: context.Background(), + dispatchers: make(map[string]*dispatcher), + dispatcherMember: map[string]map[string]*Conn{}, + currDispatcher: map[string]*dispatcher{}, } server.event = newEvent(server) @@ -91,6 +93,7 @@ type Server struct { messagePool *concurrent.Pool[*Message] // 消息池 ctx context.Context // 上下文 online *concurrent.BalanceMap[string, *Conn] // 在线连接 + systemDispatcher *dispatcher // 系统消息分发器 network Network // 网络类型 addr string // 侦听地址 systemSignal chan os.Signal // 系统信号 @@ -101,7 +104,9 @@ type Server struct { isShutdown atomic.Bool // 是否已关闭 messageCounter atomic.Int64 // 消息计数器 isRunning bool // 是否正在运行 - dispatchers map[string]*dispatcher // 消息分发器 + dispatchers map[string]*dispatcher // 消息分发器集合 + dispatcherMember map[string]map[string]*Conn // 消息分发器包含的连接 + currDispatcher map[string]*dispatcher // 当前连接所处消息分发器 } // Run 使用特定地址运行服务器 @@ -125,6 +130,7 @@ func (slf *Server) Run(addr string) error { } slf.event.check() slf.addr = addr + slf.systemDispatcher = generateDispatcher(serverSystemDispatcher, slf.dispatchMessage) var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr) var messageInitFinish = make(chan struct{}, 1) var connectionInitHandle = func(callback func()) { @@ -146,8 +152,7 @@ func (slf *Server) Run(addr string) error { } go func() { messageInitFinish <- struct{}{} - d, _ := slf.useDispatcher(serverSystemDispatcher) - d.start() + slf.systemDispatcher.start() }() } @@ -200,7 +205,6 @@ func (slf *Server) Run(addr string) error { conn := newKcpConn(slf, session) slf.OnConnectionOpenedEvent(conn) - slf.OnConnectionOpenedAfterEvent(conn) go func(conn *Conn) { defer func() { @@ -568,30 +572,70 @@ func (slf *Server) GetMessageCount() int64 { return slf.messageCounter.Load() } -// useDispatcher 添加消息分发器 -// - 该函数在分发器不重复的情况下将创建分发器,当分发器已存在将直接返回 -func (slf *Server) useDispatcher(name string) (*dispatcher, bool) { +// UseShunt 切换连接所使用的消息分流渠道,当分流渠道 name 不存在时将会创建一个新的分流渠道,否则将会加入已存在的分流渠道 +// - 默认情况下,所有连接都使用系统通道进行消息分发,当指定消息分流渠道时,将会使用指定的消息分流渠道进行消息分发 +func (slf *Server) UseShunt(conn *Conn, name string) { slf.dispatcherLock.Lock() + defer slf.dispatcherLock.Unlock() d, exist := slf.dispatchers[name] - if exist { - slf.dispatcherLock.Unlock() - return d, false + if !exist { + d = generateDispatcher(name, slf.dispatchMessage) + go d.start() + slf.dispatchers[name] = d } - d = generateDispatcher(slf.dispatchMessage) - slf.dispatchers[name] = d - slf.dispatcherLock.Unlock() - return d, true + + curr, exist := slf.currDispatcher[conn.GetID()] + if exist { + if curr.name == name { + return + } + + delete(slf.dispatcherMember[curr.name], conn.GetID()) + if len(slf.dispatcherMember[curr.name]) == 0 { + curr.close() + delete(slf.dispatchers, curr.name) + } + } + + member, exist := slf.dispatcherMember[name] + if !exist { + member = map[string]*Conn{} + slf.dispatcherMember[name] = member + } + + member[conn.GetID()] = conn +} + +// getConnDispatcher 获取连接所使用的消息分发器 +func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher { + if conn == nil { + return slf.systemDispatcher + } + slf.dispatcherLock.RLock() + defer slf.dispatcherLock.RUnlock() + d, exist := slf.currDispatcher[conn.GetID()] + if exist { + return d + } + return slf.systemDispatcher } // releaseDispatcher 关闭消息分发器 -func (slf *Server) releaseDispatcher(name string) { - slf.dispatcherLock.Lock() - d, exist := slf.dispatchers[name] - if exist { - delete(slf.dispatchers, name) - d.close() +func (slf *Server) releaseDispatcher(conn *Conn) { + if conn == nil { + return + } + slf.dispatcherLock.Lock() + defer slf.dispatcherLock.Unlock() + d, exist := slf.currDispatcher[conn.GetID()] + if exist { + delete(slf.dispatcherMember[d.name], conn.GetID()) + if len(slf.dispatcherMember[d.name]) == 0 { + d.close() + delete(slf.dispatchers, d.name) + } + delete(slf.currDispatcher, conn.GetID()) } - slf.dispatcherLock.Unlock() } // pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 @@ -602,20 +646,13 @@ func (slf *Server) pushMessage(message *Message) { } var dispatcher *dispatcher switch message.t { - case MessageTypePacket: - if slf.shuntMatcher == nil { - dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) - break - } - fallthrough - case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback: - var created bool - dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn)) - if created { - go dispatcher.start() - } + case MessageTypePacket, + MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, + MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback, + MessageTypeShunt: + dispatcher = slf.getConnDispatcher(message.conn) case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker: - dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) + dispatcher = slf.systemDispatcher } if dispatcher == nil { return @@ -756,7 +793,7 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { } case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback: msg.errHandler(msg.err) - case MessageTypeSystem: + case MessageTypeSystem, MessageTypeShunt: msg.ordinaryHandler() default: log.Warn("Server", log.String("not support message type", msg.t.String())) @@ -790,20 +827,12 @@ func (slf *Server) PushAsyncCallbackMessage(err error, callback func(err error), // - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发 // - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushAsyncMessage(caller, callback) - return - } slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...)) } // PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 // - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发 func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushAsyncCallbackMessage(err, callback) - return - } slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...)) } @@ -811,7 +840,7 @@ func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback // - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息 func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) { slf.pushMessage(slf.messagePool.Get().castToPacketMessage( - &Conn{ctx: context.WithValue(conn.ctx, contextKeyWST, wst), connection: conn.connection}, + &Conn{wst: wst, connection: conn.connection}, packet, )) } @@ -828,13 +857,9 @@ func (slf *Server) PushTickerMessage(name string, caller func(), mark ...log.Fie } // PushShuntTickerMessage 向特定分发器中推送 MessageTypeTicker 消息,消息执行与 MessageTypeTicker 一致 -// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushTickerMessage 进行转发 +// - 需要注意的是,当未指定 UseShunt 时,将会通过 PushTickerMessage 进行转发 // - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushTickerMessage(name, caller) - return - } slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...)) } @@ -850,23 +875,15 @@ func (slf *Server) PushUniqueAsyncCallbackMessage(unique string, err error, call } // PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 -// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncMessage 进行转发 +// - 需要注意的是,当未指定 UseShunt 时,将会通过系统分流渠道进行转发 // - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息 func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushUniqueAsyncMessage(unique, caller, callback) - return - } slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...)) } // PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 -// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncCallbackMessage 进行转发 +// - 需要注意的是,当未指定 UseShunt 时,将会通过系统分流渠道进行转发 func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushUniqueAsyncCallbackMessage(unique, err, callback) - return - } slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...)) } @@ -878,3 +895,8 @@ func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string func (slf *Server) PushErrorMessage(err error, errAction MessageErrorAction, mark ...log.Field) { slf.pushMessage(slf.messagePool.Get().castToErrorMessage(err, errAction, mark...)) } + +// PushShuntMessage 向特定分发器中推送 MessageTypeShunt 消息,消息执行与 MessageTypeSystem 一致,不同的是将会在特定分发器中执行 +func (slf *Server) PushShuntMessage(conn *Conn, caller func(), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToShuntMessage(conn, caller, mark...)) +} diff --git a/server/server_test.go b/server/server_test.go index d6dca80..855280e 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/kercylan98/minotaur/server" "github.com/kercylan98/minotaur/server/client" - "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/times" "testing" "time" @@ -30,13 +29,6 @@ func TestNew(t *testing.T) { //} }) - srv.RegStartFinishEvent(func(srv *server.Server) { - log.Warn("启动完成") - log.Error("启动完成") - log.Info("启动完成") - log.Debug("启动完成") - }) - srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { conn.Write(packet) })