diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index af13a59..7446fd9 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -18,4 +18,4 @@ jobs: bump-minor-pre-major: true bump-patch-for-minor-pre-major: true changelog-types: '[{"type":"other","section":"Other | 其他更改","hidden":false},{"type":"revert","section":"Reverts | 回退","hidden":false},{"type":"feat","section":"Features | 新特性","hidden":false},{"type":"fix","section":"Bug Fixes | 修复","hidden":false},{"type":"improvement","section":"Feature Improvements | 改进","hidden":false},{"type":"docs","section":"Docs | 文档优化","hidden":false},{"type":"style","section":"Styling | 可读性优化","hidden":false},{"type":"refactor","section":"Code Refactoring | 重构","hidden":false},{"type":"perf","section":"Performance Improvements | 性能优化","hidden":false},{"type":"test","section":"Tests | 新增或优化测试用例","hidden":false},{"type":"build","section":"Build System | 影响构建的修改","hidden":false},{"type":"ci","section":"CI | 更改我们的 CI 配置文件和脚本","hidden":false}]' - release-as: 0.3.0 \ No newline at end of file +# release-as: 0.3.0 \ No newline at end of file diff --git a/README.md b/README.md index 527cc4c..0f46bae 100644 --- a/README.md +++ b/README.md @@ -221,6 +221,80 @@ func main() { - 模板文件图例: ![exporter-xlsx-template.png](.github/images/exporter-xlsx-template.png) +#### 导出 JSON 文件(可供客户端直接使用,包含索引的配置导出后为键值模式,可直接读取) +```text +Flags: + -e, --exclude string excluded configuration names or display names (comma separated) | 排除的配置名或显示名(英文逗号分隔) + -h, --help help for json + -o, --output string directory path of the output json file | 输出的 json 文件所在目录路径 + -p, --prefix string export configuration file name prefix | 导出配置文件名前缀 + -t, --type string export server configuration[s] or client configuration[c] | 导出服务端配置[s]还是客户端配置[c] + -f, --xlsx string xlsx file path or directory path | xlsx 文件路径或所在目录路径 + +``` +```shell +expoter.exe json -t s -f xlsx_template.xlsx -o ./output +``` +导出结果示例 +```json +{ + "1": { + "b": { + "Id": 1, + "Count": "b", + "Info": { + "id": 1, + "name": "小明", + "info": { + "lv": 1, + "exp": { + "mux": 10, + "count": 100 + } + } + }, + "Other": [ + { + "id": 1, + "name": "张飞" + }, + { + "id": 2, + "name": "刘备" + } + ] + } + } +} + +``` + +#### 导出 Golang 文件 +```text +Flags: + -e, --exclude string excluded configuration names or display names (comma separated) | 排除的配置名或显示名(英文逗号分隔) + -h, --help help for go + -o, --output string output path | 输出的 go 文件路径 + -f, --xlsx string xlsx file path or directory path | xlsx 文件路径或所在目录路径 +``` +```shell +expoter.exe go -f xlsx_template.xlsx -o ./output +``` +使用示例 + +```go +package main + +import ( + "fmt" + "config" +) + +func main() { + fmt.Println(config.EasyConfig.Id) +} +``` + ### 持续更新的示例项目 - **[Minotaur-Example](https://github.com/kercylan98/minotaur-example)** diff --git a/go.mod b/go.mod index 7772ed2..3dadec3 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21 require ( github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 - github.com/alphadose/haxmap v1.2.0 + github.com/alphadose/haxmap v1.3.0 github.com/gin-contrib/pprof v1.4.0 github.com/gin-gonic/gin v1.9.1 github.com/go-resty/resty/v2 v2.7.0 diff --git a/go.sum b/go.sum index 0d15992..91205e9 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7 github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10= github.com/alphadose/haxmap v1.2.0 h1:noGrAmCE+gNheZ4KpW+sYj9W5uMcO1UAjbAq9XBOAfM= github.com/alphadose/haxmap v1.2.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM= +github.com/alphadose/haxmap v1.3.0 h1:C/2LboOnPCZP27GmmSXOcwx360st0P8N0fTJ3voefKc= +github.com/alphadose/haxmap v1.3.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= diff --git a/planner/pce/cs/xlsx.go b/planner/pce/cs/xlsx.go index b407d41..a45c36c 100644 --- a/planner/pce/cs/xlsx.go +++ b/planner/pce/cs/xlsx.go @@ -194,7 +194,7 @@ func (slf *Xlsx) checkFieldInvalid(field pce.DataField) bool { return true } - if strings.HasPrefix(field.Name, "#") || strings.HasPrefix(field.Type, "#") { + if strings.HasPrefix(field.Name, "#") || strings.HasPrefix(field.Type, "#") || strings.HasPrefix(field.Desc, "#") { return true } diff --git a/planner/pce/exporter/exporter.exe b/planner/pce/exporter/exporter.exe index 11c332c..4093dbd 100644 Binary files a/planner/pce/exporter/exporter.exe and b/planner/pce/exporter/exporter.exe differ diff --git a/planner/pce/tmpls/golang.go b/planner/pce/tmpls/golang.go index 6d0f400..4144bd6 100644 --- a/planner/pce/tmpls/golang.go +++ b/planner/pce/tmpls/golang.go @@ -29,6 +29,7 @@ func (slf *Golang) Render(templates ...*pce.TmplStruct) (string, error) { "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/hash" "sync" + "sync/atomic" ) type Sign string @@ -41,7 +42,7 @@ func (slf *Golang) Render(templates ...*pce.TmplStruct) (string, error) { var ( json = jsonIter.ConfigCompatibleWithStandardLibrary - configs map[Sign]any + configs atomic.Pointer[map[Sign]any] signs = []Sign{ {{- range .Templates}} {{.Name}}Sign, @@ -188,14 +189,14 @@ func (slf *Golang) Render(templates ...*pce.TmplStruct) (string, error) { cs[{{.Name}}Sign] = {{.Name}} {{- end}} - configs = cs + configs.Store(&cs) } // GetConfigs 获取所有配置 func GetConfigs() map[Sign]any { mutex.Lock() defer mutex.Unlock() - return hash.Copy(configs) + return hash.Copy(*configs.Load()) } // GetConfigSigns 获取所有配置的标识 @@ -207,7 +208,7 @@ func (slf *Golang) Render(templates ...*pce.TmplStruct) (string, error) { func Sync(handle func(configs map[Sign]any)) { mutex.Lock() defer mutex.Unlock() - handle(hash.Copy(configs)) + handle(hash.Copy(*configs.Load())) } `, slf) } diff --git a/server/bot.go b/server/bot.go new file mode 100644 index 0000000..d8d0e6c --- /dev/null +++ b/server/bot.go @@ -0,0 +1,69 @@ +package server + +import ( + "fmt" + "io" + "sync/atomic" + "time" +) + +// NewBot 创建一个机器人,目前仅支持 Socket 服务器 +func NewBot(srv *Server, options ...BotOption) *Bot { + if !srv.IsSocket() { + panic(fmt.Errorf("server type[%s] is not socket", srv.network)) + } + bot := &Bot{ + conn: newBotConn(srv), + } + for _, option := range options { + option(bot) + } + return bot +} + +type Bot struct { + conn *Conn + joined atomic.Bool +} + +// JoinServer 加入服务器 +func (slf *Bot) JoinServer() { + if slf.joined.Swap(true) { + slf.conn.server.OnConnectionClosedEvent(slf.conn, nil) + } + slf.conn.server.OnConnectionOpenedEvent(slf.conn) +} + +// LeaveServer 离开服务器 +func (slf *Bot) LeaveServer() { + if slf.joined.Swap(false) { + slf.conn.server.OnConnectionClosedEvent(slf.conn, nil) + } +} + +// SetNetworkDelay 设置网络延迟和波动范围 +// - delay 延迟 +// - fluctuation 波动范围 +func (slf *Bot) SetNetworkDelay(delay, fluctuation time.Duration) { + slf.conn.delay = delay + slf.conn.fluctuation = fluctuation +} + +// SetWriter 设置写入器 +func (slf *Bot) SetWriter(writer io.Writer) { + slf.conn.botWriter.Store(&writer) +} + +// SendPacket 发送数据包到服务器 +func (slf *Bot) SendPacket(packet []byte) { + if slf.conn.server.IsOnline(slf.conn.GetID()) { + slf.conn.server.PushPacketMessage(slf.conn, 0, packet) + } +} + +// SendWSPacket 发送 WebSocket 数据包到服务器 +func (slf *Bot) SendWSPacket(wst int, packet []byte) { + if slf.conn.server.IsOnline(slf.conn.GetID()) { + slf.conn.server.PushPacketMessage(slf.conn, wst, packet) + } +} diff --git a/server/bot_options.go b/server/bot_options.go new file mode 100644 index 0000000..5796a91 --- /dev/null +++ b/server/bot_options.go @@ -0,0 +1,26 @@ +package server + +import ( + "io" + "time" +) + +type BotOption func(bot *Bot) + +// WithBotNetworkDelay 设置机器人网络延迟及波动范围 +// - delay 延迟 +// - fluctuation 波动范围 +func WithBotNetworkDelay(delay, fluctuation time.Duration) BotOption { + return func(bot *Bot) { + bot.conn.delay = delay + bot.conn.fluctuation = fluctuation + } +} + +// WithBotWriter 设置机器人写入器,默认为 os.Stdout +func WithBotWriter(construction func(bot *Bot) io.Writer) BotOption { + return func(bot *Bot) { + writer := construction(bot) + bot.conn.botWriter.Store(&writer) + } +} diff --git a/server/bot_test.go b/server/bot_test.go new file mode 100644 index 0000000..94d10d8 --- /dev/null +++ b/server/bot_test.go @@ -0,0 +1,49 @@ +package server_test + +import ( + "github.com/kercylan98/minotaur/server" + "io" + "testing" + "time" +) + +type Writer struct { + t *testing.T + bot *server.Bot +} + +func (slf *Writer) Write(p []byte) (n int, err error) { + slf.t.Log(string(p)) + switch string(p) { + case "hello": + slf.bot.SendPacket([]byte("world")) + } + return 0, nil +} + +func TestNewBot(t *testing.T) { + srv := server.New(server.NetworkWebsocket) + + srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) { + t.Logf("connection opened: %s", conn.GetID()) + conn.Close() + conn.Write([]byte("hello")) + }) + srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) { + t.Logf("connection closed: %s", conn.GetID()) + }) + srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { + t.Logf("connection %s receive packet: %s", conn.GetID(), string(packet)) + conn.Write([]byte("world")) + }) + srv.RegStartFinishEvent(func(srv *server.Server) { + bot := server.NewBot(srv, server.WithBotNetworkDelay(100, 20), server.WithBotWriter(func(bot *server.Bot) io.Writer { + return &Writer{t: t, bot: bot} + })) + bot.JoinServer() + time.Sleep(time.Second) + bot.SendPacket([]byte("hello")) + }) + + srv.Run(":9600") +} diff --git a/server/conn.go b/server/conn.go index 81aa215..e474faf 100644 --- a/server/conn.go +++ b/server/conn.go @@ -13,11 +13,14 @@ import ( "github.com/kercylan98/minotaur/utils/timer" "github.com/panjf2000/gnet" "github.com/xtaci/kcp-go/v5" + "io" "net" "net/http" + "os" "runtime/debug" "strings" "sync" + "sync/atomic" "time" ) @@ -80,18 +83,25 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn { return c } -// NewEmptyConn 创建一个适用于测试的空连接 -func NewEmptyConn(server *Server) *Conn { +// newBotConn 创建一个适用于测试等情况的机器人连接 +func newBotConn(server *Server) *Conn { + ip, port := random.NetIP(), random.Port() + var writer io.Writer = os.Stdout c := &Conn{ ctx: server.ctx, connection: &connection{ - server: server, - remoteAddr: &net.TCPAddr{}, - ip: "0.0.0.0:0", - data: map[any]any{}, - openTime: time.Now(), + server: server, + remoteAddr: &net.TCPAddr{ + IP: ip, + Port: port, + Zone: "", + }, + ip: fmt.Sprintf("BOT:%s:%d", ip.String(), port), + data: map[any]any{}, + openTime: time.Now(), }, } + c.botWriter.Store(&writer) c.init() return c } @@ -104,20 +114,23 @@ type Conn struct { // connection 长久保持的连接 type connection struct { - server *Server - ticker *timer.Ticker - remoteAddr net.Addr - ip string - ws *websocket.Conn - gn gnet.Conn - kcp *kcp.UDPSession - gw func(packet []byte) - data map[any]any - closed bool - pool *concurrent.Pool[*connPacket] - loop *writeloop.WriteLoop[*connPacket] - mu sync.Mutex - openTime time.Time + server *Server + ticker *timer.Ticker + remoteAddr net.Addr + ip string + ws *websocket.Conn + gn gnet.Conn + kcp *kcp.UDPSession + gw func(packet []byte) + data map[any]any + closed bool + pool *concurrent.Pool[*connPacket] + loop *writeloop.WriteLoop[*connPacket] + mu sync.Mutex + openTime time.Time + delay time.Duration + fluctuation time.Duration + botWriter atomic.Pointer[io.Writer] } // Ticker 获取定时器 @@ -145,8 +158,8 @@ func (slf *Conn) GetWebsocketRequest() *http.Request { return slf.GetData(wsRequestKey).(*http.Request) } -// IsEmpty 是否是空连接 -func (slf *Conn) IsEmpty() bool { +// IsBot 是否是机器人连接 +func (slf *Conn) IsBot() bool { return slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil } @@ -158,6 +171,9 @@ func (slf *Conn) RemoteAddr() net.Addr { // GetID 获取连接ID // - 为远程地址的字符串形式 func (slf *Conn) GetID() string { + if slf.IsBot() { + return slf.ip + } return slf.remoteAddr.String() } @@ -231,6 +247,13 @@ func (slf *Conn) PushAsyncMessage(caller func() error, callback func(err error), slf.server.PushShuntAsyncMessage(slf, caller, callback, mark...) } +// PushUniqueAsyncMessage 推送唯一异步消息,该消息将通过 Server.PushUniqueShuntAsyncMessage 函数推送 +// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现 +// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息 +func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callback func(err error), mark ...log.Field) { + slf.server.PushUniqueShuntAsyncMessage(slf, name, caller, callback, mark...) +} + // Write 向连接中写入数据 // - messageType: websocket模式中指定消息类型 func (slf *Conn) Write(packet []byte, callback ...func(err error)) { @@ -274,6 +297,14 @@ func (slf *Conn) init() { ) slf.loop = writeloop.NewWriteLoop[*connPacket](slf.pool, func(data *connPacket) error { var err error + if slf.delay > 0 || slf.fluctuation > 0 { + time.Sleep(random.Duration(int64(slf.delay-slf.fluctuation), int64(slf.delay+slf.fluctuation))) + _, err = (*slf.botWriter.Load()).Write(data.packet) + if data.callback != nil { + data.callback(err) + } + return err + } if slf.IsWebsocket() { err = slf.ws.WriteMessage(data.wst, data.packet) } else { diff --git a/server/dispatcher.go b/server/dispatcher.go index e3b4c17..e5639e3 100644 --- a/server/dispatcher.go +++ b/server/dispatcher.go @@ -1,19 +1,35 @@ package server -import "github.com/kercylan98/minotaur/utils/buffer" +import ( + "github.com/alphadose/haxmap" + "github.com/kercylan98/minotaur/utils/buffer" +) + +var dispatcherUnique = struct{}{} // generateDispatcher 生成消息分发器 -func generateDispatcher(handler func(message *Message)) *dispatcher { +func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) *dispatcher { return &dispatcher{ buffer: buffer.NewUnboundedN[*Message](), handler: handler, + uniques: haxmap.New[string, struct{}](), } } // dispatcher 消息分发器 type dispatcher struct { buffer *buffer.Unbounded[*Message] - handler func(message *Message) + uniques *haxmap.Map[string, struct{}] + handler func(dispatcher *dispatcher, message *Message) +} + +func (slf *dispatcher) unique(name string) bool { + _, loaded := slf.uniques.GetOrSet(name, dispatcherUnique) + return loaded +} + +func (slf *dispatcher) antiUnique(name string) { + slf.uniques.Del(name) } func (slf *dispatcher) start() { @@ -24,7 +40,7 @@ func (slf *dispatcher) start() { return } slf.buffer.Load() - slf.handler(message) + slf.handler(slf, message) } } } diff --git a/server/message.go b/server/message.go index 74a7af9..69c5082 100644 --- a/server/message.go +++ b/server/message.go @@ -1,10 +1,8 @@ package server import ( - "fmt" "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/log" - "github.com/kercylan98/minotaur/utils/super" ) const ( @@ -32,20 +30,36 @@ const ( // MessageTypeShuntAsyncCallback 分流异步回调消息类型 MessageTypeShuntAsyncCallback + // MessageTypeUniqueAsync 唯一异步消息类型 + MessageTypeUniqueAsync + + // MessageTypeUniqueAsyncCallback 唯一异步回调消息类型 + MessageTypeUniqueAsyncCallback + + // MessageTypeUniqueShuntAsync 唯一分流异步消息类型 + MessageTypeUniqueShuntAsync + + // MessageTypeUniqueShuntAsyncCallback 唯一分流异步回调消息类型 + MessageTypeUniqueShuntAsyncCallback + // MessageTypeSystem 系统消息类型 MessageTypeSystem ) var messageNames = map[MessageType]string{ - MessageTypePacket: "MessageTypePacket", - MessageTypeError: "MessageTypeError", - MessageTypeTicker: "MessageTypeTicker", - MessageTypeShuntTicker: "MessageTypeShuntTicker", - MessageTypeAsync: "MessageTypeAsync", - MessageTypeAsyncCallback: "MessageTypeAsyncCallback", - MessageTypeShuntAsync: "MessageTypeShuntAsync", - MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback", - MessageTypeSystem: "MessageTypeSystem", + MessageTypePacket: "MessageTypePacket", + MessageTypeError: "MessageTypeError", + MessageTypeTicker: "MessageTypeTicker", + MessageTypeShuntTicker: "MessageTypeShuntTicker", + MessageTypeAsync: "MessageTypeAsync", + MessageTypeAsyncCallback: "MessageTypeAsyncCallback", + MessageTypeShuntAsync: "MessageTypeShuntAsync", + MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback", + MessageTypeUniqueAsync: "MessageTypeUniqueAsync", + MessageTypeUniqueAsyncCallback: "MessageTypeUniqueAsyncCallback", + MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync", + MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback", + MessageTypeSystem: "MessageTypeSystem", } const ( @@ -110,7 +124,7 @@ func (slf *Message) MessageType() MessageType { // String 返回消息的字符串表示 func (slf *Message) String() string { - return fmt.Sprintf("[%s] %s", slf.t, super.MarshalJSON(slf.marks)) + return slf.t.String() } // String 返回消息类型的字符串表示 @@ -160,6 +174,30 @@ func (slf *Message) castToShuntAsyncCallbackMessage(conn *Conn, err error, calle return slf } +// castToUniqueAsyncMessage 将消息转换为唯一异步消息 +func (slf *Message) castToUniqueAsyncMessage(unique string, caller func() error, callback func(err error), mark ...log.Field) *Message { + slf.t, slf.name, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeUniqueAsync, unique, caller, callback, mark + return slf +} + +// castToUniqueAsyncCallbackMessage 将消息转换为唯一异步回调消息 +func (slf *Message) castToUniqueAsyncCallbackMessage(unique string, err error, caller func(err error), mark ...log.Field) *Message { + slf.t, slf.name, slf.err, slf.errHandler, slf.marks = MessageTypeUniqueAsyncCallback, unique, err, caller, mark + return slf +} + +// castToUniqueShuntAsyncMessage 将消息转换为唯一分流异步消息 +func (slf *Message) castToUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) *Message { + slf.t, slf.conn, slf.name, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeUniqueShuntAsync, conn, unique, caller, callback, mark + return slf +} + +// castToUniqueShuntAsyncCallbackMessage 将消息转换为唯一分流异步回调消息 +func (slf *Message) castToUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, caller func(err error), mark ...log.Field) *Message { + slf.t, slf.conn, slf.name, slf.err, slf.errHandler, slf.marks = MessageTypeUniqueShuntAsyncCallback, conn, unique, err, caller, mark + return slf +} + // castToSystemMessage 将消息转换为系统消息 func (slf *Message) castToSystemMessage(caller func(), mark ...log.Field) *Message { slf.t, slf.ordinaryHandler, slf.marks = MessageTypeSystem, caller, mark diff --git a/server/server.go b/server/server.go index 4aedd39..6f32284 100644 --- a/server/server.go +++ b/server/server.go @@ -81,27 +81,27 @@ type Server struct { *event // 事件 *runtime // 运行时 *option // 可选项 - network Network // 网络类型 - addr string // 侦听地址 - systemSignal chan os.Signal // 系统信号 - online *concurrent.BalanceMap[string, *Conn] // 在线连接 ginServer *gin.Engine // HTTP模式下的路由器 httpServer *http.Server // HTTP模式下的服务器 grpcServer *grpc.Server // GRPC模式下的服务器 gServer *gNet // TCP或UDP模式下的服务器 - isRunning bool // 是否正在运行 - isShutdown atomic.Bool // 是否已关闭 - closeChannel chan struct{} // 关闭信号 + multiple *MultipleServer // 多服务器模式下的服务器 ants *ants.Pool // 协程池 messagePool *concurrent.Pool[*Message] // 消息池 - messageLock sync.RWMutex // 消息锁 - multiple *MultipleServer // 多服务器模式下的服务器 - multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 - runMode RunMode // 运行模式 - messageCounter atomic.Int64 // 消息计数器 ctx context.Context // 上下文 - dispatchers map[string]*dispatcher // 消息分发器 + online *concurrent.BalanceMap[string, *Conn] // 在线连接 + network Network // 网络类型 + addr string // 侦听地址 + runMode RunMode // 运行模式 + systemSignal chan os.Signal // 系统信号 + closeChannel chan struct{} // 关闭信号 + multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误 + messageLock sync.RWMutex // 消息锁 dispatcherLock sync.RWMutex // 消息分发器锁 + isShutdown atomic.Bool // 是否已关闭 + messageCounter atomic.Int64 // 消息计数器 + isRunning bool // 是否正在运行 + dispatchers map[string]*dispatcher // 消息分发器 } // Run 使用特定地址运行服务器 @@ -387,6 +387,13 @@ func (slf *Server) Run(addr string) error { return nil } +// IsSocket 是否是 Socket 模式 +func (slf *Server) IsSocket() bool { + return slf.network == NetworkTcp || slf.network == NetworkTcp4 || slf.network == NetworkTcp6 || + slf.network == NetworkUdp || slf.network == NetworkUdp4 || slf.network == NetworkUdp6 || + slf.network == NetworkUnix || slf.network == NetworkKcp || slf.network == NetworkWebsocket +} + // RunNone 是 Run("") 的简写,仅适用于运行 NetworkNone 服务器 func (slf *Server) RunNone() error { return slf.Run(str.None) @@ -407,6 +414,18 @@ func (slf *Server) GetOnlineCount() int { return slf.online.Size() } +// GetOnlineBotCount 获取在线机器人数量 +func (slf *Server) GetOnlineBotCount() int { + var count int + slf.online.Range(func(id string, conn *Conn) bool { + if conn.IsBot() { + count++ + } + return true + }) + return count +} + // GetOnline 获取在线连接 func (slf *Server) GetOnline(id string) *Conn { return slf.online.Get(id) @@ -589,18 +608,22 @@ func (slf *Server) pushMessage(message *Message) { break } fallthrough - case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback: + case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback: var created bool dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn)) if created { go dispatcher.start() } - case MessageTypeSystem, MessageTypeAsync, MessageTypeAsyncCallback, MessageTypeError, MessageTypeTicker: + case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker: dispatcher, _ = slf.useDispatcher(serverSystemDispatcher) } if dispatcher == nil { return } + if (message.t == MessageTypeUniqueShuntAsync || message.t == MessageTypeUniqueAsync) && dispatcher.unique(message.name) { + slf.messagePool.Release(message) + return + } slf.messageCounter.Add(1) dispatcher.put(message) } @@ -613,13 +636,17 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s)) } } - log.Warn("Server", log.String("type", "low-message"), log.String("cost", cost.String()), log.String("message", message.String()), log.Stack("stack")) + var fields = make([]log.Field, 0, len(message.marks)+4) + fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String())) + fields = append(fields, message.marks...) + fields = append(fields, log.Stack("stack")) + log.Warn("Server", fields...) slf.OnMessageLowExecEvent(message, cost) } } // dispatchMessage 消息分发 -func (slf *Server) dispatchMessage(msg *Message) { +func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { var ( ctx context.Context cancel context.CancelFunc @@ -637,7 +664,7 @@ func (slf *Server) dispatchMessage(msg *Message) { } present := time.Now() - if msg.t != MessageTypeAsync { + if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync { defer func(msg *Message) { if err := recover(); err != nil { stack := string(debug.Stack()) @@ -647,6 +674,9 @@ func (slf *Server) dispatchMessage(msg *Message) { slf.OnMessageErrorEvent(msg, e) } } + if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback { + dispatcher.antiUnique(msg.name) + } super.Handle(cancel) slf.low(msg, present, time.Millisecond*100) @@ -675,10 +705,13 @@ func (slf *Server) dispatchMessage(msg *Message) { } case MessageTypeTicker, MessageTypeShuntTicker: msg.ordinaryHandler() - case MessageTypeAsync, MessageTypeShuntAsync: + case MessageTypeAsync, MessageTypeShuntAsync, MessageTypeUniqueAsync, MessageTypeUniqueShuntAsync: if err := slf.ants.Submit(func() { defer func() { if err := recover(); err != nil { + if msg.t == MessageTypeUniqueAsync || msg.t == MessageTypeUniqueShuntAsync { + dispatcher.antiUnique(msg.name) + } stack := string(debug.Stack()) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack)) fmt.Println(stack) @@ -700,19 +733,28 @@ func (slf *Server) dispatchMessage(msg *Message) { } if msg.errHandler != nil { if msg.conn == nil { + if msg.t == MessageTypeUniqueAsync { + slf.PushUniqueAsyncCallbackMessage(msg.name, err, msg.errHandler) + return + } slf.PushAsyncCallbackMessage(err, msg.errHandler) return } + if msg.t == MessageTypeUniqueShuntAsync { + slf.PushUniqueShuntAsyncCallbackMessage(msg.conn, msg.name, err, msg.errHandler) + return + } slf.PushShuntAsyncCallbackMessage(msg.conn, err, msg.errHandler) return } + dispatcher.antiUnique(msg.name) if err != nil { log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack()))) } }); err != nil { panic(err) } - case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback: + case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback: msg.errHandler(msg.err) case MessageTypeSystem: msg.ordinaryHandler() @@ -796,6 +838,38 @@ func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func() slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...)) } +// PushUniqueAsyncMessage 向服务器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 +// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息 +func (slf *Server) PushUniqueAsyncMessage(unique string, caller func() error, callback func(err error), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToUniqueAsyncMessage(unique, caller, callback, mark...)) +} + +// PushUniqueAsyncCallbackMessage 向服务器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 +func (slf *Server) PushUniqueAsyncCallbackMessage(unique string, err error, callback func(err error), mark ...log.Field) { + slf.pushMessage(slf.messagePool.Get().castToUniqueAsyncCallbackMessage(unique, err, callback, mark...)) +} + +// PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致 +// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncMessage 进行转发 +// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息 +func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) { + if slf.shuntMatcher == nil { + slf.PushUniqueAsyncMessage(unique, caller, callback) + return + } + slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...)) +} + +// PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致 +// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncCallbackMessage 进行转发 +func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) { + if slf.shuntMatcher == nil { + slf.PushUniqueAsyncCallbackMessage(unique, err, callback) + return + } + slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...)) +} + // PushErrorMessage 向服务器中推送 MessageTypeError 消息 // - 通过该函数推送错误消息,当消息触发时将在系统分发器中处理消息 // - 参数 errAction 用于指定错误消息的处理方式,可选值为 MessageErrorActionNone 和 MessageErrorActionShutdown diff --git a/utils/concurrent/pool.go b/utils/concurrent/pool.go index b3aecb6..55c7390 100644 --- a/utils/concurrent/pool.go +++ b/utils/concurrent/pool.go @@ -2,7 +2,6 @@ package concurrent import ( "github.com/kercylan98/minotaur/utils/log" - "go.uber.org/zap" "sync" "time" ) @@ -76,7 +75,7 @@ func (slf *Pool[T]) Get() T { if !slf.silent { now := time.Now().Unix() if now-slf.warn >= 1 { - log.Warn("Pool", log.String("Get", "the number of buffer members is insufficient, consider whether it is due to unreleased or inappropriate buffer size"), zap.Stack("stack")) + log.Warn("Pool", log.String("Get", "the number of buffer members is insufficient, consider whether it is due to unreleased or inappropriate buffer size"), log.Stack("stack")) slf.warn = now } } diff --git a/utils/random/ip.go b/utils/random/ip.go new file mode 100644 index 0000000..952552f --- /dev/null +++ b/utils/random/ip.go @@ -0,0 +1,26 @@ +package random + +import ( + "fmt" + "net" +) + +// NetIP 返回一个随机的IP地址 +func NetIP() net.IP { + return net.IPv4(byte(Int64(0, 255)), byte(Int64(0, 255)), byte(Int64(0, 255)), byte(Int64(0, 255))) +} + +// Port 返回一个随机的端口号 +func Port() int { + return Int(1, 65535) +} + +// IPv4 返回一个随机产生的IPv4地址。 +func IPv4() string { + return fmt.Sprintf("%d.%d.%d.%d", Int(1, 255), Int(0, 255), Int(0, 255), Int(0, 255)) +} + +// IPv4Port 返回一个随机产生的IPv4地址和端口。 +func IPv4Port() string { + return fmt.Sprintf("%d.%d.%d.%d:%d", Int(1, 255), Int(0, 255), Int(0, 255), Int(0, 255), Int(1, 65535)) +} diff --git a/utils/super/retry.go b/utils/super/retry.go index 9e3f835..85cc9d7 100644 --- a/utils/super/retry.go +++ b/utils/super/retry.go @@ -47,8 +47,20 @@ func RetryByRule(f func() error, rule func(count int) time.Duration) error { // - multiplier:延迟时间的乘数,通常为 2 // - randomization:延迟时间的随机化因子,通常为 0.5 func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error { + return ConditionalRetryByExponentialBackoff(f, nil, maxRetries, baseDelay, maxDelay, multiplier, randomization) +} + +// ConditionalRetryByExponentialBackoff 该函数与 RetryByExponentialBackoff 类似,但是可以被中断 +// - cond 为中断条件,当 cond 返回 false 时,将会中断重试 +// +// 该函数通常用于在重试过程中,需要中断重试的场景,例如: +// - 用户请求开始游戏,由于网络等情况,进入重试状态。此时用户再次发送开始游戏请求,此时需要中断之前的重试,避免重复进入游戏 +func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error { retry := 0 for { + if cond != nil && !cond() { + return fmt.Errorf("interrupted") + } err := f() if err == nil { return nil