diff --git a/server/event.go b/server/event.go index 7343f73..569e085 100644 --- a/server/event.go +++ b/server/event.go @@ -5,6 +5,7 @@ import ( "github.com/kercylan98/minotaur/utils/runtimes" "go.uber.org/zap" "reflect" + "time" ) type StartBeforeEventHandle func(srv *Server) @@ -14,6 +15,8 @@ type ConnectionReceiveWebsocketPacketEventHandle func(srv *Server, conn *Conn, p type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) type ConnectionClosedEventHandle func(srv *Server, conn *Conn) type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId int64, packet []byte) +type MessageErrorEventHandle func(srv *Server, message *Message, err error) +type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Duration) type event struct { *Server @@ -24,6 +27,8 @@ type event struct { connectionOpenedEventHandles []ConnectionOpenedEventHandle connectionClosedEventHandles []ConnectionClosedEventHandle receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle + messageErrorEventHandles []MessageErrorEventHandle + messageLowExecEventHandles []MessageLowExecEventHandle } // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 @@ -128,6 +133,30 @@ func (slf *event) OnReceiveCrossPacketEvent(serverId int64, packet []byte) { } } +// RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数 +func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle) { + slf.messageErrorEventHandles = append(slf.messageErrorEventHandles, handle) + log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String())) +} + +func (slf *event) OnMessageErrorEvent(message *Message, err error) { + for _, handle := range slf.messageErrorEventHandles { + handle(slf.Server, message, err) + } +} + +// RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数 +func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle) { + slf.messageLowExecEventHandles = append(slf.messageLowExecEventHandles, handle) + log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String())) +} + +func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) { + for _, handle := range slf.messageLowExecEventHandles { + handle(slf.Server, message, cost) + } +} + func (slf *event) check() { switch slf.network { case NetworkHttp, NetworkGRPC: diff --git a/server/message.go b/server/message.go index b77f554..7a6a173 100644 --- a/server/message.go +++ b/server/message.go @@ -50,7 +50,7 @@ func (slf MessageErrorAction) String() string { return messageErrorActionNames[slf] } -type message struct { +type Message struct { t MessageType attrs []any } diff --git a/server/server.go b/server/server.go index f275183..8fcc2f1 100644 --- a/server/server.go +++ b/server/server.go @@ -66,9 +66,9 @@ type Server struct { closeChannel chan struct{} // 关闭信号 gServer *gNet // TCP或UDP模式下的服务器 - messagePool *synchronization.Pool[*message] // 消息池 + messagePool *synchronization.Pool[*Message] // 消息池 messagePoolSize int // 消息池大小 - messageChannel map[int]chan *message // 消息管道 + messageChannel map[int]chan *Message // 消息管道 initMessageChannel bool // 消息管道是否已经初始化 multiple bool // 是否为多服务器模式下运行 prod bool // 是否为生产模式 @@ -101,18 +101,18 @@ func (slf *Server) Run(addr string) error { if slf.messagePoolSize <= 0 { slf.messagePoolSize = 100 } - slf.messagePool = synchronization.NewPool[*message](slf.messagePoolSize, - func() *message { - return &message{} + slf.messagePool = synchronization.NewPool[*Message](slf.messagePoolSize, + func() *Message { + return &Message{} }, - func(data *message) { + func(data *Message) { data.t = 0 data.attrs = nil }, ) - slf.messageChannel = map[int]chan *message{} + slf.messageChannel = map[int]chan *Message{} for i := 0; i < slf.core; i++ { - slf.messageChannel[i] = make(chan *message, 4096*1000) + slf.messageChannel[i] = make(chan *Message, 4096*1000) } if slf.network != NetworkHttp && slf.network != NetworkWebsocket { slf.gServer = &gNet{Server: slf} @@ -420,15 +420,19 @@ func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []b } // dispatchMessage 消息分发 -func (slf *Server) dispatchMessage(msg *message) { +func (slf *Server) dispatchMessage(msg *Message) { present := time.Now() defer func() { if err := recover(); err != nil { log.Error("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("MessageAttrs", msg.attrs), zap.Any("error", err)) + if e, ok := err.(error); ok { + slf.OnMessageErrorEvent(msg, e) + } } if cost := time.Since(present); cost > time.Millisecond*100 { log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.String("LowExecCost", cost.String()), zap.Any("MessageAttrs", msg.attrs)) + slf.OnMessageLowExecEvent(msg, cost) } if !slf.isShutdown.Load() {