diff --git a/server/errors.go b/server/errors.go index ed59d27..4a83065 100644 --- a/server/errors.go +++ b/server/errors.go @@ -18,8 +18,8 @@ var ( ErrWebsocketIllegalMessageType = errors.New("illegal message type") ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register") ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register") - ErrOnlySupportSocket = errors.New("only supports Socket programming") ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server") + ErrNoSupportMonitor = errors.New("the server does not support GetMonitor, please use the WithMonitor option to create the server") ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server") ErrUnregisteredCrossName = errors.New("unregistered cross name, please use the WithCross option to create the server") ) diff --git a/server/monitor.go b/server/monitor.go new file mode 100644 index 0000000..c09b11c --- /dev/null +++ b/server/monitor.go @@ -0,0 +1,383 @@ +package server + +import ( + "encoding/json" + "github.com/kercylan98/minotaur/utils/timer" + "sync" + "time" +) + +func newMonitor() *monitor { + m := &monitor{ + ticker: timer.GetTicker(10), + } + m.ticker.Loop("tick", timer.Instantly, time.Second, timer.Forever, m.tick) + return m +} + +type Monitor interface { + MessageTotal() int64 + PacketMessageTotal() int64 + ErrorMessageTotal() int64 + CrossMessageTotal() int64 + TickerMessageTotal() int64 + MessageSecond() int64 + PacketMessageSecond() int64 + ErrorMessageSecond() int64 + CrossMessageSecond() int64 + TickerMessageSecond() int64 + MessageCost() time.Duration + PacketMessageCost() time.Duration + ErrorMessageCost() time.Duration + CrossMessageCost() time.Duration + TickerMessageCost() time.Duration + MessageDoneAvg() time.Duration + PacketMessageDoneAvg() time.Duration + ErrorMessageDoneAvg() time.Duration + CrossMessageDoneAvg() time.Duration + TickerMessageDoneAvg() time.Duration + MessageQPS() int64 + PacketMessageQPS() int64 + ErrorMessageQPS() int64 + CrossMessageQPS() int64 + TickerMessageQPS() int64 + MessageTopQPS() int64 + PacketMessageTopQPS() int64 + ErrorMessageTopQPS() int64 + CrossMessageTopQPS() int64 + TickerMessageTopQPS() int64 +} + +type monitor struct { + rwMutex sync.RWMutex + ticker *timer.Ticker + + messageTotal int64 // 正在执行的消息总数 + packetMessageTotal int64 // 正在执行的玩家消息总数 + errorMessageTotal int64 // 正在执行的错误消息总数 + crossMessageTotal int64 // 正在执行的跨服消息总数 + tickerMessageTotal int64 // 正在执行的定时器消息总数 + messageSecond int64 // 一秒内执行的消息并发量 + packetMessageSecond int64 // 一秒内玩家消息并发量 + errorMessageSecond int64 // 一秒内错误消息并发量 + crossMessageSecond int64 // 一秒内跨服消息并发量 + tickerMessageSecond int64 // 一秒内定时器消息并发量 + messageCost time.Duration // 一秒内执行的消息总耗时 + packetMessageCost time.Duration // 一秒内玩家消息总耗时 + errorMessageCost time.Duration // 一秒内错误消息总耗时 + crossMessageCost time.Duration // 一秒内跨服消息总耗时 + tickerMessageCost time.Duration // 一秒内定时器消息总耗时 + messageDoneAvg time.Duration // 一秒内执行的消息平均响应时间 + packetMessageDoneAvg time.Duration // 一秒内玩家消息平均响应时间 + errorMessageDoneAvg time.Duration // 一秒内错误消息平均响应时间 + crossMessageDoneAvg time.Duration // 一秒内跨服消息平均响应时间 + tickerMessageDoneAvg time.Duration // 一秒内定时器消息平均响应时间 + messageQPS int64 // 一秒内执行的消息QPS + packetMessageQPS int64 // 一秒内玩家消息QPS + errorMessageQPS int64 // 一秒内错误消息QPS + crossMessageQPS int64 // 一秒内跨服消息QPS + tickerMessageQPS int64 // 一秒内定时器消息QPS + messageTopQPS int64 // 执行的消息最高QPS + packetMessageTopQPS int64 // 玩家消息最高QPS + errorMessageTopQPS int64 // 错误消息最高QPS + crossMessageTopQPS int64 // 跨服消息最高QPS + tickerMessageTopQPS int64 // 定时器消息最高QPS +} + +func (slf *monitor) String() string { + m := slf.Map() + bytes, _ := json.Marshal(m) + return string(bytes) +} + +func (slf *monitor) Map() map[string]any { + var m = make(map[string]any) + m["messageTotal"] = slf.messageTotal + m["packetMessageTotal"] = slf.packetMessageTotal + m["errorMessageTotal"] = slf.errorMessageTotal + m["crossMessageTotal"] = slf.crossMessageTotal + m["tickerMessageTotal"] = slf.tickerMessageTotal + m["messageSecond"] = slf.messageSecond + m["packetMessageSecond"] = slf.packetMessageSecond + m["errorMessageSecond"] = slf.errorMessageSecond + m["crossMessageSecond"] = slf.crossMessageSecond + m["tickerMessageSecond"] = slf.tickerMessageSecond + m["messageCost"] = slf.messageCost + m["packetMessageCost"] = slf.packetMessageCost + m["errorMessageCost"] = slf.errorMessageCost + m["crossMessageCost"] = slf.crossMessageCost + m["tickerMessageCost"] = slf.tickerMessageCost + m["messageDoneAvg"] = slf.messageDoneAvg + m["packetMessageDoneAvg"] = slf.packetMessageDoneAvg + m["errorMessageDoneAvg"] = slf.errorMessageDoneAvg + m["crossMessageDoneAvg"] = slf.crossMessageDoneAvg + m["tickerMessageDoneAvg"] = slf.tickerMessageDoneAvg + m["messageQPS"] = slf.messageQPS + m["packetMessageQPS"] = slf.packetMessageQPS + m["errorMessageQPS"] = slf.errorMessageQPS + m["crossMessageQPS"] = slf.crossMessageQPS + m["tickerMessageQPS"] = slf.tickerMessageQPS + m["messageTopQPS"] = slf.messageTopQPS + m["packetMessageTopQPS"] = slf.packetMessageTopQPS + m["errorMessageTopQPS"] = slf.errorMessageTopQPS + m["crossMessageTopQPS"] = slf.crossMessageTopQPS + m["tickerMessageTopQPS"] = slf.tickerMessageTopQPS + return m +} + +func (slf *monitor) tick() { + slf.rwMutex.Lock() + + // 秒平均响应时间 + if nanoseconds := slf.messageCost.Nanoseconds(); nanoseconds == 0 { + slf.messageDoneAvg = 0 + } else { + slf.messageDoneAvg = time.Duration(nanoseconds / slf.messageSecond) + } + if nanoseconds := slf.packetMessageCost.Nanoseconds(); nanoseconds == 0 { + slf.packetMessageDoneAvg = 0 + } else { + slf.packetMessageDoneAvg = time.Duration(nanoseconds / slf.packetMessageSecond) + } + if nanoseconds := slf.errorMessageCost.Nanoseconds(); nanoseconds == 0 { + slf.errorMessageDoneAvg = 0 + } else { + slf.errorMessageDoneAvg = time.Duration(nanoseconds / slf.errorMessageSecond) + } + if nanoseconds := slf.crossMessageCost.Nanoseconds(); nanoseconds == 0 { + slf.crossMessageDoneAvg = 0 + } else { + slf.crossMessageDoneAvg = time.Duration(nanoseconds / slf.crossMessageSecond) + } + if nanoseconds := slf.tickerMessageCost.Nanoseconds(); nanoseconds == 0 { + slf.tickerMessageDoneAvg = 0 + } else { + slf.tickerMessageDoneAvg = time.Duration(nanoseconds / slf.tickerMessageSecond) + } + + // 秒 QPS + if slf.messageSecond == 0 { + slf.messageQPS = 0 + } else { + slf.messageQPS = slf.messageSecond / slf.messageDoneAvg.Nanoseconds() + } + if slf.packetMessageSecond == 0 { + slf.packetMessageQPS = 0 + } else { + slf.packetMessageQPS = slf.packetMessageSecond / slf.packetMessageDoneAvg.Nanoseconds() + } + if slf.errorMessageSecond == 0 { + slf.errorMessageQPS = 0 + } else { + slf.errorMessageQPS = slf.errorMessageSecond / slf.errorMessageDoneAvg.Nanoseconds() + } + if slf.crossMessageSecond == 0 { + slf.crossMessageQPS = 0 + } else { + slf.crossMessageQPS = slf.crossMessageSecond / slf.crossMessageDoneAvg.Nanoseconds() + } + if slf.tickerMessageSecond == 0 { + slf.tickerMessageQPS = 0 + } else { + slf.tickerMessageQPS = slf.tickerMessageSecond / slf.tickerMessageDoneAvg.Nanoseconds() + } + + // Top QPS + if slf.messageQPS > slf.messageTopQPS { + slf.messageTopQPS = slf.messageQPS + } + if slf.packetMessageQPS > slf.packetMessageTopQPS { + slf.packetMessageTopQPS = slf.packetMessageQPS + } + if slf.errorMessageQPS > slf.errorMessageTopQPS { + slf.errorMessageTopQPS = slf.errorMessageQPS + } + if slf.crossMessageQPS > slf.crossMessageTopQPS { + slf.crossMessageTopQPS = slf.crossMessageQPS + } + if slf.tickerMessageQPS > slf.tickerMessageTopQPS { + slf.tickerMessageTopQPS = slf.tickerMessageQPS + } + + slf.messageSecond = 0 + slf.packetMessageSecond = 0 + slf.errorMessageSecond = 0 + slf.crossMessageSecond = 0 + slf.tickerMessageSecond = 0 + slf.messageCost = 0 + slf.packetMessageCost = 0 + slf.errorMessageCost = 0 + slf.crossMessageCost = 0 + slf.tickerMessageCost = 0 + slf.rwMutex.Unlock() +} + +func (slf *monitor) messageRun(msg *Message) { + slf.rwMutex.Lock() + defer slf.rwMutex.Unlock() + switch msg.t { + case MessageTypePacket: + slf.packetMessageTotal++ + slf.packetMessageSecond++ + case MessageTypeError: + slf.errorMessageTotal++ + slf.errorMessageSecond++ + case MessageTypeCross: + slf.crossMessageTotal++ + slf.crossMessageSecond++ + case MessageTypeTicker: + slf.tickerMessageTotal++ + slf.tickerMessageSecond++ + default: + return + } + slf.messageTotal++ + slf.messageSecond++ +} + +func (slf *monitor) messageDone(msg *Message, cost time.Duration) { + slf.rwMutex.Lock() + defer slf.rwMutex.Unlock() + switch msg.t { + case MessageTypePacket: + slf.packetMessageTotal-- + slf.packetMessageCost += cost + case MessageTypeError: + slf.errorMessageTotal-- + slf.errorMessageCost += cost + case MessageTypeCross: + slf.crossMessageTotal-- + slf.crossMessageCost += cost + case MessageTypeTicker: + slf.tickerMessageTotal-- + slf.tickerMessageCost += cost + default: + return + } + slf.messageTotal-- + slf.messageCost += cost +} + +func (slf *monitor) close() { + slf.ticker.Release() +} + +func (slf *monitor) MessageTotal() int64 { + return slf.messageTotal +} + +func (slf *monitor) PacketMessageTotal() int64 { + return slf.packetMessageTotal +} + +func (slf *monitor) ErrorMessageTotal() int64 { + return slf.errorMessageTotal +} + +func (slf *monitor) CrossMessageTotal() int64 { + return slf.crossMessageTotal +} + +func (slf *monitor) TickerMessageTotal() int64 { + return slf.tickerMessageTotal +} + +func (slf *monitor) MessageSecond() int64 { + return slf.messageSecond +} + +func (slf *monitor) PacketMessageSecond() int64 { + return slf.packetMessageSecond +} + +func (slf *monitor) ErrorMessageSecond() int64 { + return slf.errorMessageSecond +} + +func (slf *monitor) CrossMessageSecond() int64 { + return slf.crossMessageSecond +} + +func (slf *monitor) TickerMessageSecond() int64 { + return slf.tickerMessageSecond +} + +func (slf *monitor) MessageCost() time.Duration { + return slf.messageCost +} + +func (slf *monitor) PacketMessageCost() time.Duration { + return slf.packetMessageCost +} + +func (slf *monitor) ErrorMessageCost() time.Duration { + return slf.errorMessageCost +} + +func (slf *monitor) CrossMessageCost() time.Duration { + return slf.crossMessageCost +} + +func (slf *monitor) TickerMessageCost() time.Duration { + return slf.tickerMessageCost +} + +func (slf *monitor) MessageDoneAvg() time.Duration { + return slf.messageDoneAvg +} + +func (slf *monitor) PacketMessageDoneAvg() time.Duration { + return slf.packetMessageDoneAvg +} + +func (slf *monitor) ErrorMessageDoneAvg() time.Duration { + return slf.errorMessageDoneAvg +} + +func (slf *monitor) CrossMessageDoneAvg() time.Duration { + return slf.crossMessageDoneAvg +} + +func (slf *monitor) TickerMessageDoneAvg() time.Duration { + return slf.tickerMessageDoneAvg +} + +func (slf *monitor) MessageQPS() int64 { + return slf.messageQPS +} + +func (slf *monitor) PacketMessageQPS() int64 { + return slf.packetMessageQPS +} + +func (slf *monitor) ErrorMessageQPS() int64 { + return slf.errorMessageQPS +} + +func (slf *monitor) CrossMessageQPS() int64 { + return slf.crossMessageQPS +} + +func (slf *monitor) TickerMessageQPS() int64 { + return slf.tickerMessageQPS +} + +func (slf *monitor) MessageTopQPS() int64 { + return slf.messageTopQPS +} + +func (slf *monitor) PacketMessageTopQPS() int64 { + return slf.packetMessageTopQPS +} + +func (slf *monitor) ErrorMessageTopQPS() int64 { + return slf.errorMessageTopQPS +} + +func (slf *monitor) CrossMessageTopQPS() int64 { + return slf.crossMessageTopQPS +} + +func (slf *monitor) TickerMessageTopQPS() int64 { + return slf.tickerMessageTopQPS +} diff --git a/server/options.go b/server/options.go index 440fa3f..1dd7d26 100644 --- a/server/options.go +++ b/server/options.go @@ -23,6 +23,16 @@ const ( type Option func(srv *Server) +// WithMonitor 通过监控的方式创建服务器 +// - 需要注意在消息被转为异步处理时会导致部分指标不可信 +func WithMonitor() Option { + return func(srv *Server) { + if srv.monitor == nil { + srv.monitor = newMonitor() + } + } +} + // WithTicker 通过定时器创建服务器,为服务器添加定时器功能 // - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题) // - 多核与分流情况下需要考虑是否有必要 autonomy diff --git a/server/server.go b/server/server.go index f5a86d0..c1e327b 100644 --- a/server/server.go +++ b/server/server.go @@ -52,6 +52,7 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { *event // 事件 + monitor *monitor // 监控 cross map[string]Cross // 跨服 id int64 // 服务器id network Network // 网络类型 @@ -330,6 +331,14 @@ func (slf *Server) Ticker() *timer.Ticker { return slf.ticker } +// GetMonitor 获取服务器监控信息 +func (slf *Server) GetMonitor() Monitor { + if slf.monitor == nil { + panic(ErrNoSupportMonitor) + } + return slf.monitor +} + // Shutdown 停止运行服务器 func (slf *Server) Shutdown(err error, stack ...string) { slf.isShutdown.Store(true) @@ -363,6 +372,10 @@ func (slf *Server) Shutdown(err error, stack ...string) { log.Error("Server", zap.Error(shutdownErr)) } } + if slf.monitor != nil { + slf.monitor.close() + slf.monitor = nil + } if err != nil { var s string @@ -430,15 +443,22 @@ func (slf *Server) dispatchMessage(msg *Message) { } } - if cost := time.Since(present); cost > time.Millisecond*100 { + cost := time.Since(present) + if cost > time.Millisecond*100 { log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.String("LowExecCost", cost.String()), zap.Any("MessageAttrs", msg.attrs)) slf.OnMessageLowExecEvent(msg, cost) } if !slf.isShutdown.Load() { + if slf.monitor != nil { + slf.monitor.messageDone(msg, cost) + } slf.messagePool.Release(msg) } }() + if slf.monitor != nil { + slf.monitor.messageRun(msg) + } switch msg.t { case MessageTypePacket: if slf.network == NetworkWebsocket {