支持注册消息错误和慢执行事件

This commit is contained in:
kercylan98 2023-05-20 17:14:36 +08:00
parent abe9a7b57c
commit ce71a956e5
3 changed files with 43 additions and 10 deletions

View File

@ -5,6 +5,7 @@ import (
"github.com/kercylan98/minotaur/utils/runtimes"
"go.uber.org/zap"
"reflect"
"time"
)
type StartBeforeEventHandle func(srv *Server)
@ -14,6 +15,8 @@ type ConnectionReceiveWebsocketPacketEventHandle func(srv *Server, conn *Conn, p
type ConnectionOpenedEventHandle func(srv *Server, conn *Conn)
type ConnectionClosedEventHandle func(srv *Server, conn *Conn)
type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId int64, packet []byte)
type MessageErrorEventHandle func(srv *Server, message *Message, err error)
type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Duration)
type event struct {
*Server
@ -24,6 +27,8 @@ type event struct {
connectionOpenedEventHandles []ConnectionOpenedEventHandle
connectionClosedEventHandles []ConnectionClosedEventHandle
receiveCrossPacketEventHandles []ReceiveCrossPacketEventHandle
messageErrorEventHandles []MessageErrorEventHandle
messageLowExecEventHandles []MessageLowExecEventHandle
}
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
@ -128,6 +133,30 @@ func (slf *event) OnReceiveCrossPacketEvent(serverId int64, packet []byte) {
}
}
// RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数
func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle) {
slf.messageErrorEventHandles = append(slf.messageErrorEventHandles, handle)
log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String()))
}
func (slf *event) OnMessageErrorEvent(message *Message, err error) {
for _, handle := range slf.messageErrorEventHandles {
handle(slf.Server, message, err)
}
}
// RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数
func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle) {
slf.messageLowExecEventHandles = append(slf.messageLowExecEventHandles, handle)
log.Info("Server", zap.String("RegEvent", runtimes.CurrentRunningFuncName()), zap.String("handle", reflect.TypeOf(handle).String()))
}
func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) {
for _, handle := range slf.messageLowExecEventHandles {
handle(slf.Server, message, cost)
}
}
func (slf *event) check() {
switch slf.network {
case NetworkHttp, NetworkGRPC:

View File

@ -50,7 +50,7 @@ func (slf MessageErrorAction) String() string {
return messageErrorActionNames[slf]
}
type message struct {
type Message struct {
t MessageType
attrs []any
}

View File

@ -66,9 +66,9 @@ type Server struct {
closeChannel chan struct{} // 关闭信号
gServer *gNet // TCP或UDP模式下的服务器
messagePool *synchronization.Pool[*message] // 消息池
messagePool *synchronization.Pool[*Message] // 消息池
messagePoolSize int // 消息池大小
messageChannel map[int]chan *message // 消息管道
messageChannel map[int]chan *Message // 消息管道
initMessageChannel bool // 消息管道是否已经初始化
multiple bool // 是否为多服务器模式下运行
prod bool // 是否为生产模式
@ -101,18 +101,18 @@ func (slf *Server) Run(addr string) error {
if slf.messagePoolSize <= 0 {
slf.messagePoolSize = 100
}
slf.messagePool = synchronization.NewPool[*message](slf.messagePoolSize,
func() *message {
return &message{}
slf.messagePool = synchronization.NewPool[*Message](slf.messagePoolSize,
func() *Message {
return &Message{}
},
func(data *message) {
func(data *Message) {
data.t = 0
data.attrs = nil
},
)
slf.messageChannel = map[int]chan *message{}
slf.messageChannel = map[int]chan *Message{}
for i := 0; i < slf.core; i++ {
slf.messageChannel[i] = make(chan *message, 4096*1000)
slf.messageChannel[i] = make(chan *Message, 4096*1000)
}
if slf.network != NetworkHttp && slf.network != NetworkWebsocket {
slf.gServer = &gNet{Server: slf}
@ -420,15 +420,19 @@ func (slf *Server) PushCrossMessage(crossName string, serverId int64, packet []b
}
// dispatchMessage 消息分发
func (slf *Server) dispatchMessage(msg *message) {
func (slf *Server) dispatchMessage(msg *Message) {
present := time.Now()
defer func() {
if err := recover(); err != nil {
log.Error("Server", zap.String("MessageType", messageNames[msg.t]), zap.Any("MessageAttrs", msg.attrs), zap.Any("error", err))
if e, ok := err.(error); ok {
slf.OnMessageErrorEvent(msg, e)
}
}
if cost := time.Since(present); cost > time.Millisecond*100 {
log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.String("LowExecCost", cost.String()), zap.Any("MessageAttrs", msg.attrs))
slf.OnMessageLowExecEvent(msg, cost)
}
if !slf.isShutdown.Load() {