refactor: server 包重构及性能优化

1、由于设计不合理,移除 server
包中跨服相关的内容;
2、重构消息处理机制和消息分流机制并优化性能;
3、分流消息支持数据包、定时任务、异步回调的分流;
4、长连接支持获取定时器。
This commit is contained in:
kercylan98 2023-11-10 18:40:40 +08:00
parent 707fc6c5de
commit 70f7a79c88
18 changed files with 439 additions and 642 deletions

View File

@ -18,4 +18,4 @@ jobs:
bump-minor-pre-major: true
bump-patch-for-minor-pre-major: true
changelog-types: '[{"type":"other","section":"Other | 其他更改","hidden":false},{"type":"revert","section":"Reverts | 回退","hidden":false},{"type":"feat","section":"Features | 新特性","hidden":false},{"type":"fix","section":"Bug Fixes | 修复","hidden":false},{"type":"improvement","section":"Feature Improvements | 改进","hidden":false},{"type":"docs","section":"Docs | 文档优化","hidden":false},{"type":"style","section":"Styling | 可读性优化","hidden":false},{"type":"refactor","section":"Code Refactoring | 重构","hidden":false},{"type":"perf","section":"Performance Improvements | 性能优化","hidden":false},{"type":"test","section":"Tests | 新增或优化测试用例","hidden":false},{"type":"build","section":"Build System | 影响构建的修改","hidden":false},{"type":"ci","section":"CI | 更改我们的 CI 配置文件和脚本","hidden":false}]'
# release-as: 0.2.0
release-as: 0.3.0

View File

@ -18,8 +18,9 @@ mindmap
/planner 策划相关工具目录
/pce 配置导表功能实现
/server 网络服务器支持
/cross 内置跨服功能实现
/router 内置路由器功能实现
/client 长连接客户端
/lockstep 帧同步组件
/router 消息路由器
/utils 工具结构函数目录
/examples 示例代码目录
```
@ -97,9 +98,8 @@ import "github.com/kercylan98/minotaur/server"
func main() {
srv := server.New(server.NetworkWebsocket,
server.WithShunt(func(conn *server.Conn) (guid int64, allowToCreate bool) {
guid, allowToCreate = conn.GetData("roomId").(int64)
return
server.WithShunt(func(conn *server.Conn) string {
return conn.GetData("roomId").(string)
}),
)
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {

13
go.mod
View File

@ -11,17 +11,17 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/json-iterator/go v1.1.12
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/nats-io/nats.go v1.31.0
github.com/panjf2000/ants/v2 v2.8.1
github.com/panjf2000/gnet v1.6.7
github.com/smartystreets/goconvey v1.8.1
github.com/sony/sonyflake v1.2.0
github.com/spf13/cobra v1.7.0
github.com/tealeg/xlsx v1.0.5
github.com/tidwall/gjson v1.16.0
github.com/xtaci/kcp-go/v5 v5.6.3
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.25.0
golang.org/x/time v0.4.0
golang.org/x/crypto v0.14.0
google.golang.org/grpc v1.59.0
)
@ -39,24 +39,17 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/klauspost/reedsolomon v1.11.8 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/nats-server/v2 v2.10.4 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/smarty/assertions v1.15.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/templexxx/cpu v0.1.0 // indirect
github.com/templexxx/xorsimd v0.4.2 // indirect
@ -66,10 +59,8 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.14.0 // indirect

63
go.sum
View File

@ -84,10 +84,6 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
@ -114,35 +110,11 @@ github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.9.16 h1:SuNe6AyCcVy0g5326wtyU8TdqYmcPqzTjhkHojAjprc=
github.com/nats-io/nats-server/v2 v2.9.16/go.mod h1:z1cc5Q+kqJkz9mLUdlcSsdYnId4pyImHjNgoh6zxSC0=
github.com/nats-io/nats-server/v2 v2.10.3 h1:nk2QVLpJUh3/AhZCJlQdTfj2oeLDvWnn1Z6XzGlNFm0=
github.com/nats-io/nats-server/v2 v2.10.3/go.mod h1:lzrskZ/4gyMAh+/66cCd+q74c6v7muBypzfWhP/MAaM=
github.com/nats-io/nats-server/v2 v2.10.4 h1:uB9xcwon3tPXWAdmTJqqqC6cie3yuPWHJjjTBgaPNus=
github.com/nats-io/nats-server/v2 v2.10.4/go.mod h1:eWm2JmHP9Lqm2oemB6/XGi0/GwsZwtWf8HIPUsh+9ns=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ=
github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
@ -217,8 +189,6 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
@ -237,15 +207,10 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 h1:QfTh0HpN6hlw6D3vu8DAwC8pBIwikq0AI1evdm+FksE=
golang.org/x/exp v0.0.0-20221031165847-c99f073a8326/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@ -262,8 +227,6 @@ golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -271,10 +234,10 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -288,30 +251,16 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211204120058-94396e421777/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY=
golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@ -327,16 +276,12 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
@ -348,8 +293,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -10,6 +10,7 @@ import (
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/random"
"github.com/kercylan98/minotaur/utils/timer"
"github.com/panjf2000/gnet"
"github.com/xtaci/kcp-go/v5"
"net"
@ -38,7 +39,7 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn {
if index := strings.LastIndex(c.ip, ":"); index != -1 {
c.ip = c.ip[0:index]
}
c.writeLoop()
c.init()
return c
}
@ -58,7 +59,7 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn {
if index := strings.LastIndex(c.ip, ":"); index != -1 {
c.ip = c.ip[0:index]
}
c.writeLoop()
c.init()
return c
}
@ -75,7 +76,7 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn {
openTime: time.Now(),
},
}
c.writeLoop()
c.init()
return c
}
@ -91,7 +92,7 @@ func NewEmptyConn(server *Server) *Conn {
openTime: time.Now(),
},
}
c.writeLoop()
c.init()
return c
}
@ -104,6 +105,7 @@ type Conn struct {
// connection 长久保持的连接
type connection struct {
server *Server
ticker *timer.Ticker
remoteAddr net.Addr
ip string
ws *websocket.Conn
@ -118,6 +120,11 @@ type connection struct {
openTime time.Time
}
// Ticker 获取定时器
func (slf *Conn) Ticker() *timer.Ticker {
return slf.ticker
}
// GetServer 获取服务器
func (slf *Conn) GetServer() *Server {
return slf.server
@ -218,6 +225,12 @@ func (slf *Conn) SetWST(wst int) *Conn {
return slf
}
// PushAsyncMessage 推送异步消息,该消息将通过 Server.PushShuntAsyncMessage 函数推送
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Conn) PushAsyncMessage(caller func() error, callback func(err error), mark ...log.Field) {
slf.server.PushShuntAsyncMessage(slf, caller, callback, mark...)
}
// Write 向连接中写入数据
// - messageType: websocket模式中指定消息类型
func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
@ -240,8 +253,16 @@ func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
slf.loop.Put(cp)
}
// writeLoop 写循环
func (slf *Conn) writeLoop() {
func (slf *Conn) init() {
if slf.server.ticker != nil {
if slf.server.tickerAutonomy {
slf.ticker = timer.GetTicker(slf.server.connTickerSize)
} else {
slf.ticker = timer.GetTicker(slf.server.connTickerSize, timer.WithCaller(func(name string, caller func()) {
slf.server.PushShuntTickerMessage(slf, name, caller)
}))
}
}
slf.pool = concurrent.NewPool[*connPacket](10*1024,
func() *connPacket {
return &connPacket{}
@ -298,6 +319,12 @@ func (slf *Conn) Close(err ...error) {
} else if slf.kcp != nil {
_ = slf.kcp.Close()
}
if slf.ticker != nil {
slf.ticker.Release()
}
if slf.server.shuntMatcher != nil {
slf.server.releaseDispatcher(slf.server.shuntMatcher(slf))
}
slf.pool.Close()
slf.loop.Close()
slf.mu.Unlock()

View File

@ -16,8 +16,9 @@ const (
)
const (
serverMultipleMark = "Minotaur Multiple Server"
serverMark = "Minotaur Server"
serverMultipleMark = "Minotaur Multiple Server"
serverMark = "Minotaur Server"
serverSystemDispatcher = "system" // 系统消息分发器
)
const (

View File

@ -1,15 +0,0 @@
package server
// Cross 跨服接口
type Cross interface {
// Init 初始化跨服
// - serverId: 本服id
// - packetHandle.serverId: 发送跨服消息的服务器id
// - packetHandle.packet: 数据包
Init(server *Server, packetHandle func(serverId string, packet []byte)) error
// PushMessage 推送跨服消息
// - serverId: 目标服务器id
PushMessage(serverId string, packet []byte) error
// Release 释放资源
Release()
}

View File

@ -1,7 +0,0 @@
package cross
// Message 跨服消息数据结构
type Message struct {
ServerId string `json:"server_id"`
Packet []byte `json:"packet"`
}

View File

@ -1,89 +0,0 @@
package cross
import (
"encoding/json"
"fmt"
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log"
"github.com/nats-io/nats.go"
"time"
)
const (
nasMark = "Cross.Nats"
)
// NewNats 创建一个基于 Nats 实现的跨服消息功能组件
func NewNats(url string, options ...NatsOption) *Nats {
n := &Nats{
url: url,
subject: "MINOTAUR_CROSS",
messagePool: concurrent.NewPool[*Message](1024*100, func() *Message {
return new(Message)
}, func(data *Message) {
data.ServerId = ""
data.Packet = nil
}),
}
for _, option := range options {
option(n)
}
return n
}
// Nats 基于 Nats 实现的跨服消息功能组件
type Nats struct {
conn *nats.Conn
url string
subject string
options []nats.Option
messagePool *concurrent.Pool[*Message]
}
func (slf *Nats) Init(server *server.Server, packetHandle func(serverId string, packet []byte)) (err error) {
if slf.conn == nil {
if len(slf.options) == 0 {
slf.options = append(slf.options,
nats.ReconnectWait(time.Second*5),
nats.MaxReconnects(-1),
nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
log.Error(nasMark, log.String("info", "disconnect"), log.Err(err))
}),
nats.ReconnectHandler(func(conn *nats.Conn) {
log.Info(nasMark, log.String("info", "reconnect"))
}),
)
}
slf.conn, err = nats.Connect(slf.url, slf.options...)
if err != nil {
return err
}
}
_, err = slf.conn.Subscribe(fmt.Sprintf("%s_%d", slf.subject, server.GetID()), func(msg *nats.Msg) {
message := slf.messagePool.Get()
defer slf.messagePool.Release(message)
if err := json.Unmarshal(msg.Data, &message); err != nil {
log.Error(nasMark, log.Err(err))
return
}
packetHandle(message.ServerId, message.Packet)
})
return err
}
func (slf *Nats) PushMessage(serverId string, packet []byte) error {
message := slf.messagePool.Get()
defer slf.messagePool.Release(message)
message.ServerId = serverId
message.Packet = packet
data, err := json.Marshal(message)
if err != nil {
return err
}
return slf.conn.Publish(fmt.Sprintf("%s_%d", slf.subject, serverId), data)
}
func (slf *Nats) Release() {
slf.conn.Close()
}

View File

@ -1,28 +0,0 @@
package cross
import "github.com/nats-io/nats.go"
type NatsOption func(n *Nats)
// WithNatsSubject 通过对应的主题名称创建
// - 默认为MINOTAUR_CROSS
func WithNatsSubject(subject string) NatsOption {
return func(n *Nats) {
n.subject = subject
}
}
// WithNatsOptions 通过nats自带的可选项创建连接
func WithNatsOptions(options ...nats.Option) NatsOption {
return func(n *Nats) {
n.options = options
}
}
// WithNatsConn 指定通过特定的连接创建
// - 这将导致 WithNatsOptions 失效
func WithNatsConn(conn *nats.Conn) NatsOption {
return func(n *Nats) {
n.conn = conn
}
}

38
server/dispatcher.go Normal file
View File

@ -0,0 +1,38 @@
package server
import "github.com/kercylan98/minotaur/utils/buffer"
// generateDispatcher 生成消息分发器
func generateDispatcher(handler func(message *Message)) *dispatcher {
return &dispatcher{
buffer: buffer.NewUnboundedN[*Message](),
handler: handler,
}
}
// dispatcher 消息分发器
type dispatcher struct {
buffer *buffer.Unbounded[*Message]
handler func(message *Message)
}
func (slf *dispatcher) start() {
for {
select {
case message, ok := <-slf.buffer.Get():
if !ok {
return
}
slf.buffer.Load()
slf.handler(message)
}
}
}
func (slf *dispatcher) put(message *Message) {
slf.buffer.Put(message)
}
func (slf *dispatcher) close() {
slf.buffer.Close()
}

View File

@ -9,6 +9,5 @@ var (
ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported")
ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp")
ErrWebsocketIllegalMessageType = errors.New("illegal message type")
ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server")
ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server")
)

View File

@ -19,7 +19,6 @@ type StopEventHandle func(srv *Server)
type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte)
type ConnectionOpenedEventHandle func(srv *Server, conn *Conn)
type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any)
type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId string, packet []byte)
type MessageErrorEventHandle func(srv *Server, message *Message, err error)
type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Duration)
type ConsoleCommandEventHandle func(srv *Server)
@ -40,7 +39,6 @@ func newEvent(srv *Server) *event {
connectionReceivePacketEventHandles: slice.NewPriority[ConnectionReceivePacketEventHandle](),
connectionOpenedEventHandles: slice.NewPriority[ConnectionOpenedEventHandle](),
connectionClosedEventHandles: slice.NewPriority[ConnectionClosedEventHandle](),
receiveCrossPacketEventHandles: slice.NewPriority[ReceiveCrossPacketEventHandle](),
messageErrorEventHandles: slice.NewPriority[MessageErrorEventHandle](),
messageLowExecEventHandles: slice.NewPriority[MessageLowExecEventHandle](),
connectionOpenedAfterEventHandles: slice.NewPriority[ConnectionOpenedAfterEventHandle](),
@ -61,7 +59,6 @@ type event struct {
connectionReceivePacketEventHandles *slice.Priority[ConnectionReceivePacketEventHandle]
connectionOpenedEventHandles *slice.Priority[ConnectionOpenedEventHandle]
connectionClosedEventHandles *slice.Priority[ConnectionClosedEventHandle]
receiveCrossPacketEventHandles *slice.Priority[ReceiveCrossPacketEventHandle]
messageErrorEventHandles *slice.Priority[MessageErrorEventHandle]
messageLowExecEventHandles *slice.Priority[MessageLowExecEventHandle]
connectionOpenedAfterEventHandles *slice.Priority[ConnectionOpenedAfterEventHandle]
@ -119,7 +116,7 @@ func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEv
}
func (slf *event) OnConsoleCommandEvent(command string) {
PushSystemMessage(slf.Server, func() {
slf.PushSystemMessage(func() {
handles, exist := slf.consoleCommandEventHandles[command]
if !exist {
switch command {
@ -135,7 +132,7 @@ func (slf *event) OnConsoleCommandEvent(command string) {
return true
})
}
}, "ConsoleCommandEvent")
}, log.String("Event", "OnConsoleCommandEvent"))
}
// RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数
@ -165,12 +162,12 @@ func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle, priority ..
}
func (slf *event) OnStartFinishEvent() {
PushSystemMessage(slf.Server, func() {
slf.PushSystemMessage(func() {
slf.startFinishEventHandles.RangeValue(func(index int, value StartFinishEventHandle) bool {
value(slf.Server)
return true
})
}, "StartFinishEvent")
}, log.String("Event", "OnStartFinishEvent"))
if slf.Server.limitLife > 0 {
go func() {
time.Sleep(slf.Server.limitLife)
@ -189,13 +186,13 @@ func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle, p
}
func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) {
PushSystemMessage(slf.Server, func() {
slf.PushSystemMessage(func() {
slf.Server.online.Delete(conn.GetID())
slf.connectionClosedEventHandles.RangeValue(func(index int, value ConnectionClosedEventHandle) bool {
value(slf.Server, conn, err)
return true
})
}, "ConnectionClosedEvent")
}, log.String("Event", "OnConnectionClosedEvent"))
}
// RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数
@ -208,13 +205,13 @@ func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle, p
}
func (slf *event) OnConnectionOpenedEvent(conn *Conn) {
PushSystemMessage(slf.Server, func() {
slf.PushSystemMessage(func() {
slf.Server.online.Set(conn.GetID(), conn)
slf.connectionOpenedEventHandles.RangeValue(func(index int, value ConnectionOpenedEventHandle) bool {
value(slf.Server, conn)
return true
})
}, "ConnectionOpenedEvent")
}, log.String("Event", "OnConnectionOpenedEvent"))
}
// RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数
@ -233,19 +230,6 @@ func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) {
})
}
// RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数
func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle, priority ...int) {
slf.receiveCrossPacketEventHandles.Append(handle, slice.GetValue(priority, 0))
log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String()))
}
func (slf *event) OnReceiveCrossPacketEvent(serverId string, packet []byte) {
slf.receiveCrossPacketEventHandles.RangeValue(func(index int, value ReceiveCrossPacketEventHandle) bool {
value(slf.Server, serverId, packet)
return true
})
}
// RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数
func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle, priority ...int) {
slf.messageErrorEventHandles.Append(handle, slice.GetValue(priority, 0))
@ -266,9 +250,6 @@ func (slf *event) OnMessageErrorEvent(message *Message, err error) {
value(slf.Server, message, err)
return true
})
PushSystemMessage(slf.Server, func() {
}, "MessageErrorEvent")
}
// RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数
@ -298,12 +279,12 @@ func (slf *event) RegConnectionOpenedAfterEvent(handle ConnectionOpenedAfterEven
}
func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) {
PushSystemMessage(slf.Server, func() {
slf.PushSystemMessage(func() {
slf.connectionOpenedAfterEventHandles.RangeValue(func(index int, value ConnectionOpenedAfterEventHandle) bool {
value(slf.Server, conn)
return true
})
}, "ConnectionOpenedAfterEvent")
}, log.String("Event", "OnConnectionOpenedAfterEvent"))
}
// RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数
@ -334,12 +315,12 @@ func (slf *event) RegShuntChannelCreatedEvent(handle ShuntChannelCreatedEventHan
}
func (slf *event) OnShuntChannelCreatedEvent(guid int64) {
PushSystemMessage(slf.Server, func() {
slf.PushSystemMessage(func() {
slf.shuntChannelCreatedEventHandles.RangeValue(func(index int, value ShuntChannelCreatedEventHandle) bool {
value(slf.Server, guid)
return true
})
}, "ShuntChannelCreatedEvent")
}, log.String("Event", "OnShuntChannelCreatedEvent"))
}
// RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数
@ -349,12 +330,12 @@ func (slf *event) RegShuntChannelCloseEvent(handle ShuntChannelClosedEventHandle
}
func (slf *event) OnShuntChannelClosedEvent(guid int64) {
PushSystemMessage(slf.Server, func() {
slf.PushSystemMessage(func() {
slf.shuntChannelClosedEventHandles.RangeValue(func(index int, value ShuntChannelClosedEventHandle) bool {
value(slf.Server, guid)
return true
})
}, "ShuntChannelCloseEvent")
}, log.String("Event", "OnShuntChannelClosedEvent"))
}
// RegConnectionPacketPreprocessEvent 在接收到数据包后将立刻执行被注册的事件处理函数
@ -441,9 +422,4 @@ func (slf *event) check() {
log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed"))
}
}
if slf.receiveCrossPacketEventHandles.Len() > 0 && slf.cross == nil {
log.Warn("Server", log.String("ReceiveCrossPacketEvent", "invalid server, not register cross server"))
}
}

View File

@ -40,7 +40,7 @@ func (slf *gNet) AfterWrite(c gnet.Conn, b []byte) {
}
func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Action) {
PushPacketMessage(slf.Server, c.Context().(*Conn), 0, bytes.Clone(packet))
slf.Server.PushPacketMessage(c.Context().(*Conn), 0, bytes.Clone(packet))
return nil, gnet.None
}

View File

@ -1,44 +1,51 @@
package server
import (
"context"
"fmt"
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
"reflect"
)
const (
// MessageTypePacket 数据包消息类型:该类型的数据将被发送到 ConnectionReceivePacketEvent 进行处理
MessageTypePacket MessageType = iota
MessageTypePacket MessageType = iota + 1
// MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理
MessageTypeError
// MessageTypeCross 跨服消息类型:将被推送到跨服的 Cross 实现中进行处理
MessageTypeCross
// MessageTypeTicker 定时器消息类型
MessageTypeTicker
// MessageTypeShuntTicker 分流定时器消息类型
MessageTypeShuntTicker
// MessageTypeAsync 异步消息类型
MessageTypeAsync
// MessageTypeAsyncCallback 异步回调消息类型
MessageTypeAsyncCallback
// MessageTypeShuntAsync 分流异步消息类型
MessageTypeShuntAsync
// MessageTypeShuntAsyncCallback 分流异步回调消息类型
MessageTypeShuntAsyncCallback
// MessageTypeSystem 系统消息类型
MessageTypeSystem
)
var messageNames = map[MessageType]string{
MessageTypePacket: "MessageTypePacket",
MessageTypeError: "MessageTypeError",
MessageTypeCross: "MessageTypeCross",
MessageTypeTicker: "MessageTypeTicker",
MessageTypeAsync: "MessageTypeAsync",
MessageTypeAsyncCallback: "MessageTypeAsyncCallback",
MessageTypeSystem: "MessageTypeSystem",
MessageTypePacket: "MessageTypePacket",
MessageTypeError: "MessageTypeError",
MessageTypeTicker: "MessageTypeTicker",
MessageTypeShuntTicker: "MessageTypeShuntTicker",
MessageTypeAsync: "MessageTypeAsync",
MessageTypeAsyncCallback: "MessageTypeAsyncCallback",
MessageTypeShuntAsync: "MessageTypeShuntAsync",
MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback",
MessageTypeSystem: "MessageTypeSystem",
}
const (
@ -51,12 +58,6 @@ var messageErrorActionNames = map[MessageErrorAction]string{
MessageErrorActionShutdown: "Shutdown",
}
var (
messagePacketVisualization = func(packet []byte) string {
return string(packet)
}
)
type (
// MessageType 消息类型
MessageType byte
@ -76,8 +77,30 @@ func (slf MessageErrorAction) String() string {
// Message 服务器消息
type Message struct {
t MessageType // 消息类型
attrs []any // 消息属性
conn *Conn
ordinaryHandler func()
exceptionHandler func() error
errHandler func(err error)
packet []byte
err error
name string
t MessageType
errAction MessageErrorAction
marks []log.Field
}
// reset 重置消息结构体
func (slf *Message) reset() {
slf.conn = nil
slf.ordinaryHandler = nil
slf.exceptionHandler = nil
slf.errHandler = nil
slf.packet = nil
slf.err = nil
slf.name = ""
slf.t = 0
slf.errAction = 0
slf.marks = nil
}
// MessageType 返回消息类型
@ -85,37 +108,9 @@ func (slf *Message) MessageType() MessageType {
return slf.t
}
// AttrsString 返回消息属性的字符串表示
func (slf *Message) AttrsString() string {
var attrs = make([]any, 0, len(slf.attrs))
for _, attr := range slf.attrs {
if conn, hit := attr.(*Conn); hit {
attrs = append(attrs, conn.GetID())
continue
}
if tof := reflect.TypeOf(attr); tof.Kind() == reflect.Func {
attrs = append(attrs, tof.String())
continue
}
attrs = append(attrs, attr)
}
if len(attrs) == 0 {
return "NoneAttr"
}
return string(super.MarshalJSON(attrs))
}
// String 返回消息的字符串表示
func (slf *Message) String() string {
var attrs = make([]any, 0, len(slf.attrs))
for _, attr := range slf.attrs {
if reflect.TypeOf(attr).Kind() == reflect.Func {
continue
}
attrs = append(attrs, attr)
}
return fmt.Sprintf("[%s] %s", slf.t, slf.AttrsString())
return fmt.Sprintf("[%s] %s", slf.t, super.MarshalJSON(slf.marks))
}
// String 返回消息类型的字符串表示
@ -123,119 +118,56 @@ func (slf MessageType) String() string {
return messageNames[slf]
}
// GetPacketMessageAttrs 获取消息中的数据包属性
func (slf *Message) GetPacketMessageAttrs() (conn *Conn, packet []byte) {
conn = slf.attrs[0].(*Conn)
packet = slf.attrs[1].([]byte)
return
// castToPacketMessage 将消息转换为数据包消息
func (slf *Message) castToPacketMessage(conn *Conn, packet []byte, mark ...log.Field) *Message {
slf.t, slf.conn, slf.packet, slf.marks = MessageTypePacket, conn, packet, mark
return slf
}
// PushPacketMessage 向特定服务器中推送 MessageTypePacket 消息
func PushPacketMessage(srv *Server, conn *Conn, wst int, packet []byte, mark ...any) {
msg := srv.messagePool.Get()
msg.t = MessageTypePacket
msg.attrs = append([]any{&Conn{ctx: context.WithValue(conn.ctx, contextKeyWST, wst), connection: conn.connection}, packet}, mark...)
srv.pushMessage(msg)
// castToTickerMessage 将消息转换为定时器消息
func (slf *Message) castToTickerMessage(name string, caller func(), mark ...log.Field) *Message {
slf.t, slf.name, slf.ordinaryHandler, slf.marks = MessageTypeTicker, name, caller, mark
return slf
}
// GetErrorMessageAttrs 获取消息中的错误属性
func (slf *Message) GetErrorMessageAttrs() (err error, action MessageErrorAction) {
err = slf.attrs[0].(error)
action = slf.attrs[1].(MessageErrorAction)
return
// castToShuntTickerMessage 将消息转换为分发器定时器消息
func (slf *Message) castToShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) *Message {
slf.t, slf.conn, slf.name, slf.ordinaryHandler, slf.marks = MessageTypeShuntTicker, slf.conn, name, caller, mark
return slf
}
// PushErrorMessage 向特定服务器中推送 MessageTypeError 消息
func PushErrorMessage(srv *Server, err error, action MessageErrorAction, mark ...any) {
msg := srv.messagePool.Get()
msg.t = MessageTypeError
msg.attrs = append([]any{err, action}, mark...)
srv.pushMessage(msg)
// castToAsyncMessage 将消息转换为异步消息
func (slf *Message) castToAsyncMessage(caller func() error, callback func(err error), mark ...log.Field) *Message {
slf.t, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeAsync, caller, callback, mark
return slf
}
// GetCrossMessageAttrs 获取消息中的跨服属性
func (slf *Message) GetCrossMessageAttrs() (serverId string, packet []byte) {
serverId = slf.attrs[0].(string)
packet = slf.attrs[1].([]byte)
return
// castToAsyncCallbackMessage 将消息转换为异步回调消息
func (slf *Message) castToAsyncCallbackMessage(err error, caller func(err error), mark ...log.Field) *Message {
slf.t, slf.err, slf.errHandler, slf.marks = MessageTypeAsyncCallback, err, caller, mark
return slf
}
// PushCrossMessage 向特定服务器中推送 MessageTypeCross 消息
func PushCrossMessage(srv *Server, crossName string, serverId string, packet []byte, mark ...any) {
if serverId == srv.id {
msg := srv.messagePool.Get()
msg.t = MessageTypeCross
msg.attrs = append([]any{serverId, packet}, mark...)
srv.pushMessage(msg)
} else {
if len(srv.cross) == 0 {
return
}
cross, exist := srv.cross[crossName]
if !exist {
return
}
_ = cross.PushMessage(serverId, packet)
}
// castToShuntAsyncMessage 将消息转换为分流异步消息
func (slf *Message) castToShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) *Message {
slf.t, slf.conn, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeShuntAsync, conn, caller, callback, mark
return slf
}
// GetTickerMessageAttrs 获取消息中的定时器属性
func (slf *Message) GetTickerMessageAttrs() (caller func()) {
caller = slf.attrs[0].(func())
return
// castToShuntAsyncCallbackMessage 将消息转换为分流异步回调消息
func (slf *Message) castToShuntAsyncCallbackMessage(conn *Conn, err error, caller func(err error), mark ...log.Field) *Message {
slf.t, slf.conn, slf.err, slf.errHandler, slf.marks = MessageTypeShuntAsyncCallback, conn, err, caller, mark
return slf
}
// PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息
func PushTickerMessage(srv *Server, caller func(), mark ...any) {
srv.messageLock.RLock()
if srv.messagePool == nil {
srv.messageLock.RUnlock()
return
}
srv.messageLock.RUnlock()
msg := srv.messagePool.Get()
msg.t = MessageTypeTicker
msg.attrs = append([]any{caller}, mark...)
srv.pushMessage(msg)
// castToSystemMessage 将消息转换为系统消息
func (slf *Message) castToSystemMessage(caller func(), mark ...log.Field) *Message {
slf.t, slf.ordinaryHandler, slf.marks = MessageTypeSystem, caller, mark
return slf
}
// GetAsyncMessageAttrs 获取消息中的异步消息属性
func (slf *Message) GetAsyncMessageAttrs() (caller func() error, callback func(err error), hasCallback bool) {
caller = slf.attrs[0].(func() error)
callback, hasCallback = slf.attrs[1].(func(err error))
return
}
// PushAsyncMessage 向特定服务器中推送 MessageTypeAsync 消息
// - 异步消息将在服务器的异步消息队列中进行处理,处理完成 caller 的阻塞操作后,将会通过系统消息执行 callback 函数
// - callback 函数将在异步消息处理完成后进行调用,无论过程是否产生 err都将被执行允许为 nil
// - 需要注意的是为了避免并发问题caller 函数请仅处理阻塞操作,其他操作应该在 callback 函数中进行
//
// 在通过 WithShunt 使用分流服务器时,异步消息不会转换到分流通道中进行处理。依旧需要注意上方第三条
func PushAsyncMessage(srv *Server, caller func() error, callback func(err error), mark ...any) {
msg := srv.messagePool.Get()
msg.t = MessageTypeAsync
msg.attrs = append([]any{caller, callback}, mark...)
srv.pushMessage(msg)
}
// GetSystemMessageAttrs 获取消息中的系统消息属性
func (slf *Message) GetSystemMessageAttrs() (handle func()) {
handle = slf.attrs[0].(func())
return
}
// PushSystemMessage 向特定服务器中推送 MessageTypeSystem 消息
func PushSystemMessage(srv *Server, handle func(), mark ...any) {
msg := srv.messagePool.Get()
msg.t = MessageTypeSystem
msg.attrs = append([]any{handle}, mark...)
srv.pushMessage(msg)
}
// SetMessagePacketVisualizer 设置消息可视化函数
// - 消息可视化将在慢消息等情况用于打印,使用自定消息可视化函数可以便于开发者进行调试
// - 默认的消息可视化函数将直接返回消息的字符串表示
func SetMessagePacketVisualizer(handle func(packet []byte) string) {
messagePacketVisualization = handle
// castToErrorMessage 将消息转换为错误消息
func (slf *Message) castToErrorMessage(err error, action MessageErrorAction, mark ...log.Field) *Message {
slf.t, slf.err, slf.errAction, slf.marks = MessageTypeError, err, action, mark
return slf
}

View File

@ -2,12 +2,10 @@ package server
import (
"github.com/gin-contrib/pprof"
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/timer"
"google.golang.org/grpc"
"reflect"
"runtime/debug"
"time"
)
@ -31,17 +29,44 @@ type option struct {
}
type runtime struct {
id string // 服务器id
cross map[string]Cross // 跨服
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小
ticker *timer.Ticker // 定时器
websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
deadlockDetect time.Duration // 是否开启死锁检测
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小
ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小
websocketReadDeadline time.Duration // websocket连接超时时间
websocketCompression int // websocket压缩等级
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
shuntMatcher func(conn *Conn) string // 分流匹配器
}
// WithShunt 通过连接数据包分流的方式创建服务器
// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况
// - shuntMatcher用于匹配连接的函数返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道
//
// 将被分流的消息类型(更多类型有待斟酌):
// - MessageTypePacket
//
// 注意事项:
// - 当分流匹配过程发生 panic 将会在系统通道内处理消息,并打印日志
func WithShunt(shuntMatcher func(conn *Conn) string) Option {
return func(srv *Server) {
if shuntMatcher == nil {
log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil"))
return
}
srv.shuntMatcher = func(conn *Conn) string {
defer func() {
if err := recover(); err != nil {
log.Error("ShuntMatcher", log.String("State", "Panic"), log.Any("Error", err), log.String("Stack", string(debug.Stack())))
}
}()
return shuntMatcher(conn)
}
}
}
// WithLimitLife 通过限制最大生命周期的方式创建服务器
@ -119,46 +144,20 @@ func WithWebsocketReadDeadline(t time.Duration) Option {
// WithTicker 通过定时器创建服务器,为服务器添加定时器功能
// - autonomy定时器是否独立运行独立运行的情况下不会作为服务器消息运行会导致并发问题
func WithTicker(size int, autonomy bool) Option {
func WithTicker(size, connSize int, autonomy bool) Option {
return func(srv *Server) {
srv.connTickerSize = connSize
srv.tickerAutonomy = autonomy
if !autonomy {
srv.ticker = timer.GetTicker(size)
} else {
srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) {
PushTickerMessage(srv, caller, name)
srv.PushTickerMessage(name, caller)
}))
}
}
}
// WithCross 通过跨服的方式创建服务器
// - 推送跨服消息时,将推送到对应 crossName 的跨服中间件中crossName 可以满足不同功能采用不同的跨服/消息中间件
// - 通常情况下 crossName 仅需一个即可
func WithCross(crossName string, serverId string, cross Cross) Option {
return func(srv *Server) {
start:
{
srv.id = serverId
if srv.cross == nil {
srv.cross = map[string]Cross{}
}
srv.cross[crossName] = cross
err := cross.Init(srv, func(serverId string, packet []byte) {
msg := srv.messagePool.Get()
msg.t = MessageTypeCross
msg.attrs = []any{serverId, packet}
srv.pushMessage(msg)
})
if err != nil {
log.Info("Cross", log.String("ServerID", serverId), log.String("Cross", reflect.TypeOf(cross).String()), log.String("State", "WaitNatsRun"))
time.Sleep(1 * time.Second)
goto start
}
log.Info("Cross", log.String("ServerID", serverId), log.String("Cross", reflect.TypeOf(cross).String()))
}
}
}
// WithTLS 通过安全传输层协议TLS创建服务器
// - 支持Http、Websocket
func WithTLS(certFile, keyFile string) Option {
@ -227,23 +226,3 @@ func WithPProf(pattern ...string) Option {
pprof.Register(srv.ginServer, pattern...)
}
}
// WithShunt 通过连接数据包分流的方式创建服务器
// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况
// - shuntMatcher用于匹配连接的函数返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道
//
// 将被分流的消息类型(更多类型有待斟酌):
// - MessageTypePacket
//
// 注意事项:
// - 需要在分流通道使用完成后主动调用 Server.ShuntChannelFreed 函数释放分流通道,避免内存泄漏
func WithShunt(shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool)) Option {
return func(srv *Server) {
if shuntMatcher == nil {
log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil"))
return
}
srv.shuntChannels = concurrent.NewBalanceMap[int64, *buffer.Unbounded[*Message]]()
srv.shuntMatcher = shuntMatcher
}
}

View File

@ -6,7 +6,6 @@ import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/kercylan98/minotaur/utils/concurrent"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/network"
@ -42,6 +41,7 @@ func New(network Network, options ...Option) *Server {
closeChannel: make(chan struct{}, 1),
systemSignal: make(chan os.Signal, 1),
ctx: context.Background(),
dispatchers: make(map[string]*dispatcher),
}
server.event = newEvent(server)
@ -78,31 +78,30 @@ func New(network Network, options ...Option) *Server {
// Server 网络服务器
type Server struct {
*event // 事件
*runtime // 运行时
*option // 可选项
network Network // 网络类型
addr string // 侦听地址
systemSignal chan os.Signal // 系统信号
online *concurrent.BalanceMap[string, *Conn] // 在线连接
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
gServer *gNet // TCP或UDP模式下的服务器
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
ants *ants.Pool // 协程池
messagePool *concurrent.Pool[*Message] // 消息池
messageLock sync.RWMutex // 消息锁
messageChannel *buffer.Unbounded[*Message] // 消息无界缓冲区
multiple *MultipleServer // 多服务器模式下的服务器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
runMode RunMode // 运行模式
shuntChannels *concurrent.BalanceMap[int64, *buffer.Unbounded[*Message]] // 分流管道
shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器
messageCounter atomic.Int64 // 消息计数器
ctx context.Context // 上下文
*event // 事件
*runtime // 运行时
*option // 可选项
network Network // 网络类型
addr string // 侦听地址
systemSignal chan os.Signal // 系统信号
online *concurrent.BalanceMap[string, *Conn] // 在线连接
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
gServer *gNet // TCP或UDP模式下的服务器
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
ants *ants.Pool // 协程池
messagePool *concurrent.Pool[*Message] // 消息池
messageLock sync.RWMutex // 消息锁
multiple *MultipleServer // 多服务器模式下的服务器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
runMode RunMode // 运行模式
messageCounter atomic.Int64 // 消息计数器
ctx context.Context // 上下文
dispatchers map[string]*dispatcher // 消息分发器
dispatcherLock sync.RWMutex // 消息分发器锁
}
// Run 使用特定地址运行服务器
@ -135,13 +134,9 @@ func (slf *Server) Run(addr string) error {
return &Message{}
},
func(data *Message) {
data.t = 0
data.attrs = nil
data.reset()
},
)
slf.messageChannel = buffer.NewUnbounded[*Message](func() *Message {
return nil
})
slf.messageLock.Unlock()
if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC {
slf.gServer = &gNet{Server: slf}
@ -151,16 +146,8 @@ func (slf *Server) Run(addr string) error {
}
go func() {
messageInitFinish <- struct{}{}
for {
select {
case msg, ok := <-slf.messageChannel.Get():
if !ok {
return
}
slf.messageChannel.Load()
slf.dispatchMessage(msg)
}
}
d, _ := slf.useDispatcher(serverSystemDispatcher)
d.start()
}()
}
@ -181,7 +168,7 @@ func (slf *Server) Run(addr string) error {
slf.OnStartBeforeEvent()
if err := slf.grpcServer.Serve(listener); err != nil {
slf.isRunning = false
PushErrorMessage(slf, err, MessageErrorActionShutdown)
slf.PushErrorMessage(err, MessageErrorActionShutdown)
}
}()
case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix:
@ -195,7 +182,7 @@ func (slf *Server) Run(addr string) error {
gnet.WithMulticore(true),
); err != nil {
slf.isRunning = false
PushErrorMessage(slf, err, MessageErrorActionShutdown)
slf.PushErrorMessage(err, MessageErrorActionShutdown)
}
})
case NetworkKcp:
@ -236,7 +223,7 @@ func (slf *Server) Run(addr string) error {
}
panic(err)
}
PushPacketMessage(slf, conn, 0, buf[:n])
slf.PushPacketMessage(conn, 0, buf[:n])
}
}(conn)
}
@ -258,12 +245,12 @@ func (slf *Server) Run(addr string) error {
if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil {
slf.isRunning = false
PushErrorMessage(slf, err, MessageErrorActionShutdown)
slf.PushErrorMessage(err, MessageErrorActionShutdown)
}
} else {
if err := slf.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
slf.isRunning = false
PushErrorMessage(slf, err, MessageErrorActionShutdown)
slf.PushErrorMessage(err, MessageErrorActionShutdown)
}
}
@ -337,7 +324,7 @@ func (slf *Server) Run(addr string) error {
if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] {
panic(ErrWebsocketIllegalMessageType)
}
PushPacketMessage(slf, conn, messageType, packet)
slf.PushPacketMessage(conn, messageType, packet)
}
})
go func() {
@ -346,12 +333,12 @@ func (slf *Server) Run(addr string) error {
if len(slf.certFile)+len(slf.keyFile) > 0 {
if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil {
slf.isRunning = false
PushErrorMessage(slf, err, MessageErrorActionShutdown)
slf.PushErrorMessage(err, MessageErrorActionShutdown)
}
} else {
if err := http.ListenAndServe(slf.addr, nil); err != nil {
slf.isRunning = false
PushErrorMessage(slf, err, MessageErrorActionShutdown)
slf.PushErrorMessage(err, MessageErrorActionShutdown)
}
}
@ -372,7 +359,7 @@ func (slf *Server) Run(addr string) error {
log.String("ip", ip.String()),
log.String("listen", slf.addr),
)
log.Info("Server", log.String(serverMark, "===================================================================="))
log.Error("Server", log.String(serverMark, "===================================================================="))
slf.OnStartFinishEvent()
time.Sleep(time.Second)
if !slf.isShutdown.Load() {
@ -442,14 +429,6 @@ func (slf *Server) CloseConn(id string) {
}
}
// GetID 获取服务器id
func (slf *Server) GetID() string {
if slf.cross == nil {
panic(ErrNoSupportCross)
}
return slf.id
}
// Ticker 获取服务器定时器
func (slf *Server) Ticker() *timer.Ticker {
if slf.ticker == nil {
@ -491,21 +470,12 @@ func (slf *Server) shutdown(err error) {
slf.ants.Release()
slf.ants = nil
}
for _, cross := range slf.cross {
cross.Release()
}
if slf.messageChannel != nil {
slf.messageChannel.Close()
slf.messagePool.Close()
}
if slf.shuntChannels != nil {
slf.shuntChannels.Range(func(key int64, c *buffer.Unbounded[*Message]) bool {
c.Close()
return false
})
slf.shuntChannels.Clear()
slf.shuntChannels = nil
slf.dispatcherLock.Lock()
for s, d := range slf.dispatchers {
d.close()
delete(slf.dispatchers, s)
}
slf.dispatcherLock.Unlock()
if slf.grpcServer != nil && slf.isRunning {
slf.grpcServer.GracefulStop()
}
@ -576,17 +546,30 @@ func (slf *Server) GetMessageCount() int64 {
return slf.messageCounter.Load()
}
// ShuntChannelFreed 释放分流通道
func (slf *Server) ShuntChannelFreed(channelGuid int64) {
if slf.shuntChannels == nil {
return
}
channel, exist := slf.shuntChannels.GetExist(channelGuid)
// useDispatcher 添加消息分发器
// - 该函数在分发器不重复的情况下将创建分发器,当分发器已存在将直接返回
func (slf *Server) useDispatcher(name string) (*dispatcher, bool) {
slf.dispatcherLock.Lock()
d, exist := slf.dispatchers[name]
if exist {
channel.Close()
slf.shuntChannels.Delete(channelGuid)
slf.OnShuntChannelClosedEvent(channelGuid)
slf.dispatcherLock.Unlock()
return d, false
}
d = generateDispatcher(slf.dispatchMessage)
slf.dispatchers[name] = d
slf.dispatcherLock.Unlock()
return d, true
}
// releaseDispatcher 关闭消息分发器
func (slf *Server) releaseDispatcher(name string) {
slf.dispatcherLock.Lock()
d, exist := slf.dispatchers[name]
if exist {
delete(slf.dispatchers, name)
d.close()
}
slf.dispatcherLock.Unlock()
}
// pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求
@ -595,49 +578,39 @@ func (slf *Server) pushMessage(message *Message) {
slf.messagePool.Release(message)
return
}
if slf.shuntChannels != nil && message.t == MessageTypePacket {
conn := message.attrs[0].(*Conn)
channelGuid, allowToCreate := slf.shuntMatcher(conn)
channel, exist := slf.shuntChannels.GetExist(channelGuid)
if !exist && allowToCreate {
channel = buffer.NewUnbounded[*Message](func() *Message {
return nil
})
slf.shuntChannels.Set(channelGuid, channel)
go func(channel *buffer.Unbounded[*Message]) {
for {
select {
case msg, ok := <-channel.Get():
if !ok {
return
}
channel.Load()
slf.dispatchMessage(msg)
}
}
}(channel)
defer slf.OnShuntChannelCreatedEvent(channelGuid)
var dispatcher *dispatcher
switch message.t {
case MessageTypePacket:
if slf.shuntMatcher == nil {
dispatcher, _ = slf.useDispatcher(serverSystemDispatcher)
break
}
if channel != nil {
slf.messageCounter.Add(1)
channel.Put(message)
return
fallthrough
case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback:
var created bool
dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn))
if created {
go dispatcher.start()
}
case MessageTypeSystem, MessageTypeAsync, MessageTypeAsyncCallback, MessageTypeError, MessageTypeTicker:
dispatcher, _ = slf.useDispatcher(serverSystemDispatcher)
}
if dispatcher == nil {
return
}
slf.messageCounter.Add(1)
slf.messageChannel.Put(message)
dispatcher.put(message)
}
func (slf *Server) low(message *Message, present time.Time, expect time.Duration, messageReplace ...string) {
cost := time.Since(present)
if cost > expect {
var m = "unknown"
if message != nil {
m = message.String()
} else if len(messageReplace) > 0 {
m = messageReplace[0]
if len(messageReplace) > 0 {
for i, s := range messageReplace {
message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s))
}
}
log.Warn("Server", log.String("type", "low-message"), log.String("cost", cost.String()), log.String("message", m), log.Stack("stack"))
log.Warn("Server", log.String("type", "low-message"), log.String("cost", cost.String()), log.String("message", message.String()), log.Stack("stack"))
slf.OnMessageLowExecEvent(message, cost)
}
}
@ -654,7 +627,7 @@ func (slf *Server) dispatchMessage(msg *Message) {
select {
case <-ctx.Done():
if err := ctx.Err(); err == context.DeadlineExceeded {
log.Warn("Server", log.String("MessageType", messageNames[msg.t]), log.Any("SuspectedDeadlock", msg))
log.Warn("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("SuspectedDeadlock", msg))
}
}
}(ctx, msg)
@ -665,7 +638,7 @@ func (slf *Server) dispatchMessage(msg *Message) {
defer func(msg *Message) {
if err := recover(); err != nil {
stack := string(debug.Stack())
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("MessageAttrs", msg.AttrsString()), log.Any("error", err), log.String("stack", stack))
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("error", err), log.String("stack", stack))
fmt.Println(stack)
if e, ok := err.(error); ok {
slf.OnMessageErrorEvent(msg, e)
@ -683,29 +656,23 @@ func (slf *Server) dispatchMessage(msg *Message) {
}(msg)
}
var attrs = msg.attrs
switch msg.t {
case MessageTypePacket:
var conn, packet = msg.GetPacketMessageAttrs()
if !slf.OnConnectionPacketPreprocessEvent(conn, packet, func(newPacket []byte) { packet = newPacket }) {
slf.OnConnectionReceivePacketEvent(conn, packet)
if !slf.OnConnectionPacketPreprocessEvent(msg.conn, msg.packet, func(newPacket []byte) { msg.packet = newPacket }) {
slf.OnConnectionReceivePacketEvent(msg.conn, msg.packet)
}
case MessageTypeError:
var err, action = msg.GetErrorMessageAttrs()
switch action {
switch msg.errAction {
case MessageErrorActionNone:
log.Panic("Server", log.Err(err))
log.Panic("Server", log.Err(msg.err))
case MessageErrorActionShutdown:
slf.shutdown(err)
slf.shutdown(msg.err)
default:
log.Warn("Server", log.String("not support message error action", action.String()))
log.Warn("Server", log.String("not support message error action", msg.errAction.String()))
}
case MessageTypeCross:
slf.OnReceiveCrossPacketEvent(msg.GetCrossMessageAttrs())
case MessageTypeTicker:
msg.GetTickerMessageAttrs()()
case MessageTypeAsync:
handle, callback, cb := msg.GetAsyncMessageAttrs()
case MessageTypeTicker, MessageTypeShuntTicker:
msg.ordinaryHandler()
case MessageTypeAsync, MessageTypeShuntAsync:
if err := slf.ants.Submit(func() {
defer func() {
if err := recover(); err != nil {
@ -724,32 +691,113 @@ func (slf *Server) dispatchMessage(msg *Message) {
slf.messagePool.Release(msg)
}
}()
err := handle()
if cb && callback != nil {
acm := slf.messagePool.Get()
acm.t = MessageTypeAsyncCallback
if len(attrs) > 2 {
acm.attrs = append([]any{func() { callback(err) }}, attrs[2:]...)
} else {
acm.attrs = []any{func() { callback(err) }}
var err error
if msg.exceptionHandler != nil {
err = msg.exceptionHandler()
}
if msg.errHandler != nil {
if msg.conn == nil {
slf.PushAsyncCallbackMessage(err, msg.errHandler)
return
}
slf.pushMessage(acm)
} else if err != nil {
slf.PushShuntAsyncCallbackMessage(msg.conn, err, msg.errHandler)
return
}
if err != nil {
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack())))
}
}); err != nil {
panic(err)
}
case MessageTypeAsyncCallback: // 特殊类型
attrs[0].(func())()
case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback:
msg.errHandler(msg.err)
case MessageTypeSystem:
msg.GetSystemMessageAttrs()()
msg.ordinaryHandler()
default:
log.Warn("Server", log.String("not support message type", msg.t.String()))
}
}
// PushAsyncMessage 是 PushAsyncMessage 的快捷方式
func (slf *Server) PushAsyncMessage(caller func() error, callback func(err error), mark ...any) {
PushAsyncMessage(slf, caller, callback, mark...)
// PushSystemMessage 向服务器中推送 MessageTypeSystem 消息
// - 系统消息仅包含一个可执行函数,将在系统分发器中执行
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushSystemMessage(handler func(), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToSystemMessage(handler, mark...))
}
// PushAsyncMessage 向服务器中推送 MessageTypeAsync 消息
// - 异步消息将在服务器的异步消息队列中进行处理,处理完成 caller 的阻塞操作后,将会通过系统消息执行 callback 函数
// - callback 函数将在异步消息处理完成后进行调用,无论过程是否产生 err都将被执行允许为 nil
// - 需要注意的是为了避免并发问题caller 函数请仅处理阻塞操作,其他操作应该在 callback 函数中进行
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushAsyncMessage(caller func() error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToAsyncMessage(caller, callback, mark...))
}
// PushAsyncCallbackMessage 向服务器中推送 MessageTypeAsyncCallback 消息
// - 异步消息回调将会通过一个接收 error 的函数进行处理,该函数将在系统分发器中执行
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushAsyncCallbackMessage(err error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToAsyncCallbackMessage(err, callback, mark...))
}
// PushShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushAsyncMessage(caller, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...))
}
// PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发
func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushAsyncCallbackMessage(err, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...))
}
// PushPacketMessage 向服务器中推送 MessageTypePacket 消息
// - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息
func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToPacketMessage(
&Conn{ctx: context.WithValue(conn.ctx, contextKeyWST, wst), connection: conn.connection},
packet,
))
}
// PushTickerMessage 向服务器中推送 MessageTypeTicker 消息
// - 通过该函数推送定时消息,当消息触发时将在系统分发器中处理消息
// - 可通过 timer.Ticker 或第三方定时器将执行函数(caller)推送到该消息中进行处理,可有效的避免线程安全问题
// - 参数 name 仅用作标识该定时器名称
//
// 定时消息执行不会有特殊的处理,仅标记为定时任务,也就是允许将各类函数通过该消息发送处理,但是并不建议
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushTickerMessage(name string, caller func(), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToTickerMessage(name, caller, mark...))
}
// PushShuntTickerMessage 向特定分发器中推送 MessageTypeTicker 消息,消息执行与 MessageTypeTicker 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushTickerMessage 进行转发
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushTickerMessage(name, caller)
return
}
slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...))
}
// PushErrorMessage 向服务器中推送 MessageTypeError 消息
// - 通过该函数推送错误消息,当消息触发时将在系统分发器中处理消息
// - 参数 errAction 用于指定错误消息的处理方式,可选值为 MessageErrorActionNone 和 MessageErrorActionShutdown
// - 参数 errAction 为 MessageErrorActionShutdown 时,将会停止服务器的运行
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
func (slf *Server) PushErrorMessage(err error, errAction MessageErrorAction, mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToErrorMessage(err, errAction, mark...))
}

View File

@ -5,29 +5,28 @@ import (
"github.com/kercylan98/minotaur/server"
"github.com/kercylan98/minotaur/server/client"
"github.com/kercylan98/minotaur/utils/times"
"golang.org/x/time/rate"
"testing"
"time"
)
func TestNew(t *testing.T) {
limiter := rate.NewLimiter(rate.Every(time.Second), 100)
//limiter := rate.NewLimiter(rate.Every(time.Second), 100)
srv := server.New(server.NetworkWebsocket, server.WithMessageBufferSize(1024*1024), server.WithPProf())
srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool {
t, c := srv.TimeoutContext(time.Second * 5)
defer c()
if err := limiter.Wait(t); err != nil {
return false
}
return true
})
//srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool {
// t, c := srv.TimeoutContext(time.Second * 5)
// defer c()
// if err := limiter.Wait(t); err != nil {
// return false
// }
// return true
//})
srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) {
fmt.Println("关闭", conn.GetID(), err, "Count", srv.GetOnlineCount())
})
srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) {
if srv.GetOnlineCount() > 1 {
conn.Close()
}
//if srv.GetOnlineCount() > 1 {
// conn.Close()
//}
})
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
@ -39,21 +38,24 @@ func TestNew(t *testing.T) {
}
func TestNewClient(t *testing.T) {
for i := 0; i < 1000; i++ {
count := 500
for i := 0; i < count; i++ {
id := i
fmt.Println("启动", i+1)
cli := client.NewWebsocket("ws://127.0.0.1:8888")
cli := client.NewWebsocket("ws://172.28.102.242:9999")
cli.RegConnectionReceivePacketEvent(func(conn *client.Client, wst int, packet []byte) {
fmt.Println("收到", id+1, string(packet))
})
cli.RegConnectionOpenedEvent(func(conn *client.Client) {
go func() {
for i < 1000 {
for i < count {
time.Sleep(time.Second)
}
for {
time.Sleep(time.Millisecond * 100)
cli.WriteWS(2, []byte("hello"))
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
cli.WriteWS(2, []byte("hello"))
}
}
}()
})