diff --git a/README.md b/README.md index 5bd76b5..4cdb30d 100644 --- a/README.md +++ b/README.md @@ -90,27 +90,35 @@ func main() { > Websocket地址: ws://127.0.0.1:9999 ### 分流服务器 -分流服务器可以将客户端分流到不同的分组上,每个分组中为串行处理,不同分组之间并行处理。 +分流服务器可以将消息分流到不同的分组上,每个分组中为串行处理,不同分组之间并行处理。 ```go package main import "github.com/kercylan98/minotaur/server" func main() { - srv := server.New(server.NetworkWebsocket, - server.WithShunt(func(conn *server.Conn) string { - return conn.GetData("roomId").(string) - }), - ) + srv := server.New(server.NetworkWebsocket) + srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) { + // 通过 user_id 进行分流,不同用户的消息将不会互相阻塞 + srv.UseShunt(conn, conn.Gata("user_id").(string)) + }) srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { - conn.Write(packet) + var roomId = "default" + switch string(packet) { + case "JoinRoom": + // 将用户所处的分流渠道切换到 roomId 渠道,此刻同一分流渠道的消息将会按队列顺序处理 + srv.UseShunt(conn, roomId) + case "LeaveRoom": + // 将用户所处分流切换为用户自身的分流渠道 + srv.UseShunt(conn, conn.Gata("user_id").(string)) + } }) if err := srv.Run(":9999"); err != nil { panic(err) } } ``` -> 该示例中假设各房间互不干涉,故通过`server.WithShunt`将连接通过`roomId`进行分组,提高并发处理能力。 +> 该示例中模拟了用户分流渠道在自身渠道和房间渠道切换的过程,通过`UseShunt`对连接分流渠道进行设置,提高并发处理能力。 ### 服务器死锁检测 `Minotaur`内置了服务器消息死锁检测功能,可通过`server.WithDeadlockDetect`进行开启。 @@ -145,7 +153,7 @@ package main import "github.com/kercylan98/minotaur/server" func main() { - srv := server.New(server.NetworkWebsocket, server.WithTicker(50, false)) + srv := server.New(server.NetworkWebsocket, server.WithTicker(50, 10, false)) if err := srv.Run(":9999"); err != nil { panic(err) } diff --git a/go.mod b/go.mod index 36c7a6d..42bcad4 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,9 @@ require ( github.com/gin-contrib/pprof v1.4.0 github.com/gin-gonic/gin v1.9.1 github.com/go-resty/resty/v2 v2.7.0 + github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 github.com/gorilla/websocket v1.5.0 github.com/json-iterator/go v1.1.12 - github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible github.com/panjf2000/ants/v2 v2.8.1 github.com/panjf2000/gnet v1.6.7 github.com/smartystreets/goconvey v1.8.1 @@ -23,6 +23,7 @@ require ( go.uber.org/zap v1.25.0 golang.org/x/crypto v0.14.0 google.golang.org/grpc v1.59.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) require ( @@ -36,16 +37,13 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect - github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/jonboulle/clockwork v0.3.0 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/klauspost/reedsolomon v1.11.8 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/leodido/go-urn v1.2.4 // indirect - github.com/lestrrat-go/strftime v1.0.6 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect @@ -69,6 +67,5 @@ require ( golang.org/x/text v0.13.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/protobuf v1.31.0 // indirect - gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index db67b8c..7841553 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7oFtbDZXC4dnY093M1kZx6k/95sen92gafbY= github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10= -github.com/alphadose/haxmap v1.2.0 h1:noGrAmCE+gNheZ4KpW+sYj9W5uMcO1UAjbAq9XBOAfM= -github.com/alphadose/haxmap v1.2.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM= github.com/alphadose/haxmap v1.3.0 h1:C/2LboOnPCZP27GmmSXOcwx360st0P8N0fTJ3voefKc= github.com/alphadose/haxmap v1.3.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -82,8 +80,6 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg= -github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= @@ -105,15 +101,9 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= -github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= -github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= -github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4= -github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA= -github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ= -github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -134,8 +124,6 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= -github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= diff --git a/server/conn.go b/server/conn.go index 1806047..1e8d784 100644 --- a/server/conn.go +++ b/server/conn.go @@ -106,9 +106,10 @@ func newBotConn(server *Server) *Conn { return c } -// Conn 服务器连接单次会话的包装 +// Conn 服务器连接单次消息的包装 type Conn struct { *connection + wst int ctx context.Context } @@ -160,7 +161,7 @@ func (slf *Conn) GetWebsocketRequest() *http.Request { // IsBot 是否是机器人连接 func (slf *Conn) IsBot() bool { - return slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil + return slf != nil && slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil } // RemoteAddr 获取远程地址 @@ -229,15 +230,15 @@ func (slf *Conn) IsWebsocket() bool { return slf.server.network == NetworkWebsocket } -// GetWST 获取websocket消息类型 +// GetWST 获取本次 websocket 消息类型 +// - 默认将与发送类型相同 func (slf *Conn) GetWST() int { - wst, _ := slf.ctx.Value(contextKeyWST).(int) - return wst + return slf.wst } -// SetWST 设置websocket消息类型 +// SetWST 设置本次 websocket 消息类型 func (slf *Conn) SetWST(wst int) *Conn { - slf.ctx = context.WithValue(slf.ctx, contextKeyWST, wst) + slf.wst = wst return slf } @@ -255,7 +256,6 @@ func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callba } // Write 向连接中写入数据 -// - messageType: websocket模式中指定消息类型 func (slf *Conn) Write(packet []byte, callback ...func(err error)) { if slf.gw != nil { slf.gw(packet) @@ -356,9 +356,7 @@ func (slf *Conn) Close(err ...error) { if slf.ticker != nil { slf.ticker.Release() } - if slf.server.shuntMatcher != nil { - slf.server.releaseDispatcher(slf.server.shuntMatcher(slf)) - } + slf.server.releaseDispatcher(slf) slf.pool.Close() slf.loop.Close() slf.mu.Unlock() diff --git a/server/constants.go b/server/constants.go index 888dc8b..55ff658 100644 --- a/server/constants.go +++ b/server/constants.go @@ -1,20 +1,9 @@ package server import ( - "github.com/kercylan98/minotaur/utils/log" "time" ) -type ( - RunMode = log.RunMode -) - -const ( - RunModeDev RunMode = log.RunModeDev - RunModeProd RunMode = log.RunModeProd - RunModeTest RunMode = log.RunModeTest -) - const ( serverMultipleMark = "Minotaur Multiple Server" serverMark = "Minotaur Server" @@ -27,7 +16,3 @@ const ( DefaultWebsocketReadDeadline = 30 * time.Second DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB ) - -const ( - contextKeyWST = "_wst" // WebSocket 消息类型 -) diff --git a/server/dispatcher.go b/server/dispatcher.go index e5639e3..40402d4 100644 --- a/server/dispatcher.go +++ b/server/dispatcher.go @@ -8,8 +8,9 @@ import ( var dispatcherUnique = struct{}{} // generateDispatcher 生成消息分发器 -func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) *dispatcher { +func generateDispatcher(name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher { return &dispatcher{ + name: name, buffer: buffer.NewUnboundedN[*Message](), handler: handler, uniques: haxmap.New[string, struct{}](), @@ -18,6 +19,7 @@ func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) // dispatcher 消息分发器 type dispatcher struct { + name string buffer *buffer.Unbounded[*Message] uniques *haxmap.Map[string, struct{}] handler func(dispatcher *dispatcher, message *Message) diff --git a/server/event.go b/server/event.go index c3c390d..6a0e6f8 100644 --- a/server/event.go +++ b/server/event.go @@ -15,74 +15,74 @@ import ( "time" ) -type StartBeforeEventHandle func(srv *Server) -type StartFinishEventHandle func(srv *Server) -type StopEventHandle func(srv *Server) -type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte) -type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) -type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any) -type MessageErrorEventHandle func(srv *Server, message *Message, err error) -type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Duration) -type ConsoleCommandEventHandle func(srv *Server, command string, params ConsoleParams) -type ConnectionOpenedAfterEventHandle func(srv *Server, conn *Conn) -type ConnectionWritePacketBeforeEventHandle func(srv *Server, conn *Conn, packet []byte) []byte -type ShuntChannelCreatedEventHandle func(srv *Server, guid int64) -type ShuntChannelClosedEventHandle func(srv *Server, guid int64) -type ConnectionPacketPreprocessEventHandle func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte)) -type MessageExecBeforeEventHandle func(srv *Server, message *Message) bool -type MessageReadyEventHandle func(srv *Server) +type StartBeforeEventHandler func(srv *Server) +type StartFinishEventHandler func(srv *Server) +type StopEventHandler func(srv *Server) +type ConnectionReceivePacketEventHandler func(srv *Server, conn *Conn, packet []byte) +type ConnectionOpenedEventHandler func(srv *Server, conn *Conn) +type ConnectionClosedEventHandler func(srv *Server, conn *Conn, err any) +type MessageErrorEventHandler func(srv *Server, message *Message, err error) +type MessageLowExecEventHandler func(srv *Server, message *Message, cost time.Duration) +type ConsoleCommandEventHandler func(srv *Server, command string, params ConsoleParams) +type ConnectionOpenedAfterEventHandler func(srv *Server, conn *Conn) +type ConnectionWritePacketBeforeEventHandler func(srv *Server, conn *Conn, packet []byte) []byte +type ShuntChannelCreatedEventHandler func(srv *Server, guid int64) +type ShuntChannelClosedEventHandler func(srv *Server, guid int64) +type ConnectionPacketPreprocessEventHandler func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte)) +type MessageExecBeforeEventHandler func(srv *Server, message *Message) bool +type MessageReadyEventHandler func(srv *Server) func newEvent(srv *Server) *event { return &event{ - Server: srv, - startBeforeEventHandles: slice.NewPriority[StartBeforeEventHandle](), - startFinishEventHandles: slice.NewPriority[StartFinishEventHandle](), - stopEventHandles: slice.NewPriority[StopEventHandle](), - connectionReceivePacketEventHandles: slice.NewPriority[ConnectionReceivePacketEventHandle](), - connectionOpenedEventHandles: slice.NewPriority[ConnectionOpenedEventHandle](), - connectionClosedEventHandles: slice.NewPriority[ConnectionClosedEventHandle](), - messageErrorEventHandles: slice.NewPriority[MessageErrorEventHandle](), - messageLowExecEventHandles: slice.NewPriority[MessageLowExecEventHandle](), - connectionOpenedAfterEventHandles: slice.NewPriority[ConnectionOpenedAfterEventHandle](), - connectionWritePacketBeforeHandles: slice.NewPriority[ConnectionWritePacketBeforeEventHandle](), - shuntChannelCreatedEventHandles: slice.NewPriority[ShuntChannelCreatedEventHandle](), - shuntChannelClosedEventHandles: slice.NewPriority[ShuntChannelClosedEventHandle](), - connectionPacketPreprocessEventHandles: slice.NewPriority[ConnectionPacketPreprocessEventHandle](), - messageExecBeforeEventHandles: slice.NewPriority[MessageExecBeforeEventHandle](), - messageReadyEventHandles: slice.NewPriority[MessageReadyEventHandle](), + Server: srv, + startBeforeEventHandlers: slice.NewPriority[StartBeforeEventHandler](), + startFinishEventHandlers: slice.NewPriority[StartFinishEventHandler](), + stopEventHandlers: slice.NewPriority[StopEventHandler](), + connectionReceivePacketEventHandlers: slice.NewPriority[ConnectionReceivePacketEventHandler](), + connectionOpenedEventHandlers: slice.NewPriority[ConnectionOpenedEventHandler](), + connectionClosedEventHandlers: slice.NewPriority[ConnectionClosedEventHandler](), + messageErrorEventHandlers: slice.NewPriority[MessageErrorEventHandler](), + messageLowExecEventHandlers: slice.NewPriority[MessageLowExecEventHandler](), + connectionOpenedAfterEventHandlers: slice.NewPriority[ConnectionOpenedAfterEventHandler](), + connectionWritePacketBeforeHandlers: slice.NewPriority[ConnectionWritePacketBeforeEventHandler](), + shuntChannelCreatedEventHandlers: slice.NewPriority[ShuntChannelCreatedEventHandler](), + shuntChannelClosedEventHandlers: slice.NewPriority[ShuntChannelClosedEventHandler](), + connectionPacketPreprocessEventHandlers: slice.NewPriority[ConnectionPacketPreprocessEventHandler](), + messageExecBeforeEventHandlers: slice.NewPriority[MessageExecBeforeEventHandler](), + messageReadyEventHandlers: slice.NewPriority[MessageReadyEventHandler](), } } type event struct { *Server - startBeforeEventHandles *slice.Priority[StartBeforeEventHandle] - startFinishEventHandles *slice.Priority[StartFinishEventHandle] - stopEventHandles *slice.Priority[StopEventHandle] - connectionReceivePacketEventHandles *slice.Priority[ConnectionReceivePacketEventHandle] - connectionOpenedEventHandles *slice.Priority[ConnectionOpenedEventHandle] - connectionClosedEventHandles *slice.Priority[ConnectionClosedEventHandle] - messageErrorEventHandles *slice.Priority[MessageErrorEventHandle] - messageLowExecEventHandles *slice.Priority[MessageLowExecEventHandle] - connectionOpenedAfterEventHandles *slice.Priority[ConnectionOpenedAfterEventHandle] - connectionWritePacketBeforeHandles *slice.Priority[ConnectionWritePacketBeforeEventHandle] - shuntChannelCreatedEventHandles *slice.Priority[ShuntChannelCreatedEventHandle] - shuntChannelClosedEventHandles *slice.Priority[ShuntChannelClosedEventHandle] - connectionPacketPreprocessEventHandles *slice.Priority[ConnectionPacketPreprocessEventHandle] - messageExecBeforeEventHandles *slice.Priority[MessageExecBeforeEventHandle] - messageReadyEventHandles *slice.Priority[MessageReadyEventHandle] + startBeforeEventHandlers *slice.Priority[StartBeforeEventHandler] + startFinishEventHandlers *slice.Priority[StartFinishEventHandler] + stopEventHandlers *slice.Priority[StopEventHandler] + connectionReceivePacketEventHandlers *slice.Priority[ConnectionReceivePacketEventHandler] + connectionOpenedEventHandlers *slice.Priority[ConnectionOpenedEventHandler] + connectionClosedEventHandlers *slice.Priority[ConnectionClosedEventHandler] + messageErrorEventHandlers *slice.Priority[MessageErrorEventHandler] + messageLowExecEventHandlers *slice.Priority[MessageLowExecEventHandler] + connectionOpenedAfterEventHandlers *slice.Priority[ConnectionOpenedAfterEventHandler] + connectionWritePacketBeforeHandlers *slice.Priority[ConnectionWritePacketBeforeEventHandler] + shuntChannelCreatedEventHandlers *slice.Priority[ShuntChannelCreatedEventHandler] + shuntChannelClosedEventHandlers *slice.Priority[ShuntChannelClosedEventHandler] + connectionPacketPreprocessEventHandlers *slice.Priority[ConnectionPacketPreprocessEventHandler] + messageExecBeforeEventHandlers *slice.Priority[MessageExecBeforeEventHandler] + messageReadyEventHandlers *slice.Priority[MessageReadyEventHandler] - consoleCommandEventHandles map[string]*slice.Priority[ConsoleCommandEventHandle] - consoleCommandEventHandleInitOnce sync.Once + consoleCommandEventHandlers map[string]*slice.Priority[ConsoleCommandEventHandler] + consoleCommandEventHandlerInitOnce sync.Once } // RegStopEvent 服务器停止时将立即执行被注册的事件处理函数 -func (slf *event) RegStopEvent(handle StopEventHandle, priority ...int) { - slf.stopEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegStopEvent(handler StopEventHandler, priority ...int) { + slf.stopEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnStopEvent() { - slf.stopEventHandles.RangeValue(func(index int, value StopEventHandle) bool { + slf.stopEventHandlers.RangeValue(func(index int, value StopEventHandler) bool { value(slf.Server) return true }) @@ -91,15 +91,15 @@ func (slf *event) OnStopEvent() { // RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数 // - 默认将注册 "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN" 指令作为关闭服务器的指令 // - 可通过注册默认指令进行默认行为的覆盖 -func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEventHandle, priority ...int) { +func (slf *event) RegConsoleCommandEvent(command string, handler ConsoleCommandEventHandler, priority ...int) { fd := int(os.Stdin.Fd()) if !terminal.IsTerminal(fd) { log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("ignore", "system not terminal")) return } - slf.consoleCommandEventHandleInitOnce.Do(func() { - slf.consoleCommandEventHandles = map[string]*slice.Priority[ConsoleCommandEventHandle]{} + slf.consoleCommandEventHandlerInitOnce.Do(func() { + slf.consoleCommandEventHandlers = map[string]*slice.Priority[ConsoleCommandEventHandler]{} go func() { for { var input string @@ -112,18 +112,18 @@ func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEv } }() }) - list, exist := slf.consoleCommandEventHandles[command] + list, exist := slf.consoleCommandEventHandlers[command] if !exist { - list = slice.NewPriority[ConsoleCommandEventHandle]() - slf.consoleCommandEventHandles[command] = list + list = slice.NewPriority[ConsoleCommandEventHandler]() + slf.consoleCommandEventHandlers[command] = list } - list.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + list.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) { slf.PushSystemMessage(func() { - handles, exist := slf.consoleCommandEventHandles[command] + handles, exist := slf.consoleCommandEventHandlers[command] if !exist { switch command { case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN": @@ -142,7 +142,7 @@ func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) { for key, value := range v { params[key] = value } - handles.RangeValue(func(index int, value ConsoleCommandEventHandle) bool { + handles.RangeValue(func(index int, value ConsoleCommandEventHandler) bool { value(slf.Server, command, params) return true }) @@ -151,9 +151,9 @@ func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) { } // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 -func (slf *event) RegStartBeforeEvent(handle StartBeforeEventHandle, priority ...int) { - slf.startBeforeEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegStartBeforeEvent(handler StartBeforeEventHandler, priority ...int) { + slf.startBeforeEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnStartBeforeEvent() { @@ -163,7 +163,7 @@ func (slf *event) OnStartBeforeEvent() { debug.PrintStack() } }() - slf.startBeforeEventHandles.RangeValue(func(index int, value StartBeforeEventHandle) bool { + slf.startBeforeEventHandlers.RangeValue(func(index int, value StartBeforeEventHandler) bool { value(slf.Server) return true }) @@ -171,14 +171,14 @@ func (slf *event) OnStartBeforeEvent() { // RegStartFinishEvent 在服务器启动完成时将立刻执行被注册的事件处理函数 // - 需要注意该时刻服务器已经启动完成,但是还有可能未开始处理消息,客户端有可能无法连接,如果需要在消息处理器准备就绪后执行,请使用 RegMessageReadyEvent 函数 -func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle, priority ...int) { - slf.startFinishEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegStartFinishEvent(handler StartFinishEventHandler, priority ...int) { + slf.startFinishEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnStartFinishEvent() { slf.PushSystemMessage(func() { - slf.startFinishEventHandles.RangeValue(func(index int, value StartFinishEventHandle) bool { + slf.startFinishEventHandlers.RangeValue(func(index int, value StartFinishEventHandler) bool { value(slf.Server) return true }) @@ -192,18 +192,18 @@ func (slf *event) OnStartFinishEvent() { } // RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle, priority ...int) { +func (slf *event) RegConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionClosedEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionClosedEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { slf.PushSystemMessage(func() { slf.Server.online.Delete(conn.GetID()) - slf.connectionClosedEventHandles.RangeValue(func(index int, value ConnectionClosedEventHandle) bool { + slf.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool { value(slf.Server, conn, err) return true }) @@ -211,51 +211,53 @@ func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { } // RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle, priority ...int) { +// - 该阶段的事件将会在系统消息中进行处理,不适合处理耗时操作 +func (slf *event) RegConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionOpenedEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionOpenedEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionOpenedEvent(conn *Conn) { slf.PushSystemMessage(func() { slf.Server.online.Set(conn.GetID(), conn) - slf.connectionOpenedEventHandles.RangeValue(func(index int, value ConnectionOpenedEventHandle) bool { + slf.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool { value(slf.Server, conn) return true }) + slf.OnConnectionOpenedAfterEvent(conn) }, log.String("Event", "OnConnectionOpenedEvent")) } // RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacketEventHandle, priority ...int) { +func (slf *event) RegConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionReceivePacketEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionReceivePacketEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) { if slf.Server.runtime.packetWarnSize > 0 && len(packet) > slf.Server.runtime.packetWarnSize { log.Warn("Server", log.String("OnConnectionReceivePacketEvent", fmt.Sprintf("packet size %d > %d", len(packet), slf.Server.runtime.packetWarnSize))) } - slf.connectionReceivePacketEventHandles.RangeValue(func(index int, value ConnectionReceivePacketEventHandle) bool { + slf.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool { value(slf.Server, conn, packet) return true }) } // RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数 -func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle, priority ...int) { - slf.messageErrorEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegMessageErrorEvent(handler MessageErrorEventHandler, priority ...int) { + slf.messageErrorEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnMessageErrorEvent(message *Message, err error) { - if slf.messageErrorEventHandles.Len() == 0 { + if slf.messageErrorEventHandlers.Len() == 0 { return } defer func() { @@ -264,41 +266,42 @@ func (slf *event) OnMessageErrorEvent(message *Message, err error) { debug.PrintStack() } }() - slf.messageErrorEventHandles.RangeValue(func(index int, value MessageErrorEventHandle) bool { + slf.messageErrorEventHandlers.RangeValue(func(index int, value MessageErrorEventHandler) bool { value(slf.Server, message, err) return true }) } // RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数 -func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle, priority ...int) { - slf.messageLowExecEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegMessageLowExecEvent(handler MessageLowExecEventHandler, priority ...int) { + slf.messageLowExecEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) { - if slf.messageLowExecEventHandles.Len() == 0 { + if slf.messageLowExecEventHandlers.Len() == 0 { return } // 慢消息不再占用消息通道 - slf.messageLowExecEventHandles.RangeValue(func(index int, value MessageLowExecEventHandle) bool { + slf.messageLowExecEventHandlers.RangeValue(func(index int, value MessageLowExecEventHandler) bool { value(slf.Server, message, cost) return true }) } // RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionOpenedAfterEvent(handle ConnectionOpenedAfterEventHandle, priority ...int) { +// - 该阶段事件将会转到对应消息分流渠道中进行处理 +func (slf *event) RegConnectionOpenedAfterEvent(handler ConnectionOpenedAfterEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionOpenedAfterEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionOpenedAfterEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) { - slf.PushSystemMessage(func() { - slf.connectionOpenedAfterEventHandles.RangeValue(func(index int, value ConnectionOpenedAfterEventHandle) bool { + slf.PushShuntMessage(conn, func() { + slf.connectionOpenedAfterEventHandlers.RangeValue(func(index int, value ConnectionOpenedAfterEventHandler) bool { value(slf.Server, conn) return true }) @@ -306,20 +309,20 @@ func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) { } // RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数 -func (slf *event) RegConnectionWritePacketBeforeEvent(handle ConnectionWritePacketBeforeEventHandle, priority ...int) { +func (slf *event) RegConnectionWritePacketBeforeEvent(handler ConnectionWritePacketBeforeEventHandler, priority ...int) { if slf.network == NetworkHttp { panic(ErrNetworkIncompatibleHttp) } - slf.connectionWritePacketBeforeHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) + slf.connectionWritePacketBeforeHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte) (newPacket []byte) { - if slf.connectionWritePacketBeforeHandles.Len() == 0 { + if slf.connectionWritePacketBeforeHandlers.Len() == 0 { return packet } newPacket = packet - slf.connectionWritePacketBeforeHandles.RangeValue(func(index int, value ConnectionWritePacketBeforeEventHandle) bool { + slf.connectionWritePacketBeforeHandlers.RangeValue(func(index int, value ConnectionWritePacketBeforeEventHandler) bool { newPacket = value(slf.Server, conn, newPacket) return true }) @@ -327,14 +330,14 @@ func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte) } // RegShuntChannelCreatedEvent 在分流通道创建时将立刻执行被注册的事件处理函数 -func (slf *event) RegShuntChannelCreatedEvent(handle ShuntChannelCreatedEventHandle, priority ...int) { - slf.shuntChannelCreatedEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegShuntChannelCreatedEvent(handler ShuntChannelCreatedEventHandler, priority ...int) { + slf.shuntChannelCreatedEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnShuntChannelCreatedEvent(guid int64) { slf.PushSystemMessage(func() { - slf.shuntChannelCreatedEventHandles.RangeValue(func(index int, value ShuntChannelCreatedEventHandle) bool { + slf.shuntChannelCreatedEventHandlers.RangeValue(func(index int, value ShuntChannelCreatedEventHandler) bool { value(slf.Server, guid) return true }) @@ -342,14 +345,14 @@ func (slf *event) OnShuntChannelCreatedEvent(guid int64) { } // RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数 -func (slf *event) RegShuntChannelCloseEvent(handle ShuntChannelClosedEventHandle, priority ...int) { - slf.shuntChannelClosedEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegShuntChannelCloseEvent(handler ShuntChannelClosedEventHandler, priority ...int) { + slf.shuntChannelClosedEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnShuntChannelClosedEvent(guid int64) { slf.PushSystemMessage(func() { - slf.shuntChannelClosedEventHandles.RangeValue(func(index int, value ShuntChannelClosedEventHandle) bool { + slf.shuntChannelClosedEventHandlers.RangeValue(func(index int, value ShuntChannelClosedEventHandler) bool { value(slf.Server, guid) return true }) @@ -364,17 +367,17 @@ func (slf *event) OnShuntChannelClosedEvent(guid int64) { // 场景: // - 数据包格式校验 // - 数据包分包等情况处理 -func (slf *event) RegConnectionPacketPreprocessEvent(handle ConnectionPacketPreprocessEventHandle, priority ...int) { - slf.connectionPacketPreprocessEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegConnectionPacketPreprocessEvent(handler ConnectionPacketPreprocessEventHandler, priority ...int) { + slf.connectionPacketPreprocessEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, usePacket func(newPacket []byte)) bool { - if slf.connectionPacketPreprocessEventHandles.Len() == 0 { + if slf.connectionPacketPreprocessEventHandlers.Len() == 0 { return false } var abort = false - slf.connectionPacketPreprocessEventHandles.RangeValue(func(index int, value ConnectionPacketPreprocessEventHandle) bool { + slf.connectionPacketPreprocessEventHandlers.RangeValue(func(index int, value ConnectionPacketPreprocessEventHandler) bool { value(slf.Server, conn, packet, func() { abort = true }, usePacket) if abort { return false @@ -388,13 +391,13 @@ func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, u // - 当返回 true 时,将继续执行后续的消息处理函数,否则将不会执行后续的消息处理函数,并且该消息将被丢弃 // // 适用于限流等场景 -func (slf *event) RegMessageExecBeforeEvent(handle MessageExecBeforeEventHandle, priority ...int) { - slf.messageExecBeforeEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) +func (slf *event) RegMessageExecBeforeEvent(handler MessageExecBeforeEventHandler, priority ...int) { + slf.messageExecBeforeEventHandlers.Append(handler, slice.GetValue(priority, 0)) + log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String())) } func (slf *event) OnMessageExecBeforeEvent(message *Message) bool { - if slf.messageExecBeforeEventHandles.Len() == 0 { + if slf.messageExecBeforeEventHandlers.Len() == 0 { return true } var result = true @@ -404,7 +407,7 @@ func (slf *event) OnMessageExecBeforeEvent(message *Message) bool { debug.PrintStack() } }() - slf.messageExecBeforeEventHandles.RangeValue(func(index int, value MessageExecBeforeEventHandle) bool { + slf.messageExecBeforeEventHandlers.RangeValue(func(index int, value MessageExecBeforeEventHandler) bool { result = value(slf.Server, message) return result }) @@ -412,12 +415,12 @@ func (slf *event) OnMessageExecBeforeEvent(message *Message) bool { } // RegMessageReadyEvent 在服务器消息处理器准备就绪时立即执行被注册的事件处理函数 -func (slf *event) RegMessageReadyEvent(handle MessageReadyEventHandle, priority ...int) { - slf.messageReadyEventHandles.Append(handle, slice.GetValue(priority, 0)) +func (slf *event) RegMessageReadyEvent(handler MessageReadyEventHandler, priority ...int) { + slf.messageReadyEventHandlers.Append(handler, slice.GetValue(priority, 0)) } func (slf *event) OnMessageReadyEvent() { - if slf.messageReadyEventHandles.Len() == 0 { + if slf.messageReadyEventHandlers.Len() == 0 { return } defer func() { @@ -426,7 +429,7 @@ func (slf *event) OnMessageReadyEvent() { debug.PrintStack() } }() - slf.messageReadyEventHandles.RangeValue(func(index int, value MessageReadyEventHandle) bool { + slf.messageReadyEventHandlers.RangeValue(func(index int, value MessageReadyEventHandler) bool { value(slf.Server) return true }) @@ -436,7 +439,7 @@ func (slf *event) check() { switch slf.network { case NetworkHttp, NetworkGRPC, NetworkNone: default: - if slf.connectionReceivePacketEventHandles.Len() == 0 { + if slf.connectionReceivePacketEventHandlers.Len() == 0 { log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) } } diff --git a/server/internal/logger/ants.go b/server/internal/logger/ants.go new file mode 100644 index 0000000..72d6875 --- /dev/null +++ b/server/internal/logger/ants.go @@ -0,0 +1,10 @@ +package logger + +import "github.com/kercylan98/minotaur/utils/log" + +type Ants struct { +} + +func (slf *Ants) Printf(format string, args ...interface{}) { + log.Warn(format, log.Any("args", args)) +} diff --git a/server/internal/logger/gnet.go b/server/internal/logger/gnet.go new file mode 100644 index 0000000..c169e49 --- /dev/null +++ b/server/internal/logger/gnet.go @@ -0,0 +1,29 @@ +package logger + +import ( + "fmt" + "github.com/kercylan98/minotaur/utils/log" +) + +type GNet struct { +} + +func (slf *GNet) Debugf(format string, args ...interface{}) { + log.Debug(fmt.Sprintf(format, args...)) +} + +func (slf *GNet) Infof(format string, args ...interface{}) { + log.Info(fmt.Sprintf(format, args...)) +} + +func (slf *GNet) Warnf(format string, args ...interface{}) { + log.Warn(fmt.Sprintf(format, args...)) +} + +func (slf *GNet) Errorf(format string, args ...interface{}) { + log.Error(fmt.Sprintf(format, args...)) +} + +func (slf *GNet) Fatalf(format string, args ...interface{}) { + log.Fatal(fmt.Sprintf(format, args...)) +} diff --git a/server/message.go b/server/message.go index 69c5082..500ace4 100644 --- a/server/message.go +++ b/server/message.go @@ -44,6 +44,9 @@ const ( // MessageTypeSystem 系统消息类型 MessageTypeSystem + + // MessageTypeShunt 普通分流消息类型 + MessageTypeShunt ) var messageNames = map[MessageType]string{ @@ -60,6 +63,7 @@ var messageNames = map[MessageType]string{ MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync", MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback", MessageTypeSystem: "MessageTypeSystem", + MessageTypeShunt: "MessageTypeShunt", } const ( @@ -209,3 +213,9 @@ func (slf *Message) castToErrorMessage(err error, action MessageErrorAction, mar slf.t, slf.err, slf.errAction, slf.marks = MessageTypeError, err, action, mark return slf } + +// castToShuntMessage 将消息转换为分流消息 +func (slf *Message) castToShuntMessage(conn *Conn, caller func(), mark ...log.Field) *Message { + slf.t, slf.conn, slf.ordinaryHandler, slf.marks = MessageTypeShunt, conn, caller, mark + return slf +} diff --git a/server/multiple.go b/server/multiple.go index e4ec259..602a0b7 100644 --- a/server/multiple.go +++ b/server/multiple.go @@ -40,7 +40,7 @@ func (slf *MultipleServer) Run() { go func(address string, server *Server) { var lock sync.Mutex var startFinish bool - server.startFinishEventHandles.Append(func(srv *Server) { + server.startFinishEventHandlers.Append(func(srv *Server) { lock.Lock() defer lock.Unlock() if !startFinish { diff --git a/server/options.go b/server/options.go index 53a1def..8374a61 100644 --- a/server/options.go +++ b/server/options.go @@ -5,7 +5,6 @@ import ( "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/timer" "google.golang.org/grpc" - "runtime/debug" "time" ) @@ -29,19 +28,18 @@ type option struct { } type runtime struct { - deadlockDetect time.Duration // 是否开启死锁检测 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - messagePoolSize int // 消息池大小 - ticker *timer.Ticker // 定时器 - tickerAutonomy bool // 定时器是否独立运行 - connTickerSize int // 连接定时器大小 - websocketReadDeadline time.Duration // websocket连接超时时间 - websocketCompression int // websocket压缩等级 - websocketWriteCompression bool // websocket写入压缩 - limitLife time.Duration // 限制最大生命周期 - shuntMatcher func(conn *Conn) string // 分流匹配器 - packetWarnSize int // 数据包大小警告 + deadlockDetect time.Duration // 是否开启死锁检测 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + messagePoolSize int // 消息池大小 + ticker *timer.Ticker // 定时器 + tickerAutonomy bool // 定时器是否独立运行 + connTickerSize int // 连接定时器大小 + websocketReadDeadline time.Duration // websocket连接超时时间 + websocketCompression int // websocket压缩等级 + websocketWriteCompression bool // websocket写入压缩 + limitLife time.Duration // 限制最大生命周期 + packetWarnSize int // 数据包大小警告 } // WithPacketWarnSize 通过数据包大小警告的方式创建服务器,当数据包大小超过指定大小时,将会输出 WARN 类型的日志 @@ -58,32 +56,6 @@ func WithPacketWarnSize(size int) Option { } } -// WithShunt 通过连接数据包分流的方式创建服务器 -// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况 -// - shuntMatcher:用于匹配连接的函数,返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道 -// -// 将被分流的消息类型(更多类型有待斟酌): -// - MessageTypePacket -// -// 注意事项: -// - 当分流匹配过程发生 panic 将会在系统通道内处理消息,并打印日志 -func WithShunt(shuntMatcher func(conn *Conn) string) Option { - return func(srv *Server) { - if shuntMatcher == nil { - log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil")) - return - } - srv.shuntMatcher = func(conn *Conn) string { - defer func() { - if err := recover(); err != nil { - log.Error("ShuntMatcher", log.String("State", "Panic"), log.Any("Error", err), log.String("Stack", string(debug.Stack()))) - } - }() - return shuntMatcher(conn) - } - } -} - // WithLimitLife 通过限制最大生命周期的方式创建服务器 // - 通常用于测试服务器,服务器将在到达最大生命周期时自动关闭 func WithLimitLife(t time.Duration) Option { @@ -158,6 +130,8 @@ func WithWebsocketReadDeadline(t time.Duration) Option { } // WithTicker 通过定时器创建服务器,为服务器添加定时器功能 +// - size:服务器定时器时间轮大小 +// - connSize:服务器连接定时器时间轮大小 // - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题) func WithTicker(size, connSize int, autonomy bool) Option { return func(srv *Server) { @@ -195,14 +169,6 @@ func WithGRPCServerOptions(options ...grpc.ServerOption) Option { } } -// WithRunMode 通过特定模式运行服务器 -// - 默认为 RunModeDev -func WithRunMode(mode RunMode) Option { - return func(srv *Server) { - srv.runMode = mode - } -} - // WithWebsocketMessageType 设置仅支持特定类型的Websocket消息 func WithWebsocketMessageType(messageTypes ...int) Option { return func(srv *Server) { diff --git a/server/server.go b/server/server.go index 9626d6d..47fb883 100644 --- a/server/server.go +++ b/server/server.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" + "github.com/kercylan98/minotaur/server/internal/logger" "github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/network" @@ -14,7 +15,6 @@ import ( "github.com/kercylan98/minotaur/utils/timer" "github.com/panjf2000/ants/v2" "github.com/panjf2000/gnet" - "github.com/panjf2000/gnet/pkg/logging" "github.com/xtaci/kcp-go/v5" "google.golang.org/grpc" "net" @@ -36,13 +36,15 @@ func New(network Network, options ...Option) *Server { messagePoolSize: DefaultMessageBufferSize, packetWarnSize: DefaultPacketWarnSize, }, - option: &option{}, - network: network, - online: concurrent.NewBalanceMap[string, *Conn](), - closeChannel: make(chan struct{}, 1), - systemSignal: make(chan os.Signal, 1), - ctx: context.Background(), - dispatchers: make(map[string]*dispatcher), + option: &option{}, + network: network, + online: concurrent.NewBalanceMap[string, *Conn](), + closeChannel: make(chan struct{}, 1), + systemSignal: make(chan os.Signal, 1), + ctx: context.Background(), + dispatchers: make(map[string]*dispatcher), + dispatcherMember: map[string]map[string]*Conn{}, + currDispatcher: map[string]*dispatcher{}, } server.event = newEvent(server) @@ -67,7 +69,7 @@ func New(network Network, options ...Option) *Server { server.antsPoolSize = DefaultAsyncPoolSize } var err error - server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.GetLogger())) + server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(new(logger.Ants))) if err != nil { panic(err) } @@ -91,9 +93,9 @@ type Server struct { messagePool *concurrent.Pool[*Message] // 消息池 ctx context.Context // 上下文 online *concurrent.BalanceMap[string, *Conn] // 在线连接 + systemDispatcher *dispatcher // 系统消息分发器 network Network // 网络类型 addr string // 侦听地址 - runMode RunMode // 运行模式 systemSignal chan os.Signal // 系统信号 closeChannel chan struct{} // 关闭信号 multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 @@ -102,7 +104,9 @@ type Server struct { isShutdown atomic.Bool // 是否已关闭 messageCounter atomic.Int64 // 消息计数器 isRunning bool // 是否正在运行 - dispatchers map[string]*dispatcher // 消息分发器 + dispatchers map[string]*dispatcher // 消息分发器集合 + dispatcherMember map[string]map[string]*Conn // 消息分发器包含的连接 + currDispatcher map[string]*dispatcher // 当前连接所处消息分发器 } // Run 使用特定地址运行服务器 @@ -126,6 +130,7 @@ func (slf *Server) Run(addr string) error { } slf.event.check() slf.addr = addr + slf.systemDispatcher = generateDispatcher(serverSystemDispatcher, slf.dispatchMessage) var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr) var messageInitFinish = make(chan struct{}, 1) var connectionInitHandle = func(callback func()) { @@ -147,8 +152,7 @@ func (slf *Server) Run(addr string) error { } go func() { messageInitFinish <- struct{}{} - d, _ := slf.useDispatcher(serverSystemDispatcher) - d.start() + slf.systemDispatcher.start() }() } @@ -177,8 +181,7 @@ func (slf *Server) Run(addr string) error { slf.isRunning = true slf.OnStartBeforeEvent() if err := gnet.Serve(slf.gServer, protoAddr, - gnet.WithLogger(log.GetLogger()), - gnet.WithLogLevel(super.If(slf.runMode == RunModeProd, logging.ErrorLevel, logging.DebugLevel)), + gnet.WithLogger(new(logger.GNet)), gnet.WithTicker(true), gnet.WithMulticore(true), ); err != nil { @@ -202,7 +205,6 @@ func (slf *Server) Run(addr string) error { conn := newKcpConn(slf, session) slf.OnConnectionOpenedEvent(conn) - slf.OnConnectionOpenedAfterEvent(conn) go func(conn *Conn) { defer func() { @@ -230,18 +232,19 @@ func (slf *Server) Run(addr string) error { } }) case NetworkHttp: - switch slf.runMode { - case RunModeDev: - gin.SetMode(gin.DebugMode) - case RunModeTest: - gin.SetMode(gin.TestMode) - case RunModeProd: - gin.SetMode(gin.ReleaseMode) - } go func() { slf.isRunning = true slf.OnStartBeforeEvent() slf.httpServer.Addr = slf.addr + gin.SetMode(gin.ReleaseMode) + slf.ginServer.Use(func(c *gin.Context) { + t := time.Now() + c.Next() + log.Info("Server", log.String("type", "http"), + log.String("method", c.Request.Method), log.Int("status", c.Writer.Status()), + log.String("ip", c.ClientIP()), log.String("path", c.Request.URL.Path), + log.Duration("cost", time.Since(t))) + }) go connectionInitHandle(nil) if len(slf.certFile)+len(slf.keyFile) > 0 { if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil { @@ -569,30 +572,70 @@ func (slf *Server) GetMessageCount() int64 { return slf.messageCounter.Load() } -// useDispatcher 添加消息分发器 -// - 该函数在分发器不重复的情况下将创建分发器,当分发器已存在将直接返回 -func (slf *Server) useDispatcher(name string) (*dispatcher, bool) { +// UseShunt 切换连接所使用的消息分流渠道,当分流渠道 name 不存在时将会创建一个新的分流渠道,否则将会加入已存在的分流渠道 +// - 默认情况下,所有连接都使用系统通道进行消息分发,当指定消息分流渠道时,将会使用指定的消息分流渠道进行消息分发 +func (slf *Server) UseShunt(conn *Conn, name string) { slf.dispatcherLock.Lock() + defer slf.dispatcherLock.Unlock() d, exist := slf.dispatchers[name] - if exist { - slf.dispatcherLock.Unlock() - return d, false + if !exist { + d = generateDispatcher(name, slf.dispatchMessage) + go d.start() + slf.dispatchers[name] = d } - d = generateDispatcher(slf.dispatchMessage) - slf.dispatchers[name] = d - slf.dispatcherLock.Unlock() - return d, true + + curr, exist := slf.currDispatcher[conn.GetID()] + if exist { + if curr.name == name { + return + } + + delete(slf.dispatcherMember[curr.name], conn.GetID()) + if len(slf.dispatcherMember[curr.name]) == 0 { + curr.close() + delete(slf.dispatchers, curr.name) + } + } + + member, exist := slf.dispatcherMember[name] + if !exist { + member = map[string]*Conn{} + slf.dispatcherMember[name] = member + } + + member[conn.GetID()] = conn +} + +// getConnDispatcher 获取连接所使用的消息分发器 +func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher { + if conn == nil { + return slf.systemDispatcher + } + slf.dispatcherLock.RLock() + defer slf.dispatcherLock.RUnlock() + d, exist := slf.currDispatcher[conn.GetID()] + if exist { + return d + } + return slf.systemDispatcher } // releaseDispatcher 关闭消息分发器 -func (slf *Server) releaseDispatcher(name string) { - slf.dispatcherLock.Lock() - d, exist := slf.dispatchers[name] - if exist { - delete(slf.dispatchers, name) - d.close() +func (slf *Server) releaseDispatcher(conn *Conn) { + if conn == nil { + return + } + slf.dispatcherLock.Lock() + defer slf.dispatcherLock.Unlock() + d, exist := slf.currDispatcher[conn.GetID()] + if exist { + delete(slf.dispatcherMember[d.name], conn.GetID()) + if len(slf.dispatcherMember[d.name]) == 0 { + d.close() + delete(slf.dispatchers, d.name) + } + delete(slf.currDispatcher, conn.GetID()) } - slf.dispatcherLock.Unlock() } // pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 @@ -603,20 +646,13 @@ func (slf *Server) pushMessage(message *Message) { } var dispatcher *dispatcher switch message.t { - case MessageTypePacket: - if slf.shuntMatcher == nil { - dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) - break - } - fallthrough - case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback: - var created bool - dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn)) - if created { - go dispatcher.start() - } + case MessageTypePacket, + MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, + MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback, + MessageTypeShunt: + dispatcher = slf.getConnDispatcher(message.conn) case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker: - dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) + dispatcher = slf.systemDispatcher } if dispatcher == nil { return @@ -757,7 +793,7 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { } case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback: msg.errHandler(msg.err) - case MessageTypeSystem: + case MessageTypeSystem, MessageTypeShunt: msg.ordinaryHandler() default: log.Warn("Server", log.String("not support message type", msg.t.String())) @@ -791,20 +827,12 @@ func (slf *Server) PushAsyncCallbackMessage(err error, callback func(err error), // - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发 // - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushAsyncMessage(caller, callback) - return - } slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...)) } // PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 // - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发 func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushAsyncCallbackMessage(err, callback) - return - } slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...)) } @@ -812,7 +840,7 @@ func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback // - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息 func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) { slf.pushMessage(slf.messagePool.Get().castToPacketMessage( - &Conn{ctx: context.WithValue(conn.ctx, contextKeyWST, wst), connection: conn.connection}, + &Conn{wst: wst, connection: conn.connection}, packet, )) } @@ -829,13 +857,9 @@ func (slf *Server) PushTickerMessage(name string, caller func(), mark ...log.Fie } // PushShuntTickerMessage 向特定分发器中推送 MessageTypeTicker 消息,消息执行与 MessageTypeTicker 一致 -// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushTickerMessage 进行转发 +// - 需要注意的是,当未指定 UseShunt 时,将会通过 PushTickerMessage 进行转发 // - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushTickerMessage(name, caller) - return - } slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...)) } @@ -851,23 +875,15 @@ func (slf *Server) PushUniqueAsyncCallbackMessage(unique string, err error, call } // PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 -// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncMessage 进行转发 +// - 需要注意的是,当未指定 UseShunt 时,将会通过系统分流渠道进行转发 // - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息 func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushUniqueAsyncMessage(unique, caller, callback) - return - } slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...)) } // PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 -// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncCallbackMessage 进行转发 +// - 需要注意的是,当未指定 UseShunt 时,将会通过系统分流渠道进行转发 func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) { - if slf.shuntMatcher == nil { - slf.PushUniqueAsyncCallbackMessage(unique, err, callback) - return - } slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...)) } @@ -879,3 +895,8 @@ func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string func (slf *Server) PushErrorMessage(err error, errAction MessageErrorAction, mark ...log.Field) { slf.pushMessage(slf.messagePool.Get().castToErrorMessage(err, errAction, mark...)) } + +// PushShuntMessage 向特定分发器中推送 MessageTypeShunt 消息,消息执行与 MessageTypeSystem 一致,不同的是将会在特定分发器中执行 +func (slf *Server) PushShuntMessage(conn *Conn, caller func(), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToShuntMessage(conn, caller, mark...)) +} diff --git a/utils/buffer/unbounded_benchmark_test.go b/utils/buffer/unbounded_benchmark_test.go new file mode 100644 index 0000000..3d9b2bb --- /dev/null +++ b/utils/buffer/unbounded_benchmark_test.go @@ -0,0 +1,36 @@ +package buffer_test + +import ( + "github.com/kercylan98/minotaur/utils/buffer" + "testing" +) + +func BenchmarkUnboundedBuffer(b *testing.B) { + ub := buffer.NewUnboundedN[int]() + + b.Run("Put", func(b *testing.B) { + for i := 0; i < b.N; i++ { + ub.Put(i) + } + }) + + b.Run("Load", func(b *testing.B) { + for i := 0; i < b.N; i++ { + ub.Load() + } + }) + + // 先填充数据以防止 Get 被阻塞 + for i := 0; i < b.N; i++ { + ub.Put(i) + } + + b.Run("Get", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + ub.Put(i) + <-ub.Get() + ub.Load() + } + }) +} diff --git a/utils/file/file.go b/utils/file/file.go index 7f6ae98..879ccb4 100644 --- a/utils/file/file.go +++ b/utils/file/file.go @@ -163,6 +163,14 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str _ = file.Close() }() + fileSize, err := file.Seek(0, io.SeekEnd) + if err != nil { + return offset, err + } + if offset-1 >= fileSize { + return fileSize + 1, nil + } + chunks := FindLineChunksByOffset(file, offset, chunkSize) var end int64 var endMutex sync.Mutex diff --git a/utils/huge/int.go b/utils/huge/int.go index ed3993a..68db831 100644 --- a/utils/huge/int.go +++ b/utils/huge/int.go @@ -1,6 +1,9 @@ package huge -import "math/big" +import ( + "github.com/kercylan98/minotaur/utils/generic" + "math/big" +) var ( IntNegativeOne = NewInt(-1) // 默认初始化的-1值Int,应当将其当作常量使用 @@ -12,19 +15,25 @@ var ( IntTenThousand = NewInt(10000) // 默认初始化的10000值Int,应当将其当作常量使用 ) -type IntRestrain interface { - uint | uint8 | uint16 | uint32 | uint64 | int | int8 | int16 | int32 | int64 -} - type Int big.Int -func NewInt[T IntRestrain](x T, exp ...T) *Int { - num := int64(x) - i := big.NewInt(num) - for _, t := range exp { - i = i.Exp(i, big.NewInt(int64(t)), nil) +// NewInt 创建一个 Int +func NewInt[T generic.Number](x T) *Int { + return (*Int)(big.NewInt(int64(x))) +} + +// NewIntByString 通过字符串创建一个 Int +// - 如果字符串不是一个合法的数字,则返回 0 +func NewIntByString(i string) *Int { + v, suc := new(big.Int).SetString(i, 10) + if !suc { + return IntZero.Copy() } - return (*Int)(i) + return (*Int)(v) +} + +func applyIntOperation[T generic.Number](v *Int, i T, op func(*big.Int, *big.Int) *big.Int) *Int { + return (*Int)(op(v.ToBigint(), NewInt(i).ToBigint())) } func (slf *Int) Copy() *Int { @@ -86,6 +95,7 @@ func (slf *Int) ToBigint() *big.Int { return (*big.Int)(slf) } +// Cmp 比较,当 slf > i 时返回 1,当 slf < i 时返回 -1,当 slf == i 时返回 0 func (slf *Int) Cmp(i *Int) int { return slf.ToBigint().Cmp(i.ToBigint()) } @@ -286,3 +296,286 @@ func (slf *Int) SubUint64(i uint64) *Int { x := slf.ToBigint() return (*Int)(x.Sub(x, NewInt(i).ToBigint())) } + +func (slf *Int) Div(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.Div(x, i.ToBigint())) +} + +func (slf *Int) DivInt(i int) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) DivInt8(i int8) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) DivInt16(i int16) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) DivInt32(i int32) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) DivInt64(i int64) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) DivUint(i uint) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) DivUint8(i uint8) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) DivUint16(i uint16) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) DivUint32(i uint32) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) DivUint64(i uint64) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Div) +} + +func (slf *Int) Mod(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.Mod(x, i.ToBigint())) +} + +func (slf *Int) ModInt(i int) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) ModInt8(i int8) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) ModInt16(i int16) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) ModInt32(i int32) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) ModInt64(i int64) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) ModUint(i uint) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) ModUint8(i uint8) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) ModUint16(i uint16) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) ModUint32(i uint32) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) ModUint64(i uint64) *Int { + return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod) +} + +func (slf *Int) Pow(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, i.ToBigint(), nil)) +} + +func (slf *Int) PowInt(i int) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +func (slf *Int) PowInt8(i int8) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +func (slf *Int) PowInt16(i int16) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +func (slf *Int) PowInt32(i int32) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +func (slf *Int) PowInt64(i int64) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +func (slf *Int) PowUint(i uint) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +func (slf *Int) PowUint8(i uint8) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +func (slf *Int) PowUint16(i uint16) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +func (slf *Int) PowUint32(i uint32) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +func (slf *Int) PowUint64(i uint64) *Int { + x := slf.ToBigint() + return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil)) +} + +// Lsh 左移 +func (slf *Int) Lsh(i int) *Int { + x := slf.ToBigint() + return (*Int)(x.Lsh(x, uint(i))) +} + +// Rsh 右移 +func (slf *Int) Rsh(i int) *Int { + x := slf.ToBigint() + return (*Int)(x.Rsh(x, uint(i))) +} + +// And 与 +func (slf *Int) And(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.And(x, i.ToBigint())) +} + +// AndNot 与非 +func (slf *Int) AndNot(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.AndNot(x, i.ToBigint())) +} + +// Or 或 +func (slf *Int) Or(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.Or(x, i.ToBigint())) +} + +// Xor 异或 +func (slf *Int) Xor(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.Xor(x, i.ToBigint())) +} + +// Not 非 +func (slf *Int) Not() *Int { + x := slf.ToBigint() + return (*Int)(x.Not(x)) +} + +// Sqrt 平方根 +func (slf *Int) Sqrt() *Int { + x := slf.ToBigint() + return (*Int)(x.Sqrt(x)) +} + +// GCD 最大公约数 +func (slf *Int) GCD(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.GCD(nil, nil, x, i.ToBigint())) +} + +// LCM 最小公倍数 +func (slf *Int) LCM(i *Int) *Int { + sb := slf.ToBigint() + ib := i.ToBigint() + gcd := new(big.Int).GCD(nil, nil, sb, ib) + absProduct := new(big.Int).Mul(sb, ib).Abs(new(big.Int).Mul(sb, ib)) + lcm := new(big.Int).Div(absProduct, gcd) + return (*Int)(lcm) +} + +// ModInverse 模反元素 +func (slf *Int) ModInverse(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.ModInverse(x, i.ToBigint())) +} + +// ModSqrt 模平方根 +func (slf *Int) ModSqrt(i *Int) *Int { + x := slf.ToBigint() + return (*Int)(x.ModSqrt(x, i.ToBigint())) +} + +// BitLen 二进制长度 +func (slf *Int) BitLen() int { + return slf.ToBigint().BitLen() +} + +// Bit 二进制位 +func (slf *Int) Bit(i int) uint { + return slf.ToBigint().Bit(i) +} + +// SetBit 设置二进制位 +func (slf *Int) SetBit(i int, v uint) *Int { + x := slf.ToBigint() + return (*Int)(x.SetBit(x, i, v)) +} + +// Neg 返回数字的相反数 +func (slf *Int) Neg() *Int { + x := slf.ToBigint() + return (*Int)(x.Neg(x)) +} + +// Abs 返回数字的绝对值 +func (slf *Int) Abs() *Int { + x := slf.ToBigint() + return (*Int)(x.Abs(x)) +} + +// Sign 返回数字的符号 +// - 1:正数 +// - 0:零 +// - -1:负数 +func (slf *Int) Sign() int { + return slf.ToBigint().Sign() +} + +// IsPositive 是否为正数 +func (slf *Int) IsPositive() bool { + return slf.Sign() > 0 +} + +// IsNegative 是否为负数 +func (slf *Int) IsNegative() bool { + return slf.Sign() < 0 +} + +// IsEven 是否为偶数 +func (slf *Int) IsEven() bool { + return slf.ToBigint().Bit(0) == 0 +} + +// IsOdd 是否为奇数 +func (slf *Int) IsOdd() bool { + return slf.ToBigint().Bit(0) == 1 +} + +// ProportionalCalc 比例计算,该函数会再 formula 返回值的基础上除以 proportional +// - formula 为计算公式,该公式的参数为调用该函数的 Int 的拷贝 +func (slf *Int) ProportionalCalc(proportional *Int, formula func(v *Int) *Int) *Int { + return formula(slf.Copy()).Div(proportional) +} diff --git a/utils/log/core.go b/utils/log/core.go deleted file mode 100644 index 5d8965e..0000000 --- a/utils/log/core.go +++ /dev/null @@ -1,5 +0,0 @@ -package log - -import "go.uber.org/zap/zapcore" - -type Core = zapcore.Core diff --git a/utils/log/encoder.go b/utils/log/encoder.go index c7d5570..218cc49 100644 --- a/utils/log/encoder.go +++ b/utils/log/encoder.go @@ -1,26 +1,39 @@ package log import ( + "go.uber.org/zap" "go.uber.org/zap/zapcore" - "time" + "gopkg.in/natefinch/lumberjack.v2" ) -type Encoder = zapcore.Encoder - -// NewEncoder 创建一个 Minotaur 默认使用的编码器 -func NewEncoder() Encoder { - return zapcore.NewConsoleEncoder(zapcore.EncoderConfig{ - MessageKey: "msg", - LevelKey: "level", - EncodeLevel: zapcore.CapitalLevelEncoder, - TimeKey: "ts", - EncodeTime: func(t time.Time, enc zapcore.PrimitiveArrayEncoder) { - enc.AppendString(t.Format(time.DateTime)) - }, - CallerKey: "file", - EncodeCaller: zapcore.ShortCallerEncoder, - EncodeDuration: func(d time.Duration, enc zapcore.PrimitiveArrayEncoder) { - enc.AppendInt64(int64(d) / 1000000) - }, - }) +type Encoder struct { + e zapcore.Encoder + cores []Core + conf *Config +} + +func (slf *Encoder) Split(config *lumberjack.Logger) *Encoder { + slf.cores = append(slf.cores, zapcore.NewCore(slf.e, zapcore.AddSync(config), zapcore.DebugLevel)) + return slf +} + +func (slf *Encoder) AddCore(ws WriteSyncer, enab LevelEnabler) *Encoder { + slf.cores = append(slf.cores, zapcore.NewCore(slf.e, ws, enab)) + return slf +} + +func (slf *Encoder) Build(options ...LoggerOption) *Minotaur { + l, err := slf.conf.Build() + if err != nil { + panic(err) + } + options = append([]LoggerOption{zap.AddCaller(), zap.AddCallerSkip(1)}, options...) + options = append(options, zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return zapcore.NewTee(append(slf.cores, core)...) + })) + l = l.WithOptions(options...) + return &Minotaur{ + Logger: l, + Sugared: l.Sugar(), + } } diff --git a/utils/log/level.go b/utils/log/level.go index b1dac5b..fd7a0ae 100644 --- a/utils/log/level.go +++ b/utils/log/level.go @@ -1,13 +1,6 @@ package log -import ( - "github.com/kercylan98/minotaur/utils/hash" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -type Level = zapcore.Level -type LevelEnablerFunc = zap.LevelEnablerFunc +import "go.uber.org/zap/zapcore" const ( // DebugLevel 调试级别日志通常非常庞大,并且通常在生产中被禁用 @@ -25,86 +18,3 @@ const ( // FatalLevel 记录一条消息,然后调用 os.Exit(1) FatalLevel Level = zapcore.FatalLevel ) - -var ( - levels = []Level{DebugLevel, InfoLevel, WarnLevel, ErrorLevel, DPanicLevel, PanicLevel, FatalLevel} - defaultLevelPartition = map[Level]func() LevelEnablerFunc{ - DebugLevel: DebugLevelPartition, - InfoLevel: InfoLevelPartition, - WarnLevel: WarnLevelPartition, - ErrorLevel: ErrorLevelPartition, - DPanicLevel: DPanicLevelPartition, - PanicLevel: PanicLevelPartition, - FatalLevel: FatalLevelPartition, - } -) - -// Levels 返回所有日志级别 -func Levels() []Level { - return levels -} - -// MultiLevelPartition 返回一个 LevelEnablerFunc,该函数在指定的多个级别时返回 true -// - 该函数被用于划分不同级别的日志输出 -func MultiLevelPartition(levels ...Level) LevelEnablerFunc { - var levelMap = hash.ToIterator(levels) - return func(level zapcore.Level) bool { - return hash.Exist(levelMap, level) - } -} - -// DebugLevelPartition 返回一个 LevelEnablerFunc,该函数在 DebugLevel 时返回 true -// - 该函数被用于划分不同级别的日志输出 -func DebugLevelPartition() LevelEnablerFunc { - return func(level zapcore.Level) bool { - return level == DebugLevel - } -} - -// InfoLevelPartition 返回一个 LevelEnablerFunc,该函数在 InfoLevel 时返回 true -// - 该函数被用于划分不同级别的日志输出 -func InfoLevelPartition() LevelEnablerFunc { - return func(level zapcore.Level) bool { - return level == InfoLevel - } -} - -// WarnLevelPartition 返回一个 LevelEnablerFunc,该函数在 WarnLevel 时返回 true -// - 该函数被用于划分不同级别的日志输出 -func WarnLevelPartition() LevelEnablerFunc { - return func(level zapcore.Level) bool { - return level == WarnLevel - } -} - -// ErrorLevelPartition 返回一个 LevelEnablerFunc,该函数在 ErrorLevel 时返回 true -// - 该函数被用于划分不同级别的日志输出 -func ErrorLevelPartition() LevelEnablerFunc { - return func(level zapcore.Level) bool { - return level == ErrorLevel - } -} - -// DPanicLevelPartition 返回一个 LevelEnablerFunc,该函数在 DPanicLevel 时返回 true -// - 该函数被用于划分不同级别的日志输出 -func DPanicLevelPartition() LevelEnablerFunc { - return func(level zapcore.Level) bool { - return level == DPanicLevel - } -} - -// PanicLevelPartition 返回一个 LevelEnablerFunc,该函数在 PanicLevel 时返回 true -// - 该函数被用于划分不同级别的日志输出 -func PanicLevelPartition() LevelEnablerFunc { - return func(level zapcore.Level) bool { - return level == PanicLevel - } -} - -// FatalLevelPartition 返回一个 LevelEnablerFunc,该函数在 FatalLevel 时返回 true -// - 该函数被用于划分不同级别的日志输出 -func FatalLevelPartition() LevelEnablerFunc { - return func(level zapcore.Level) bool { - return level == FatalLevel - } -} diff --git a/utils/log/log.go b/utils/log/log.go deleted file mode 100644 index 2226cdc..0000000 --- a/utils/log/log.go +++ /dev/null @@ -1,170 +0,0 @@ -package log - -import ( - "fmt" - "github.com/kercylan98/minotaur/utils/str" - "github.com/kercylan98/minotaur/utils/times" - rotateLogs "github.com/lestrrat-go/file-rotatelogs" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "os" - "path/filepath" - "strings" -) - -// NewLog 创建一个日志记录器 -func NewLog(options ...Option) *Log { - log := &Log{ - filename: func(level Level) string { - return fmt.Sprintf("%s.log", level.String()) - }, - rotateFilename: func(level Level) string { - return strings.Join([]string{level.String(), "%Y%m%d.log"}, ".") - }, - levelPartition: defaultLevelPartition, - } - - for _, option := range options { - option(log) - } - - if len(log.rotateOptions) == 0 { - log.rotateOptions = []rotateLogs.Option{ - rotateLogs.WithMaxAge(times.Week), - rotateLogs.WithRotationTime(times.Day), - } - } - - if len(log.cores) == 0 { - var encoder = NewEncoder() - - switch log.mode { - case RunModeDev: - var partition LevelEnablerFunc = func(lvl Level) bool { - return true - } - log.cores = append(log.cores, zapcore.NewCore(encoder, os.Stdout, partition)) - case RunModeTest, RunModeProd: - if log.mode == RunModeTest { - infoRotate, err := rotateLogs.New( - filepath.Join(log.rotateLogDir, log.rotateFilename(InfoLevel)), - append([]rotateLogs.Option{rotateLogs.WithLinkName(filepath.Join(log.logDir, log.filename(InfoLevel)))}, log.rotateOptions...)..., - ) - if err != nil { - panic(err) - } - errRotate, err := rotateLogs.New( - filepath.Join(log.rotateLogDir, log.rotateFilename(ErrorLevel)), - append([]rotateLogs.Option{rotateLogs.WithLinkName(filepath.Join(log.logDir, log.filename(ErrorLevel)))}, log.rotateOptions...)..., - ) - if err != nil { - panic(err) - } - if log.logDir != str.None { - log.cores = append(log.cores, zapcore.NewCore(encoder, zapcore.AddSync(infoRotate), LevelEnablerFunc(func(lvl Level) bool { return lvl < ErrorLevel }))) - log.cores = append(log.cores, zapcore.NewCore(encoder, zapcore.AddSync(errRotate), LevelEnablerFunc(func(lvl Level) bool { return lvl >= ErrorLevel }))) - log.cores = append(log.cores, zapcore.NewCore(encoder, os.Stdout, LevelEnablerFunc(func(lvl Level) bool { return lvl < ErrorLevel }))) - log.cores = append(log.cores, zapcore.NewCore(encoder, os.Stdout, LevelEnablerFunc(func(lvl Level) bool { return lvl >= ErrorLevel }))) - } - } else { - infoRotate, err := rotateLogs.New( - filepath.Join(log.rotateLogDir, log.rotateFilename(InfoLevel)), - append([]rotateLogs.Option{rotateLogs.WithLinkName(filepath.Join(log.logDir, log.filename(InfoLevel)))}, log.rotateOptions...)..., - ) - if err != nil { - panic(err) - } - errRotate, err := rotateLogs.New( - filepath.Join(log.rotateLogDir, log.rotateFilename(ErrorLevel)), - append([]rotateLogs.Option{rotateLogs.WithLinkName(filepath.Join(log.logDir, log.filename(ErrorLevel)))}, log.rotateOptions...)..., - ) - if err != nil { - panic(err) - } - if log.logDir != str.None { - log.cores = append(log.cores, zapcore.NewCore(encoder, zapcore.AddSync(infoRotate), LevelEnablerFunc(func(lvl Level) bool { return lvl == InfoLevel }))) - log.cores = append(log.cores, zapcore.NewCore(encoder, zapcore.AddSync(errRotate), LevelEnablerFunc(func(lvl Level) bool { return lvl >= ErrorLevel }))) - } - } - } - } - - log.zap = zap.New(zapcore.NewTee(log.cores...), zap.AddCaller(), zap.AddCallerSkip(2)) - log.sugar = log.zap.Sugar() - return log -} - -type Log struct { - zap *zap.Logger - sugar *zap.SugaredLogger - filename func(level Level) string - rotateFilename func(level Level) string - rotateOptions []rotateLogs.Option - levelPartition map[Level]func() LevelEnablerFunc - cores []Core - mode RunMode - logDir string - rotateLogDir string -} - -func (slf *Log) Debugf(format string, args ...interface{}) { - slf.sugar.Debugf(format, args...) -} - -func (slf *Log) Infof(format string, args ...interface{}) { - slf.sugar.Infof(format, args...) -} - -func (slf *Log) Warnf(format string, args ...interface{}) { - slf.sugar.Warnf(format, args...) -} - -func (slf *Log) Errorf(format string, args ...interface{}) { - slf.sugar.Errorf(format, args...) -} - -func (slf *Log) Fatalf(format string, args ...interface{}) { - slf.sugar.Fatalf(format, args...) -} - -func (slf *Log) Printf(format string, args ...interface{}) { - slf.sugar.Infof(format, args...) -} - -// Debug 在 DebugLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 -func (slf *Log) Debug(msg string, fields ...Field) { - slf.zap.Debug(msg, fields...) -} - -// Info 在 InfoLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 -func (slf *Log) Info(msg string, fields ...Field) { - slf.zap.Info(msg, fields...) -} - -// Warn 在 WarnLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 -func (slf *Log) Warn(msg string, fields ...Field) { - slf.zap.Warn(msg, fields...) -} - -// Error 在 ErrorLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 -func (slf *Log) Error(msg string, fields ...Field) { - slf.zap.Error(msg, fields...) -} - -// DPanic 在 DPanicLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 -// - 如果记录器处于开发模式,它就会出现 panic(DPanic 的意思是“development panic”)。这对于捕获可恢复但不应该发生的错误很有用 -func (slf *Log) DPanic(msg string, fields ...Field) { - slf.zap.DPanic(msg, fields...) -} - -// Panic 在 PanicLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 -// - 即使禁用了 PanicLevel 的日志记录,记录器也会出现 panic -func (slf *Log) Panic(msg string, fields ...Field) { - slf.zap.Panic(msg, fields...) -} - -// Fatal 在 FatalLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 -// - 然后记录器调用 os.Exit(1),即使 FatalLevel 的日志记录被禁用 -func (slf *Log) Fatal(msg string, fields ...Field) { - slf.zap.Fatal(msg, fields...) -} diff --git a/utils/log/logger.go b/utils/log/logger.go index a0a1c96..2b35dfe 100644 --- a/utils/log/logger.go +++ b/utils/log/logger.go @@ -1,16 +1,22 @@ package log -import ( - "github.com/panjf2000/ants/v2" - "github.com/panjf2000/gnet/pkg/logging" -) +var logger Logger -var logger Logger = NewLog() +func init() { + logger = Default().Build() +} + +// SetLogger 设置日志记录器 +func SetLogger(l Logger) { + if m, ok := l.(*Minotaur); ok && m != nil { + _ = m.Sync() + _ = m.Sugared.Sync() + } + logger = l +} // Logger 适用于 Minotaur 的日志接口 type Logger interface { - ants.Logger - logging.Logger // Debug 在 DebugLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 Debug(msg string, fields ...Field) // Info 在 InfoLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 @@ -67,13 +73,3 @@ func Panic(msg string, fields ...Field) { func Fatal(msg string, fields ...Field) { logger.Fatal(msg, fields...) } - -// SetLogger 设置日志记录器 -func SetLogger(log Logger) { - logger = log -} - -// GetLogger 获取日志记录器 -func GetLogger() Logger { - return logger -} diff --git a/utils/log/minotaur.go b/utils/log/minotaur.go new file mode 100644 index 0000000..73efa10 --- /dev/null +++ b/utils/log/minotaur.go @@ -0,0 +1,8 @@ +package log + +import "go.uber.org/zap" + +type Minotaur struct { + *zap.Logger + Sugared *zap.SugaredLogger +} diff --git a/utils/log/options.go b/utils/log/options.go index c00297c..37dd5a6 100644 --- a/utils/log/options.go +++ b/utils/log/options.go @@ -1,52 +1,239 @@ package log import ( - rotateLogs "github.com/lestrrat-go/file-rotatelogs" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "io" + "time" ) -type Option func(log *Log) +type ( + Config = zap.Config + Level = zapcore.Level + LevelEncoder = zapcore.LevelEncoder + TimeEncoder = zapcore.TimeEncoder + DurationEncoder = zapcore.DurationEncoder + CallerEncoder = zapcore.CallerEncoder + NameEncoder = zapcore.NameEncoder + ReflectedEncoder = zapcore.ReflectedEncoder + WriteSyncer = zapcore.WriteSyncer + LoggerOption = zap.Option + Core = zapcore.Core + LevelEnabler = zapcore.LevelEnabler + Option func(config *Config) +) -// WithRunMode 设置运行模式 -// - 默认的运行模式为: RunModeDev -// - 当 handle 不为空时,将会调用 handle(),并将返回值添加到日志记录器中,同时将会抑制默认的日志记录器 -func WithRunMode(mode RunMode, handle func() Core) Option { - return func(log *Log) { - log.mode = mode - if handle != nil { - log.cores = append(log.cores, handle()) +func Default(opts ...Option) *Encoder { + config := &zap.Config{ + Level: zap.NewAtomicLevelAt(zap.DebugLevel), + Development: true, + DisableStacktrace: true, + Sampling: &zap.SamplingConfig{ + Initial: 100, + Thereafter: 100, + }, + EncoderConfig: zapcore.EncoderConfig{ + TimeKey: "Time", + LevelKey: "Level", + NameKey: "Name", + CallerKey: "Caller", + MessageKey: "Msg", + StacktraceKey: "Stack", + EncodeLevel: zapcore.CapitalLevelEncoder, + EncodeTime: func(t time.Time, encoder zapcore.PrimitiveArrayEncoder) { + encoder.AppendString(t.Format(time.DateTime)) + }, + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + + // 应用选项 + for _, opt := range opts { + opt(config) + } + + if len(config.Encoding) == 0 { + if config.Development { + config.Encoding = "console" + } else { + config.Encoding = "json" } } + + var encoder = new(Encoder) + encoder.conf = config + switch config.Encoding { + case "console": + encoder.e = zapcore.NewConsoleEncoder(config.EncoderConfig) + case "json": + encoder.e = zapcore.NewJSONEncoder(config.EncoderConfig) + default: + panic("unknown encoding") + } + + return encoder } -// WithFilename 设置日志文件名 -// - 默认的日志文件名为: {level}.log -func WithFilename(filename func(level Level) string) Option { - return func(log *Log) { - log.filename = filename +// WithLevel 设置日志级别 +func WithLevel(level Level) Option { + return func(config *Config) { + config.Level.SetLevel(level) } } -// WithRotateFilename 设置日志分割文件名 -// - 默认的日志分割文件名为: {level}.%Y%m%d.log -func WithRotateFilename(filename func(level Level) string) Option { - return func(log *Log) { - log.rotateFilename = filename +// WithDevelopment 设置是否为开发模式 +func WithDevelopment(development bool) Option { + return func(config *Config) { + config.Development = development } } -// WithRotateOption 设置日志分割选项 -// - 默认的日志分割选项为: WithMaxAge(7天), WithRotationTime(1天) -func WithRotateOption(options ...rotateLogs.Option) Option { - return func(log *Log) { - log.rotateOptions = options +// WithDisableCaller 设置是否禁用调用者 +func WithDisableCaller(disableCaller bool) Option { + return func(config *Config) { + config.DisableCaller = disableCaller } } -// WithLogDir 设置日志文件夹 -// - 默认情况下不会设置日志文件夹,日志将不会被文件存储 -func WithLogDir(logDir, rotateLogDir string) Option { - return func(log *Log) { - log.logDir = logDir - log.rotateLogDir = rotateLogDir +// WithDisableStacktrace 设置是否禁用堆栈跟踪 +func WithDisableStacktrace(disableStacktrace bool) Option { + return func(config *Config) { + config.DisableStacktrace = disableStacktrace + } +} + +// WithSampling 设置采样策略 +func WithSampling(sampling *zap.SamplingConfig) Option { + return func(config *Config) { + config.Sampling = sampling + } +} + +// WithEncoding 设置编码器 +func WithEncoding(encoding string) Option { + return func(config *Config) { + config.Encoding = encoding + } +} + +// WithEncoderMessageKey 设置消息键 +func WithEncoderMessageKey(encoderMessageKey string) Option { + return func(config *Config) { + config.EncoderConfig.MessageKey = encoderMessageKey + } +} + +// WithEncoderLevelKey 设置级别键 +func WithEncoderLevelKey(encoderLevelKey string) Option { + return func(config *Config) { + config.EncoderConfig.LevelKey = encoderLevelKey + } +} + +// WithEncoderTimeKey 设置时间键 +func WithEncoderTimeKey(encoderTimeKey string) Option { + return func(config *Config) { + config.EncoderConfig.TimeKey = encoderTimeKey + } +} + +// WithEncoderNameKey 设置名称键 +func WithEncoderNameKey(encoderNameKey string) Option { + return func(config *Config) { + config.EncoderConfig.NameKey = encoderNameKey + } +} + +// WithEncoderCallerKey 设置调用者键 +func WithEncoderCallerKey(encoderCallerKey string) Option { + return func(config *Config) { + config.EncoderConfig.CallerKey = encoderCallerKey + } +} + +// WithEncoderFunctionKey 设置函数键 +func WithEncoderFunctionKey(encoderFunctionKey string) Option { + return func(config *Config) { + config.EncoderConfig.FunctionKey = encoderFunctionKey + } +} + +// WithEncoderStacktraceKey 设置堆栈跟踪键 +func WithEncoderStacktraceKey(encoderStacktraceKey string) Option { + return func(config *Config) { + config.EncoderConfig.StacktraceKey = encoderStacktraceKey + } +} + +// WithEncoderLineEnding 设置行尾 +func WithEncoderLineEnding(encoderLineEnding string) Option { + return func(config *Config) { + config.EncoderConfig.LineEnding = encoderLineEnding + } +} + +// WithEncoderLevel 设置级别编码器 +func WithEncoderLevel(encoderLevel LevelEncoder) Option { + return func(config *Config) { + config.EncoderConfig.EncodeLevel = encoderLevel + } +} + +// WithEncoderTime 设置时间编码器 +func WithEncoderTime(encoderTime TimeEncoder) Option { + return func(config *Config) { + config.EncoderConfig.EncodeTime = encoderTime + } +} + +// WithEncoderDuration 设置持续时间编码器 +func WithEncoderDuration(encoderDuration DurationEncoder) Option { + return func(config *Config) { + config.EncoderConfig.EncodeDuration = encoderDuration + } +} + +// WithEncoderCaller 设置调用者编码器 +func WithEncoderCaller(encoderCaller CallerEncoder) Option { + return func(config *Config) { + config.EncoderConfig.EncodeCaller = encoderCaller + } +} + +// WithEncoderName 设置名称编码器 +func WithEncoderName(encoderName NameEncoder) Option { + return func(config *Config) { + config.EncoderConfig.EncodeName = encoderName + } +} + +// WithEncoderNewReflectedEncoder 设置反射编码器 +func WithEncoderNewReflectedEncoder(encoderNewReflectedEncoder func(io.Writer) ReflectedEncoder) Option { + return func(config *Config) { + config.EncoderConfig.NewReflectedEncoder = encoderNewReflectedEncoder + } +} + +// WithOutputPaths 设置输出路径 +func WithOutputPaths(outputPaths ...string) Option { + return func(config *Config) { + config.OutputPaths = outputPaths + } +} + +// WithErrorOutputPaths 设置错误输出路径 +func WithErrorOutputPaths(errorOutputPaths ...string) Option { + return func(config *Config) { + config.ErrorOutputPaths = errorOutputPaths + } +} + +// WithInitialFields 设置初始字段 +func WithInitialFields(initialFields map[string]interface{}) Option { + return func(config *Config) { + config.InitialFields = initialFields } } diff --git a/utils/log/run_mode.go b/utils/log/run_mode.go deleted file mode 100644 index b49a7d4..0000000 --- a/utils/log/run_mode.go +++ /dev/null @@ -1,16 +0,0 @@ -package log - -const ( - // RunModeDev 开发模式是默认的运行模式,同时也是最基础的运行模式 - // - 开发模式下,将会输出所有级别的日志到控制台 - // - 默认不再输出日志到文件 - RunModeDev RunMode = iota - // RunModeTest 测试模式是一种特殊的运行模式,用于测试 - // - 测试模式下,将会输出所有级别的日志到控制台和文件 - RunModeTest - // RunModeProd 生产模式是一种特殊的运行模式,用于生产 - // - 生产模式下,将会输出 InfoLevel 及以上级别的日志到控制台和文件 - RunModeProd -) - -type RunMode uint8 diff --git a/utils/log/survey/survey.go b/utils/log/survey/survey.go index 8fff7c5..a87f91e 100644 --- a/utils/log/survey/survey.go +++ b/utils/log/survey/survey.go @@ -156,8 +156,11 @@ func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R)) func IncrementAnalyze(filePath string, handle func(analyzer *Analyzer, record R)) func() (*Report, error) { var analyzer = new(Analyzer) var offset int64 + var m = new(sync.Mutex) return func() (*Report, error) { var err error + m.Lock() + defer m.Unlock() offset, err = file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { handle(analyzer, R(s)) }, offset) diff --git a/utils/sole/unique.go b/utils/sole/unique.go new file mode 100644 index 0000000..1c90ce1 --- /dev/null +++ b/utils/sole/unique.go @@ -0,0 +1,45 @@ +package sole + +import ( + "strconv" + "sync/atomic" +) + +var autoIncrementUint32 uint32 = 0 +var autoIncrementUint64 uint64 = 0 +var autoIncrementInt32 int32 = 0 +var autoIncrementInt64 int64 = 0 +var autoIncrementInt uint64 = 0 +var autoIncrementString uint64 = 0 + +// AutoIncrementUint32 获取一个自增的 uint32 值 +func AutoIncrementUint32() uint32 { + return atomic.AddUint32(&autoIncrementUint32, 1) - 1 +} + +// AutoIncrementUint64 获取一个自增的 uint64 值 +func AutoIncrementUint64() uint64 { + return atomic.AddUint64(&autoIncrementUint64, 1) - 1 +} + +// AutoIncrementInt32 获取一个自增的 int32 值 +func AutoIncrementInt32() int32 { + return atomic.AddInt32(&autoIncrementInt32, 1) - 1 +} + +// AutoIncrementInt64 获取一个自增的 int64 值 +func AutoIncrementInt64() int64 { + return atomic.AddInt64(&autoIncrementInt64, 1) - 1 +} + +// AutoIncrementInt 获取一个自增的 int 值 +func AutoIncrementInt() int { + num := atomic.AddUint64(&autoIncrementInt, 1) + result := num % (1 << (strconv.IntSize - 1)) + return int(result) +} + +// AutoIncrementString 获取一个自增的字符串 +func AutoIncrementString() string { + return strconv.FormatUint(atomic.AddUint64(&autoIncrementString, 1)-1, 10) +} diff --git a/utils/super/super.go b/utils/super/super.go index 048df86..adcdbbf 100644 --- a/utils/super/super.go +++ b/utils/super/super.go @@ -6,6 +6,11 @@ import ( ) var launchTime = time.Now() +var pid int + +func init() { + pid = os.Getpid() +} // LaunchTime 获取程序启动时间 func LaunchTime() time.Time { @@ -16,3 +21,8 @@ func LaunchTime() time.Time { func Hostname() string { return os.Getenv("HOSTNAME") } + +// PID 获取进程 PID +func PID() int { + return pid +} diff --git a/utils/super/version.go b/utils/super/version.go new file mode 100644 index 0000000..4a15b34 --- /dev/null +++ b/utils/super/version.go @@ -0,0 +1,57 @@ +package super + +import ( + "regexp" + "strings" +) + +// OldVersion 检查 version2 对于 version1 来说是不是旧版本 +func OldVersion(version1, version2 string) bool { + return CompareVersion(version1, version2) == 1 +} + +// CompareVersion 返回一个整数,用于表示两个版本号的比较结果: +// - 如果 version1 大于 version2,它将返回 1 +// - 如果 version1 小于 version2,它将返回 -1 +// - 如果 version1 和 version2 相等,它将返回 0 +func CompareVersion(version1, version2 string) int { + reg, _ := regexp.Compile("[^0-9.]+") + processedVersion1 := processVersion(reg.ReplaceAllString(version1, "")) + processedVersion2 := processVersion(reg.ReplaceAllString(version2, "")) + + n, m := len(processedVersion1), len(processedVersion2) + i, j := 0, 0 + for i < n || j < m { + x := 0 + for ; i < n && processedVersion1[i] != '.'; i++ { + x = x*10 + int(processedVersion1[i]-'0') + } + i++ // skip the dot + y := 0 + for ; j < m && processedVersion2[j] != '.'; j++ { + y = y*10 + int(processedVersion2[j]-'0') + } + j++ // skip the dot + if x > y { + return 1 + } + if x < y { + return -1 + } + } + return 0 +} + +func processVersion(version string) string { + // 移除首尾可能存在的非数字字符 + reg, _ := regexp.Compile("^[^.0-9]+|[^.0-9]+$") + version = reg.ReplaceAllString(version, "") + // 确保不出现连续的点 + version = strings.ReplaceAll(version, "..", ".") + // 移除每一部分起始的 0 + versionParts := strings.Split(version, ".") + for idx, part := range versionParts { + versionParts[idx] = strings.TrimLeft(part, "0") + } + return strings.Join(versionParts, ".") +} diff --git a/utils/super/version_test.go b/utils/super/version_test.go new file mode 100644 index 0000000..326acf1 --- /dev/null +++ b/utils/super/version_test.go @@ -0,0 +1,13 @@ +package super_test + +import ( + "github.com/kercylan98/minotaur/utils/super" + "testing" +) + +func TestCompareVersion(t *testing.T) { + t.Log(super.CompareVersion("1", "2"), -1) + t.Log(super.CompareVersion("1", "vv2"), -1) + t.Log(super.CompareVersion("1", "vv2.3.1"), -1) + t.Log(super.CompareVersion("11", "vv2.3.1"), 1) +}