feat: server 支持通过 WithLowMessageDuration、WithAsyncLowMessageDuration 函数设置慢消息阈值

This commit is contained in:
kercylan98 2024-01-12 18:43:47 +08:00
parent 4f2850b355
commit 4e1d075a05
4 changed files with 43 additions and 13 deletions

View File

@ -12,12 +12,14 @@ const (
) )
const ( const (
DefaultAsyncPoolSize = 256 DefaultAsyncPoolSize = 256
DefaultWebsocketReadDeadline = 30 * time.Second DefaultWebsocketReadDeadline = 30 * time.Second
DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB
DefaultDispatcherBufferSize = 1024 * 16 DefaultDispatcherBufferSize = 1024 * 16
DefaultConnWriteBufferSize = 1024 * 1 DefaultConnWriteBufferSize = 1024 * 1
DefaultConnHubBufferSize = 1024 * 1 DefaultConnHubBufferSize = 1024 * 1
DefaultLowMessageDuration = 100 * time.Millisecond
DefaultAsyncLowMessageDuration = time.Second
) )
func DefaultWebsocketUpgrader() *websocket.Upgrader { func DefaultWebsocketUpgrader() *websocket.Upgrader {

View File

@ -27,7 +27,7 @@ func (slf *HttpRouter[Context]) handlesConvert(handlers []HandlerFunc[Context])
hc := slf.packer(ctx) hc := slf.packer(ctx)
var now = time.Now() var now = time.Now()
handler(hc) handler(hc)
slf.srv.low(nil, now, time.Second, "HTTP ["+ctx.Request.Method+"] "+ctx.Request.RequestURI) slf.srv.low(nil, now, slf.srv.asyncLowMessageDuration, true, "HTTP ["+ctx.Request.Method+"] "+ctx.Request.RequestURI)
}) })
} }
return handles return handles

View File

@ -52,6 +52,26 @@ type runtime struct {
websocketUpgrader *websocket.Upgrader // websocket 升级器 websocketUpgrader *websocket.Upgrader // websocket 升级器
websocketConnInitializer func(writer http.ResponseWriter, request *http.Request, conn *websocket.Conn) error // websocket 连接初始化 websocketConnInitializer func(writer http.ResponseWriter, request *http.Request, conn *websocket.Conn) error // websocket 连接初始化
dispatcherBufferSize int // 消息分发器缓冲区大小 dispatcherBufferSize int // 消息分发器缓冲区大小
lowMessageDuration time.Duration // 慢消息时长
asyncLowMessageDuration time.Duration // 异步慢消息时长
}
// WithLowMessageDuration 通过指定慢消息时长的方式创建服务器,当消息处理时间超过指定时长时,将会输出 WARN 类型的日志
// - 默认值为 DefaultLowMessageDuration
// - 当 duration <= 0 时,表示关闭慢消息检测
func WithLowMessageDuration(duration time.Duration) Option {
return func(srv *Server) {
srv.lowMessageDuration = duration
}
}
// WithAsyncLowMessageDuration 通过指定异步消息的慢消息时长的方式创建服务器,当消息处理时间超过指定时长时,将会输出 WARN 类型的日志
// - 默认值为 DefaultAsyncLowMessageDuration
// - 当 duration <= 0 时,表示关闭慢消息检测
func WithAsyncLowMessageDuration(duration time.Duration) Option {
return func(srv *Server) {
srv.asyncLowMessageDuration = duration
}
} }
// WithWebsocketConnInitializer 通过 websocket 连接初始化的方式创建服务器,当 initializer 返回错误时,服务器将不会处理该连接的后续逻辑 // WithWebsocketConnInitializer 通过 websocket 连接初始化的方式创建服务器,当 initializer 返回错误时,服务器将不会处理该连接的后续逻辑

View File

@ -32,9 +32,11 @@ func New(network Network, options ...Option) *Server {
network.check() network.check()
server := &Server{ server := &Server{
runtime: &runtime{ runtime: &runtime{
packetWarnSize: DefaultPacketWarnSize, packetWarnSize: DefaultPacketWarnSize,
connWriteBufferSize: DefaultConnWriteBufferSize, connWriteBufferSize: DefaultConnWriteBufferSize,
dispatcherBufferSize: DefaultDispatcherBufferSize, dispatcherBufferSize: DefaultDispatcherBufferSize,
lowMessageDuration: DefaultLowMessageDuration,
asyncLowMessageDuration: DefaultAsyncLowMessageDuration,
}, },
connMgr: &connMgr{}, connMgr: &connMgr{},
option: &option{}, option: &option{},
@ -369,7 +371,13 @@ func (srv *Server) pushMessage(message *Message) {
d.Put(message) d.Put(message)
} }
func (srv *Server) low(message *Message, present time.Time, expect time.Duration, messageReplace ...string) { func (srv *Server) low(message *Message, present time.Time, expect time.Duration, async bool, messageReplace ...string) {
switch {
case async && srv.asyncLowMessageDuration <= 0:
return
case !async && srv.lowMessageDuration <= 0:
return
}
cost := time.Since(present) cost := time.Since(present)
if cost > expect { if cost > expect {
if message == nil { if message == nil {
@ -431,7 +439,7 @@ func (srv *Server) dispatchMessage(dispatcherIns *dispatcher.Dispatcher[string,
dispatcherIns.IncrCount(msg.producer, -1) dispatcherIns.IncrCount(msg.producer, -1)
} }
srv.low(msg, present, time.Millisecond*100) srv.low(msg, present, srv.lowMessageDuration, false)
srv.messageCounter.Add(-1) srv.messageCounter.Add(-1)
if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) { if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) {
@ -470,7 +478,7 @@ func (srv *Server) dispatchMessage(dispatcherIns *dispatcher.Dispatcher[string,
srv.OnMessageErrorEvent(msg, err) srv.OnMessageErrorEvent(msg, err)
} }
super.Handle(cancel) super.Handle(cancel)
srv.low(msg, present, time.Second) srv.low(msg, present, srv.asyncLowMessageDuration, true)
srv.messageCounter.Add(-1) srv.messageCounter.Add(-1)
if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) { if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) {