监控实现
This commit is contained in:
parent
a3941fc529
commit
3c0190eb27
|
@ -18,8 +18,8 @@ var (
|
|||
ErrWebsocketIllegalMessageType = errors.New("illegal message type")
|
||||
ErrPleaseUseWebsocketHandle = errors.New("in Websocket mode, please use the RegConnectionReceiveWebsocketPacketEvent function to register")
|
||||
ErrPleaseUseOrdinaryPacketHandle = errors.New("non Websocket mode, please use the RegConnectionReceivePacketEvent function to register")
|
||||
ErrOnlySupportSocket = errors.New("only supports Socket programming")
|
||||
ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server")
|
||||
ErrNoSupportMonitor = errors.New("the server does not support GetMonitor, please use the WithMonitor option to create the server")
|
||||
ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server")
|
||||
ErrUnregisteredCrossName = errors.New("unregistered cross name, please use the WithCross option to create the server")
|
||||
)
|
||||
|
|
|
@ -0,0 +1,383 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/kercylan98/minotaur/utils/timer"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func newMonitor() *monitor {
|
||||
m := &monitor{
|
||||
ticker: timer.GetTicker(10),
|
||||
}
|
||||
m.ticker.Loop("tick", timer.Instantly, time.Second, timer.Forever, m.tick)
|
||||
return m
|
||||
}
|
||||
|
||||
type Monitor interface {
|
||||
MessageTotal() int64
|
||||
PacketMessageTotal() int64
|
||||
ErrorMessageTotal() int64
|
||||
CrossMessageTotal() int64
|
||||
TickerMessageTotal() int64
|
||||
MessageSecond() int64
|
||||
PacketMessageSecond() int64
|
||||
ErrorMessageSecond() int64
|
||||
CrossMessageSecond() int64
|
||||
TickerMessageSecond() int64
|
||||
MessageCost() time.Duration
|
||||
PacketMessageCost() time.Duration
|
||||
ErrorMessageCost() time.Duration
|
||||
CrossMessageCost() time.Duration
|
||||
TickerMessageCost() time.Duration
|
||||
MessageDoneAvg() time.Duration
|
||||
PacketMessageDoneAvg() time.Duration
|
||||
ErrorMessageDoneAvg() time.Duration
|
||||
CrossMessageDoneAvg() time.Duration
|
||||
TickerMessageDoneAvg() time.Duration
|
||||
MessageQPS() int64
|
||||
PacketMessageQPS() int64
|
||||
ErrorMessageQPS() int64
|
||||
CrossMessageQPS() int64
|
||||
TickerMessageQPS() int64
|
||||
MessageTopQPS() int64
|
||||
PacketMessageTopQPS() int64
|
||||
ErrorMessageTopQPS() int64
|
||||
CrossMessageTopQPS() int64
|
||||
TickerMessageTopQPS() int64
|
||||
}
|
||||
|
||||
type monitor struct {
|
||||
rwMutex sync.RWMutex
|
||||
ticker *timer.Ticker
|
||||
|
||||
messageTotal int64 // 正在执行的消息总数
|
||||
packetMessageTotal int64 // 正在执行的玩家消息总数
|
||||
errorMessageTotal int64 // 正在执行的错误消息总数
|
||||
crossMessageTotal int64 // 正在执行的跨服消息总数
|
||||
tickerMessageTotal int64 // 正在执行的定时器消息总数
|
||||
messageSecond int64 // 一秒内执行的消息并发量
|
||||
packetMessageSecond int64 // 一秒内玩家消息并发量
|
||||
errorMessageSecond int64 // 一秒内错误消息并发量
|
||||
crossMessageSecond int64 // 一秒内跨服消息并发量
|
||||
tickerMessageSecond int64 // 一秒内定时器消息并发量
|
||||
messageCost time.Duration // 一秒内执行的消息总耗时
|
||||
packetMessageCost time.Duration // 一秒内玩家消息总耗时
|
||||
errorMessageCost time.Duration // 一秒内错误消息总耗时
|
||||
crossMessageCost time.Duration // 一秒内跨服消息总耗时
|
||||
tickerMessageCost time.Duration // 一秒内定时器消息总耗时
|
||||
messageDoneAvg time.Duration // 一秒内执行的消息平均响应时间
|
||||
packetMessageDoneAvg time.Duration // 一秒内玩家消息平均响应时间
|
||||
errorMessageDoneAvg time.Duration // 一秒内错误消息平均响应时间
|
||||
crossMessageDoneAvg time.Duration // 一秒内跨服消息平均响应时间
|
||||
tickerMessageDoneAvg time.Duration // 一秒内定时器消息平均响应时间
|
||||
messageQPS int64 // 一秒内执行的消息QPS
|
||||
packetMessageQPS int64 // 一秒内玩家消息QPS
|
||||
errorMessageQPS int64 // 一秒内错误消息QPS
|
||||
crossMessageQPS int64 // 一秒内跨服消息QPS
|
||||
tickerMessageQPS int64 // 一秒内定时器消息QPS
|
||||
messageTopQPS int64 // 执行的消息最高QPS
|
||||
packetMessageTopQPS int64 // 玩家消息最高QPS
|
||||
errorMessageTopQPS int64 // 错误消息最高QPS
|
||||
crossMessageTopQPS int64 // 跨服消息最高QPS
|
||||
tickerMessageTopQPS int64 // 定时器消息最高QPS
|
||||
}
|
||||
|
||||
func (slf *monitor) String() string {
|
||||
m := slf.Map()
|
||||
bytes, _ := json.Marshal(m)
|
||||
return string(bytes)
|
||||
}
|
||||
|
||||
func (slf *monitor) Map() map[string]any {
|
||||
var m = make(map[string]any)
|
||||
m["messageTotal"] = slf.messageTotal
|
||||
m["packetMessageTotal"] = slf.packetMessageTotal
|
||||
m["errorMessageTotal"] = slf.errorMessageTotal
|
||||
m["crossMessageTotal"] = slf.crossMessageTotal
|
||||
m["tickerMessageTotal"] = slf.tickerMessageTotal
|
||||
m["messageSecond"] = slf.messageSecond
|
||||
m["packetMessageSecond"] = slf.packetMessageSecond
|
||||
m["errorMessageSecond"] = slf.errorMessageSecond
|
||||
m["crossMessageSecond"] = slf.crossMessageSecond
|
||||
m["tickerMessageSecond"] = slf.tickerMessageSecond
|
||||
m["messageCost"] = slf.messageCost
|
||||
m["packetMessageCost"] = slf.packetMessageCost
|
||||
m["errorMessageCost"] = slf.errorMessageCost
|
||||
m["crossMessageCost"] = slf.crossMessageCost
|
||||
m["tickerMessageCost"] = slf.tickerMessageCost
|
||||
m["messageDoneAvg"] = slf.messageDoneAvg
|
||||
m["packetMessageDoneAvg"] = slf.packetMessageDoneAvg
|
||||
m["errorMessageDoneAvg"] = slf.errorMessageDoneAvg
|
||||
m["crossMessageDoneAvg"] = slf.crossMessageDoneAvg
|
||||
m["tickerMessageDoneAvg"] = slf.tickerMessageDoneAvg
|
||||
m["messageQPS"] = slf.messageQPS
|
||||
m["packetMessageQPS"] = slf.packetMessageQPS
|
||||
m["errorMessageQPS"] = slf.errorMessageQPS
|
||||
m["crossMessageQPS"] = slf.crossMessageQPS
|
||||
m["tickerMessageQPS"] = slf.tickerMessageQPS
|
||||
m["messageTopQPS"] = slf.messageTopQPS
|
||||
m["packetMessageTopQPS"] = slf.packetMessageTopQPS
|
||||
m["errorMessageTopQPS"] = slf.errorMessageTopQPS
|
||||
m["crossMessageTopQPS"] = slf.crossMessageTopQPS
|
||||
m["tickerMessageTopQPS"] = slf.tickerMessageTopQPS
|
||||
return m
|
||||
}
|
||||
|
||||
func (slf *monitor) tick() {
|
||||
slf.rwMutex.Lock()
|
||||
|
||||
// 秒平均响应时间
|
||||
if nanoseconds := slf.messageCost.Nanoseconds(); nanoseconds == 0 {
|
||||
slf.messageDoneAvg = 0
|
||||
} else {
|
||||
slf.messageDoneAvg = time.Duration(nanoseconds / slf.messageSecond)
|
||||
}
|
||||
if nanoseconds := slf.packetMessageCost.Nanoseconds(); nanoseconds == 0 {
|
||||
slf.packetMessageDoneAvg = 0
|
||||
} else {
|
||||
slf.packetMessageDoneAvg = time.Duration(nanoseconds / slf.packetMessageSecond)
|
||||
}
|
||||
if nanoseconds := slf.errorMessageCost.Nanoseconds(); nanoseconds == 0 {
|
||||
slf.errorMessageDoneAvg = 0
|
||||
} else {
|
||||
slf.errorMessageDoneAvg = time.Duration(nanoseconds / slf.errorMessageSecond)
|
||||
}
|
||||
if nanoseconds := slf.crossMessageCost.Nanoseconds(); nanoseconds == 0 {
|
||||
slf.crossMessageDoneAvg = 0
|
||||
} else {
|
||||
slf.crossMessageDoneAvg = time.Duration(nanoseconds / slf.crossMessageSecond)
|
||||
}
|
||||
if nanoseconds := slf.tickerMessageCost.Nanoseconds(); nanoseconds == 0 {
|
||||
slf.tickerMessageDoneAvg = 0
|
||||
} else {
|
||||
slf.tickerMessageDoneAvg = time.Duration(nanoseconds / slf.tickerMessageSecond)
|
||||
}
|
||||
|
||||
// 秒 QPS
|
||||
if slf.messageSecond == 0 {
|
||||
slf.messageQPS = 0
|
||||
} else {
|
||||
slf.messageQPS = slf.messageSecond / slf.messageDoneAvg.Nanoseconds()
|
||||
}
|
||||
if slf.packetMessageSecond == 0 {
|
||||
slf.packetMessageQPS = 0
|
||||
} else {
|
||||
slf.packetMessageQPS = slf.packetMessageSecond / slf.packetMessageDoneAvg.Nanoseconds()
|
||||
}
|
||||
if slf.errorMessageSecond == 0 {
|
||||
slf.errorMessageQPS = 0
|
||||
} else {
|
||||
slf.errorMessageQPS = slf.errorMessageSecond / slf.errorMessageDoneAvg.Nanoseconds()
|
||||
}
|
||||
if slf.crossMessageSecond == 0 {
|
||||
slf.crossMessageQPS = 0
|
||||
} else {
|
||||
slf.crossMessageQPS = slf.crossMessageSecond / slf.crossMessageDoneAvg.Nanoseconds()
|
||||
}
|
||||
if slf.tickerMessageSecond == 0 {
|
||||
slf.tickerMessageQPS = 0
|
||||
} else {
|
||||
slf.tickerMessageQPS = slf.tickerMessageSecond / slf.tickerMessageDoneAvg.Nanoseconds()
|
||||
}
|
||||
|
||||
// Top QPS
|
||||
if slf.messageQPS > slf.messageTopQPS {
|
||||
slf.messageTopQPS = slf.messageQPS
|
||||
}
|
||||
if slf.packetMessageQPS > slf.packetMessageTopQPS {
|
||||
slf.packetMessageTopQPS = slf.packetMessageQPS
|
||||
}
|
||||
if slf.errorMessageQPS > slf.errorMessageTopQPS {
|
||||
slf.errorMessageTopQPS = slf.errorMessageQPS
|
||||
}
|
||||
if slf.crossMessageQPS > slf.crossMessageTopQPS {
|
||||
slf.crossMessageTopQPS = slf.crossMessageQPS
|
||||
}
|
||||
if slf.tickerMessageQPS > slf.tickerMessageTopQPS {
|
||||
slf.tickerMessageTopQPS = slf.tickerMessageQPS
|
||||
}
|
||||
|
||||
slf.messageSecond = 0
|
||||
slf.packetMessageSecond = 0
|
||||
slf.errorMessageSecond = 0
|
||||
slf.crossMessageSecond = 0
|
||||
slf.tickerMessageSecond = 0
|
||||
slf.messageCost = 0
|
||||
slf.packetMessageCost = 0
|
||||
slf.errorMessageCost = 0
|
||||
slf.crossMessageCost = 0
|
||||
slf.tickerMessageCost = 0
|
||||
slf.rwMutex.Unlock()
|
||||
}
|
||||
|
||||
func (slf *monitor) messageRun(msg *Message) {
|
||||
slf.rwMutex.Lock()
|
||||
defer slf.rwMutex.Unlock()
|
||||
switch msg.t {
|
||||
case MessageTypePacket:
|
||||
slf.packetMessageTotal++
|
||||
slf.packetMessageSecond++
|
||||
case MessageTypeError:
|
||||
slf.errorMessageTotal++
|
||||
slf.errorMessageSecond++
|
||||
case MessageTypeCross:
|
||||
slf.crossMessageTotal++
|
||||
slf.crossMessageSecond++
|
||||
case MessageTypeTicker:
|
||||
slf.tickerMessageTotal++
|
||||
slf.tickerMessageSecond++
|
||||
default:
|
||||
return
|
||||
}
|
||||
slf.messageTotal++
|
||||
slf.messageSecond++
|
||||
}
|
||||
|
||||
func (slf *monitor) messageDone(msg *Message, cost time.Duration) {
|
||||
slf.rwMutex.Lock()
|
||||
defer slf.rwMutex.Unlock()
|
||||
switch msg.t {
|
||||
case MessageTypePacket:
|
||||
slf.packetMessageTotal--
|
||||
slf.packetMessageCost += cost
|
||||
case MessageTypeError:
|
||||
slf.errorMessageTotal--
|
||||
slf.errorMessageCost += cost
|
||||
case MessageTypeCross:
|
||||
slf.crossMessageTotal--
|
||||
slf.crossMessageCost += cost
|
||||
case MessageTypeTicker:
|
||||
slf.tickerMessageTotal--
|
||||
slf.tickerMessageCost += cost
|
||||
default:
|
||||
return
|
||||
}
|
||||
slf.messageTotal--
|
||||
slf.messageCost += cost
|
||||
}
|
||||
|
||||
func (slf *monitor) close() {
|
||||
slf.ticker.Release()
|
||||
}
|
||||
|
||||
func (slf *monitor) MessageTotal() int64 {
|
||||
return slf.messageTotal
|
||||
}
|
||||
|
||||
func (slf *monitor) PacketMessageTotal() int64 {
|
||||
return slf.packetMessageTotal
|
||||
}
|
||||
|
||||
func (slf *monitor) ErrorMessageTotal() int64 {
|
||||
return slf.errorMessageTotal
|
||||
}
|
||||
|
||||
func (slf *monitor) CrossMessageTotal() int64 {
|
||||
return slf.crossMessageTotal
|
||||
}
|
||||
|
||||
func (slf *monitor) TickerMessageTotal() int64 {
|
||||
return slf.tickerMessageTotal
|
||||
}
|
||||
|
||||
func (slf *monitor) MessageSecond() int64 {
|
||||
return slf.messageSecond
|
||||
}
|
||||
|
||||
func (slf *monitor) PacketMessageSecond() int64 {
|
||||
return slf.packetMessageSecond
|
||||
}
|
||||
|
||||
func (slf *monitor) ErrorMessageSecond() int64 {
|
||||
return slf.errorMessageSecond
|
||||
}
|
||||
|
||||
func (slf *monitor) CrossMessageSecond() int64 {
|
||||
return slf.crossMessageSecond
|
||||
}
|
||||
|
||||
func (slf *monitor) TickerMessageSecond() int64 {
|
||||
return slf.tickerMessageSecond
|
||||
}
|
||||
|
||||
func (slf *monitor) MessageCost() time.Duration {
|
||||
return slf.messageCost
|
||||
}
|
||||
|
||||
func (slf *monitor) PacketMessageCost() time.Duration {
|
||||
return slf.packetMessageCost
|
||||
}
|
||||
|
||||
func (slf *monitor) ErrorMessageCost() time.Duration {
|
||||
return slf.errorMessageCost
|
||||
}
|
||||
|
||||
func (slf *monitor) CrossMessageCost() time.Duration {
|
||||
return slf.crossMessageCost
|
||||
}
|
||||
|
||||
func (slf *monitor) TickerMessageCost() time.Duration {
|
||||
return slf.tickerMessageCost
|
||||
}
|
||||
|
||||
func (slf *monitor) MessageDoneAvg() time.Duration {
|
||||
return slf.messageDoneAvg
|
||||
}
|
||||
|
||||
func (slf *monitor) PacketMessageDoneAvg() time.Duration {
|
||||
return slf.packetMessageDoneAvg
|
||||
}
|
||||
|
||||
func (slf *monitor) ErrorMessageDoneAvg() time.Duration {
|
||||
return slf.errorMessageDoneAvg
|
||||
}
|
||||
|
||||
func (slf *monitor) CrossMessageDoneAvg() time.Duration {
|
||||
return slf.crossMessageDoneAvg
|
||||
}
|
||||
|
||||
func (slf *monitor) TickerMessageDoneAvg() time.Duration {
|
||||
return slf.tickerMessageDoneAvg
|
||||
}
|
||||
|
||||
func (slf *monitor) MessageQPS() int64 {
|
||||
return slf.messageQPS
|
||||
}
|
||||
|
||||
func (slf *monitor) PacketMessageQPS() int64 {
|
||||
return slf.packetMessageQPS
|
||||
}
|
||||
|
||||
func (slf *monitor) ErrorMessageQPS() int64 {
|
||||
return slf.errorMessageQPS
|
||||
}
|
||||
|
||||
func (slf *monitor) CrossMessageQPS() int64 {
|
||||
return slf.crossMessageQPS
|
||||
}
|
||||
|
||||
func (slf *monitor) TickerMessageQPS() int64 {
|
||||
return slf.tickerMessageQPS
|
||||
}
|
||||
|
||||
func (slf *monitor) MessageTopQPS() int64 {
|
||||
return slf.messageTopQPS
|
||||
}
|
||||
|
||||
func (slf *monitor) PacketMessageTopQPS() int64 {
|
||||
return slf.packetMessageTopQPS
|
||||
}
|
||||
|
||||
func (slf *monitor) ErrorMessageTopQPS() int64 {
|
||||
return slf.errorMessageTopQPS
|
||||
}
|
||||
|
||||
func (slf *monitor) CrossMessageTopQPS() int64 {
|
||||
return slf.crossMessageTopQPS
|
||||
}
|
||||
|
||||
func (slf *monitor) TickerMessageTopQPS() int64 {
|
||||
return slf.tickerMessageTopQPS
|
||||
}
|
|
@ -23,6 +23,16 @@ const (
|
|||
|
||||
type Option func(srv *Server)
|
||||
|
||||
// WithMonitor 通过监控的方式创建服务器
|
||||
// - 需要注意在消息被转为异步处理时会导致部分指标不可信
|
||||
func WithMonitor() Option {
|
||||
return func(srv *Server) {
|
||||
if srv.monitor == nil {
|
||||
srv.monitor = newMonitor()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithTicker 通过定时器创建服务器,为服务器添加定时器功能
|
||||
// - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题)
|
||||
// - 多核与分流情况下需要考虑是否有必要 autonomy
|
||||
|
|
|
@ -52,6 +52,7 @@ func New(network Network, options ...Option) *Server {
|
|||
// Server 网络服务器
|
||||
type Server struct {
|
||||
*event // 事件
|
||||
monitor *monitor // 监控
|
||||
cross map[string]Cross // 跨服
|
||||
id int64 // 服务器id
|
||||
network Network // 网络类型
|
||||
|
@ -330,6 +331,14 @@ func (slf *Server) Ticker() *timer.Ticker {
|
|||
return slf.ticker
|
||||
}
|
||||
|
||||
// GetMonitor 获取服务器监控信息
|
||||
func (slf *Server) GetMonitor() Monitor {
|
||||
if slf.monitor == nil {
|
||||
panic(ErrNoSupportMonitor)
|
||||
}
|
||||
return slf.monitor
|
||||
}
|
||||
|
||||
// Shutdown 停止运行服务器
|
||||
func (slf *Server) Shutdown(err error, stack ...string) {
|
||||
slf.isShutdown.Store(true)
|
||||
|
@ -363,6 +372,10 @@ func (slf *Server) Shutdown(err error, stack ...string) {
|
|||
log.Error("Server", zap.Error(shutdownErr))
|
||||
}
|
||||
}
|
||||
if slf.monitor != nil {
|
||||
slf.monitor.close()
|
||||
slf.monitor = nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
var s string
|
||||
|
@ -430,15 +443,22 @@ func (slf *Server) dispatchMessage(msg *Message) {
|
|||
}
|
||||
}
|
||||
|
||||
if cost := time.Since(present); cost > time.Millisecond*100 {
|
||||
cost := time.Since(present)
|
||||
if cost > time.Millisecond*100 {
|
||||
log.Warn("Server", zap.String("MessageType", messageNames[msg.t]), zap.String("LowExecCost", cost.String()), zap.Any("MessageAttrs", msg.attrs))
|
||||
slf.OnMessageLowExecEvent(msg, cost)
|
||||
}
|
||||
|
||||
if !slf.isShutdown.Load() {
|
||||
if slf.monitor != nil {
|
||||
slf.monitor.messageDone(msg, cost)
|
||||
}
|
||||
slf.messagePool.Release(msg)
|
||||
}
|
||||
}()
|
||||
if slf.monitor != nil {
|
||||
slf.monitor.messageRun(msg)
|
||||
}
|
||||
switch msg.t {
|
||||
case MessageTypePacket:
|
||||
if slf.network == NetworkWebsocket {
|
||||
|
|
Loading…
Reference in New Issue