diff --git a/server/constants.go b/server/constants.go index b5bc5c7..cc941c0 100644 --- a/server/constants.go +++ b/server/constants.go @@ -12,12 +12,14 @@ const ( ) const ( - DefaultAsyncPoolSize = 256 - DefaultWebsocketReadDeadline = 30 * time.Second - DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB - DefaultDispatcherBufferSize = 1024 * 16 - DefaultConnWriteBufferSize = 1024 * 1 - DefaultConnHubBufferSize = 1024 * 1 + DefaultAsyncPoolSize = 256 + DefaultWebsocketReadDeadline = 30 * time.Second + DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB + DefaultDispatcherBufferSize = 1024 * 16 + DefaultConnWriteBufferSize = 1024 * 1 + DefaultConnHubBufferSize = 1024 * 1 + DefaultLowMessageDuration = 100 * time.Millisecond + DefaultAsyncLowMessageDuration = time.Second ) func DefaultWebsocketUpgrader() *websocket.Upgrader { diff --git a/server/http_router.go b/server/http_router.go index d6efe09..779cced 100644 --- a/server/http_router.go +++ b/server/http_router.go @@ -27,7 +27,7 @@ func (slf *HttpRouter[Context]) handlesConvert(handlers []HandlerFunc[Context]) hc := slf.packer(ctx) var now = time.Now() handler(hc) - slf.srv.low(nil, now, time.Second, "HTTP ["+ctx.Request.Method+"] "+ctx.Request.RequestURI) + slf.srv.low(nil, now, slf.srv.asyncLowMessageDuration, true, "HTTP ["+ctx.Request.Method+"] "+ctx.Request.RequestURI) }) } return handles diff --git a/server/options.go b/server/options.go index 9497d33..1e11728 100644 --- a/server/options.go +++ b/server/options.go @@ -52,6 +52,26 @@ type runtime struct { websocketUpgrader *websocket.Upgrader // websocket 升级器 websocketConnInitializer func(writer http.ResponseWriter, request *http.Request, conn *websocket.Conn) error // websocket 连接初始化 dispatcherBufferSize int // 消息分发器缓冲区大小 + lowMessageDuration time.Duration // 慢消息时长 + asyncLowMessageDuration time.Duration // 异步慢消息时长 +} + +// WithLowMessageDuration 通过指定慢消息时长的方式创建服务器,当消息处理时间超过指定时长时,将会输出 WARN 类型的日志 +// - 默认值为 DefaultLowMessageDuration +// - 当 duration <= 0 时,表示关闭慢消息检测 +func WithLowMessageDuration(duration time.Duration) Option { + return func(srv *Server) { + srv.lowMessageDuration = duration + } +} + +// WithAsyncLowMessageDuration 通过指定异步消息的慢消息时长的方式创建服务器,当消息处理时间超过指定时长时,将会输出 WARN 类型的日志 +// - 默认值为 DefaultAsyncLowMessageDuration +// - 当 duration <= 0 时,表示关闭慢消息检测 +func WithAsyncLowMessageDuration(duration time.Duration) Option { + return func(srv *Server) { + srv.asyncLowMessageDuration = duration + } } // WithWebsocketConnInitializer 通过 websocket 连接初始化的方式创建服务器,当 initializer 返回错误时,服务器将不会处理该连接的后续逻辑 diff --git a/server/server.go b/server/server.go index 47d69ed..ea53720 100644 --- a/server/server.go +++ b/server/server.go @@ -32,9 +32,11 @@ func New(network Network, options ...Option) *Server { network.check() server := &Server{ runtime: &runtime{ - packetWarnSize: DefaultPacketWarnSize, - connWriteBufferSize: DefaultConnWriteBufferSize, - dispatcherBufferSize: DefaultDispatcherBufferSize, + packetWarnSize: DefaultPacketWarnSize, + connWriteBufferSize: DefaultConnWriteBufferSize, + dispatcherBufferSize: DefaultDispatcherBufferSize, + lowMessageDuration: DefaultLowMessageDuration, + asyncLowMessageDuration: DefaultAsyncLowMessageDuration, }, connMgr: &connMgr{}, option: &option{}, @@ -369,7 +371,13 @@ func (srv *Server) pushMessage(message *Message) { d.Put(message) } -func (srv *Server) low(message *Message, present time.Time, expect time.Duration, messageReplace ...string) { +func (srv *Server) low(message *Message, present time.Time, expect time.Duration, async bool, messageReplace ...string) { + switch { + case async && srv.asyncLowMessageDuration <= 0: + return + case !async && srv.lowMessageDuration <= 0: + return + } cost := time.Since(present) if cost > expect { if message == nil { @@ -431,7 +439,7 @@ func (srv *Server) dispatchMessage(dispatcherIns *dispatcher.Dispatcher[string, dispatcherIns.IncrCount(msg.producer, -1) } - srv.low(msg, present, time.Millisecond*100) + srv.low(msg, present, srv.lowMessageDuration, false) srv.messageCounter.Add(-1) if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) { @@ -470,7 +478,7 @@ func (srv *Server) dispatchMessage(dispatcherIns *dispatcher.Dispatcher[string, srv.OnMessageErrorEvent(msg, err) } super.Handle(cancel) - srv.low(msg, present, time.Second) + srv.low(msg, present, srv.asyncLowMessageDuration, true) srv.messageCounter.Add(-1) if atomic.CompareAndSwapUint32(&srv.closed, 0, 0) {