diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b25ccfa..af13a59 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,4 +18,4 @@ jobs: bump-minor-pre-major: true bump-patch-for-minor-pre-major: true changelog-types: '[{"type":"other","section":"Other | 其他更改","hidden":false},{"type":"revert","section":"Reverts | 回退","hidden":false},{"type":"feat","section":"Features | 新特性","hidden":false},{"type":"fix","section":"Bug Fixes | 修复","hidden":false},{"type":"improvement","section":"Feature Improvements | 改进","hidden":false},{"type":"docs","section":"Docs | 文档优化","hidden":false},{"type":"style","section":"Styling | 可读性优化","hidden":false},{"type":"refactor","section":"Code Refactoring | 重构","hidden":false},{"type":"perf","section":"Performance Improvements | 性能优化","hidden":false},{"type":"test","section":"Tests | 新增或优化测试用例","hidden":false},{"type":"build","section":"Build System | 影响构建的修改","hidden":false},{"type":"ci","section":"CI | 更改我们的 CI 配置文件和脚本","hidden":false}]' -# release-as: 0.2.0 \ No newline at end of file + release-as: 0.3.0 \ No newline at end of file diff --git a/README.md b/README.md index 981b14c..527cc4c 100644 --- a/README.md +++ b/README.md @@ -18,8 +18,9 @@ mindmap /planner 策划相关工具目录 /pce 配置导表功能实现 /server 网络服务器支持 - /cross 内置跨服功能实现 - /router 内置路由器功能实现 + /client 长连接客户端 + /lockstep 帧同步组件 + /router 消息路由器 /utils 工具结构函数目录 /examples 示例代码目录 ``` @@ -97,9 +98,8 @@ import "github.com/kercylan98/minotaur/server" func main() { srv := server.New(server.NetworkWebsocket, - server.WithShunt(func(conn *server.Conn) (guid int64, allowToCreate bool) { - guid, allowToCreate = conn.GetData("roomId").(int64) - return + server.WithShunt(func(conn *server.Conn) string { + return conn.GetData("roomId").(string) }), ) srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { diff --git a/go.mod b/go.mod index 3dc5cb1..7772ed2 100644 --- a/go.mod +++ b/go.mod @@ -11,17 +11,17 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/json-iterator/go v1.1.12 github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible - github.com/nats-io/nats.go v1.31.0 github.com/panjf2000/ants/v2 v2.8.1 github.com/panjf2000/gnet v1.6.7 github.com/smartystreets/goconvey v1.8.1 github.com/sony/sonyflake v1.2.0 + github.com/spf13/cobra v1.7.0 github.com/tealeg/xlsx v1.0.5 github.com/tidwall/gjson v1.16.0 github.com/xtaci/kcp-go/v5 v5.6.3 go.uber.org/atomic v1.10.0 go.uber.org/zap v1.25.0 - golang.org/x/time v0.4.0 + golang.org/x/crypto v0.14.0 google.golang.org/grpc v1.59.0 ) @@ -39,24 +39,17 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jonboulle/clockwork v0.3.0 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect - github.com/klauspost/compress v1.17.2 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/klauspost/reedsolomon v1.11.8 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/lestrrat-go/strftime v1.0.6 // indirect github.com/mattn/go-isatty v0.0.19 // indirect - github.com/minio/highwayhash v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.5.3 // indirect - github.com/nats-io/nats-server/v2 v2.10.4 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect - github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/smarty/assertions v1.15.0 // indirect - github.com/spf13/cobra v1.7.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/templexxx/cpu v0.1.0 // indirect github.com/templexxx/xorsimd v0.4.2 // indirect @@ -66,10 +59,8 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - go.uber.org/automaxprocs v1.5.3 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.3.0 // indirect - golang.org/x/crypto v0.14.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.14.0 // indirect diff --git a/go.sum b/go.sum index bd5b1ea..0d15992 100644 --- a/go.sum +++ b/go.sum @@ -84,10 +84,6 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= -github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= -github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -114,35 +110,11 @@ github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= -github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= -github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= -github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= -github.com/nats-io/nats-server/v2 v2.9.16 h1:SuNe6AyCcVy0g5326wtyU8TdqYmcPqzTjhkHojAjprc= -github.com/nats-io/nats-server/v2 v2.9.16/go.mod h1:z1cc5Q+kqJkz9mLUdlcSsdYnId4pyImHjNgoh6zxSC0= -github.com/nats-io/nats-server/v2 v2.10.3 h1:nk2QVLpJUh3/AhZCJlQdTfj2oeLDvWnn1Z6XzGlNFm0= -github.com/nats-io/nats-server/v2 v2.10.3/go.mod h1:lzrskZ/4gyMAh+/66cCd+q74c6v7muBypzfWhP/MAaM= -github.com/nats-io/nats-server/v2 v2.10.4 h1:uB9xcwon3tPXWAdmTJqqqC6cie3yuPWHJjjTBgaPNus= -github.com/nats-io/nats-server/v2 v2.10.4/go.mod h1:eWm2JmHP9Lqm2oemB6/XGi0/GwsZwtWf8HIPUsh+9ns= -github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= -github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= -github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= -github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= -github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= -github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= -github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ= github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8= @@ -217,8 +189,6 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= -go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= @@ -237,15 +207,10 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk= -golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= -golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= -golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20221031165847-c99f073a8326 h1:QfTh0HpN6hlw6D3vu8DAwC8pBIwikq0AI1evdm+FksE= -golang.org/x/exp v0.0.0-20221031165847-c99f073a8326/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -262,8 +227,6 @@ golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= -golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -271,10 +234,10 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -288,30 +251,16 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211204120058-94396e421777/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU= -golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= -golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY= -golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -327,16 +276,12 @@ google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9Ywl google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo= google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -348,8 +293,6 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/server/conn.go b/server/conn.go index 1f67e69..81aa215 100644 --- a/server/conn.go +++ b/server/conn.go @@ -10,6 +10,7 @@ import ( "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/random" + "github.com/kercylan98/minotaur/utils/timer" "github.com/panjf2000/gnet" "github.com/xtaci/kcp-go/v5" "net" @@ -38,7 +39,7 @@ func newKcpConn(server *Server, session *kcp.UDPSession) *Conn { if index := strings.LastIndex(c.ip, ":"); index != -1 { c.ip = c.ip[0:index] } - c.writeLoop() + c.init() return c } @@ -58,7 +59,7 @@ func newGNetConn(server *Server, conn gnet.Conn) *Conn { if index := strings.LastIndex(c.ip, ":"); index != -1 { c.ip = c.ip[0:index] } - c.writeLoop() + c.init() return c } @@ -75,7 +76,7 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn { openTime: time.Now(), }, } - c.writeLoop() + c.init() return c } @@ -91,7 +92,7 @@ func NewEmptyConn(server *Server) *Conn { openTime: time.Now(), }, } - c.writeLoop() + c.init() return c } @@ -104,6 +105,7 @@ type Conn struct { // connection 长久保持的连接 type connection struct { server *Server + ticker *timer.Ticker remoteAddr net.Addr ip string ws *websocket.Conn @@ -118,6 +120,11 @@ type connection struct { openTime time.Time } +// Ticker 获取定时器 +func (slf *Conn) Ticker() *timer.Ticker { + return slf.ticker +} + // GetServer 获取服务器 func (slf *Conn) GetServer() *Server { return slf.server @@ -218,6 +225,12 @@ func (slf *Conn) SetWST(wst int) *Conn { return slf } +// PushAsyncMessage 推送异步消息,该消息将通过 Server.PushShuntAsyncMessage 函数推送 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +func (slf *Conn) PushAsyncMessage(caller func() error, callback func(err error), mark ...log.Field) { + slf.server.PushShuntAsyncMessage(slf, caller, callback, mark...) +} + // Write 向连接中写入数据 // - messageType: websocket模式中指定消息类型 func (slf *Conn) Write(packet []byte, callback ...func(err error)) { @@ -240,8 +253,16 @@ func (slf *Conn) Write(packet []byte, callback ...func(err error)) { slf.loop.Put(cp) } -// writeLoop 写循环 -func (slf *Conn) writeLoop() { +func (slf *Conn) init() { + if slf.server.ticker != nil { + if slf.server.tickerAutonomy { + slf.ticker = timer.GetTicker(slf.server.connTickerSize) + } else { + slf.ticker = timer.GetTicker(slf.server.connTickerSize, timer.WithCaller(func(name string, caller func()) { + slf.server.PushShuntTickerMessage(slf, name, caller) + })) + } + } slf.pool = concurrent.NewPool[*connPacket](10*1024, func() *connPacket { return &connPacket{} @@ -298,6 +319,12 @@ func (slf *Conn) Close(err ...error) { } else if slf.kcp != nil { _ = slf.kcp.Close() } + if slf.ticker != nil { + slf.ticker.Release() + } + if slf.server.shuntMatcher != nil { + slf.server.releaseDispatcher(slf.server.shuntMatcher(slf)) + } slf.pool.Close() slf.loop.Close() slf.mu.Unlock() diff --git a/server/constants.go b/server/constants.go index fbbe1c8..e02720a 100644 --- a/server/constants.go +++ b/server/constants.go @@ -16,8 +16,9 @@ const ( ) const ( - serverMultipleMark = "Minotaur Multiple Server" - serverMark = "Minotaur Server" + serverMultipleMark = "Minotaur Multiple Server" + serverMark = "Minotaur Server" + serverSystemDispatcher = "system" // 系统消息分发器 ) const ( diff --git a/server/cross.go b/server/cross.go deleted file mode 100644 index ad77c2b..0000000 --- a/server/cross.go +++ /dev/null @@ -1,15 +0,0 @@ -package server - -// Cross 跨服接口 -type Cross interface { - // Init 初始化跨服 - // - serverId: 本服id - // - packetHandle.serverId: 发送跨服消息的服务器id - // - packetHandle.packet: 数据包 - Init(server *Server, packetHandle func(serverId string, packet []byte)) error - // PushMessage 推送跨服消息 - // - serverId: 目标服务器id - PushMessage(serverId string, packet []byte) error - // Release 释放资源 - Release() -} diff --git a/server/cross/message.go b/server/cross/message.go deleted file mode 100644 index 17104ed..0000000 --- a/server/cross/message.go +++ /dev/null @@ -1,7 +0,0 @@ -package cross - -// Message 跨服消息数据结构 -type Message struct { - ServerId string `json:"server_id"` - Packet []byte `json:"packet"` -} diff --git a/server/cross/nats.go b/server/cross/nats.go deleted file mode 100644 index e52c8ac..0000000 --- a/server/cross/nats.go +++ /dev/null @@ -1,89 +0,0 @@ -package cross - -import ( - "encoding/json" - "fmt" - "github.com/kercylan98/minotaur/server" - "github.com/kercylan98/minotaur/utils/concurrent" - "github.com/kercylan98/minotaur/utils/log" - "github.com/nats-io/nats.go" - "time" -) - -const ( - nasMark = "Cross.Nats" -) - -// NewNats 创建一个基于 Nats 实现的跨服消息功能组件 -func NewNats(url string, options ...NatsOption) *Nats { - n := &Nats{ - url: url, - subject: "MINOTAUR_CROSS", - messagePool: concurrent.NewPool[*Message](1024*100, func() *Message { - return new(Message) - }, func(data *Message) { - data.ServerId = "" - data.Packet = nil - }), - } - for _, option := range options { - option(n) - } - return n -} - -// Nats 基于 Nats 实现的跨服消息功能组件 -type Nats struct { - conn *nats.Conn - url string - subject string - options []nats.Option - messagePool *concurrent.Pool[*Message] -} - -func (slf *Nats) Init(server *server.Server, packetHandle func(serverId string, packet []byte)) (err error) { - if slf.conn == nil { - if len(slf.options) == 0 { - slf.options = append(slf.options, - nats.ReconnectWait(time.Second*5), - nats.MaxReconnects(-1), - nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { - log.Error(nasMark, log.String("info", "disconnect"), log.Err(err)) - }), - nats.ReconnectHandler(func(conn *nats.Conn) { - log.Info(nasMark, log.String("info", "reconnect")) - }), - ) - } - slf.conn, err = nats.Connect(slf.url, slf.options...) - if err != nil { - return err - } - } - _, err = slf.conn.Subscribe(fmt.Sprintf("%s_%d", slf.subject, server.GetID()), func(msg *nats.Msg) { - message := slf.messagePool.Get() - defer slf.messagePool.Release(message) - if err := json.Unmarshal(msg.Data, &message); err != nil { - log.Error(nasMark, log.Err(err)) - return - } - packetHandle(message.ServerId, message.Packet) - }) - return err -} - -func (slf *Nats) PushMessage(serverId string, packet []byte) error { - message := slf.messagePool.Get() - defer slf.messagePool.Release(message) - message.ServerId = serverId - message.Packet = packet - data, err := json.Marshal(message) - if err != nil { - return err - } - return slf.conn.Publish(fmt.Sprintf("%s_%d", slf.subject, serverId), data) -} - -func (slf *Nats) Release() { - slf.conn.Close() -} diff --git a/server/cross/nats_options.go b/server/cross/nats_options.go deleted file mode 100644 index 97dd173..0000000 --- a/server/cross/nats_options.go +++ /dev/null @@ -1,28 +0,0 @@ -package cross - -import "github.com/nats-io/nats.go" - -type NatsOption func(n *Nats) - -// WithNatsSubject 通过对应的主题名称创建 -// - 默认为:MINOTAUR_CROSS -func WithNatsSubject(subject string) NatsOption { - return func(n *Nats) { - n.subject = subject - } -} - -// WithNatsOptions 通过nats自带的可选项创建连接 -func WithNatsOptions(options ...nats.Option) NatsOption { - return func(n *Nats) { - n.options = options - } -} - -// WithNatsConn 指定通过特定的连接创建 -// - 这将导致 WithNatsOptions 失效 -func WithNatsConn(conn *nats.Conn) NatsOption { - return func(n *Nats) { - n.conn = conn - } -} diff --git a/server/dispatcher.go b/server/dispatcher.go new file mode 100644 index 0000000..e3b4c17 --- /dev/null +++ b/server/dispatcher.go @@ -0,0 +1,38 @@ +package server + +import "github.com/kercylan98/minotaur/utils/buffer" + +// generateDispatcher 生成消息分发器 +func generateDispatcher(handler func(message *Message)) *dispatcher { + return &dispatcher{ + buffer: buffer.NewUnboundedN[*Message](), + handler: handler, + } +} + +// dispatcher 消息分发器 +type dispatcher struct { + buffer *buffer.Unbounded[*Message] + handler func(message *Message) +} + +func (slf *dispatcher) start() { + for { + select { + case message, ok := <-slf.buffer.Get(): + if !ok { + return + } + slf.buffer.Load() + slf.handler(message) + } + } +} + +func (slf *dispatcher) put(message *Message) { + slf.buffer.Put(message) +} + +func (slf *dispatcher) close() { + slf.buffer.Close() +} diff --git a/server/errors.go b/server/errors.go index 5ddd56a..174f4d1 100644 --- a/server/errors.go +++ b/server/errors.go @@ -9,6 +9,5 @@ var ( ErrNetworkOnlySupportGRPC = errors.New("the current network mode is not compatible with RegGrpcServer, only NetworkGRPC is supported") ErrNetworkIncompatibleHttp = errors.New("the current network mode is not compatible with NetworkHttp") ErrWebsocketIllegalMessageType = errors.New("illegal message type") - ErrNoSupportCross = errors.New("the server does not support GetID or PushCrossMessage, please use the WithCross option to create the server") ErrNoSupportTicker = errors.New("the server does not support Ticker, please use the WithTicker option to create the server") ) diff --git a/server/event.go b/server/event.go index 246bf88..d84454a 100644 --- a/server/event.go +++ b/server/event.go @@ -19,7 +19,6 @@ type StopEventHandle func(srv *Server) type ConnectionReceivePacketEventHandle func(srv *Server, conn *Conn, packet []byte) type ConnectionOpenedEventHandle func(srv *Server, conn *Conn) type ConnectionClosedEventHandle func(srv *Server, conn *Conn, err any) -type ReceiveCrossPacketEventHandle func(srv *Server, senderServerId string, packet []byte) type MessageErrorEventHandle func(srv *Server, message *Message, err error) type MessageLowExecEventHandle func(srv *Server, message *Message, cost time.Duration) type ConsoleCommandEventHandle func(srv *Server) @@ -40,7 +39,6 @@ func newEvent(srv *Server) *event { connectionReceivePacketEventHandles: slice.NewPriority[ConnectionReceivePacketEventHandle](), connectionOpenedEventHandles: slice.NewPriority[ConnectionOpenedEventHandle](), connectionClosedEventHandles: slice.NewPriority[ConnectionClosedEventHandle](), - receiveCrossPacketEventHandles: slice.NewPriority[ReceiveCrossPacketEventHandle](), messageErrorEventHandles: slice.NewPriority[MessageErrorEventHandle](), messageLowExecEventHandles: slice.NewPriority[MessageLowExecEventHandle](), connectionOpenedAfterEventHandles: slice.NewPriority[ConnectionOpenedAfterEventHandle](), @@ -61,7 +59,6 @@ type event struct { connectionReceivePacketEventHandles *slice.Priority[ConnectionReceivePacketEventHandle] connectionOpenedEventHandles *slice.Priority[ConnectionOpenedEventHandle] connectionClosedEventHandles *slice.Priority[ConnectionClosedEventHandle] - receiveCrossPacketEventHandles *slice.Priority[ReceiveCrossPacketEventHandle] messageErrorEventHandles *slice.Priority[MessageErrorEventHandle] messageLowExecEventHandles *slice.Priority[MessageLowExecEventHandle] connectionOpenedAfterEventHandles *slice.Priority[ConnectionOpenedAfterEventHandle] @@ -119,7 +116,7 @@ func (slf *event) RegConsoleCommandEvent(command string, handle ConsoleCommandEv } func (slf *event) OnConsoleCommandEvent(command string) { - PushSystemMessage(slf.Server, func() { + slf.PushSystemMessage(func() { handles, exist := slf.consoleCommandEventHandles[command] if !exist { switch command { @@ -135,7 +132,7 @@ func (slf *event) OnConsoleCommandEvent(command string) { return true }) } - }, "ConsoleCommandEvent") + }, log.String("Event", "OnConsoleCommandEvent")) } // RegStartBeforeEvent 在服务器初始化完成启动前立刻执行被注册的事件处理函数 @@ -165,12 +162,12 @@ func (slf *event) RegStartFinishEvent(handle StartFinishEventHandle, priority .. } func (slf *event) OnStartFinishEvent() { - PushSystemMessage(slf.Server, func() { + slf.PushSystemMessage(func() { slf.startFinishEventHandles.RangeValue(func(index int, value StartFinishEventHandle) bool { value(slf.Server) return true }) - }, "StartFinishEvent") + }, log.String("Event", "OnStartFinishEvent")) if slf.Server.limitLife > 0 { go func() { time.Sleep(slf.Server.limitLife) @@ -189,13 +186,13 @@ func (slf *event) RegConnectionClosedEvent(handle ConnectionClosedEventHandle, p } func (slf *event) OnConnectionClosedEvent(conn *Conn, err any) { - PushSystemMessage(slf.Server, func() { + slf.PushSystemMessage(func() { slf.Server.online.Delete(conn.GetID()) slf.connectionClosedEventHandles.RangeValue(func(index int, value ConnectionClosedEventHandle) bool { value(slf.Server, conn, err) return true }) - }, "ConnectionClosedEvent") + }, log.String("Event", "OnConnectionClosedEvent")) } // RegConnectionOpenedEvent 在连接打开后将立刻执行被注册的事件处理函数 @@ -208,13 +205,13 @@ func (slf *event) RegConnectionOpenedEvent(handle ConnectionOpenedEventHandle, p } func (slf *event) OnConnectionOpenedEvent(conn *Conn) { - PushSystemMessage(slf.Server, func() { + slf.PushSystemMessage(func() { slf.Server.online.Set(conn.GetID(), conn) slf.connectionOpenedEventHandles.RangeValue(func(index int, value ConnectionOpenedEventHandle) bool { value(slf.Server, conn) return true }) - }, "ConnectionOpenedEvent") + }, log.String("Event", "OnConnectionOpenedEvent")) } // RegConnectionReceivePacketEvent 在接收到数据包时将立刻执行被注册的事件处理函数 @@ -233,19 +230,6 @@ func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) { }) } -// RegReceiveCrossPacketEvent 在接收到跨服数据包时将立即执行被注册的事件处理函数 -func (slf *event) RegReceiveCrossPacketEvent(handle ReceiveCrossPacketEventHandle, priority ...int) { - slf.receiveCrossPacketEventHandles.Append(handle, slice.GetValue(priority, 0)) - log.Info("Server", log.String("RegEvent", runtimes.CurrentRunningFuncName()), log.String("handle", reflect.TypeOf(handle).String())) -} - -func (slf *event) OnReceiveCrossPacketEvent(serverId string, packet []byte) { - slf.receiveCrossPacketEventHandles.RangeValue(func(index int, value ReceiveCrossPacketEventHandle) bool { - value(slf.Server, serverId, packet) - return true - }) -} - // RegMessageErrorEvent 在处理消息发生错误时将立即执行被注册的事件处理函数 func (slf *event) RegMessageErrorEvent(handle MessageErrorEventHandle, priority ...int) { slf.messageErrorEventHandles.Append(handle, slice.GetValue(priority, 0)) @@ -266,9 +250,6 @@ func (slf *event) OnMessageErrorEvent(message *Message, err error) { value(slf.Server, message, err) return true }) - PushSystemMessage(slf.Server, func() { - - }, "MessageErrorEvent") } // RegMessageLowExecEvent 在处理消息缓慢时将立即执行被注册的事件处理函数 @@ -298,12 +279,12 @@ func (slf *event) RegConnectionOpenedAfterEvent(handle ConnectionOpenedAfterEven } func (slf *event) OnConnectionOpenedAfterEvent(conn *Conn) { - PushSystemMessage(slf.Server, func() { + slf.PushSystemMessage(func() { slf.connectionOpenedAfterEventHandles.RangeValue(func(index int, value ConnectionOpenedAfterEventHandle) bool { value(slf.Server, conn) return true }) - }, "ConnectionOpenedAfterEvent") + }, log.String("Event", "OnConnectionOpenedAfterEvent")) } // RegConnectionWritePacketBeforeEvent 在发送数据包前将立刻执行被注册的事件处理函数 @@ -334,12 +315,12 @@ func (slf *event) RegShuntChannelCreatedEvent(handle ShuntChannelCreatedEventHan } func (slf *event) OnShuntChannelCreatedEvent(guid int64) { - PushSystemMessage(slf.Server, func() { + slf.PushSystemMessage(func() { slf.shuntChannelCreatedEventHandles.RangeValue(func(index int, value ShuntChannelCreatedEventHandle) bool { value(slf.Server, guid) return true }) - }, "ShuntChannelCreatedEvent") + }, log.String("Event", "OnShuntChannelCreatedEvent")) } // RegShuntChannelCloseEvent 在分流通道关闭时将立刻执行被注册的事件处理函数 @@ -349,12 +330,12 @@ func (slf *event) RegShuntChannelCloseEvent(handle ShuntChannelClosedEventHandle } func (slf *event) OnShuntChannelClosedEvent(guid int64) { - PushSystemMessage(slf.Server, func() { + slf.PushSystemMessage(func() { slf.shuntChannelClosedEventHandles.RangeValue(func(index int, value ShuntChannelClosedEventHandle) bool { value(slf.Server, guid) return true }) - }, "ShuntChannelCloseEvent") + }, log.String("Event", "OnShuntChannelClosedEvent")) } // RegConnectionPacketPreprocessEvent 在接收到数据包后将立刻执行被注册的事件处理函数 @@ -441,9 +422,4 @@ func (slf *event) check() { log.Warn("Server", log.String("ConnectionReceivePacketEvent", "invalid server, no packets processed")) } } - - if slf.receiveCrossPacketEventHandles.Len() > 0 && slf.cross == nil { - log.Warn("Server", log.String("ReceiveCrossPacketEvent", "invalid server, not register cross server")) - } - } diff --git a/server/gnet.go b/server/gnet.go index 4d7b61f..77f8d60 100644 --- a/server/gnet.go +++ b/server/gnet.go @@ -40,7 +40,7 @@ func (slf *gNet) AfterWrite(c gnet.Conn, b []byte) { } func (slf *gNet) React(packet []byte, c gnet.Conn) (out []byte, action gnet.Action) { - PushPacketMessage(slf.Server, c.Context().(*Conn), 0, bytes.Clone(packet)) + slf.Server.PushPacketMessage(c.Context().(*Conn), 0, bytes.Clone(packet)) return nil, gnet.None } diff --git a/server/message.go b/server/message.go index 8301138..74a7af9 100644 --- a/server/message.go +++ b/server/message.go @@ -1,44 +1,51 @@ package server import ( - "context" "fmt" "github.com/kercylan98/minotaur/utils/hash" + "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/super" - "reflect" ) const ( // MessageTypePacket 数据包消息类型:该类型的数据将被发送到 ConnectionReceivePacketEvent 进行处理 - MessageTypePacket MessageType = iota + MessageTypePacket MessageType = iota + 1 // MessageTypeError 错误消息类型:根据不同的错误状态,将交由 Server 进行统一处理 MessageTypeError - // MessageTypeCross 跨服消息类型:将被推送到跨服的 Cross 实现中进行处理 - MessageTypeCross - // MessageTypeTicker 定时器消息类型 MessageTypeTicker + // MessageTypeShuntTicker 分流定时器消息类型 + MessageTypeShuntTicker + // MessageTypeAsync 异步消息类型 MessageTypeAsync // MessageTypeAsyncCallback 异步回调消息类型 MessageTypeAsyncCallback + // MessageTypeShuntAsync 分流异步消息类型 + MessageTypeShuntAsync + + // MessageTypeShuntAsyncCallback 分流异步回调消息类型 + MessageTypeShuntAsyncCallback + // MessageTypeSystem 系统消息类型 MessageTypeSystem ) var messageNames = map[MessageType]string{ - MessageTypePacket: "MessageTypePacket", - MessageTypeError: "MessageTypeError", - MessageTypeCross: "MessageTypeCross", - MessageTypeTicker: "MessageTypeTicker", - MessageTypeAsync: "MessageTypeAsync", - MessageTypeAsyncCallback: "MessageTypeAsyncCallback", - MessageTypeSystem: "MessageTypeSystem", + MessageTypePacket: "MessageTypePacket", + MessageTypeError: "MessageTypeError", + MessageTypeTicker: "MessageTypeTicker", + MessageTypeShuntTicker: "MessageTypeShuntTicker", + MessageTypeAsync: "MessageTypeAsync", + MessageTypeAsyncCallback: "MessageTypeAsyncCallback", + MessageTypeShuntAsync: "MessageTypeShuntAsync", + MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback", + MessageTypeSystem: "MessageTypeSystem", } const ( @@ -51,12 +58,6 @@ var messageErrorActionNames = map[MessageErrorAction]string{ MessageErrorActionShutdown: "Shutdown", } -var ( - messagePacketVisualization = func(packet []byte) string { - return string(packet) - } -) - type ( // MessageType 消息类型 MessageType byte @@ -76,8 +77,30 @@ func (slf MessageErrorAction) String() string { // Message 服务器消息 type Message struct { - t MessageType // 消息类型 - attrs []any // 消息属性 + conn *Conn + ordinaryHandler func() + exceptionHandler func() error + errHandler func(err error) + packet []byte + err error + name string + t MessageType + errAction MessageErrorAction + marks []log.Field +} + +// reset 重置消息结构体 +func (slf *Message) reset() { + slf.conn = nil + slf.ordinaryHandler = nil + slf.exceptionHandler = nil + slf.errHandler = nil + slf.packet = nil + slf.err = nil + slf.name = "" + slf.t = 0 + slf.errAction = 0 + slf.marks = nil } // MessageType 返回消息类型 @@ -85,37 +108,9 @@ func (slf *Message) MessageType() MessageType { return slf.t } -// AttrsString 返回消息属性的字符串表示 -func (slf *Message) AttrsString() string { - var attrs = make([]any, 0, len(slf.attrs)) - for _, attr := range slf.attrs { - if conn, hit := attr.(*Conn); hit { - attrs = append(attrs, conn.GetID()) - continue - } - if tof := reflect.TypeOf(attr); tof.Kind() == reflect.Func { - attrs = append(attrs, tof.String()) - continue - } - attrs = append(attrs, attr) - } - if len(attrs) == 0 { - return "NoneAttr" - } - return string(super.MarshalJSON(attrs)) -} - // String 返回消息的字符串表示 func (slf *Message) String() string { - var attrs = make([]any, 0, len(slf.attrs)) - for _, attr := range slf.attrs { - if reflect.TypeOf(attr).Kind() == reflect.Func { - continue - } - attrs = append(attrs, attr) - } - - return fmt.Sprintf("[%s] %s", slf.t, slf.AttrsString()) + return fmt.Sprintf("[%s] %s", slf.t, super.MarshalJSON(slf.marks)) } // String 返回消息类型的字符串表示 @@ -123,119 +118,56 @@ func (slf MessageType) String() string { return messageNames[slf] } -// GetPacketMessageAttrs 获取消息中的数据包属性 -func (slf *Message) GetPacketMessageAttrs() (conn *Conn, packet []byte) { - conn = slf.attrs[0].(*Conn) - packet = slf.attrs[1].([]byte) - return +// castToPacketMessage 将消息转换为数据包消息 +func (slf *Message) castToPacketMessage(conn *Conn, packet []byte, mark ...log.Field) *Message { + slf.t, slf.conn, slf.packet, slf.marks = MessageTypePacket, conn, packet, mark + return slf } -// PushPacketMessage 向特定服务器中推送 MessageTypePacket 消息 -func PushPacketMessage(srv *Server, conn *Conn, wst int, packet []byte, mark ...any) { - msg := srv.messagePool.Get() - msg.t = MessageTypePacket - msg.attrs = append([]any{&Conn{ctx: context.WithValue(conn.ctx, contextKeyWST, wst), connection: conn.connection}, packet}, mark...) - srv.pushMessage(msg) +// castToTickerMessage 将消息转换为定时器消息 +func (slf *Message) castToTickerMessage(name string, caller func(), mark ...log.Field) *Message { + slf.t, slf.name, slf.ordinaryHandler, slf.marks = MessageTypeTicker, name, caller, mark + return slf } -// GetErrorMessageAttrs 获取消息中的错误属性 -func (slf *Message) GetErrorMessageAttrs() (err error, action MessageErrorAction) { - err = slf.attrs[0].(error) - action = slf.attrs[1].(MessageErrorAction) - return +// castToShuntTickerMessage 将消息转换为分发器定时器消息 +func (slf *Message) castToShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) *Message { + slf.t, slf.conn, slf.name, slf.ordinaryHandler, slf.marks = MessageTypeShuntTicker, slf.conn, name, caller, mark + return slf } -// PushErrorMessage 向特定服务器中推送 MessageTypeError 消息 -func PushErrorMessage(srv *Server, err error, action MessageErrorAction, mark ...any) { - msg := srv.messagePool.Get() - msg.t = MessageTypeError - msg.attrs = append([]any{err, action}, mark...) - srv.pushMessage(msg) +// castToAsyncMessage 将消息转换为异步消息 +func (slf *Message) castToAsyncMessage(caller func() error, callback func(err error), mark ...log.Field) *Message { + slf.t, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeAsync, caller, callback, mark + return slf } -// GetCrossMessageAttrs 获取消息中的跨服属性 -func (slf *Message) GetCrossMessageAttrs() (serverId string, packet []byte) { - serverId = slf.attrs[0].(string) - packet = slf.attrs[1].([]byte) - return +// castToAsyncCallbackMessage 将消息转换为异步回调消息 +func (slf *Message) castToAsyncCallbackMessage(err error, caller func(err error), mark ...log.Field) *Message { + slf.t, slf.err, slf.errHandler, slf.marks = MessageTypeAsyncCallback, err, caller, mark + return slf } -// PushCrossMessage 向特定服务器中推送 MessageTypeCross 消息 -func PushCrossMessage(srv *Server, crossName string, serverId string, packet []byte, mark ...any) { - if serverId == srv.id { - msg := srv.messagePool.Get() - msg.t = MessageTypeCross - msg.attrs = append([]any{serverId, packet}, mark...) - srv.pushMessage(msg) - } else { - if len(srv.cross) == 0 { - return - } - cross, exist := srv.cross[crossName] - if !exist { - return - } - _ = cross.PushMessage(serverId, packet) - } +// castToShuntAsyncMessage 将消息转换为分流异步消息 +func (slf *Message) castToShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) *Message { + slf.t, slf.conn, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeShuntAsync, conn, caller, callback, mark + return slf } -// GetTickerMessageAttrs 获取消息中的定时器属性 -func (slf *Message) GetTickerMessageAttrs() (caller func()) { - caller = slf.attrs[0].(func()) - return +// castToShuntAsyncCallbackMessage 将消息转换为分流异步回调消息 +func (slf *Message) castToShuntAsyncCallbackMessage(conn *Conn, err error, caller func(err error), mark ...log.Field) *Message { + slf.t, slf.conn, slf.err, slf.errHandler, slf.marks = MessageTypeShuntAsyncCallback, conn, err, caller, mark + return slf } -// PushTickerMessage 向特定服务器中推送 MessageTypeTicker 消息 -func PushTickerMessage(srv *Server, caller func(), mark ...any) { - srv.messageLock.RLock() - if srv.messagePool == nil { - srv.messageLock.RUnlock() - return - } - srv.messageLock.RUnlock() - msg := srv.messagePool.Get() - msg.t = MessageTypeTicker - msg.attrs = append([]any{caller}, mark...) - srv.pushMessage(msg) +// castToSystemMessage 将消息转换为系统消息 +func (slf *Message) castToSystemMessage(caller func(), mark ...log.Field) *Message { + slf.t, slf.ordinaryHandler, slf.marks = MessageTypeSystem, caller, mark + return slf } -// GetAsyncMessageAttrs 获取消息中的异步消息属性 -func (slf *Message) GetAsyncMessageAttrs() (caller func() error, callback func(err error), hasCallback bool) { - caller = slf.attrs[0].(func() error) - callback, hasCallback = slf.attrs[1].(func(err error)) - return -} - -// PushAsyncMessage 向特定服务器中推送 MessageTypeAsync 消息 -// - 异步消息将在服务器的异步消息队列中进行处理,处理完成 caller 的阻塞操作后,将会通过系统消息执行 callback 函数 -// - callback 函数将在异步消息处理完成后进行调用,无论过程是否产生 err,都将被执行,允许为 nil -// - 需要注意的是,为了避免并发问题,caller 函数请仅处理阻塞操作,其他操作应该在 callback 函数中进行 -// -// 在通过 WithShunt 使用分流服务器时,异步消息不会转换到分流通道中进行处理。依旧需要注意上方第三条 -func PushAsyncMessage(srv *Server, caller func() error, callback func(err error), mark ...any) { - msg := srv.messagePool.Get() - msg.t = MessageTypeAsync - msg.attrs = append([]any{caller, callback}, mark...) - srv.pushMessage(msg) -} - -// GetSystemMessageAttrs 获取消息中的系统消息属性 -func (slf *Message) GetSystemMessageAttrs() (handle func()) { - handle = slf.attrs[0].(func()) - return -} - -// PushSystemMessage 向特定服务器中推送 MessageTypeSystem 消息 -func PushSystemMessage(srv *Server, handle func(), mark ...any) { - msg := srv.messagePool.Get() - msg.t = MessageTypeSystem - msg.attrs = append([]any{handle}, mark...) - srv.pushMessage(msg) -} - -// SetMessagePacketVisualizer 设置消息可视化函数 -// - 消息可视化将在慢消息等情况用于打印,使用自定消息可视化函数可以便于开发者进行调试 -// - 默认的消息可视化函数将直接返回消息的字符串表示 -func SetMessagePacketVisualizer(handle func(packet []byte) string) { - messagePacketVisualization = handle +// castToErrorMessage 将消息转换为错误消息 +func (slf *Message) castToErrorMessage(err error, action MessageErrorAction, mark ...log.Field) *Message { + slf.t, slf.err, slf.errAction, slf.marks = MessageTypeError, err, action, mark + return slf } diff --git a/server/options.go b/server/options.go index 5f7f918..0dc122e 100644 --- a/server/options.go +++ b/server/options.go @@ -2,12 +2,10 @@ package server import ( "github.com/gin-contrib/pprof" - "github.com/kercylan98/minotaur/utils/buffer" - "github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/timer" "google.golang.org/grpc" - "reflect" + "runtime/debug" "time" ) @@ -31,17 +29,44 @@ type option struct { } type runtime struct { - id string // 服务器id - cross map[string]Cross // 跨服 - deadlockDetect time.Duration // 是否开启死锁检测 - supportMessageTypes map[int]bool // websocket模式下支持的消息类型 - certFile, keyFile string // TLS文件 - messagePoolSize int // 消息池大小 - ticker *timer.Ticker // 定时器 - websocketReadDeadline time.Duration // websocket连接超时时间 - websocketCompression int // websocket压缩等级 - websocketWriteCompression bool // websocket写入压缩 - limitLife time.Duration // 限制最大生命周期 + deadlockDetect time.Duration // 是否开启死锁检测 + supportMessageTypes map[int]bool // websocket模式下支持的消息类型 + certFile, keyFile string // TLS文件 + messagePoolSize int // 消息池大小 + ticker *timer.Ticker // 定时器 + tickerAutonomy bool // 定时器是否独立运行 + connTickerSize int // 连接定时器大小 + websocketReadDeadline time.Duration // websocket连接超时时间 + websocketCompression int // websocket压缩等级 + websocketWriteCompression bool // websocket写入压缩 + limitLife time.Duration // 限制最大生命周期 + shuntMatcher func(conn *Conn) string // 分流匹配器 +} + +// WithShunt 通过连接数据包分流的方式创建服务器 +// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况 +// - shuntMatcher:用于匹配连接的函数,返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道 +// +// 将被分流的消息类型(更多类型有待斟酌): +// - MessageTypePacket +// +// 注意事项: +// - 当分流匹配过程发生 panic 将会在系统通道内处理消息,并打印日志 +func WithShunt(shuntMatcher func(conn *Conn) string) Option { + return func(srv *Server) { + if shuntMatcher == nil { + log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil")) + return + } + srv.shuntMatcher = func(conn *Conn) string { + defer func() { + if err := recover(); err != nil { + log.Error("ShuntMatcher", log.String("State", "Panic"), log.Any("Error", err), log.String("Stack", string(debug.Stack()))) + } + }() + return shuntMatcher(conn) + } + } } // WithLimitLife 通过限制最大生命周期的方式创建服务器 @@ -119,46 +144,20 @@ func WithWebsocketReadDeadline(t time.Duration) Option { // WithTicker 通过定时器创建服务器,为服务器添加定时器功能 // - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题) -func WithTicker(size int, autonomy bool) Option { +func WithTicker(size, connSize int, autonomy bool) Option { return func(srv *Server) { + srv.connTickerSize = connSize + srv.tickerAutonomy = autonomy if !autonomy { srv.ticker = timer.GetTicker(size) } else { srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) { - PushTickerMessage(srv, caller, name) + srv.PushTickerMessage(name, caller) })) } } } -// WithCross 通过跨服的方式创建服务器 -// - 推送跨服消息时,将推送到对应 crossName 的跨服中间件中,crossName 可以满足不同功能采用不同的跨服/消息中间件 -// - 通常情况下 crossName 仅需一个即可 -func WithCross(crossName string, serverId string, cross Cross) Option { - return func(srv *Server) { - start: - { - srv.id = serverId - if srv.cross == nil { - srv.cross = map[string]Cross{} - } - srv.cross[crossName] = cross - err := cross.Init(srv, func(serverId string, packet []byte) { - msg := srv.messagePool.Get() - msg.t = MessageTypeCross - msg.attrs = []any{serverId, packet} - srv.pushMessage(msg) - }) - if err != nil { - log.Info("Cross", log.String("ServerID", serverId), log.String("Cross", reflect.TypeOf(cross).String()), log.String("State", "WaitNatsRun")) - time.Sleep(1 * time.Second) - goto start - } - log.Info("Cross", log.String("ServerID", serverId), log.String("Cross", reflect.TypeOf(cross).String())) - } - } -} - // WithTLS 通过安全传输层协议TLS创建服务器 // - 支持:Http、Websocket func WithTLS(certFile, keyFile string) Option { @@ -227,23 +226,3 @@ func WithPProf(pattern ...string) Option { pprof.Register(srv.ginServer, pattern...) } } - -// WithShunt 通过连接数据包分流的方式创建服务器 -// - 在分流的情况下,将会使用分流通道处理数据包,而不是使用系统通道,消息的执行将转移到对应的分流通道内进行串行处理,默认情况下所有消息都是串行处理的,适用于例如不同游戏房间并行处理,游戏房间内部消息串行处理的情况 -// - shuntMatcher:用于匹配连接的函数,返回值为分流通道的 GUID 和是否允许创建新的分流通道,当返回不允许创建新的分流通道时,将会使用使用默认的系统通道 -// -// 将被分流的消息类型(更多类型有待斟酌): -// - MessageTypePacket -// -// 注意事项: -// - 需要在分流通道使用完成后主动调用 Server.ShuntChannelFreed 函数释放分流通道,避免内存泄漏 -func WithShunt(shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool)) Option { - return func(srv *Server) { - if shuntMatcher == nil { - log.Warn("WithShunt", log.String("State", "Ignore"), log.String("Reason", "shuntMatcher is nil")) - return - } - srv.shuntChannels = concurrent.NewBalanceMap[int64, *buffer.Unbounded[*Message]]() - srv.shuntMatcher = shuntMatcher - } -} diff --git a/server/server.go b/server/server.go index 88d7acc..3b95420 100644 --- a/server/server.go +++ b/server/server.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" - "github.com/kercylan98/minotaur/utils/buffer" "github.com/kercylan98/minotaur/utils/concurrent" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/network" @@ -42,6 +41,7 @@ func New(network Network, options ...Option) *Server { closeChannel: make(chan struct{}, 1), systemSignal: make(chan os.Signal, 1), ctx: context.Background(), + dispatchers: make(map[string]*dispatcher), } server.event = newEvent(server) @@ -78,31 +78,30 @@ func New(network Network, options ...Option) *Server { // Server 网络服务器 type Server struct { - *event // 事件 - *runtime // 运行时 - *option // 可选项 - network Network // 网络类型 - addr string // 侦听地址 - systemSignal chan os.Signal // 系统信号 - online *concurrent.BalanceMap[string, *Conn] // 在线连接 - ginServer *gin.Engine // HTTP模式下的路由器 - httpServer *http.Server // HTTP模式下的服务器 - grpcServer *grpc.Server // GRPC模式下的服务器 - gServer *gNet // TCP或UDP模式下的服务器 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 - ants *ants.Pool // 协程池 - messagePool *concurrent.Pool[*Message] // 消息池 - messageLock sync.RWMutex // 消息锁 - messageChannel *buffer.Unbounded[*Message] // 消息无界缓冲区 - multiple *MultipleServer // 多服务器模式下的服务器 - multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 - runMode RunMode // 运行模式 - shuntChannels *concurrent.BalanceMap[int64, *buffer.Unbounded[*Message]] // 分流管道 - shuntMatcher func(conn *Conn) (guid int64, allowToCreate bool) // 分流管道匹配器 - messageCounter atomic.Int64 // 消息计数器 - ctx context.Context // 上下文 + *event // 事件 + *runtime // 运行时 + *option // 可选项 + network Network // 网络类型 + addr string // 侦听地址 + systemSignal chan os.Signal // 系统信号 + online *concurrent.BalanceMap[string, *Conn] // 在线连接 + ginServer *gin.Engine // HTTP模式下的路由器 + httpServer *http.Server // HTTP模式下的服务器 + grpcServer *grpc.Server // GRPC模式下的服务器 + gServer *gNet // TCP或UDP模式下的服务器 + isRunning bool // 是否正在运行 + isShutdown atomic.Bool // 是否已关闭 + closeChannel chan struct{} // 关闭信号 + ants *ants.Pool // 协程池 + messagePool *concurrent.Pool[*Message] // 消息池 + messageLock sync.RWMutex // 消息锁 + multiple *MultipleServer // 多服务器模式下的服务器 + multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 + runMode RunMode // 运行模式 + messageCounter atomic.Int64 // 消息计数器 + ctx context.Context // 上下文 + dispatchers map[string]*dispatcher // 消息分发器 + dispatcherLock sync.RWMutex // 消息分发器锁 } // Run 使用特定地址运行服务器 @@ -135,13 +134,9 @@ func (slf *Server) Run(addr string) error { return &Message{} }, func(data *Message) { - data.t = 0 - data.attrs = nil + data.reset() }, ) - slf.messageChannel = buffer.NewUnbounded[*Message](func() *Message { - return nil - }) slf.messageLock.Unlock() if slf.network != NetworkHttp && slf.network != NetworkWebsocket && slf.network != NetworkGRPC { slf.gServer = &gNet{Server: slf} @@ -151,16 +146,8 @@ func (slf *Server) Run(addr string) error { } go func() { messageInitFinish <- struct{}{} - for { - select { - case msg, ok := <-slf.messageChannel.Get(): - if !ok { - return - } - slf.messageChannel.Load() - slf.dispatchMessage(msg) - } - } + d, _ := slf.useDispatcher(serverSystemDispatcher) + d.start() }() } @@ -181,7 +168,7 @@ func (slf *Server) Run(addr string) error { slf.OnStartBeforeEvent() if err := slf.grpcServer.Serve(listener); err != nil { slf.isRunning = false - PushErrorMessage(slf, err, MessageErrorActionShutdown) + slf.PushErrorMessage(err, MessageErrorActionShutdown) } }() case NetworkTcp, NetworkTcp4, NetworkTcp6, NetworkUdp, NetworkUdp4, NetworkUdp6, NetworkUnix: @@ -195,7 +182,7 @@ func (slf *Server) Run(addr string) error { gnet.WithMulticore(true), ); err != nil { slf.isRunning = false - PushErrorMessage(slf, err, MessageErrorActionShutdown) + slf.PushErrorMessage(err, MessageErrorActionShutdown) } }) case NetworkKcp: @@ -236,7 +223,7 @@ func (slf *Server) Run(addr string) error { } panic(err) } - PushPacketMessage(slf, conn, 0, buf[:n]) + slf.PushPacketMessage(conn, 0, buf[:n]) } }(conn) } @@ -258,12 +245,12 @@ func (slf *Server) Run(addr string) error { if len(slf.certFile)+len(slf.keyFile) > 0 { if err := slf.httpServer.ListenAndServeTLS(slf.certFile, slf.keyFile); err != nil { slf.isRunning = false - PushErrorMessage(slf, err, MessageErrorActionShutdown) + slf.PushErrorMessage(err, MessageErrorActionShutdown) } } else { if err := slf.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { slf.isRunning = false - PushErrorMessage(slf, err, MessageErrorActionShutdown) + slf.PushErrorMessage(err, MessageErrorActionShutdown) } } @@ -337,7 +324,7 @@ func (slf *Server) Run(addr string) error { if len(slf.supportMessageTypes) > 0 && !slf.supportMessageTypes[messageType] { panic(ErrWebsocketIllegalMessageType) } - PushPacketMessage(slf, conn, messageType, packet) + slf.PushPacketMessage(conn, messageType, packet) } }) go func() { @@ -346,12 +333,12 @@ func (slf *Server) Run(addr string) error { if len(slf.certFile)+len(slf.keyFile) > 0 { if err := http.ListenAndServeTLS(slf.addr, slf.certFile, slf.keyFile, nil); err != nil { slf.isRunning = false - PushErrorMessage(slf, err, MessageErrorActionShutdown) + slf.PushErrorMessage(err, MessageErrorActionShutdown) } } else { if err := http.ListenAndServe(slf.addr, nil); err != nil { slf.isRunning = false - PushErrorMessage(slf, err, MessageErrorActionShutdown) + slf.PushErrorMessage(err, MessageErrorActionShutdown) } } @@ -372,7 +359,7 @@ func (slf *Server) Run(addr string) error { log.String("ip", ip.String()), log.String("listen", slf.addr), ) - log.Info("Server", log.String(serverMark, "====================================================================")) + log.Error("Server", log.String(serverMark, "====================================================================")) slf.OnStartFinishEvent() time.Sleep(time.Second) if !slf.isShutdown.Load() { @@ -442,14 +429,6 @@ func (slf *Server) CloseConn(id string) { } } -// GetID 获取服务器id -func (slf *Server) GetID() string { - if slf.cross == nil { - panic(ErrNoSupportCross) - } - return slf.id -} - // Ticker 获取服务器定时器 func (slf *Server) Ticker() *timer.Ticker { if slf.ticker == nil { @@ -491,21 +470,12 @@ func (slf *Server) shutdown(err error) { slf.ants.Release() slf.ants = nil } - for _, cross := range slf.cross { - cross.Release() - } - if slf.messageChannel != nil { - slf.messageChannel.Close() - slf.messagePool.Close() - } - if slf.shuntChannels != nil { - slf.shuntChannels.Range(func(key int64, c *buffer.Unbounded[*Message]) bool { - c.Close() - return false - }) - slf.shuntChannels.Clear() - slf.shuntChannels = nil + slf.dispatcherLock.Lock() + for s, d := range slf.dispatchers { + d.close() + delete(slf.dispatchers, s) } + slf.dispatcherLock.Unlock() if slf.grpcServer != nil && slf.isRunning { slf.grpcServer.GracefulStop() } @@ -576,17 +546,30 @@ func (slf *Server) GetMessageCount() int64 { return slf.messageCounter.Load() } -// ShuntChannelFreed 释放分流通道 -func (slf *Server) ShuntChannelFreed(channelGuid int64) { - if slf.shuntChannels == nil { - return - } - channel, exist := slf.shuntChannels.GetExist(channelGuid) +// useDispatcher 添加消息分发器 +// - 该函数在分发器不重复的情况下将创建分发器,当分发器已存在将直接返回 +func (slf *Server) useDispatcher(name string) (*dispatcher, bool) { + slf.dispatcherLock.Lock() + d, exist := slf.dispatchers[name] if exist { - channel.Close() - slf.shuntChannels.Delete(channelGuid) - slf.OnShuntChannelClosedEvent(channelGuid) + slf.dispatcherLock.Unlock() + return d, false } + d = generateDispatcher(slf.dispatchMessage) + slf.dispatchers[name] = d + slf.dispatcherLock.Unlock() + return d, true +} + +// releaseDispatcher 关闭消息分发器 +func (slf *Server) releaseDispatcher(name string) { + slf.dispatcherLock.Lock() + d, exist := slf.dispatchers[name] + if exist { + delete(slf.dispatchers, name) + d.close() + } + slf.dispatcherLock.Unlock() } // pushMessage 向服务器中写入特定类型的消息,需严格遵守消息属性要求 @@ -595,49 +578,39 @@ func (slf *Server) pushMessage(message *Message) { slf.messagePool.Release(message) return } - if slf.shuntChannels != nil && message.t == MessageTypePacket { - conn := message.attrs[0].(*Conn) - channelGuid, allowToCreate := slf.shuntMatcher(conn) - channel, exist := slf.shuntChannels.GetExist(channelGuid) - if !exist && allowToCreate { - channel = buffer.NewUnbounded[*Message](func() *Message { - return nil - }) - slf.shuntChannels.Set(channelGuid, channel) - go func(channel *buffer.Unbounded[*Message]) { - for { - select { - case msg, ok := <-channel.Get(): - if !ok { - return - } - channel.Load() - slf.dispatchMessage(msg) - } - } - }(channel) - defer slf.OnShuntChannelCreatedEvent(channelGuid) + var dispatcher *dispatcher + switch message.t { + case MessageTypePacket: + if slf.shuntMatcher == nil { + dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) + break } - if channel != nil { - slf.messageCounter.Add(1) - channel.Put(message) - return + fallthrough + case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback: + var created bool + dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn)) + if created { + go dispatcher.start() } + case MessageTypeSystem, MessageTypeAsync, MessageTypeAsyncCallback, MessageTypeError, MessageTypeTicker: + dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) + } + if dispatcher == nil { + return } slf.messageCounter.Add(1) - slf.messageChannel.Put(message) + dispatcher.put(message) } func (slf *Server) low(message *Message, present time.Time, expect time.Duration, messageReplace ...string) { cost := time.Since(present) if cost > expect { - var m = "unknown" - if message != nil { - m = message.String() - } else if len(messageReplace) > 0 { - m = messageReplace[0] + if len(messageReplace) > 0 { + for i, s := range messageReplace { + message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s)) + } } - log.Warn("Server", log.String("type", "low-message"), log.String("cost", cost.String()), log.String("message", m), log.Stack("stack")) + log.Warn("Server", log.String("type", "low-message"), log.String("cost", cost.String()), log.String("message", message.String()), log.Stack("stack")) slf.OnMessageLowExecEvent(message, cost) } } @@ -654,7 +627,7 @@ func (slf *Server) dispatchMessage(msg *Message) { select { case <-ctx.Done(): if err := ctx.Err(); err == context.DeadlineExceeded { - log.Warn("Server", log.String("MessageType", messageNames[msg.t]), log.Any("SuspectedDeadlock", msg)) + log.Warn("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("SuspectedDeadlock", msg)) } } }(ctx, msg) @@ -665,7 +638,7 @@ func (slf *Server) dispatchMessage(msg *Message) { defer func(msg *Message) { if err := recover(); err != nil { stack := string(debug.Stack()) - log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("MessageAttrs", msg.AttrsString()), log.Any("error", err), log.String("stack", stack)) + log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("error", err), log.String("stack", stack)) fmt.Println(stack) if e, ok := err.(error); ok { slf.OnMessageErrorEvent(msg, e) @@ -683,29 +656,23 @@ func (slf *Server) dispatchMessage(msg *Message) { }(msg) } - var attrs = msg.attrs switch msg.t { case MessageTypePacket: - var conn, packet = msg.GetPacketMessageAttrs() - if !slf.OnConnectionPacketPreprocessEvent(conn, packet, func(newPacket []byte) { packet = newPacket }) { - slf.OnConnectionReceivePacketEvent(conn, packet) + if !slf.OnConnectionPacketPreprocessEvent(msg.conn, msg.packet, func(newPacket []byte) { msg.packet = newPacket }) { + slf.OnConnectionReceivePacketEvent(msg.conn, msg.packet) } case MessageTypeError: - var err, action = msg.GetErrorMessageAttrs() - switch action { + switch msg.errAction { case MessageErrorActionNone: - log.Panic("Server", log.Err(err)) + log.Panic("Server", log.Err(msg.err)) case MessageErrorActionShutdown: - slf.shutdown(err) + slf.shutdown(msg.err) default: - log.Warn("Server", log.String("not support message error action", action.String())) + log.Warn("Server", log.String("not support message error action", msg.errAction.String())) } - case MessageTypeCross: - slf.OnReceiveCrossPacketEvent(msg.GetCrossMessageAttrs()) - case MessageTypeTicker: - msg.GetTickerMessageAttrs()() - case MessageTypeAsync: - handle, callback, cb := msg.GetAsyncMessageAttrs() + case MessageTypeTicker, MessageTypeShuntTicker: + msg.ordinaryHandler() + case MessageTypeAsync, MessageTypeShuntAsync: if err := slf.ants.Submit(func() { defer func() { if err := recover(); err != nil { @@ -724,32 +691,113 @@ func (slf *Server) dispatchMessage(msg *Message) { slf.messagePool.Release(msg) } }() - err := handle() - if cb && callback != nil { - acm := slf.messagePool.Get() - acm.t = MessageTypeAsyncCallback - if len(attrs) > 2 { - acm.attrs = append([]any{func() { callback(err) }}, attrs[2:]...) - } else { - acm.attrs = []any{func() { callback(err) }} + var err error + if msg.exceptionHandler != nil { + err = msg.exceptionHandler() + } + if msg.errHandler != nil { + if msg.conn == nil { + slf.PushAsyncCallbackMessage(err, msg.errHandler) + return } - slf.pushMessage(acm) - } else if err != nil { + slf.PushShuntAsyncCallbackMessage(msg.conn, err, msg.errHandler) + return + } + if err != nil { log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack()))) } }); err != nil { panic(err) } - case MessageTypeAsyncCallback: // 特殊类型 - attrs[0].(func())() + case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback: + msg.errHandler(msg.err) case MessageTypeSystem: - msg.GetSystemMessageAttrs()() + msg.ordinaryHandler() default: log.Warn("Server", log.String("not support message type", msg.t.String())) } } -// PushAsyncMessage 是 PushAsyncMessage 的快捷方式 -func (slf *Server) PushAsyncMessage(caller func() error, callback func(err error), mark ...any) { - PushAsyncMessage(slf, caller, callback, mark...) +// PushSystemMessage 向服务器中推送 MessageTypeSystem 消息 +// - 系统消息仅包含一个可执行函数,将在系统分发器中执行 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +func (slf *Server) PushSystemMessage(handler func(), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToSystemMessage(handler, mark...)) +} + +// PushAsyncMessage 向服务器中推送 MessageTypeAsync 消息 +// - 异步消息将在服务器的异步消息队列中进行处理,处理完成 caller 的阻塞操作后,将会通过系统消息执行 callback 函数 +// - callback 函数将在异步消息处理完成后进行调用,无论过程是否产生 err,都将被执行,允许为 nil +// - 需要注意的是,为了避免并发问题,caller 函数请仅处理阻塞操作,其他操作应该在 callback 函数中进行 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +func (slf *Server) PushAsyncMessage(caller func() error, callback func(err error), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToAsyncMessage(caller, callback, mark...)) +} + +// PushAsyncCallbackMessage 向服务器中推送 MessageTypeAsyncCallback 消息 +// - 异步消息回调将会通过一个接收 error 的函数进行处理,该函数将在系统分发器中执行 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +func (slf *Server) PushAsyncCallbackMessage(err error, callback func(err error), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToAsyncCallbackMessage(err, callback, mark...)) +} + +// PushShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 +// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncMessage 进行转发 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +func (slf *Server) PushShuntAsyncMessage(conn *Conn, caller func() error, callback func(err error), mark ...log.Field) { + if slf.shuntMatcher == nil { + slf.PushAsyncMessage(caller, callback) + return + } + slf.pushMessage(slf.messagePool.Get().castToShuntAsyncMessage(conn, caller, callback, mark...)) +} + +// PushShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 +// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushAsyncCallbackMessage 进行转发 +func (slf *Server) PushShuntAsyncCallbackMessage(conn *Conn, err error, callback func(err error), mark ...log.Field) { + if slf.shuntMatcher == nil { + slf.PushAsyncCallbackMessage(err, callback) + return + } + slf.pushMessage(slf.messagePool.Get().castToShuntAsyncCallbackMessage(conn, err, callback, mark...)) +} + +// PushPacketMessage 向服务器中推送 MessageTypePacket 消息 +// - 当存在 WithShunt 的选项时,将会根据选项中的 shuntMatcher 进行分发,否则将在系统分发器中处理消息 +func (slf *Server) PushPacketMessage(conn *Conn, wst int, packet []byte, mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToPacketMessage( + &Conn{ctx: context.WithValue(conn.ctx, contextKeyWST, wst), connection: conn.connection}, + packet, + )) +} + +// PushTickerMessage 向服务器中推送 MessageTypeTicker 消息 +// - 通过该函数推送定时消息,当消息触发时将在系统分发器中处理消息 +// - 可通过 timer.Ticker 或第三方定时器将执行函数(caller)推送到该消息中进行处理,可有效的避免线程安全问题 +// - 参数 name 仅用作标识该定时器名称 +// +// 定时消息执行不会有特殊的处理,仅标记为定时任务,也就是允许将各类函数通过该消息发送处理,但是并不建议 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +func (slf *Server) PushTickerMessage(name string, caller func(), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToTickerMessage(name, caller, mark...)) +} + +// PushShuntTickerMessage 向特定分发器中推送 MessageTypeTicker 消息,消息执行与 MessageTypeTicker 一致 +// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushTickerMessage 进行转发 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func(), mark ...log.Field) { + if slf.shuntMatcher == nil { + slf.PushTickerMessage(name, caller) + return + } + slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...)) +} + +// PushErrorMessage 向服务器中推送 MessageTypeError 消息 +// - 通过该函数推送错误消息,当消息触发时将在系统分发器中处理消息 +// - 参数 errAction 用于指定错误消息的处理方式,可选值为 MessageErrorActionNone 和 MessageErrorActionShutdown +// - 参数 errAction 为 MessageErrorActionShutdown 时,将会停止服务器的运行 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +func (slf *Server) PushErrorMessage(err error, errAction MessageErrorAction, mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToErrorMessage(err, errAction, mark...)) } diff --git a/server/server_test.go b/server/server_test.go index bd6a5e5..855280e 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -5,29 +5,28 @@ import ( "github.com/kercylan98/minotaur/server" "github.com/kercylan98/minotaur/server/client" "github.com/kercylan98/minotaur/utils/times" - "golang.org/x/time/rate" "testing" "time" ) func TestNew(t *testing.T) { - limiter := rate.NewLimiter(rate.Every(time.Second), 100) + //limiter := rate.NewLimiter(rate.Every(time.Second), 100) srv := server.New(server.NetworkWebsocket, server.WithMessageBufferSize(1024*1024), server.WithPProf()) - srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool { - t, c := srv.TimeoutContext(time.Second * 5) - defer c() - if err := limiter.Wait(t); err != nil { - return false - } - return true - }) + //srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool { + // t, c := srv.TimeoutContext(time.Second * 5) + // defer c() + // if err := limiter.Wait(t); err != nil { + // return false + // } + // return true + //}) srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) { fmt.Println("关闭", conn.GetID(), err, "Count", srv.GetOnlineCount()) }) srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) { - if srv.GetOnlineCount() > 1 { - conn.Close() - } + //if srv.GetOnlineCount() > 1 { + // conn.Close() + //} }) srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { @@ -39,21 +38,24 @@ func TestNew(t *testing.T) { } func TestNewClient(t *testing.T) { - for i := 0; i < 1000; i++ { + count := 500 + for i := 0; i < count; i++ { id := i fmt.Println("启动", i+1) - cli := client.NewWebsocket("ws://127.0.0.1:8888") + cli := client.NewWebsocket("ws://172.28.102.242:9999") cli.RegConnectionReceivePacketEvent(func(conn *client.Client, wst int, packet []byte) { fmt.Println("收到", id+1, string(packet)) }) cli.RegConnectionOpenedEvent(func(conn *client.Client) { go func() { - for i < 1000 { + for i < count { time.Sleep(time.Second) } for { - time.Sleep(time.Millisecond * 100) - cli.WriteWS(2, []byte("hello")) + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + cli.WriteWS(2, []byte("hello")) + } } }() })