Merge branch 'develop'

This commit is contained in:
kercylan98 2023-12-01 16:46:30 +08:00
commit 63d961e774
29 changed files with 1075 additions and 670 deletions

View File

@ -90,27 +90,35 @@ func main() {
> Websocket地址: ws://127.0.0.1:9999 > Websocket地址: ws://127.0.0.1:9999
### 分流服务器 ### 分流服务器
分流服务器可以将客户端分流到不同的分组上,每个分组中为串行处理,不同分组之间并行处理。 分流服务器可以将消息分流到不同的分组上,每个分组中为串行处理,不同分组之间并行处理。
```go ```go
package main package main
import "github.com/kercylan98/minotaur/server" import "github.com/kercylan98/minotaur/server"
func main() { func main() {
srv := server.New(server.NetworkWebsocket, srv := server.New(server.NetworkWebsocket)
server.WithShunt(func(conn *server.Conn) string { srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) {
return conn.GetData("roomId").(string) // 通过 user_id 进行分流,不同用户的消息将不会互相阻塞
}), srv.UseShunt(conn, conn.Gata("user_id").(string))
) })
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
conn.Write(packet) var roomId = "default"
switch string(packet) {
case "JoinRoom":
// 将用户所处的分流渠道切换到 roomId 渠道,此刻同一分流渠道的消息将会按队列顺序处理
srv.UseShunt(conn, roomId)
case "LeaveRoom":
// 将用户所处分流切换为用户自身的分流渠道
srv.UseShunt(conn, conn.Gata("user_id").(string))
}
}) })
if err := srv.Run(":9999"); err != nil { if err := srv.Run(":9999"); err != nil {
panic(err) panic(err)
} }
} }
``` ```
> 该示例中假设各房间互不干涉,故通过`server.WithShunt`将连接通过`roomId`进行分组,提高并发处理能力。 > 该示例中模拟了用户分流渠道在自身渠道和房间渠道切换的过程,通过`UseShunt`对连接分流渠道进行设置,提高并发处理能力。
### 服务器死锁检测 ### 服务器死锁检测
`Minotaur`内置了服务器消息死锁检测功能,可通过`server.WithDeadlockDetect`进行开启。 `Minotaur`内置了服务器消息死锁检测功能,可通过`server.WithDeadlockDetect`进行开启。
@ -145,7 +153,7 @@ package main
import "github.com/kercylan98/minotaur/server" import "github.com/kercylan98/minotaur/server"
func main() { func main() {
srv := server.New(server.NetworkWebsocket, server.WithTicker(50, false)) srv := server.New(server.NetworkWebsocket, server.WithTicker(50, 10, false))
if err := srv.Run(":9999"); err != nil { if err := srv.Run(":9999"); err != nil {
panic(err) panic(err)
} }

9
go.mod
View File

@ -8,9 +8,9 @@ require (
github.com/gin-contrib/pprof v1.4.0 github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
github.com/go-resty/resty/v2 v2.7.0 github.com/go-resty/resty/v2 v2.7.0
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/json-iterator/go v1.1.12 github.com/json-iterator/go v1.1.12
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/panjf2000/ants/v2 v2.8.1 github.com/panjf2000/ants/v2 v2.8.1
github.com/panjf2000/gnet v1.6.7 github.com/panjf2000/gnet v1.6.7
github.com/smartystreets/goconvey v1.8.1 github.com/smartystreets/goconvey v1.8.1
@ -23,6 +23,7 @@ require (
go.uber.org/zap v1.25.0 go.uber.org/zap v1.25.0
golang.org/x/crypto v0.14.0 golang.org/x/crypto v0.14.0
google.golang.org/grpc v1.59.0 google.golang.org/grpc v1.59.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
) )
require ( require (
@ -36,16 +37,13 @@ require (
github.com/goccy/go-json v0.10.2 // indirect github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/klauspost/reedsolomon v1.11.8 // indirect github.com/klauspost/reedsolomon v1.11.8 // indirect
github.com/kr/pretty v0.3.1 // indirect github.com/kr/pretty v0.3.1 // indirect
github.com/leodido/go-urn v1.2.4 // indirect github.com/leodido/go-urn v1.2.4 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect
@ -69,6 +67,5 @@ require (
golang.org/x/text v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/protobuf v1.31.0 // indirect google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )

16
go.sum
View File

@ -2,8 +2,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7oFtbDZXC4dnY093M1kZx6k/95sen92gafbY= github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7oFtbDZXC4dnY093M1kZx6k/95sen92gafbY=
github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10= github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10=
github.com/alphadose/haxmap v1.2.0 h1:noGrAmCE+gNheZ4KpW+sYj9W5uMcO1UAjbAq9XBOAfM=
github.com/alphadose/haxmap v1.2.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/alphadose/haxmap v1.3.0 h1:C/2LboOnPCZP27GmmSXOcwx360st0P8N0fTJ3voefKc= github.com/alphadose/haxmap v1.3.0 h1:C/2LboOnPCZP27GmmSXOcwx360st0P8N0fTJ3voefKc=
github.com/alphadose/haxmap v1.3.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM= github.com/alphadose/haxmap v1.3.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
@ -82,8 +80,6 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg=
github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
@ -105,15 +101,9 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4=
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA=
github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ=
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
@ -134,8 +124,6 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=

View File

@ -106,9 +106,10 @@ func newBotConn(server *Server) *Conn {
return c return c
} }
// Conn 服务器连接单次会话的包装 // Conn 服务器连接单次消息的包装
type Conn struct { type Conn struct {
*connection *connection
wst int
ctx context.Context ctx context.Context
} }
@ -160,7 +161,7 @@ func (slf *Conn) GetWebsocketRequest() *http.Request {
// IsBot 是否是机器人连接 // IsBot 是否是机器人连接
func (slf *Conn) IsBot() bool { func (slf *Conn) IsBot() bool {
return slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil return slf != nil && slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil
} }
// RemoteAddr 获取远程地址 // RemoteAddr 获取远程地址
@ -229,15 +230,15 @@ func (slf *Conn) IsWebsocket() bool {
return slf.server.network == NetworkWebsocket return slf.server.network == NetworkWebsocket
} }
// GetWST 获取websocket消息类型 // GetWST 获取本次 websocket 消息类型
// - 默认将与发送类型相同
func (slf *Conn) GetWST() int { func (slf *Conn) GetWST() int {
wst, _ := slf.ctx.Value(contextKeyWST).(int) return slf.wst
return wst
} }
// SetWST 设置websocket消息类型 // SetWST 设置本次 websocket 消息类型
func (slf *Conn) SetWST(wst int) *Conn { func (slf *Conn) SetWST(wst int) *Conn {
slf.ctx = context.WithValue(slf.ctx, contextKeyWST, wst) slf.wst = wst
return slf return slf
} }
@ -255,7 +256,6 @@ func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callba
} }
// Write 向连接中写入数据 // Write 向连接中写入数据
// - messageType: websocket模式中指定消息类型
func (slf *Conn) Write(packet []byte, callback ...func(err error)) { func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
if slf.gw != nil { if slf.gw != nil {
slf.gw(packet) slf.gw(packet)
@ -356,9 +356,7 @@ func (slf *Conn) Close(err ...error) {
if slf.ticker != nil { if slf.ticker != nil {
slf.ticker.Release() slf.ticker.Release()
} }
if slf.server.shuntMatcher != nil { slf.server.releaseDispatcher(slf)
slf.server.releaseDispatcher(slf.server.shuntMatcher(slf))
}
slf.pool.Close() slf.pool.Close()
slf.loop.Close() slf.loop.Close()
slf.mu.Unlock() slf.mu.Unlock()

View File

@ -1,20 +1,9 @@
package server package server
import ( import (
"github.com/kercylan98/minotaur/utils/log"
"time" "time"
) )
type (
RunMode = log.RunMode
)
const (
RunModeDev RunMode = log.RunModeDev
RunModeProd RunMode = log.RunModeProd
RunModeTest RunMode = log.RunModeTest
)
const ( const (
serverMultipleMark = "Minotaur Multiple Server" serverMultipleMark = "Minotaur Multiple Server"
serverMark = "Minotaur Server" serverMark = "Minotaur Server"
@ -27,7 +16,3 @@ const (
DefaultWebsocketReadDeadline = 30 * time.Second DefaultWebsocketReadDeadline = 30 * time.Second
DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB
) )
const (
contextKeyWST = "_wst" // WebSocket 消息类型
)

View File

@ -8,8 +8,9 @@ import (
var dispatcherUnique = struct{}{} var dispatcherUnique = struct{}{}
// generateDispatcher 生成消息分发器 // generateDispatcher 生成消息分发器
func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) *dispatcher { func generateDispatcher(name string, handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
return &dispatcher{ return &dispatcher{
name: name,
buffer: buffer.NewUnboundedN[*Message](), buffer: buffer.NewUnboundedN[*Message](),
handler: handler, handler: handler,
uniques: haxmap.New[string, struct{}](), uniques: haxmap.New[string, struct{}](),
@ -18,6 +19,7 @@ func generateDispatcher(handler func(dispatcher *dispatcher, message *Message))
// dispatcher 消息分发器 // dispatcher 消息分发器
type dispatcher struct { type dispatcher struct {
name string
buffer *buffer.Unbounded[*Message] buffer *buffer.Unbounded[*Message]
uniques *haxmap.Map[string, struct{}] uniques *haxmap.Map[string, struct{}]
handler func(dispatcher *dispatcher, message *Message) handler func(dispatcher *dispatcher, message *Message)

View File

@ -15,74 +15,74 @@ import (
"time" "time"
) )
type StartBeforeEventHandle func(srv *Server) type StartBeforeEventHandler func(srv *Server)
type StartFinishEventHandle func(srv *Server) type StartFinishEventHandler func(srv *Server)
type StopEventHandle func(srv *Server) type StopEventHandler func(srv *Server)
type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte) type ConnectionReceivePacketEventHandler func(srv *Server, conn *Conn, packet []byte)
type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) type ConnectionOpenedEventHandler func(srv *Server, conn *Conn)
type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any) type ConnectionClosedEventHandler func(srv *Server, conn *Conn, err any)
type MessageErrorEventHandle func(srv *Server, message *Message, err error) type MessageErrorEventHandler func(srv *Server, message *Message, err error)
type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Duration) type MessageLowExecEventHandler func(srv *Server, message *Message, cost time.Duration)
type ConsoleCommandEventHandle func(srv *Server, command string, params ConsoleParams) type ConsoleCommandEventHandler func(srv *Server, command string, params ConsoleParams)
type ConnectionOpenedAfterEventHandle func(srv *Server, conn *Conn) type ConnectionOpenedAfterEventHandler func(srv *Server, conn *Conn)
type ConnectionWritePacketBeforeEventHandle func(srv *Server, conn *Conn, packet []byte) []byte type ConnectionWritePacketBeforeEventHandler func(srv *Server, conn *Conn, packet []byte) []byte
type ShuntChannelCreatedEventHandle func(srv *Server, guid int64) type ShuntChannelCreatedEventHandler func(srv *Server, guid int64)
type ShuntChannelClosedEventHandle func(srv *Server, guid int64) type ShuntChannelClosedEventHandler func(srv *Server, guid int64)
type ConnectionPacketPreprocessEventHandle func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte)) type ConnectionPacketPreprocessEventHandler func(srv *Server, conn *Conn, packet []byte, abort func(), usePacket func(newPacket []byte))
type MessageExecBeforeEventHandle func(srv *Server, message *Message) bool type MessageExecBeforeEventHandler func(srv *Server, message *Message) bool
type MessageReadyEventHandle func(srv *Server) type MessageReadyEventHandler func(srv *Server)
func newEvent(srv *Server) *event { func newEvent(srv *Server) *event {
return &event{ return &event{
Server: srv, Server: srv,
startBeforeEventHandles: slice.NewPriority[StartBeforeEventHandle](), startBeforeEventHandlers: slice.NewPriority[StartBeforeEventHandler](),
startFinishEventHandles: slice.NewPriority[StartFinishEventHandle](), startFinishEventHandlers: slice.NewPriority[StartFinishEventHandler](),
stopEventHandles: slice.NewPriority[StopEventHandle](), stopEventHandlers: slice.NewPriority[StopEventHandler](),
connectionReceivePacketEventHandles: slice.NewPriority[ConnectionReceivePacketEventHandle](), connectionReceivePacketEventHandlers: slice.NewPriority[ConnectionReceivePacketEventHandler](),
connectionOpenedEventHandles: slice.NewPriority[ConnectionOpenedEventHandle](), connectionOpenedEventHandlers: slice.NewPriority[ConnectionOpenedEventHandler](),
connectionClosedEventHandles: slice.NewPriority[ConnectionClosedEventHandle](), connectionClosedEventHandlers: slice.NewPriority[ConnectionClosedEventHandler](),
messageErrorEventHandles: slice.NewPriority[MessageErrorEventHandle](), messageErrorEventHandlers: slice.NewPriority[MessageErrorEventHandler](),
messageLowExecEventHandles: slice.NewPriority[MessageLowExecEventHandle](), messageLowExecEventHandlers: slice.NewPriority[MessageLowExecEventHandler](),
connectionOpenedAfterEventHandles: slice.NewPriority[ConnectionOpenedAfterEventHandle](), connectionOpenedAfterEventHandlers: slice.NewPriority[ConnectionOpenedAfterEventHandler](),
connectionWritePacketBeforeHandles: slice.NewPriority[ConnectionWritePacketBeforeEventHandle](), connectionWritePacketBeforeHandlers: slice.NewPriority[ConnectionWritePacketBeforeEventHandler](),
shuntChannelCreatedEventHandles: slice.NewPriority[ShuntChannelCreatedEventHandle](), shuntChannelCreatedEventHandlers: slice.NewPriority[ShuntChannelCreatedEventHandler](),
shuntChannelClosedEventHandles: slice.NewPriority[ShuntChannelClosedEventHandle](), shuntChannelClosedEventHandlers: slice.NewPriority[ShuntChannelClosedEventHandler](),
connectionPacketPreprocessEventHandles: slice.NewPriority[ConnectionPacketPreprocessEventHandle](), connectionPacketPreprocessEventHandlers: slice.NewPriority[ConnectionPacketPreprocessEventHandler](),
messageExecBeforeEventHandles: slice.NewPriority[MessageExecBeforeEventHandle](), messageExecBeforeEventHandlers: slice.NewPriority[MessageExecBeforeEventHandler](),
messageReadyEventHandles: slice.NewPriority[MessageReadyEventHandle](), messageReadyEventHandlers: slice.NewPriority[MessageReadyEventHandler](),
} }
} }
type event struct { type event struct {
*Server *Server
startBeforeEventHandles *slice.Priority[StartBeforeEventHandle] startBeforeEventHandlers *slice.Priority[StartBeforeEventHandler]
startFinishEventHandles *slice.Priority[StartFinishEventHandle] startFinishEventHandlers *slice.Priority[StartFinishEventHandler]
stopEventHandles *slice.Priority[StopEventHandle] stopEventHandlers *slice.Priority[StopEventHandler]
connectionReceivePacketEventHandles *slice.Priority[ConnectionReceivePacketEventHandle] connectionReceivePacketEventHandlers *slice.Priority[ConnectionReceivePacketEventHandler]
connectionOpenedEventHandles *slice.Priority[ConnectionOpenedEventHandle] connectionOpenedEventHandlers *slice.Priority[ConnectionOpenedEventHandler]
connectionClosedEventHandles *slice.Priority[ConnectionClosedEventHandle] connectionClosedEventHandlers *slice.Priority[ConnectionClosedEventHandler]
messageErrorEventHandles *slice.Priority[MessageErrorEventHandle] messageErrorEventHandlers *slice.Priority[MessageErrorEventHandler]
messageLowExecEventHandles *slice.Priority[MessageLowExecEventHandle] messageLowExecEventHandlers *slice.Priority[MessageLowExecEventHandler]
connectionOpenedAfterEventHandles *slice.Priority[ConnectionOpenedAfterEventHandle] connectionOpenedAfterEventHandlers *slice.Priority[ConnectionOpenedAfterEventHandler]
connectionWritePacketBeforeHandles *slice.Priority[ConnectionWritePacketBeforeEventHandle] connectionWritePacketBeforeHandlers *slice.Priority[ConnectionWritePacketBeforeEventHandler]
shuntChannelCreatedEventHandles *slice.Priority[ShuntChannelCreatedEventHandle] shuntChannelCreatedEventHandlers *slice.Priority[ShuntChannelCreatedEventHandler]
shuntChannelClosedEventHandles *slice.Priority[ShuntChannelClosedEventHandle] shuntChannelClosedEventHandlers *slice.Priority[ShuntChannelClosedEventHandler]
connectionPacketPreprocessEventHandles *slice.Priority[ConnectionPacketPreprocessEventHandle] connectionPacketPreprocessEventHandlers *slice.Priority[ConnectionPacketPreprocessEventHandler]
messageExecBeforeEventHandles *slice.Priority[MessageExecBeforeEventHandle] messageExecBeforeEventHandlers *slice.Priority[MessageExecBeforeEventHandler]
messageReadyEventHandles *slice.Priority[MessageReadyEventHandle] messageReadyEventHandlers *slice.Priority[MessageReadyEventHandler]
consoleCommandEventHandles map[string]*slice.Priority[ConsoleCommandEventHandle] consoleCommandEventHandlers map[string]*slice.Priority[ConsoleCommandEventHandler]
consoleCommandEventHandleInitOnce sync.Once consoleCommandEventHandlerInitOnce sync.Once
} }
// RegStopEvent 服务器停止时将立即执行被注册的事件处理函数 // RegStopEvent 服务器停止时将立即执行被注册的事件处理函数
func (slf *event) RegStopEvent(handle StopEventHandle, priority ...int) { func (slf *event) RegStopEvent(handler StopEventHandler, priority ...int) {
slf.stopEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.stopEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnStopEvent() { func (slf *event) OnStopEvent() {
slf.stopEventHandles.RangeValue(func(index int, value StopEventHandle) bool { slf.stopEventHandlers.RangeValue(func(index int, value StopEventHandler) bool {
value(slf.Server) value(slf.Server)
return true return true
}) })
@ -91,15 +91,15 @@ func (slf *event) OnStopEvent() {
// RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数 // RegConsoleCommandEvent 控制台收到指令时将立即执行被注册的事件处理函数
// - 默认将注册 "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN" 指令作为关闭服务器的指令 // - 默认将注册 "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN" 指令作为关闭服务器的指令
// - 可通过注册默认指令进行默认行为的覆盖 // - 可通过注册默认指令进行默认行为的覆盖
func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEventHandle, priority ...int) { func (slf *event) RegConsoleCommandEvent(command string, handler ConsoleCommandEventHandler, priority ...int) {
fd := int(os.Stdin.Fd()) fd := int(os.Stdin.Fd())
if !terminal.IsTerminal(fd) { if !terminal.IsTerminal(fd) {
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("ignore", "system not terminal")) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("ignore", "system not terminal"))
return return
} }
slf.consoleCommandEventHandleInitOnce.Do(func() { slf.consoleCommandEventHandlerInitOnce.Do(func() {
slf.consoleCommandEventHandles = map[string]*slice.Priority[ConsoleCommandEventHandle]{} slf.consoleCommandEventHandlers = map[string]*slice.Priority[ConsoleCommandEventHandler]{}
go func() { go func() {
for { for {
var input string var input string
@ -112,18 +112,18 @@ func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEv
} }
}() }()
}) })
list, exist := slf.consoleCommandEventHandles[command] list, exist := slf.consoleCommandEventHandlers[command]
if !exist { if !exist {
list = slice.NewPriority[ConsoleCommandEventHandle]() list = slice.NewPriority[ConsoleCommandEventHandler]()
slf.consoleCommandEventHandles[command] = list slf.consoleCommandEventHandlers[command] = list
} }
list.Append(handle, slice.GetValue(priority, 0)) list.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) { func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) {
slf.PushSystemMessage(func() { slf.PushSystemMessage(func() {
handles, exist := slf.consoleCommandEventHandles[command] handles, exist := slf.consoleCommandEventHandlers[command]
if !exist { if !exist {
switch command { switch command {
case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN": case "exit", "quit", "close", "shutdown", "EXIT", "QUIT", "CLOSE", "SHUTDOWN":
@ -142,7 +142,7 @@ func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) {
for key, value := range v { for key, value := range v {
params[key] = value params[key] = value
} }
handles.RangeValue(func(index int, value ConsoleCommandEventHandle) bool { handles.RangeValue(func(index int, value ConsoleCommandEventHandler) bool {
value(slf.Server, command, params) value(slf.Server, command, params)
return true return true
}) })
@ -151,9 +151,9 @@ func (slf *event) OnConsoleCommandEvent(command string, paramsStr string) {
} }
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
func (slf *event) RegStartBeforeEvent(handle StartBeforeEventHandle, priority ...int) { func (slf *event) RegStartBeforeEvent(handler StartBeforeEventHandler, priority ...int) {
slf.startBeforeEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.startBeforeEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnStartBeforeEvent() { func (slf *event) OnStartBeforeEvent() {
@ -163,7 +163,7 @@ func (slf *event) OnStartBeforeEvent() {
debug.PrintStack() debug.PrintStack()
} }
}() }()
slf.startBeforeEventHandles.RangeValue(func(index int, value StartBeforeEventHandle) bool { slf.startBeforeEventHandlers.RangeValue(func(index int, value StartBeforeEventHandler) bool {
value(slf.Server) value(slf.Server)
return true return true
}) })
@ -171,14 +171,14 @@ func (slf *event) OnStartBeforeEvent() {
// RegStartFinishEvent 在服务器启动完成时将立刻执行被注册的事件处理函数 // RegStartFinishEvent 在服务器启动完成时将立刻执行被注册的事件处理函数
// - 需要注意该时刻服务器已经启动完成,但是还有可能未开始处理消息,客户端有可能无法连接,如果需要在消息处理器准备就绪后执行,请使用 RegMessageReadyEvent 函数 // - 需要注意该时刻服务器已经启动完成,但是还有可能未开始处理消息,客户端有可能无法连接,如果需要在消息处理器准备就绪后执行,请使用 RegMessageReadyEvent 函数
func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle, priority ...int) { func (slf *event) RegStartFinishEvent(handler StartFinishEventHandler, priority ...int) {
slf.startFinishEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.startFinishEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnStartFinishEvent() { func (slf *event) OnStartFinishEvent() {
slf.PushSystemMessage(func() { slf.PushSystemMessage(func() {
slf.startFinishEventHandles.RangeValue(func(index int, value StartFinishEventHandle) bool { slf.startFinishEventHandlers.RangeValue(func(index int, value StartFinishEventHandler) bool {
value(slf.Server) value(slf.Server)
return true return true
}) })
@ -192,18 +192,18 @@ func (slf *event) OnStartFinishEvent() {
} }
// RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数 // RegConnectionClosedEvent 在连接关闭后将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle, priority ...int) { func (slf *event) RegConnectionClosedEvent(handler ConnectionClosedEventHandler, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionClosedEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.connectionClosedEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
slf.PushSystemMessage(func() { slf.PushSystemMessage(func() {
slf.Server.online.Delete(conn.GetID()) slf.Server.online.Delete(conn.GetID())
slf.connectionClosedEventHandles.RangeValue(func(index int, value ConnectionClosedEventHandle) bool { slf.connectionClosedEventHandlers.RangeValue(func(index int, value ConnectionClosedEventHandler) bool {
value(slf.Server, conn, err) value(slf.Server, conn, err)
return true return true
}) })
@ -211,51 +211,53 @@ func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
} }
// RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数 // RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle, priority ...int) { // - 该阶段的事件将会在系统消息中进行处理,不适合处理耗时操作
func (slf *event) RegConnectionOpenedEvent(handler ConnectionOpenedEventHandler, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionOpenedEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.connectionOpenedEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnConnectionOpenedEvent(conn *Conn) { func (slf *event) OnConnectionOpenedEvent(conn *Conn) {
slf.PushSystemMessage(func() { slf.PushSystemMessage(func() {
slf.Server.online.Set(conn.GetID(), conn) slf.Server.online.Set(conn.GetID(), conn)
slf.connectionOpenedEventHandles.RangeValue(func(index int, value ConnectionOpenedEventHandle) bool { slf.connectionOpenedEventHandlers.RangeValue(func(index int, value ConnectionOpenedEventHandler) bool {
value(slf.Server, conn) value(slf.Server, conn)
return true return true
}) })
slf.OnConnectionOpenedAfterEvent(conn)
}, log.String("Event", "OnConnectionOpenedEvent")) }, log.String("Event", "OnConnectionOpenedEvent"))
} }
// RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数 // RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacketEventHandle, priority ...int) { func (slf *event) RegConnectionReceivePacketEvent(handler ConnectionReceivePacketEventHandler, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionReceivePacketEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.connectionReceivePacketEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) { func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) {
if slf.Server.runtime.packetWarnSize > 0 && len(packet) > slf.Server.runtime.packetWarnSize { if slf.Server.runtime.packetWarnSize > 0 && len(packet) > slf.Server.runtime.packetWarnSize {
log.Warn("Server", log.String("OnConnectionReceivePacketEvent", fmt.Sprintf("packet size %d > %d", len(packet), slf.Server.runtime.packetWarnSize))) log.Warn("Server", log.String("OnConnectionReceivePacketEvent", fmt.Sprintf("packet size %d > %d", len(packet), slf.Server.runtime.packetWarnSize)))
} }
slf.connectionReceivePacketEventHandles.RangeValue(func(index int, value ConnectionReceivePacketEventHandle) bool { slf.connectionReceivePacketEventHandlers.RangeValue(func(index int, value ConnectionReceivePacketEventHandler) bool {
value(slf.Server, conn, packet) value(slf.Server, conn, packet)
return true return true
}) })
} }
// RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数 // RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数
func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle, priority ...int) { func (slf *event) RegMessageErrorEvent(handler MessageErrorEventHandler, priority ...int) {
slf.messageErrorEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.messageErrorEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnMessageErrorEvent(message *Message, err error) { func (slf *event) OnMessageErrorEvent(message *Message, err error) {
if slf.messageErrorEventHandles.Len() == 0 { if slf.messageErrorEventHandlers.Len() == 0 {
return return
} }
defer func() { defer func() {
@ -264,41 +266,42 @@ func (slf *event) OnMessageErrorEvent(message *Message, err error) {
debug.PrintStack() debug.PrintStack()
} }
}() }()
slf.messageErrorEventHandles.RangeValue(func(index int, value MessageErrorEventHandle) bool { slf.messageErrorEventHandlers.RangeValue(func(index int, value MessageErrorEventHandler) bool {
value(slf.Server, message, err) value(slf.Server, message, err)
return true return true
}) })
} }
// RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数 // RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数
func (slf *event) RegMessageLowExecEvent(handle MessageLowExecEventHandle, priority ...int) { func (slf *event) RegMessageLowExecEvent(handler MessageLowExecEventHandler, priority ...int) {
slf.messageLowExecEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.messageLowExecEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) { func (slf *event) OnMessageLowExecEvent(message *Message, cost time.Duration) {
if slf.messageLowExecEventHandles.Len() == 0 { if slf.messageLowExecEventHandlers.Len() == 0 {
return return
} }
// 慢消息不再占用消息通道 // 慢消息不再占用消息通道
slf.messageLowExecEventHandles.RangeValue(func(index int, value MessageLowExecEventHandle) bool { slf.messageLowExecEventHandlers.RangeValue(func(index int, value MessageLowExecEventHandler) bool {
value(slf.Server, message, cost) value(slf.Server, message, cost)
return true return true
}) })
} }
// RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数 // RegConnectionOpenedAfterEvent 在连接打开事件处理完成后将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionOpenedAfterEvent(handle ConnectionOpenedAfterEventHandle, priority ...int) { // - 该阶段事件将会转到对应消息分流渠道中进行处理
func (slf *event) RegConnectionOpenedAfterEvent(handler ConnectionOpenedAfterEventHandler, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionOpenedAfterEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.connectionOpenedAfterEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) { func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) {
slf.PushSystemMessage(func() { slf.PushShuntMessage(conn, func() {
slf.connectionOpenedAfterEventHandles.RangeValue(func(index int, value ConnectionOpenedAfterEventHandle) bool { slf.connectionOpenedAfterEventHandlers.RangeValue(func(index int, value ConnectionOpenedAfterEventHandler) bool {
value(slf.Server, conn) value(slf.Server, conn)
return true return true
}) })
@ -306,20 +309,20 @@ func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) {
} }
// RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数 // RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数
func (slf *event) RegConnectionWritePacketBeforeEvent(handle ConnectionWritePacketBeforeEventHandle, priority ...int) { func (slf *event) RegConnectionWritePacketBeforeEvent(handler ConnectionWritePacketBeforeEventHandler, priority ...int) {
if slf.network == NetworkHttp { if slf.network == NetworkHttp {
panic(ErrNetworkIncompatibleHttp) panic(ErrNetworkIncompatibleHttp)
} }
slf.connectionWritePacketBeforeHandles.Append(handle, slice.GetValue(priority, 0)) slf.connectionWritePacketBeforeHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte) (newPacket []byte) { func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte) (newPacket []byte) {
if slf.connectionWritePacketBeforeHandles.Len() == 0 { if slf.connectionWritePacketBeforeHandlers.Len() == 0 {
return packet return packet
} }
newPacket = packet newPacket = packet
slf.connectionWritePacketBeforeHandles.RangeValue(func(index int, value ConnectionWritePacketBeforeEventHandle) bool { slf.connectionWritePacketBeforeHandlers.RangeValue(func(index int, value ConnectionWritePacketBeforeEventHandler) bool {
newPacket = value(slf.Server, conn, newPacket) newPacket = value(slf.Server, conn, newPacket)
return true return true
}) })
@ -327,14 +330,14 @@ func (slf *event) OnConnectionWritePacketBeforeEvent(conn *Conn, packet []byte)
} }
// RegShuntChannelCreatedEvent 在分流通道创建时将立刻执行被注册的事件处理函数 // RegShuntChannelCreatedEvent 在分流通道创建时将立刻执行被注册的事件处理函数
func (slf *event) RegShuntChannelCreatedEvent(handle ShuntChannelCreatedEventHandle, priority ...int) { func (slf *event) RegShuntChannelCreatedEvent(handler ShuntChannelCreatedEventHandler, priority ...int) {
slf.shuntChannelCreatedEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.shuntChannelCreatedEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnShuntChannelCreatedEvent(guid int64) { func (slf *event) OnShuntChannelCreatedEvent(guid int64) {
slf.PushSystemMessage(func() { slf.PushSystemMessage(func() {
slf.shuntChannelCreatedEventHandles.RangeValue(func(index int, value ShuntChannelCreatedEventHandle) bool { slf.shuntChannelCreatedEventHandlers.RangeValue(func(index int, value ShuntChannelCreatedEventHandler) bool {
value(slf.Server, guid) value(slf.Server, guid)
return true return true
}) })
@ -342,14 +345,14 @@ func (slf *event) OnShuntChannelCreatedEvent(guid int64) {
} }
// RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数 // RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数
func (slf *event) RegShuntChannelCloseEvent(handle ShuntChannelClosedEventHandle, priority ...int) { func (slf *event) RegShuntChannelCloseEvent(handler ShuntChannelClosedEventHandler, priority ...int) {
slf.shuntChannelClosedEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.shuntChannelClosedEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnShuntChannelClosedEvent(guid int64) { func (slf *event) OnShuntChannelClosedEvent(guid int64) {
slf.PushSystemMessage(func() { slf.PushSystemMessage(func() {
slf.shuntChannelClosedEventHandles.RangeValue(func(index int, value ShuntChannelClosedEventHandle) bool { slf.shuntChannelClosedEventHandlers.RangeValue(func(index int, value ShuntChannelClosedEventHandler) bool {
value(slf.Server, guid) value(slf.Server, guid)
return true return true
}) })
@ -364,17 +367,17 @@ func (slf *event) OnShuntChannelClosedEvent(guid int64) {
// 场景: // 场景:
// - 数据包格式校验 // - 数据包格式校验
// - 数据包分包等情况处理 // - 数据包分包等情况处理
func (slf *event) RegConnectionPacketPreprocessEvent(handle ConnectionPacketPreprocessEventHandle, priority ...int) { func (slf *event) RegConnectionPacketPreprocessEvent(handler ConnectionPacketPreprocessEventHandler, priority ...int) {
slf.connectionPacketPreprocessEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.connectionPacketPreprocessEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, usePacket func(newPacket []byte)) bool { func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, usePacket func(newPacket []byte)) bool {
if slf.connectionPacketPreprocessEventHandles.Len() == 0 { if slf.connectionPacketPreprocessEventHandlers.Len() == 0 {
return false return false
} }
var abort = false var abort = false
slf.connectionPacketPreprocessEventHandles.RangeValue(func(index int, value ConnectionPacketPreprocessEventHandle) bool { slf.connectionPacketPreprocessEventHandlers.RangeValue(func(index int, value ConnectionPacketPreprocessEventHandler) bool {
value(slf.Server, conn, packet, func() { abort = true }, usePacket) value(slf.Server, conn, packet, func() { abort = true }, usePacket)
if abort { if abort {
return false return false
@ -388,13 +391,13 @@ func (slf *event) OnConnectionPacketPreprocessEvent(conn *Conn, packet []byte, u
// - 当返回 true 时,将继续执行后续的消息处理函数,否则将不会执行后续的消息处理函数,并且该消息将被丢弃 // - 当返回 true 时,将继续执行后续的消息处理函数,否则将不会执行后续的消息处理函数,并且该消息将被丢弃
// //
// 适用于限流等场景 // 适用于限流等场景
func (slf *event) RegMessageExecBeforeEvent(handle MessageExecBeforeEventHandle, priority ...int) { func (slf *event) RegMessageExecBeforeEvent(handler MessageExecBeforeEventHandler, priority ...int) {
slf.messageExecBeforeEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.messageExecBeforeEventHandlers.Append(handler, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handler", reflect.TypeOf(handler).String()))
} }
func (slf *event) OnMessageExecBeforeEvent(message *Message) bool { func (slf *event) OnMessageExecBeforeEvent(message *Message) bool {
if slf.messageExecBeforeEventHandles.Len() == 0 { if slf.messageExecBeforeEventHandlers.Len() == 0 {
return true return true
} }
var result = true var result = true
@ -404,7 +407,7 @@ func (slf *event) OnMessageExecBeforeEvent(message *Message) bool {
debug.PrintStack() debug.PrintStack()
} }
}() }()
slf.messageExecBeforeEventHandles.RangeValue(func(index int, value MessageExecBeforeEventHandle) bool { slf.messageExecBeforeEventHandlers.RangeValue(func(index int, value MessageExecBeforeEventHandler) bool {
result = value(slf.Server, message) result = value(slf.Server, message)
return result return result
}) })
@ -412,12 +415,12 @@ func (slf *event) OnMessageExecBeforeEvent(message *Message) bool {
} }
// RegMessageReadyEvent 在服务器消息处理器准备就绪时立即执行被注册的事件处理函数 // RegMessageReadyEvent 在服务器消息处理器准备就绪时立即执行被注册的事件处理函数
func (slf *event) RegMessageReadyEvent(handle MessageReadyEventHandle, priority ...int) { func (slf *event) RegMessageReadyEvent(handler MessageReadyEventHandler, priority ...int) {
slf.messageReadyEventHandles.Append(handle, slice.GetValue(priority, 0)) slf.messageReadyEventHandlers.Append(handler, slice.GetValue(priority, 0))
} }
func (slf *event) OnMessageReadyEvent() { func (slf *event) OnMessageReadyEvent() {
if slf.messageReadyEventHandles.Len() == 0 { if slf.messageReadyEventHandlers.Len() == 0 {
return return
} }
defer func() { defer func() {
@ -426,7 +429,7 @@ func (slf *event) OnMessageReadyEvent() {
debug.PrintStack() debug.PrintStack()
} }
}() }()
slf.messageReadyEventHandles.RangeValue(func(index int, value MessageReadyEventHandle) bool { slf.messageReadyEventHandlers.RangeValue(func(index int, value MessageReadyEventHandler) bool {
value(slf.Server) value(slf.Server)
return true return true
}) })
@ -436,7 +439,7 @@ func (slf *event) check() {
switch slf.network { switch slf.network {
case NetworkHttp, NetworkGRPC, NetworkNone: case NetworkHttp, NetworkGRPC, NetworkNone:
default: default:
if slf.connectionReceivePacketEventHandles.Len() == 0 { if slf.connectionReceivePacketEventHandlers.Len() == 0 {
log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed"))
} }
} }

View File

@ -0,0 +1,10 @@
package logger
import "github.com/kercylan98/minotaur/utils/log"
type Ants struct {
}
func (slf *Ants) Printf(format string, args ...interface{}) {
log.Warn(format, log.Any("args", args))
}

View File

@ -0,0 +1,29 @@
package logger
import (
"fmt"
"github.com/kercylan98/minotaur/utils/log"
)
type GNet struct {
}
func (slf *GNet) Debugf(format string, args ...interface{}) {
log.Debug(fmt.Sprintf(format, args...))
}
func (slf *GNet) Infof(format string, args ...interface{}) {
log.Info(fmt.Sprintf(format, args...))
}
func (slf *GNet) Warnf(format string, args ...interface{}) {
log.Warn(fmt.Sprintf(format, args...))
}
func (slf *GNet) Errorf(format string, args ...interface{}) {
log.Error(fmt.Sprintf(format, args...))
}
func (slf *GNet) Fatalf(format string, args ...interface{}) {
log.Fatal(fmt.Sprintf(format, args...))
}

View File

@ -44,6 +44,9 @@ const (
// MessageTypeSystem 系统消息类型 // MessageTypeSystem 系统消息类型
MessageTypeSystem MessageTypeSystem
// MessageTypeShunt 普通分流消息类型
MessageTypeShunt
) )
var messageNames = map[MessageType]string{ var messageNames = map[MessageType]string{
@ -60,6 +63,7 @@ var messageNames = map[MessageType]string{
MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync", MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync",
MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback", MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback",
MessageTypeSystem: "MessageTypeSystem", MessageTypeSystem: "MessageTypeSystem",
MessageTypeShunt: "MessageTypeShunt",
} }
const ( const (
@ -209,3 +213,9 @@ func (slf *Message) castToErrorMessage(err error, action MessageErrorAction, mar
slf.t, slf.err, slf.errAction, slf.marks = MessageTypeError, err, action, mark slf.t, slf.err, slf.errAction, slf.marks = MessageTypeError, err, action, mark
return slf return slf
} }
// castToShuntMessage 将消息转换为分流消息
func (slf *Message) castToShuntMessage(conn *Conn, caller func(), mark ...log.Field) *Message {
slf.t, slf.conn, slf.ordinaryHandler, slf.marks = MessageTypeShunt, conn, caller, mark
return slf
}

View File

@ -40,7 +40,7 @@ func (slf *MultipleServer) Run() {
go func(address string, server *Server) { go func(address string, server *Server) {
var lock sync.Mutex var lock sync.Mutex
var startFinish bool var startFinish bool
server.startFinishEventHandles.Append(func(srv *Server) { server.startFinishEventHandlers.Append(func(srv *Server) {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
if !startFinish { if !startFinish {

View File

@ -5,7 +5,6 @@ import (
"github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/timer" "github.com/kercylan98/minotaur/utils/timer"
"google.golang.org/grpc" "google.golang.org/grpc"
"runtime/debug"
"time" "time"
) )
@ -29,19 +28,18 @@ type option struct {
} }
type runtime struct { type runtime struct {
deadlockDetect time.Duration // 是否开启死锁检测 deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型 supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件 certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小 messagePoolSize int // 消息池大小
ticker *timer.Ticker // 定时器 ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行 tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小 connTickerSize int // 连接定时器大小
websocketReadDeadline time.Duration // websocket连接超时时间 websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级 websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩 websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期 limitLife time.Duration // 限制最大生命周期
shuntMatcher func(conn *Conn) string // 分流匹配器 packetWarnSize int // 数据包大小警告
packetWarnSize int // 数据包大小警告
} }
// WithPacketWarnSize 通过数据包大小警告的方式创建服务器,当数据包大小超过指定大小时,将会输出 WARN 类型的日志 // WithPacketWarnSize 通过数据包大小警告的方式创建服务器,当数据包大小超过指定大小时,将会输出 WARN 类型的日志
@ -58,32 +56,6 @@ func WithPacketWarnSize(size int) Option {
} }
} }
// WithShunt 通过连接数据包分流的方式创建服务器
// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况
// - shuntMatcher用于匹配连接的函数返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道
//
// 将被分流的消息类型(更多类型有待斟酌):
// - MessageTypePacket
//
// 注意事项:
// - 当分流匹配过程发生 panic 将会在系统通道内处理消息,并打印日志
func WithShunt(shuntMatcher func(conn *Conn) string) Option {
return func(srv *Server) {
if shuntMatcher == nil {
log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil"))
return
}
srv.shuntMatcher = func(conn *Conn) string {
defer func() {
if err := recover(); err != nil {
log.Error("ShuntMatcher", log.String("State", "Panic"), log.Any("Error", err), log.String("Stack", string(debug.Stack())))
}
}()
return shuntMatcher(conn)
}
}
}
// WithLimitLife 通过限制最大生命周期的方式创建服务器 // WithLimitLife 通过限制最大生命周期的方式创建服务器
// - 通常用于测试服务器,服务器将在到达最大生命周期时自动关闭 // - 通常用于测试服务器,服务器将在到达最大生命周期时自动关闭
func WithLimitLife(t time.Duration) Option { func WithLimitLife(t time.Duration) Option {
@ -158,6 +130,8 @@ func WithWebsocketReadDeadline(t time.Duration) Option {
} }
// WithTicker 通过定时器创建服务器,为服务器添加定时器功能 // WithTicker 通过定时器创建服务器,为服务器添加定时器功能
// - size服务器定时器时间轮大小
// - connSize服务器连接定时器时间轮大小
// - autonomy定时器是否独立运行独立运行的情况下不会作为服务器消息运行会导致并发问题 // - autonomy定时器是否独立运行独立运行的情况下不会作为服务器消息运行会导致并发问题
func WithTicker(size, connSize int, autonomy bool) Option { func WithTicker(size, connSize int, autonomy bool) Option {
return func(srv *Server) { return func(srv *Server) {
@ -195,14 +169,6 @@ func WithGRPCServerOptions(options ...grpc.ServerOption) Option {
} }
} }
// WithRunMode 通过特定模式运行服务器
// - 默认为 RunModeDev
func WithRunMode(mode RunMode) Option {
return func(srv *Server) {
srv.runMode = mode
}
}
// WithWebsocketMessageType 设置仅支持特定类型的Websocket消息 // WithWebsocketMessageType 设置仅支持特定类型的Websocket消息
func WithWebsocketMessageType(messageTypes ...int) Option { func WithWebsocketMessageType(messageTypes ...int) Option {
return func(srv *Server) { return func(srv *Server) {

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/server/internal/logger"
"github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/network" "github.com/kercylan98/minotaur/utils/network"
@ -14,7 +15,6 @@ import (
"github.com/kercylan98/minotaur/utils/timer" "github.com/kercylan98/minotaur/utils/timer"
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
"github.com/panjf2000/gnet" "github.com/panjf2000/gnet"
"github.com/panjf2000/gnet/pkg/logging"
"github.com/xtaci/kcp-go/v5" "github.com/xtaci/kcp-go/v5"
"google.golang.org/grpc" "google.golang.org/grpc"
"net" "net"
@ -36,13 +36,15 @@ func New(network Network, options ...Option) *Server {
messagePoolSize: DefaultMessageBufferSize, messagePoolSize: DefaultMessageBufferSize,
packetWarnSize: DefaultPacketWarnSize, packetWarnSize: DefaultPacketWarnSize,
}, },
option: &option{}, option: &option{},
network: network, network: network,
online: concurrent.NewBalanceMap[string, *Conn](), online: concurrent.NewBalanceMap[string, *Conn](),
closeChannel: make(chan struct{}, 1), closeChannel: make(chan struct{}, 1),
systemSignal: make(chan os.Signal, 1), systemSignal: make(chan os.Signal, 1),
ctx: context.Background(), ctx: context.Background(),
dispatchers: make(map[string]*dispatcher), dispatchers: make(map[string]*dispatcher),
dispatcherMember: map[string]map[string]*Conn{},
currDispatcher: map[string]*dispatcher{},
} }
server.event = newEvent(server) server.event = newEvent(server)
@ -67,7 +69,7 @@ func New(network Network, options ...Option) *Server {
server.antsPoolSize = DefaultAsyncPoolSize server.antsPoolSize = DefaultAsyncPoolSize
} }
var err error var err error
server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(log.GetLogger())) server.ants, err = ants.NewPool(server.antsPoolSize, ants.WithLogger(new(logger.Ants)))
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -91,9 +93,9 @@ type Server struct {
messagePool *concurrent.Pool[*Message] // 消息池 messagePool *concurrent.Pool[*Message] // 消息池
ctx context.Context // 上下文 ctx context.Context // 上下文
online *concurrent.BalanceMap[string, *Conn] // 在线连接 online *concurrent.BalanceMap[string, *Conn] // 在线连接
systemDispatcher *dispatcher // 系统消息分发器
network Network // 网络类型 network Network // 网络类型
addr string // 侦听地址 addr string // 侦听地址
runMode RunMode // 运行模式
systemSignal chan os.Signal // 系统信号 systemSignal chan os.Signal // 系统信号
closeChannel chan struct{} // 关闭信号 closeChannel chan struct{} // 关闭信号
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
@ -102,7 +104,9 @@ type Server struct {
isShutdown atomic.Bool // 是否已关闭 isShutdown atomic.Bool // 是否已关闭
messageCounter atomic.Int64 // 消息计数器 messageCounter atomic.Int64 // 消息计数器
isRunning bool // 是否正在运行 isRunning bool // 是否正在运行
dispatchers map[string]*dispatcher // 消息分发器 dispatchers map[string]*dispatcher // 消息分发器集合
dispatcherMember map[string]map[string]*Conn // 消息分发器包含的连接
currDispatcher map[string]*dispatcher // 当前连接所处消息分发器
} }
// Run 使用特定地址运行服务器 // Run 使用特定地址运行服务器
@ -126,6 +130,7 @@ func (slf *Server) Run(addr string) error {
} }
slf.event.check() slf.event.check()
slf.addr = addr slf.addr = addr
slf.systemDispatcher = generateDispatcher(serverSystemDispatcher, slf.dispatchMessage)
var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr) var protoAddr = fmt.Sprintf("%s://%s", slf.network, slf.addr)
var messageInitFinish = make(chan struct{}, 1) var messageInitFinish = make(chan struct{}, 1)
var connectionInitHandle = func(callback func()) { var connectionInitHandle = func(callback func()) {
@ -147,8 +152,7 @@ func (slf *Server) Run(addr string) error {
} }
go func() { go func() {
messageInitFinish <- struct{}{} messageInitFinish <- struct{}{}
d, _ := slf.useDispatcher(serverSystemDispatcher) slf.systemDispatcher.start()
d.start()
}() }()
} }
@ -177,8 +181,7 @@ func (slf *Server) Run(addr string) error {
slf.isRunning = true slf.isRunning = true
slf.OnStartBeforeEvent() slf.OnStartBeforeEvent()
if err := gnet.Serve(slf.gServer, protoAddr, if err := gnet.Serve(slf.gServer, protoAddr,
gnet.WithLogger(log.GetLogger()), gnet.WithLogger(new(logger.GNet)),
gnet.WithLogLevel(super.If(slf.runMode == RunModeProd, logging.ErrorLevel, logging.DebugLevel)),
gnet.WithTicker(true), gnet.WithTicker(true),
gnet.WithMulticore(true), gnet.WithMulticore(true),
); err != nil { ); err != nil {
@ -202,7 +205,6 @@ func (slf *Server) Run(addr string) error {
conn := newKcpConn(slf, session) conn := newKcpConn(slf, session)
slf.OnConnectionOpenedEvent(conn) slf.OnConnectionOpenedEvent(conn)
slf.OnConnectionOpenedAfterEvent(conn)
go func(conn *Conn) { go func(conn *Conn) {
defer func() { defer func() {
@ -230,18 +232,19 @@ func (slf *Server) Run(addr string) error {
} }
}) })
case NetworkHttp: case NetworkHttp:
switch slf.runMode {
case RunModeDev:
gin.SetMode(gin.DebugMode)
case RunModeTest:
gin.SetMode(gin.TestMode)
case RunModeProd:
gin.SetMode(gin.ReleaseMode)
}
go func() { go func() {
slf.isRunning = true slf.isRunning = true
slf.OnStartBeforeEvent() slf.OnStartBeforeEvent()
slf.httpServer.Addr = slf.addr slf.httpServer.Addr = slf.addr
gin.SetMode(gin.ReleaseMode)
slf.ginServer.Use(func(c *gin.Context) {
t := time.Now()
c.Next()
log.Info("Server", log.String("type", "http"),
log.String("method", c.Request.Method), log.Int("status", c.Writer.Status()),
log.String("ip", c.ClientIP()), log.String("path", c.Request.URL.Path),
log.Duration("cost", time.Since(t)))
})
go connectionInitHandle(nil) go connectionInitHandle(nil)
if len(slf.certFile)+len(slf.keyFile) > 0 { if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil { if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil {
@ -569,30 +572,70 @@ func (slf *Server) GetMessageCount() int64 {
return slf.messageCounter.Load() return slf.messageCounter.Load()
} }
// useDispatcher 添加消息分发器 // UseShunt 切换连接所使用的消息分流渠道,当分流渠道 name 不存在时将会创建一个新的分流渠道,否则将会加入已存在的分流渠道
// - 该函数在分发器不重复的情况下将创建分发器,当分发器已存在将直接返回 // - 默认情况下,所有连接都使用系统通道进行消息分发,当指定消息分流渠道时,将会使用指定的消息分流渠道进行消息分发
func (slf *Server) useDispatcher(name string) (*dispatcher, bool) { func (slf *Server) UseShunt(conn *Conn, name string) {
slf.dispatcherLock.Lock() slf.dispatcherLock.Lock()
defer slf.dispatcherLock.Unlock()
d, exist := slf.dispatchers[name] d, exist := slf.dispatchers[name]
if exist { if !exist {
slf.dispatcherLock.Unlock() d = generateDispatcher(name, slf.dispatchMessage)
return d, false go d.start()
slf.dispatchers[name] = d
} }
d = generateDispatcher(slf.dispatchMessage)
slf.dispatchers[name] = d curr, exist := slf.currDispatcher[conn.GetID()]
slf.dispatcherLock.Unlock() if exist {
return d, true if curr.name == name {
return
}
delete(slf.dispatcherMember[curr.name], conn.GetID())
if len(slf.dispatcherMember[curr.name]) == 0 {
curr.close()
delete(slf.dispatchers, curr.name)
}
}
member, exist := slf.dispatcherMember[name]
if !exist {
member = map[string]*Conn{}
slf.dispatcherMember[name] = member
}
member[conn.GetID()] = conn
}
// getConnDispatcher 获取连接所使用的消息分发器
func (slf *Server) getConnDispatcher(conn *Conn) *dispatcher {
if conn == nil {
return slf.systemDispatcher
}
slf.dispatcherLock.RLock()
defer slf.dispatcherLock.RUnlock()
d, exist := slf.currDispatcher[conn.GetID()]
if exist {
return d
}
return slf.systemDispatcher
} }
// releaseDispatcher 关闭消息分发器 // releaseDispatcher 关闭消息分发器
func (slf *Server) releaseDispatcher(name string) { func (slf *Server) releaseDispatcher(conn *Conn) {
slf.dispatcherLock.Lock() if conn == nil {
d, exist := slf.dispatchers[name] return
if exist { }
delete(slf.dispatchers, name) slf.dispatcherLock.Lock()
d.close() defer slf.dispatcherLock.Unlock()
d, exist := slf.currDispatcher[conn.GetID()]
if exist {
delete(slf.dispatcherMember[d.name], conn.GetID())
if len(slf.dispatcherMember[d.name]) == 0 {
d.close()
delete(slf.dispatchers, d.name)
}
delete(slf.currDispatcher, conn.GetID())
} }
slf.dispatcherLock.Unlock()
} }
// pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 // pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求
@ -603,20 +646,13 @@ func (slf *Server) pushMessage(message *Message) {
} }
var dispatcher *dispatcher var dispatcher *dispatcher
switch message.t { switch message.t {
case MessageTypePacket: case MessageTypePacket,
if slf.shuntMatcher == nil { MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback,
dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback,
break MessageTypeShunt:
} dispatcher = slf.getConnDispatcher(message.conn)
fallthrough
case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback:
var created bool
dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn))
if created {
go dispatcher.start()
}
case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker: case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker:
dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) dispatcher = slf.systemDispatcher
} }
if dispatcher == nil { if dispatcher == nil {
return return
@ -757,7 +793,7 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
} }
case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback: case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback:
msg.errHandler(msg.err) msg.errHandler(msg.err)
case MessageTypeSystem: case MessageTypeSystem, MessageTypeShunt:
msg.ordinaryHandler() msg.ordinaryHandler()
default: default:
log.Warn("Server", log.String("not support message type", msg.t.String())) log.Warn("Server", log.String("not support message type", msg.t.String()))
@ -791,20 +827,12 @@ func (slf *Server) PushAsyncCallbackMessage(err error, callback func(err error),
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发 // - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 // - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) { func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushAsyncMessage(caller, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...)) slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...))
} }
// PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 // PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发 // - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发
func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) { func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushAsyncCallbackMessage(err, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...)) slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...))
} }
@ -812,7 +840,7 @@ func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback
// - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息 // - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息
func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) { func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToPacketMessage( slf.pushMessage(slf.messagePool.Get().castToPacketMessage(
&Conn{ctx: context.WithValue(conn.ctx, contextKeyWST, wst), connection: conn.connection}, &Conn{wst: wst, connection: conn.connection},
packet, packet,
)) ))
} }
@ -829,13 +857,9 @@ func (slf *Server) PushTickerMessage(name string, caller func(), mark ...log.Fie
} }
// PushShuntTickerMessage 向特定分发器中推送 MessageTypeTicker 消息,消息执行与 MessageTypeTicker 一致 // PushShuntTickerMessage 向特定分发器中推送 MessageTypeTicker 消息,消息执行与 MessageTypeTicker 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushTickerMessage 进行转发 // - 需要注意的是,当未指定 UseShunt 时,将会通过 PushTickerMessage 进行转发
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 // - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) { func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushTickerMessage(name, caller)
return
}
slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...)) slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...))
} }
@ -851,23 +875,15 @@ func (slf *Server) PushUniqueAsyncCallbackMessage(unique string, err error, call
} }
// PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 // PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncMessage 进行转发 // - 需要注意的是,当未指定 UseShunt 时,将会通过系统分流渠道进行转发
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息 // - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) { func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushUniqueAsyncMessage(unique, caller, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...)) slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...))
} }
// PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 // PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncCallbackMessage 进行转发 // - 需要注意的是,当未指定 UseShunt 时,将会通过系统分流渠道进行转发
func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) { func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushUniqueAsyncCallbackMessage(unique, err, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...)) slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...))
} }
@ -879,3 +895,8 @@ func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string
func (slf *Server) PushErrorMessage(err error, errAction MessageErrorAction, mark ...log.Field) { func (slf *Server) PushErrorMessage(err error, errAction MessageErrorAction, mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToErrorMessage(err, errAction, mark...)) slf.pushMessage(slf.messagePool.Get().castToErrorMessage(err, errAction, mark...))
} }
// PushShuntMessage 向特定分发器中推送 MessageTypeShunt 消息,消息执行与 MessageTypeSystem 一致,不同的是将会在特定分发器中执行
func (slf *Server) PushShuntMessage(conn *Conn, caller func(), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToShuntMessage(conn, caller, mark...))
}

View File

@ -0,0 +1,36 @@
package buffer_test
import (
"github.com/kercylan98/minotaur/utils/buffer"
"testing"
)
func BenchmarkUnboundedBuffer(b *testing.B) {
ub := buffer.NewUnboundedN[int]()
b.Run("Put", func(b *testing.B) {
for i := 0; i < b.N; i++ {
ub.Put(i)
}
})
b.Run("Load", func(b *testing.B) {
for i := 0; i < b.N; i++ {
ub.Load()
}
})
// 先填充数据以防止 Get 被阻塞
for i := 0; i < b.N; i++ {
ub.Put(i)
}
b.Run("Get", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ub.Put(i)
<-ub.Get()
ub.Load()
}
})
}

View File

@ -163,6 +163,14 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str
_ = file.Close() _ = file.Close()
}() }()
fileSize, err := file.Seek(0, io.SeekEnd)
if err != nil {
return offset, err
}
if offset-1 >= fileSize {
return fileSize + 1, nil
}
chunks := FindLineChunksByOffset(file, offset, chunkSize) chunks := FindLineChunksByOffset(file, offset, chunkSize)
var end int64 var end int64
var endMutex sync.Mutex var endMutex sync.Mutex

View File

@ -1,6 +1,9 @@
package huge package huge
import "math/big" import (
"github.com/kercylan98/minotaur/utils/generic"
"math/big"
)
var ( var (
IntNegativeOne = NewInt(-1) // 默认初始化的-1值Int应当将其当作常量使用 IntNegativeOne = NewInt(-1) // 默认初始化的-1值Int应当将其当作常量使用
@ -12,19 +15,25 @@ var (
IntTenThousand = NewInt(10000) // 默认初始化的10000值Int应当将其当作常量使用 IntTenThousand = NewInt(10000) // 默认初始化的10000值Int应当将其当作常量使用
) )
type IntRestrain interface {
uint | uint8 | uint16 | uint32 | uint64 | int | int8 | int16 | int32 | int64
}
type Int big.Int type Int big.Int
func NewInt[T IntRestrain](x T, exp ...T) *Int { // NewInt 创建一个 Int
num := int64(x) func NewInt[T generic.Number](x T) *Int {
i := big.NewInt(num) return (*Int)(big.NewInt(int64(x)))
for _, t := range exp { }
i = i.Exp(i, big.NewInt(int64(t)), nil)
// NewIntByString 通过字符串创建一个 Int
// - 如果字符串不是一个合法的数字,则返回 0
func NewIntByString(i string) *Int {
v, suc := new(big.Int).SetString(i, 10)
if !suc {
return IntZero.Copy()
} }
return (*Int)(i) return (*Int)(v)
}
func applyIntOperation[T generic.Number](v *Int, i T, op func(*big.Int, *big.Int) *big.Int) *Int {
return (*Int)(op(v.ToBigint(), NewInt(i).ToBigint()))
} }
func (slf *Int) Copy() *Int { func (slf *Int) Copy() *Int {
@ -86,6 +95,7 @@ func (slf *Int) ToBigint() *big.Int {
return (*big.Int)(slf) return (*big.Int)(slf)
} }
// Cmp 比较,当 slf > i 时返回 1当 slf < i 时返回 -1当 slf == i 时返回 0
func (slf *Int) Cmp(i *Int) int { func (slf *Int) Cmp(i *Int) int {
return slf.ToBigint().Cmp(i.ToBigint()) return slf.ToBigint().Cmp(i.ToBigint())
} }
@ -286,3 +296,286 @@ func (slf *Int) SubUint64(i uint64) *Int {
x := slf.ToBigint() x := slf.ToBigint()
return (*Int)(x.Sub(x, NewInt(i).ToBigint())) return (*Int)(x.Sub(x, NewInt(i).ToBigint()))
} }
func (slf *Int) Div(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.Div(x, i.ToBigint()))
}
func (slf *Int) DivInt(i int) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) DivInt8(i int8) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) DivInt16(i int16) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) DivInt32(i int32) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) DivInt64(i int64) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) DivUint(i uint) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) DivUint8(i uint8) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) DivUint16(i uint16) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) DivUint32(i uint32) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) DivUint64(i uint64) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Div)
}
func (slf *Int) Mod(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.Mod(x, i.ToBigint()))
}
func (slf *Int) ModInt(i int) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) ModInt8(i int8) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) ModInt16(i int16) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) ModInt32(i int32) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) ModInt64(i int64) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) ModUint(i uint) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) ModUint8(i uint8) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) ModUint16(i uint16) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) ModUint32(i uint32) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) ModUint64(i uint64) *Int {
return applyIntOperation(slf, i, NewInt(i).ToBigint().Mod)
}
func (slf *Int) Pow(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, i.ToBigint(), nil))
}
func (slf *Int) PowInt(i int) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
func (slf *Int) PowInt8(i int8) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
func (slf *Int) PowInt16(i int16) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
func (slf *Int) PowInt32(i int32) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
func (slf *Int) PowInt64(i int64) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
func (slf *Int) PowUint(i uint) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
func (slf *Int) PowUint8(i uint8) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
func (slf *Int) PowUint16(i uint16) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
func (slf *Int) PowUint32(i uint32) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
func (slf *Int) PowUint64(i uint64) *Int {
x := slf.ToBigint()
return (*Int)(x.Exp(x, NewInt(i).ToBigint(), nil))
}
// Lsh 左移
func (slf *Int) Lsh(i int) *Int {
x := slf.ToBigint()
return (*Int)(x.Lsh(x, uint(i)))
}
// Rsh 右移
func (slf *Int) Rsh(i int) *Int {
x := slf.ToBigint()
return (*Int)(x.Rsh(x, uint(i)))
}
// And 与
func (slf *Int) And(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.And(x, i.ToBigint()))
}
// AndNot 与非
func (slf *Int) AndNot(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.AndNot(x, i.ToBigint()))
}
// Or 或
func (slf *Int) Or(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.Or(x, i.ToBigint()))
}
// Xor 异或
func (slf *Int) Xor(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.Xor(x, i.ToBigint()))
}
// Not 非
func (slf *Int) Not() *Int {
x := slf.ToBigint()
return (*Int)(x.Not(x))
}
// Sqrt 平方根
func (slf *Int) Sqrt() *Int {
x := slf.ToBigint()
return (*Int)(x.Sqrt(x))
}
// GCD 最大公约数
func (slf *Int) GCD(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.GCD(nil, nil, x, i.ToBigint()))
}
// LCM 最小公倍数
func (slf *Int) LCM(i *Int) *Int {
sb := slf.ToBigint()
ib := i.ToBigint()
gcd := new(big.Int).GCD(nil, nil, sb, ib)
absProduct := new(big.Int).Mul(sb, ib).Abs(new(big.Int).Mul(sb, ib))
lcm := new(big.Int).Div(absProduct, gcd)
return (*Int)(lcm)
}
// ModInverse 模反元素
func (slf *Int) ModInverse(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.ModInverse(x, i.ToBigint()))
}
// ModSqrt 模平方根
func (slf *Int) ModSqrt(i *Int) *Int {
x := slf.ToBigint()
return (*Int)(x.ModSqrt(x, i.ToBigint()))
}
// BitLen 二进制长度
func (slf *Int) BitLen() int {
return slf.ToBigint().BitLen()
}
// Bit 二进制位
func (slf *Int) Bit(i int) uint {
return slf.ToBigint().Bit(i)
}
// SetBit 设置二进制位
func (slf *Int) SetBit(i int, v uint) *Int {
x := slf.ToBigint()
return (*Int)(x.SetBit(x, i, v))
}
// Neg 返回数字的相反数
func (slf *Int) Neg() *Int {
x := slf.ToBigint()
return (*Int)(x.Neg(x))
}
// Abs 返回数字的绝对值
func (slf *Int) Abs() *Int {
x := slf.ToBigint()
return (*Int)(x.Abs(x))
}
// Sign 返回数字的符号
// - 1正数
// - 0
// - -1负数
func (slf *Int) Sign() int {
return slf.ToBigint().Sign()
}
// IsPositive 是否为正数
func (slf *Int) IsPositive() bool {
return slf.Sign() > 0
}
// IsNegative 是否为负数
func (slf *Int) IsNegative() bool {
return slf.Sign() < 0
}
// IsEven 是否为偶数
func (slf *Int) IsEven() bool {
return slf.ToBigint().Bit(0) == 0
}
// IsOdd 是否为奇数
func (slf *Int) IsOdd() bool {
return slf.ToBigint().Bit(0) == 1
}
// ProportionalCalc 比例计算,该函数会再 formula 返回值的基础上除以 proportional
// - formula 为计算公式,该公式的参数为调用该函数的 Int 的拷贝
func (slf *Int) ProportionalCalc(proportional *Int, formula func(v *Int) *Int) *Int {
return formula(slf.Copy()).Div(proportional)
}

View File

@ -1,5 +0,0 @@
package log
import "go.uber.org/zap/zapcore"
type Core = zapcore.Core

View File

@ -1,26 +1,39 @@
package log package log
import ( import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
"time" "gopkg.in/natefinch/lumberjack.v2"
) )
type Encoder = zapcore.Encoder type Encoder struct {
e zapcore.Encoder
// NewEncoder 创建一个 Minotaur 默认使用的编码器 cores []Core
func NewEncoder() Encoder { conf *Config
return zapcore.NewConsoleEncoder(zapcore.EncoderConfig{ }
MessageKey: "msg",
LevelKey: "level", func (slf *Encoder) Split(config *lumberjack.Logger) *Encoder {
EncodeLevel: zapcore.CapitalLevelEncoder, slf.cores = append(slf.cores, zapcore.NewCore(slf.e, zapcore.AddSync(config), zapcore.DebugLevel))
TimeKey: "ts", return slf
EncodeTime: func(t time.Time, enc zapcore.PrimitiveArrayEncoder) { }
enc.AppendString(t.Format(time.DateTime))
}, func (slf *Encoder) AddCore(ws WriteSyncer, enab LevelEnabler) *Encoder {
CallerKey: "file", slf.cores = append(slf.cores, zapcore.NewCore(slf.e, ws, enab))
EncodeCaller: zapcore.ShortCallerEncoder, return slf
EncodeDuration: func(d time.Duration, enc zapcore.PrimitiveArrayEncoder) { }
enc.AppendInt64(int64(d) / 1000000)
}, func (slf *Encoder) Build(options ...LoggerOption) *Minotaur {
}) l, err := slf.conf.Build()
if err != nil {
panic(err)
}
options = append([]LoggerOption{zap.AddCaller(), zap.AddCallerSkip(1)}, options...)
options = append(options, zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewTee(append(slf.cores, core)...)
}))
l = l.WithOptions(options...)
return &Minotaur{
Logger: l,
Sugared: l.Sugar(),
}
} }

View File

@ -1,13 +1,6 @@
package log package log
import ( import "go.uber.org/zap/zapcore"
"github.com/kercylan98/minotaur/utils/hash"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type Level = zapcore.Level
type LevelEnablerFunc = zap.LevelEnablerFunc
const ( const (
// DebugLevel 调试级别日志通常非常庞大,并且通常在生产中被禁用 // DebugLevel 调试级别日志通常非常庞大,并且通常在生产中被禁用
@ -25,86 +18,3 @@ const (
// FatalLevel 记录一条消息,然后调用 os.Exit(1) // FatalLevel 记录一条消息,然后调用 os.Exit(1)
FatalLevel Level = zapcore.FatalLevel FatalLevel Level = zapcore.FatalLevel
) )
var (
levels = []Level{DebugLevel, InfoLevel, WarnLevel, ErrorLevel, DPanicLevel, PanicLevel, FatalLevel}
defaultLevelPartition = map[Level]func() LevelEnablerFunc{
DebugLevel: DebugLevelPartition,
InfoLevel: InfoLevelPartition,
WarnLevel: WarnLevelPartition,
ErrorLevel: ErrorLevelPartition,
DPanicLevel: DPanicLevelPartition,
PanicLevel: PanicLevelPartition,
FatalLevel: FatalLevelPartition,
}
)
// Levels 返回所有日志级别
func Levels() []Level {
return levels
}
// MultiLevelPartition 返回一个 LevelEnablerFunc该函数在指定的多个级别时返回 true
// - 该函数被用于划分不同级别的日志输出
func MultiLevelPartition(levels ...Level) LevelEnablerFunc {
var levelMap = hash.ToIterator(levels)
return func(level zapcore.Level) bool {
return hash.Exist(levelMap, level)
}
}
// DebugLevelPartition 返回一个 LevelEnablerFunc该函数在 DebugLevel 时返回 true
// - 该函数被用于划分不同级别的日志输出
func DebugLevelPartition() LevelEnablerFunc {
return func(level zapcore.Level) bool {
return level == DebugLevel
}
}
// InfoLevelPartition 返回一个 LevelEnablerFunc该函数在 InfoLevel 时返回 true
// - 该函数被用于划分不同级别的日志输出
func InfoLevelPartition() LevelEnablerFunc {
return func(level zapcore.Level) bool {
return level == InfoLevel
}
}
// WarnLevelPartition 返回一个 LevelEnablerFunc该函数在 WarnLevel 时返回 true
// - 该函数被用于划分不同级别的日志输出
func WarnLevelPartition() LevelEnablerFunc {
return func(level zapcore.Level) bool {
return level == WarnLevel
}
}
// ErrorLevelPartition 返回一个 LevelEnablerFunc该函数在 ErrorLevel 时返回 true
// - 该函数被用于划分不同级别的日志输出
func ErrorLevelPartition() LevelEnablerFunc {
return func(level zapcore.Level) bool {
return level == ErrorLevel
}
}
// DPanicLevelPartition 返回一个 LevelEnablerFunc该函数在 DPanicLevel 时返回 true
// - 该函数被用于划分不同级别的日志输出
func DPanicLevelPartition() LevelEnablerFunc {
return func(level zapcore.Level) bool {
return level == DPanicLevel
}
}
// PanicLevelPartition 返回一个 LevelEnablerFunc该函数在 PanicLevel 时返回 true
// - 该函数被用于划分不同级别的日志输出
func PanicLevelPartition() LevelEnablerFunc {
return func(level zapcore.Level) bool {
return level == PanicLevel
}
}
// FatalLevelPartition 返回一个 LevelEnablerFunc该函数在 FatalLevel 时返回 true
// - 该函数被用于划分不同级别的日志输出
func FatalLevelPartition() LevelEnablerFunc {
return func(level zapcore.Level) bool {
return level == FatalLevel
}
}

View File

@ -1,170 +0,0 @@
package log
import (
"fmt"
"github.com/kercylan98/minotaur/utils/str"
"github.com/kercylan98/minotaur/utils/times"
rotateLogs "github.com/lestrrat-go/file-rotatelogs"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"os"
"path/filepath"
"strings"
)
// NewLog 创建一个日志记录器
func NewLog(options ...Option) *Log {
log := &Log{
filename: func(level Level) string {
return fmt.Sprintf("%s.log", level.String())
},
rotateFilename: func(level Level) string {
return strings.Join([]string{level.String(), "%Y%m%d.log"}, ".")
},
levelPartition: defaultLevelPartition,
}
for _, option := range options {
option(log)
}
if len(log.rotateOptions) == 0 {
log.rotateOptions = []rotateLogs.Option{
rotateLogs.WithMaxAge(times.Week),
rotateLogs.WithRotationTime(times.Day),
}
}
if len(log.cores) == 0 {
var encoder = NewEncoder()
switch log.mode {
case RunModeDev:
var partition LevelEnablerFunc = func(lvl Level) bool {
return true
}
log.cores = append(log.cores, zapcore.NewCore(encoder, os.Stdout, partition))
case RunModeTest, RunModeProd:
if log.mode == RunModeTest {
infoRotate, err := rotateLogs.New(
filepath.Join(log.rotateLogDir, log.rotateFilename(InfoLevel)),
append([]rotateLogs.Option{rotateLogs.WithLinkName(filepath.Join(log.logDir, log.filename(InfoLevel)))}, log.rotateOptions...)...,
)
if err != nil {
panic(err)
}
errRotate, err := rotateLogs.New(
filepath.Join(log.rotateLogDir, log.rotateFilename(ErrorLevel)),
append([]rotateLogs.Option{rotateLogs.WithLinkName(filepath.Join(log.logDir, log.filename(ErrorLevel)))}, log.rotateOptions...)...,
)
if err != nil {
panic(err)
}
if log.logDir != str.None {
log.cores = append(log.cores, zapcore.NewCore(encoder, zapcore.AddSync(infoRotate), LevelEnablerFunc(func(lvl Level) bool { return lvl < ErrorLevel })))
log.cores = append(log.cores, zapcore.NewCore(encoder, zapcore.AddSync(errRotate), LevelEnablerFunc(func(lvl Level) bool { return lvl >= ErrorLevel })))
log.cores = append(log.cores, zapcore.NewCore(encoder, os.Stdout, LevelEnablerFunc(func(lvl Level) bool { return lvl < ErrorLevel })))
log.cores = append(log.cores, zapcore.NewCore(encoder, os.Stdout, LevelEnablerFunc(func(lvl Level) bool { return lvl >= ErrorLevel })))
}
} else {
infoRotate, err := rotateLogs.New(
filepath.Join(log.rotateLogDir, log.rotateFilename(InfoLevel)),
append([]rotateLogs.Option{rotateLogs.WithLinkName(filepath.Join(log.logDir, log.filename(InfoLevel)))}, log.rotateOptions...)...,
)
if err != nil {
panic(err)
}
errRotate, err := rotateLogs.New(
filepath.Join(log.rotateLogDir, log.rotateFilename(ErrorLevel)),
append([]rotateLogs.Option{rotateLogs.WithLinkName(filepath.Join(log.logDir, log.filename(ErrorLevel)))}, log.rotateOptions...)...,
)
if err != nil {
panic(err)
}
if log.logDir != str.None {
log.cores = append(log.cores, zapcore.NewCore(encoder, zapcore.AddSync(infoRotate), LevelEnablerFunc(func(lvl Level) bool { return lvl == InfoLevel })))
log.cores = append(log.cores, zapcore.NewCore(encoder, zapcore.AddSync(errRotate), LevelEnablerFunc(func(lvl Level) bool { return lvl >= ErrorLevel })))
}
}
}
}
log.zap = zap.New(zapcore.NewTee(log.cores...), zap.AddCaller(), zap.AddCallerSkip(2))
log.sugar = log.zap.Sugar()
return log
}
type Log struct {
zap *zap.Logger
sugar *zap.SugaredLogger
filename func(level Level) string
rotateFilename func(level Level) string
rotateOptions []rotateLogs.Option
levelPartition map[Level]func() LevelEnablerFunc
cores []Core
mode RunMode
logDir string
rotateLogDir string
}
func (slf *Log) Debugf(format string, args ...interface{}) {
slf.sugar.Debugf(format, args...)
}
func (slf *Log) Infof(format string, args ...interface{}) {
slf.sugar.Infof(format, args...)
}
func (slf *Log) Warnf(format string, args ...interface{}) {
slf.sugar.Warnf(format, args...)
}
func (slf *Log) Errorf(format string, args ...interface{}) {
slf.sugar.Errorf(format, args...)
}
func (slf *Log) Fatalf(format string, args ...interface{}) {
slf.sugar.Fatalf(format, args...)
}
func (slf *Log) Printf(format string, args ...interface{}) {
slf.sugar.Infof(format, args...)
}
// Debug 在 DebugLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段
func (slf *Log) Debug(msg string, fields ...Field) {
slf.zap.Debug(msg, fields...)
}
// Info 在 InfoLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段
func (slf *Log) Info(msg string, fields ...Field) {
slf.zap.Info(msg, fields...)
}
// Warn 在 WarnLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段
func (slf *Log) Warn(msg string, fields ...Field) {
slf.zap.Warn(msg, fields...)
}
// Error 在 ErrorLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段
func (slf *Log) Error(msg string, fields ...Field) {
slf.zap.Error(msg, fields...)
}
// DPanic 在 DPanicLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段
// - 如果记录器处于开发模式,它就会出现 panicDPanic 的意思是“development panic”。这对于捕获可恢复但不应该发生的错误很有用
func (slf *Log) DPanic(msg string, fields ...Field) {
slf.zap.DPanic(msg, fields...)
}
// Panic 在 PanicLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段
// - 即使禁用了 PanicLevel 的日志记录,记录器也会出现 panic
func (slf *Log) Panic(msg string, fields ...Field) {
slf.zap.Panic(msg, fields...)
}
// Fatal 在 FatalLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段
// - 然后记录器调用 os.Exit(1),即使 FatalLevel 的日志记录被禁用
func (slf *Log) Fatal(msg string, fields ...Field) {
slf.zap.Fatal(msg, fields...)
}

View File

@ -1,16 +1,22 @@
package log package log
import ( var logger Logger
"github.com/panjf2000/ants/v2"
"github.com/panjf2000/gnet/pkg/logging"
)
var logger Logger = NewLog() func init() {
logger = Default().Build()
}
// SetLogger 设置日志记录器
func SetLogger(l Logger) {
if m, ok := l.(*Minotaur); ok && m != nil {
_ = m.Sync()
_ = m.Sugared.Sync()
}
logger = l
}
// Logger 适用于 Minotaur 的日志接口 // Logger 适用于 Minotaur 的日志接口
type Logger interface { type Logger interface {
ants.Logger
logging.Logger
// Debug 在 DebugLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 // Debug 在 DebugLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段
Debug(msg string, fields ...Field) Debug(msg string, fields ...Field)
// Info 在 InfoLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段 // Info 在 InfoLevel 记录一条消息。该消息包括在日志站点传递的任何字段以及记录器上累积的任何字段
@ -67,13 +73,3 @@ func Panic(msg string, fields ...Field) {
func Fatal(msg string, fields ...Field) { func Fatal(msg string, fields ...Field) {
logger.Fatal(msg, fields...) logger.Fatal(msg, fields...)
} }
// SetLogger 设置日志记录器
func SetLogger(log Logger) {
logger = log
}
// GetLogger 获取日志记录器
func GetLogger() Logger {
return logger
}

8
utils/log/minotaur.go Normal file
View File

@ -0,0 +1,8 @@
package log
import "go.uber.org/zap"
type Minotaur struct {
*zap.Logger
Sugared *zap.SugaredLogger
}

View File

@ -1,52 +1,239 @@
package log package log
import ( import (
rotateLogs "github.com/lestrrat-go/file-rotatelogs" "go.uber.org/zap"
"go.uber.org/zap/zapcore"
"io"
"time"
) )
type Option func(log *Log) type (
Config = zap.Config
Level = zapcore.Level
LevelEncoder = zapcore.LevelEncoder
TimeEncoder = zapcore.TimeEncoder
DurationEncoder = zapcore.DurationEncoder
CallerEncoder = zapcore.CallerEncoder
NameEncoder = zapcore.NameEncoder
ReflectedEncoder = zapcore.ReflectedEncoder
WriteSyncer = zapcore.WriteSyncer
LoggerOption = zap.Option
Core = zapcore.Core
LevelEnabler = zapcore.LevelEnabler
Option func(config *Config)
)
// WithRunMode 设置运行模式 func Default(opts ...Option) *Encoder {
// - 默认的运行模式为: RunModeDev config := &zap.Config{
// - 当 handle 不为空时,将会调用 handle(),并将返回值添加到日志记录器中,同时将会抑制默认的日志记录器 Level: zap.NewAtomicLevelAt(zap.DebugLevel),
func WithRunMode(mode RunMode, handle func() Core) Option { Development: true,
return func(log *Log) { DisableStacktrace: true,
log.mode = mode Sampling: &zap.SamplingConfig{
if handle != nil { Initial: 100,
log.cores = append(log.cores, handle()) Thereafter: 100,
},
EncoderConfig: zapcore.EncoderConfig{
TimeKey: "Time",
LevelKey: "Level",
NameKey: "Name",
CallerKey: "Caller",
MessageKey: "Msg",
StacktraceKey: "Stack",
EncodeLevel: zapcore.CapitalLevelEncoder,
EncodeTime: func(t time.Time, encoder zapcore.PrimitiveArrayEncoder) {
encoder.AppendString(t.Format(time.DateTime))
},
EncodeDuration: zapcore.StringDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
},
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}
// 应用选项
for _, opt := range opts {
opt(config)
}
if len(config.Encoding) == 0 {
if config.Development {
config.Encoding = "console"
} else {
config.Encoding = "json"
} }
} }
var encoder = new(Encoder)
encoder.conf = config
switch config.Encoding {
case "console":
encoder.e = zapcore.NewConsoleEncoder(config.EncoderConfig)
case "json":
encoder.e = zapcore.NewJSONEncoder(config.EncoderConfig)
default:
panic("unknown encoding")
}
return encoder
} }
// WithFilename 设置日志文件名 // WithLevel 设置日志级别
// - 默认的日志文件名为: {level}.log func WithLevel(level Level) Option {
func WithFilename(filename func(level Level) string) Option { return func(config *Config) {
return func(log *Log) { config.Level.SetLevel(level)
log.filename = filename
} }
} }
// WithRotateFilename 设置日志分割文件名 // WithDevelopment 设置是否为开发模式
// - 默认的日志分割文件名为: {level}.%Y%m%d.log func WithDevelopment(development bool) Option {
func WithRotateFilename(filename func(level Level) string) Option { return func(config *Config) {
return func(log *Log) { config.Development = development
log.rotateFilename = filename
} }
} }
// WithRotateOption 设置日志分割选项 // WithDisableCaller 设置是否禁用调用者
// - 默认的日志分割选项为: WithMaxAge(7天), WithRotationTime(1天) func WithDisableCaller(disableCaller bool) Option {
func WithRotateOption(options ...rotateLogs.Option) Option { return func(config *Config) {
return func(log *Log) { config.DisableCaller = disableCaller
log.rotateOptions = options
} }
} }
// WithLogDir 设置日志文件夹 // WithDisableStacktrace 设置是否禁用堆栈跟踪
// - 默认情况下不会设置日志文件夹,日志将不会被文件存储 func WithDisableStacktrace(disableStacktrace bool) Option {
func WithLogDir(logDir, rotateLogDir string) Option { return func(config *Config) {
return func(log *Log) { config.DisableStacktrace = disableStacktrace
log.logDir = logDir }
log.rotateLogDir = rotateLogDir }
// WithSampling 设置采样策略
func WithSampling(sampling *zap.SamplingConfig) Option {
return func(config *Config) {
config.Sampling = sampling
}
}
// WithEncoding 设置编码器
func WithEncoding(encoding string) Option {
return func(config *Config) {
config.Encoding = encoding
}
}
// WithEncoderMessageKey 设置消息键
func WithEncoderMessageKey(encoderMessageKey string) Option {
return func(config *Config) {
config.EncoderConfig.MessageKey = encoderMessageKey
}
}
// WithEncoderLevelKey 设置级别键
func WithEncoderLevelKey(encoderLevelKey string) Option {
return func(config *Config) {
config.EncoderConfig.LevelKey = encoderLevelKey
}
}
// WithEncoderTimeKey 设置时间键
func WithEncoderTimeKey(encoderTimeKey string) Option {
return func(config *Config) {
config.EncoderConfig.TimeKey = encoderTimeKey
}
}
// WithEncoderNameKey 设置名称键
func WithEncoderNameKey(encoderNameKey string) Option {
return func(config *Config) {
config.EncoderConfig.NameKey = encoderNameKey
}
}
// WithEncoderCallerKey 设置调用者键
func WithEncoderCallerKey(encoderCallerKey string) Option {
return func(config *Config) {
config.EncoderConfig.CallerKey = encoderCallerKey
}
}
// WithEncoderFunctionKey 设置函数键
func WithEncoderFunctionKey(encoderFunctionKey string) Option {
return func(config *Config) {
config.EncoderConfig.FunctionKey = encoderFunctionKey
}
}
// WithEncoderStacktraceKey 设置堆栈跟踪键
func WithEncoderStacktraceKey(encoderStacktraceKey string) Option {
return func(config *Config) {
config.EncoderConfig.StacktraceKey = encoderStacktraceKey
}
}
// WithEncoderLineEnding 设置行尾
func WithEncoderLineEnding(encoderLineEnding string) Option {
return func(config *Config) {
config.EncoderConfig.LineEnding = encoderLineEnding
}
}
// WithEncoderLevel 设置级别编码器
func WithEncoderLevel(encoderLevel LevelEncoder) Option {
return func(config *Config) {
config.EncoderConfig.EncodeLevel = encoderLevel
}
}
// WithEncoderTime 设置时间编码器
func WithEncoderTime(encoderTime TimeEncoder) Option {
return func(config *Config) {
config.EncoderConfig.EncodeTime = encoderTime
}
}
// WithEncoderDuration 设置持续时间编码器
func WithEncoderDuration(encoderDuration DurationEncoder) Option {
return func(config *Config) {
config.EncoderConfig.EncodeDuration = encoderDuration
}
}
// WithEncoderCaller 设置调用者编码器
func WithEncoderCaller(encoderCaller CallerEncoder) Option {
return func(config *Config) {
config.EncoderConfig.EncodeCaller = encoderCaller
}
}
// WithEncoderName 设置名称编码器
func WithEncoderName(encoderName NameEncoder) Option {
return func(config *Config) {
config.EncoderConfig.EncodeName = encoderName
}
}
// WithEncoderNewReflectedEncoder 设置反射编码器
func WithEncoderNewReflectedEncoder(encoderNewReflectedEncoder func(io.Writer) ReflectedEncoder) Option {
return func(config *Config) {
config.EncoderConfig.NewReflectedEncoder = encoderNewReflectedEncoder
}
}
// WithOutputPaths 设置输出路径
func WithOutputPaths(outputPaths ...string) Option {
return func(config *Config) {
config.OutputPaths = outputPaths
}
}
// WithErrorOutputPaths 设置错误输出路径
func WithErrorOutputPaths(errorOutputPaths ...string) Option {
return func(config *Config) {
config.ErrorOutputPaths = errorOutputPaths
}
}
// WithInitialFields 设置初始字段
func WithInitialFields(initialFields map[string]interface{}) Option {
return func(config *Config) {
config.InitialFields = initialFields
} }
} }

View File

@ -1,16 +0,0 @@
package log
const (
// RunModeDev 开发模式是默认的运行模式,同时也是最基础的运行模式
// - 开发模式下,将会输出所有级别的日志到控制台
// - 默认不再输出日志到文件
RunModeDev RunMode = iota
// RunModeTest 测试模式是一种特殊的运行模式,用于测试
// - 测试模式下,将会输出所有级别的日志到控制台和文件
RunModeTest
// RunModeProd 生产模式是一种特殊的运行模式,用于生产
// - 生产模式下,将会输出 InfoLevel 及以上级别的日志到控制台和文件
RunModeProd
)
type RunMode uint8

View File

@ -156,8 +156,11 @@ func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R))
func IncrementAnalyze(filePath string, handle func(analyzer *Analyzer, record R)) func() (*Report, error) { func IncrementAnalyze(filePath string, handle func(analyzer *Analyzer, record R)) func() (*Report, error) {
var analyzer = new(Analyzer) var analyzer = new(Analyzer)
var offset int64 var offset int64
var m = new(sync.Mutex)
return func() (*Report, error) { return func() (*Report, error) {
var err error var err error
m.Lock()
defer m.Unlock()
offset, err = file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { offset, err = file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(analyzer, R(s)) handle(analyzer, R(s))
}, offset) }, offset)

45
utils/sole/unique.go Normal file
View File

@ -0,0 +1,45 @@
package sole
import (
"strconv"
"sync/atomic"
)
var autoIncrementUint32 uint32 = 0
var autoIncrementUint64 uint64 = 0
var autoIncrementInt32 int32 = 0
var autoIncrementInt64 int64 = 0
var autoIncrementInt uint64 = 0
var autoIncrementString uint64 = 0
// AutoIncrementUint32 获取一个自增的 uint32 值
func AutoIncrementUint32() uint32 {
return atomic.AddUint32(&autoIncrementUint32, 1) - 1
}
// AutoIncrementUint64 获取一个自增的 uint64 值
func AutoIncrementUint64() uint64 {
return atomic.AddUint64(&autoIncrementUint64, 1) - 1
}
// AutoIncrementInt32 获取一个自增的 int32 值
func AutoIncrementInt32() int32 {
return atomic.AddInt32(&autoIncrementInt32, 1) - 1
}
// AutoIncrementInt64 获取一个自增的 int64 值
func AutoIncrementInt64() int64 {
return atomic.AddInt64(&autoIncrementInt64, 1) - 1
}
// AutoIncrementInt 获取一个自增的 int 值
func AutoIncrementInt() int {
num := atomic.AddUint64(&autoIncrementInt, 1)
result := num % (1 << (strconv.IntSize - 1))
return int(result)
}
// AutoIncrementString 获取一个自增的字符串
func AutoIncrementString() string {
return strconv.FormatUint(atomic.AddUint64(&autoIncrementString, 1)-1, 10)
}

View File

@ -6,6 +6,11 @@ import (
) )
var launchTime = time.Now() var launchTime = time.Now()
var pid int
func init() {
pid = os.Getpid()
}
// LaunchTime 获取程序启动时间 // LaunchTime 获取程序启动时间
func LaunchTime() time.Time { func LaunchTime() time.Time {
@ -16,3 +21,8 @@ func LaunchTime() time.Time {
func Hostname() string { func Hostname() string {
return os.Getenv("HOSTNAME") return os.Getenv("HOSTNAME")
} }
// PID 获取进程 PID
func PID() int {
return pid
}

57
utils/super/version.go Normal file
View File

@ -0,0 +1,57 @@
package super
import (
"regexp"
"strings"
)
// OldVersion 检查 version2 对于 version1 来说是不是旧版本
func OldVersion(version1, version2 string) bool {
return CompareVersion(version1, version2) == 1
}
// CompareVersion 返回一个整数,用于表示两个版本号的比较结果:
// - 如果 version1 大于 version2它将返回 1
// - 如果 version1 小于 version2它将返回 -1
// - 如果 version1 和 version2 相等,它将返回 0
func CompareVersion(version1, version2 string) int {
reg, _ := regexp.Compile("[^0-9.]+")
processedVersion1 := processVersion(reg.ReplaceAllString(version1, ""))
processedVersion2 := processVersion(reg.ReplaceAllString(version2, ""))
n, m := len(processedVersion1), len(processedVersion2)
i, j := 0, 0
for i < n || j < m {
x := 0
for ; i < n && processedVersion1[i] != '.'; i++ {
x = x*10 + int(processedVersion1[i]-'0')
}
i++ // skip the dot
y := 0
for ; j < m && processedVersion2[j] != '.'; j++ {
y = y*10 + int(processedVersion2[j]-'0')
}
j++ // skip the dot
if x > y {
return 1
}
if x < y {
return -1
}
}
return 0
}
func processVersion(version string) string {
// 移除首尾可能存在的非数字字符
reg, _ := regexp.Compile("^[^.0-9]+|[^.0-9]+$")
version = reg.ReplaceAllString(version, "")
// 确保不出现连续的点
version = strings.ReplaceAll(version, "..", ".")
// 移除每一部分起始的 0
versionParts := strings.Split(version, ".")
for idx, part := range versionParts {
versionParts[idx] = strings.TrimLeft(part, "0")
}
return strings.Join(versionParts, ".")
}

View File

@ -0,0 +1,13 @@
package super_test
import (
"github.com/kercylan98/minotaur/utils/super"
"testing"
)
func TestCompareVersion(t *testing.T) {
t.Log(super.CompareVersion("1", "2"), -1)
t.Log(super.CompareVersion("1", "vv2"), -1)
t.Log(super.CompareVersion("1", "vv2.3.1"), -1)
t.Log(super.CompareVersion("11", "vv2.3.1"), 1)
}