Merge branch 'develop'

This commit is contained in:
kercylan98 2023-11-13 16:06:08 +08:00
commit ace6a06910
17 changed files with 485 additions and 68 deletions

View File

@ -18,4 +18,4 @@ jobs:
bump-minor-pre-major: true
bump-patch-for-minor-pre-major: true
changelog-types: '[{"type":"other","section":"Other | 其他更改","hidden":false},{"type":"revert","section":"Reverts | 回退","hidden":false},{"type":"feat","section":"Features | 新特性","hidden":false},{"type":"fix","section":"Bug Fixes | 修复","hidden":false},{"type":"improvement","section":"Feature Improvements | 改进","hidden":false},{"type":"docs","section":"Docs | 文档优化","hidden":false},{"type":"style","section":"Styling | 可读性优化","hidden":false},{"type":"refactor","section":"Code Refactoring | 重构","hidden":false},{"type":"perf","section":"Performance Improvements | 性能优化","hidden":false},{"type":"test","section":"Tests | 新增或优化测试用例","hidden":false},{"type":"build","section":"Build System | 影响构建的修改","hidden":false},{"type":"ci","section":"CI | 更改我们的 CI 配置文件和脚本","hidden":false}]'
release-as: 0.3.0
# release-as: 0.3.0

View File

@ -221,6 +221,80 @@ func main() {
- 模板文件图例:
![exporter-xlsx-template.png](.github/images/exporter-xlsx-template.png)
#### 导出 JSON 文件(可供客户端直接使用,包含索引的配置导出后为键值模式,可直接读取)
```text
Flags:
-e, --exclude string excluded configuration names or display names (comma separated) | 排除的配置名或显示名(英文逗号分隔)
-h, --help help for json
-o, --output string directory path of the output json file | 输出的 json 文件所在目录路径
-p, --prefix string export configuration file name prefix | 导出配置文件名前缀
-t, --type string export server configuration[s] or client configuration[c] | 导出服务端配置[s]还是客户端配置[c]
-f, --xlsx string xlsx file path or directory path | xlsx 文件路径或所在目录路径
```
```shell
expoter.exe json -t s -f xlsx_template.xlsx -o ./output
```
导出结果示例
```json
{
"1": {
"b": {
"Id": 1,
"Count": "b",
"Info": {
"id": 1,
"name": "小明",
"info": {
"lv": 1,
"exp": {
"mux": 10,
"count": 100
}
}
},
"Other": [
{
"id": 1,
"name": "张飞"
},
{
"id": 2,
"name": "刘备"
}
]
}
}
}
```
#### 导出 Golang 文件
```text
Flags:
-e, --exclude string excluded configuration names or display names (comma separated) | 排除的配置名或显示名(英文逗号分隔)
-h, --help help for go
-o, --output string output path | 输出的 go 文件路径
-f, --xlsx string xlsx file path or directory path | xlsx 文件路径或所在目录路径
```
```shell
expoter.exe go -f xlsx_template.xlsx -o ./output
```
使用示例
```go
package main
import (
"fmt"
"config"
)
func main() {
fmt.Println(config.EasyConfig.Id)
}
```
### 持续更新的示例项目
- **[Minotaur-Example](https://github.com/kercylan98/minotaur-example)**

2
go.mod
View File

@ -4,7 +4,7 @@ go 1.21
require (
github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108
github.com/alphadose/haxmap v1.2.0
github.com/alphadose/haxmap v1.3.0
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.9.1
github.com/go-resty/resty/v2 v2.7.0

2
go.sum
View File

@ -4,6 +4,8 @@ github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108 h1:iPugyBI7
github.com/RussellLuo/timingwheel v0.0.0-20220218152713-54845bda3108/go.mod h1:WAMLHwunr1hi3u7OjGV6/VWG9QbdMhGpEKjROiSFd10=
github.com/alphadose/haxmap v1.2.0 h1:noGrAmCE+gNheZ4KpW+sYj9W5uMcO1UAjbAq9XBOAfM=
github.com/alphadose/haxmap v1.2.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/alphadose/haxmap v1.3.0 h1:C/2LboOnPCZP27GmmSXOcwx360st0P8N0fTJ3voefKc=
github.com/alphadose/haxmap v1.3.0/go.mod h1:rjHw1IAqbxm0S3U5tD16GoKsiAd8FWx5BJ2IYqXwgmM=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=

View File

@ -194,7 +194,7 @@ func (slf *Xlsx) checkFieldInvalid(field pce.DataField) bool {
return true
}
if strings.HasPrefix(field.Name, "#") || strings.HasPrefix(field.Type, "#") {
if strings.HasPrefix(field.Name, "#") || strings.HasPrefix(field.Type, "#") || strings.HasPrefix(field.Desc, "#") {
return true
}

Binary file not shown.

View File

@ -29,6 +29,7 @@ func (slf *Golang) Render(templates ...*pce.TmplStruct) (string, error) {
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/hash"
"sync"
"sync/atomic"
)
type Sign string
@ -41,7 +42,7 @@ func (slf *Golang) Render(templates ...*pce.TmplStruct) (string, error) {
var (
json = jsonIter.ConfigCompatibleWithStandardLibrary
configs map[Sign]any
configs atomic.Pointer[map[Sign]any]
signs = []Sign{
{{- range .Templates}}
{{.Name}}Sign,
@ -188,14 +189,14 @@ func (slf *Golang) Render(templates ...*pce.TmplStruct) (string, error) {
cs[{{.Name}}Sign] = {{.Name}}
{{- end}}
configs = cs
configs.Store(&cs)
}
// GetConfigs 获取所有配置
func GetConfigs() map[Sign]any {
mutex.Lock()
defer mutex.Unlock()
return hash.Copy(configs)
return hash.Copy(*configs.Load())
}
// GetConfigSigns 获取所有配置的标识
@ -207,7 +208,7 @@ func (slf *Golang) Render(templates ...*pce.TmplStruct) (string, error) {
func Sync(handle func(configs map[Sign]any)) {
mutex.Lock()
defer mutex.Unlock()
handle(hash.Copy(configs))
handle(hash.Copy(*configs.Load()))
}
`, slf)
}

69
server/bot.go Normal file
View File

@ -0,0 +1,69 @@
package server
import (
"fmt"
"io"
"sync/atomic"
"time"
)
// NewBot 创建一个机器人,目前仅支持 Socket 服务器
func NewBot(srv *Server, options ...BotOption) *Bot {
if !srv.IsSocket() {
panic(fmt.Errorf("server type[%s] is not socket", srv.network))
}
bot := &Bot{
conn: newBotConn(srv),
}
for _, option := range options {
option(bot)
}
return bot
}
type Bot struct {
conn *Conn
joined atomic.Bool
}
// JoinServer 加入服务器
func (slf *Bot) JoinServer() {
if slf.joined.Swap(true) {
slf.conn.server.OnConnectionClosedEvent(slf.conn, nil)
}
slf.conn.server.OnConnectionOpenedEvent(slf.conn)
}
// LeaveServer 离开服务器
func (slf *Bot) LeaveServer() {
if slf.joined.Swap(false) {
slf.conn.server.OnConnectionClosedEvent(slf.conn, nil)
}
}
// SetNetworkDelay 设置网络延迟和波动范围
// - delay 延迟
// - fluctuation 波动范围
func (slf *Bot) SetNetworkDelay(delay, fluctuation time.Duration) {
slf.conn.delay = delay
slf.conn.fluctuation = fluctuation
}
// SetWriter 设置写入器
func (slf *Bot) SetWriter(writer io.Writer) {
slf.conn.botWriter.Store(&writer)
}
// SendPacket 发送数据包到服务器
func (slf *Bot) SendPacket(packet []byte) {
if slf.conn.server.IsOnline(slf.conn.GetID()) {
slf.conn.server.PushPacketMessage(slf.conn, 0, packet)
}
}
// SendWSPacket 发送 WebSocket 数据包到服务器
func (slf *Bot) SendWSPacket(wst int, packet []byte) {
if slf.conn.server.IsOnline(slf.conn.GetID()) {
slf.conn.server.PushPacketMessage(slf.conn, wst, packet)
}
}

26
server/bot_options.go Normal file
View File

@ -0,0 +1,26 @@
package server
import (
"io"
"time"
)
type BotOption func(bot *Bot)
// WithBotNetworkDelay 设置机器人网络延迟及波动范围
// - delay 延迟
// - fluctuation 波动范围
func WithBotNetworkDelay(delay, fluctuation time.Duration) BotOption {
return func(bot *Bot) {
bot.conn.delay = delay
bot.conn.fluctuation = fluctuation
}
}
// WithBotWriter 设置机器人写入器,默认为 os.Stdout
func WithBotWriter(construction func(bot *Bot) io.Writer) BotOption {
return func(bot *Bot) {
writer := construction(bot)
bot.conn.botWriter.Store(&writer)
}
}

49
server/bot_test.go Normal file
View File

@ -0,0 +1,49 @@
package server_test
import (
"github.com/kercylan98/minotaur/server"
"io"
"testing"
"time"
)
type Writer struct {
t *testing.T
bot *server.Bot
}
func (slf *Writer) Write(p []byte) (n int, err error) {
slf.t.Log(string(p))
switch string(p) {
case "hello":
slf.bot.SendPacket([]byte("world"))
}
return 0, nil
}
func TestNewBot(t *testing.T) {
srv := server.New(server.NetworkWebsocket)
srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) {
t.Logf("connection opened: %s", conn.GetID())
conn.Close()
conn.Write([]byte("hello"))
})
srv.RegConnectionClosedEvent(func(srv *server.Server, conn *server.Conn, err any) {
t.Logf("connection closed: %s", conn.GetID())
})
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
t.Logf("connection %s receive packet: %s", conn.GetID(), string(packet))
conn.Write([]byte("world"))
})
srv.RegStartFinishEvent(func(srv *server.Server) {
bot := server.NewBot(srv, server.WithBotNetworkDelay(100, 20), server.WithBotWriter(func(bot *server.Bot) io.Writer {
return &Writer{t: t, bot: bot}
}))
bot.JoinServer()
time.Sleep(time.Second)
bot.SendPacket([]byte("hello"))
})
srv.Run(":9600")
}

View File

@ -13,11 +13,14 @@ import (
"github.com/kercylan98/minotaur/utils/timer"
"github.com/panjf2000/gnet"
"github.com/xtaci/kcp-go/v5"
"io"
"net"
"net/http"
"os"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"
)
@ -80,18 +83,25 @@ func newWebsocketConn(server *Server, ws *websocket.Conn, ip string) *Conn {
return c
}
// NewEmptyConn 创建一个适用于测试的空连接
func NewEmptyConn(server *Server) *Conn {
// newBotConn 创建一个适用于测试等情况的机器人连接
func newBotConn(server *Server) *Conn {
ip, port := random.NetIP(), random.Port()
var writer io.Writer = os.Stdout
c := &Conn{
ctx: server.ctx,
connection: &connection{
server: server,
remoteAddr: &net.TCPAddr{},
ip: "0.0.0.0:0",
data: map[any]any{},
openTime: time.Now(),
server: server,
remoteAddr: &net.TCPAddr{
IP: ip,
Port: port,
Zone: "",
},
ip: fmt.Sprintf("BOT:%s:%d", ip.String(), port),
data: map[any]any{},
openTime: time.Now(),
},
}
c.botWriter.Store(&writer)
c.init()
return c
}
@ -104,20 +114,23 @@ type Conn struct {
// connection 长久保持的连接
type connection struct {
server *Server
ticker *timer.Ticker
remoteAddr net.Addr
ip string
ws *websocket.Conn
gn gnet.Conn
kcp *kcp.UDPSession
gw func(packet []byte)
data map[any]any
closed bool
pool *concurrent.Pool[*connPacket]
loop *writeloop.WriteLoop[*connPacket]
mu sync.Mutex
openTime time.Time
server *Server
ticker *timer.Ticker
remoteAddr net.Addr
ip string
ws *websocket.Conn
gn gnet.Conn
kcp *kcp.UDPSession
gw func(packet []byte)
data map[any]any
closed bool
pool *concurrent.Pool[*connPacket]
loop *writeloop.WriteLoop[*connPacket]
mu sync.Mutex
openTime time.Time
delay time.Duration
fluctuation time.Duration
botWriter atomic.Pointer[io.Writer]
}
// Ticker 获取定时器
@ -145,8 +158,8 @@ func (slf *Conn) GetWebsocketRequest() *http.Request {
return slf.GetData(wsRequestKey).(*http.Request)
}
// IsEmpty 是否是空连接
func (slf *Conn) IsEmpty() bool {
// IsBot 是否是机器人连接
func (slf *Conn) IsBot() bool {
return slf.ws == nil && slf.gn == nil && slf.kcp == nil && slf.gw == nil
}
@ -158,6 +171,9 @@ func (slf *Conn) RemoteAddr() net.Addr {
// GetID 获取连接ID
// - 为远程地址的字符串形式
func (slf *Conn) GetID() string {
if slf.IsBot() {
return slf.ip
}
return slf.remoteAddr.String()
}
@ -231,6 +247,13 @@ func (slf *Conn) PushAsyncMessage(caller func() error, callback func(err error),
slf.server.PushShuntAsyncMessage(slf, caller, callback, mark...)
}
// PushUniqueAsyncMessage 推送唯一异步消息,该消息将通过 Server.PushUniqueShuntAsyncMessage 函数推送
// - mark 为可选的日志标记,当发生异常时,将会在日志中进行体现
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
func (slf *Conn) PushUniqueAsyncMessage(name string, caller func() error, callback func(err error), mark ...log.Field) {
slf.server.PushUniqueShuntAsyncMessage(slf, name, caller, callback, mark...)
}
// Write 向连接中写入数据
// - messageType: websocket模式中指定消息类型
func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
@ -274,6 +297,14 @@ func (slf *Conn) init() {
)
slf.loop = writeloop.NewWriteLoop[*connPacket](slf.pool, func(data *connPacket) error {
var err error
if slf.delay > 0 || slf.fluctuation > 0 {
time.Sleep(random.Duration(int64(slf.delay-slf.fluctuation), int64(slf.delay+slf.fluctuation)))
_, err = (*slf.botWriter.Load()).Write(data.packet)
if data.callback != nil {
data.callback(err)
}
return err
}
if slf.IsWebsocket() {
err = slf.ws.WriteMessage(data.wst, data.packet)
} else {

View File

@ -1,19 +1,35 @@
package server
import "github.com/kercylan98/minotaur/utils/buffer"
import (
"github.com/alphadose/haxmap"
"github.com/kercylan98/minotaur/utils/buffer"
)
var dispatcherUnique = struct{}{}
// generateDispatcher 生成消息分发器
func generateDispatcher(handler func(message *Message)) *dispatcher {
func generateDispatcher(handler func(dispatcher *dispatcher, message *Message)) *dispatcher {
return &dispatcher{
buffer: buffer.NewUnboundedN[*Message](),
handler: handler,
uniques: haxmap.New[string, struct{}](),
}
}
// dispatcher 消息分发器
type dispatcher struct {
buffer *buffer.Unbounded[*Message]
handler func(message *Message)
uniques *haxmap.Map[string, struct{}]
handler func(dispatcher *dispatcher, message *Message)
}
func (slf *dispatcher) unique(name string) bool {
_, loaded := slf.uniques.GetOrSet(name, dispatcherUnique)
return loaded
}
func (slf *dispatcher) antiUnique(name string) {
slf.uniques.Del(name)
}
func (slf *dispatcher) start() {
@ -24,7 +40,7 @@ func (slf *dispatcher) start() {
return
}
slf.buffer.Load()
slf.handler(message)
slf.handler(slf, message)
}
}
}

View File

@ -1,10 +1,8 @@
package server
import (
"fmt"
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
)
const (
@ -32,20 +30,36 @@ const (
// MessageTypeShuntAsyncCallback 分流异步回调消息类型
MessageTypeShuntAsyncCallback
// MessageTypeUniqueAsync 唯一异步消息类型
MessageTypeUniqueAsync
// MessageTypeUniqueAsyncCallback 唯一异步回调消息类型
MessageTypeUniqueAsyncCallback
// MessageTypeUniqueShuntAsync 唯一分流异步消息类型
MessageTypeUniqueShuntAsync
// MessageTypeUniqueShuntAsyncCallback 唯一分流异步回调消息类型
MessageTypeUniqueShuntAsyncCallback
// MessageTypeSystem 系统消息类型
MessageTypeSystem
)
var messageNames = map[MessageType]string{
MessageTypePacket: "MessageTypePacket",
MessageTypeError: "MessageTypeError",
MessageTypeTicker: "MessageTypeTicker",
MessageTypeShuntTicker: "MessageTypeShuntTicker",
MessageTypeAsync: "MessageTypeAsync",
MessageTypeAsyncCallback: "MessageTypeAsyncCallback",
MessageTypeShuntAsync: "MessageTypeShuntAsync",
MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback",
MessageTypeSystem: "MessageTypeSystem",
MessageTypePacket: "MessageTypePacket",
MessageTypeError: "MessageTypeError",
MessageTypeTicker: "MessageTypeTicker",
MessageTypeShuntTicker: "MessageTypeShuntTicker",
MessageTypeAsync: "MessageTypeAsync",
MessageTypeAsyncCallback: "MessageTypeAsyncCallback",
MessageTypeShuntAsync: "MessageTypeShuntAsync",
MessageTypeShuntAsyncCallback: "MessageTypeShuntAsyncCallback",
MessageTypeUniqueAsync: "MessageTypeUniqueAsync",
MessageTypeUniqueAsyncCallback: "MessageTypeUniqueAsyncCallback",
MessageTypeUniqueShuntAsync: "MessageTypeUniqueShuntAsync",
MessageTypeUniqueShuntAsyncCallback: "MessageTypeUniqueShuntAsyncCallback",
MessageTypeSystem: "MessageTypeSystem",
}
const (
@ -110,7 +124,7 @@ func (slf *Message) MessageType() MessageType {
// String 返回消息的字符串表示
func (slf *Message) String() string {
return fmt.Sprintf("[%s] %s", slf.t, super.MarshalJSON(slf.marks))
return slf.t.String()
}
// String 返回消息类型的字符串表示
@ -160,6 +174,30 @@ func (slf *Message) castToShuntAsyncCallbackMessage(conn *Conn, err error, calle
return slf
}
// castToUniqueAsyncMessage 将消息转换为唯一异步消息
func (slf *Message) castToUniqueAsyncMessage(unique string, caller func() error, callback func(err error), mark ...log.Field) *Message {
slf.t, slf.name, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeUniqueAsync, unique, caller, callback, mark
return slf
}
// castToUniqueAsyncCallbackMessage 将消息转换为唯一异步回调消息
func (slf *Message) castToUniqueAsyncCallbackMessage(unique string, err error, caller func(err error), mark ...log.Field) *Message {
slf.t, slf.name, slf.err, slf.errHandler, slf.marks = MessageTypeUniqueAsyncCallback, unique, err, caller, mark
return slf
}
// castToUniqueShuntAsyncMessage 将消息转换为唯一分流异步消息
func (slf *Message) castToUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) *Message {
slf.t, slf.conn, slf.name, slf.exceptionHandler, slf.errHandler, slf.marks = MessageTypeUniqueShuntAsync, conn, unique, caller, callback, mark
return slf
}
// castToUniqueShuntAsyncCallbackMessage 将消息转换为唯一分流异步回调消息
func (slf *Message) castToUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, caller func(err error), mark ...log.Field) *Message {
slf.t, slf.conn, slf.name, slf.err, slf.errHandler, slf.marks = MessageTypeUniqueShuntAsyncCallback, conn, unique, err, caller, mark
return slf
}
// castToSystemMessage 将消息转换为系统消息
func (slf *Message) castToSystemMessage(caller func(), mark ...log.Field) *Message {
slf.t, slf.ordinaryHandler, slf.marks = MessageTypeSystem, caller, mark

View File

@ -81,27 +81,27 @@ type Server struct {
*event // 事件
*runtime // 运行时
*option // 可选项
network Network // 网络类型
addr string // 侦听地址
systemSignal chan os.Signal // 系统信号
online *concurrent.BalanceMap[string, *Conn] // 在线连接
ginServer *gin.Engine // HTTP模式下的路由器
httpServer *http.Server // HTTP模式下的服务器
grpcServer *grpc.Server // GRPC模式下的服务器
gServer *gNet // TCP或UDP模式下的服务器
isRunning bool // 是否正在运行
isShutdown atomic.Bool // 是否已关闭
closeChannel chan struct{} // 关闭信号
multiple *MultipleServer // 多服务器模式下的服务器
ants *ants.Pool // 协程池
messagePool *concurrent.Pool[*Message] // 消息池
messageLock sync.RWMutex // 消息锁
multiple *MultipleServer // 多服务器模式下的服务器
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
runMode RunMode // 运行模式
messageCounter atomic.Int64 // 消息计数器
ctx context.Context // 上下文
dispatchers map[string]*dispatcher // 消息分发器
online *concurrent.BalanceMap[string, *Conn] // 在线连接
network Network // 网络类型
addr string // 侦听地址
runMode RunMode // 运行模式
systemSignal chan os.Signal // 系统信号
closeChannel chan struct{} // 关闭信号
multipleRuntimeErrorChan chan error // 多服务器模式下的运行时错误
messageLock sync.RWMutex // 消息锁
dispatcherLock sync.RWMutex // 消息分发器锁
isShutdown atomic.Bool // 是否已关闭
messageCounter atomic.Int64 // 消息计数器
isRunning bool // 是否正在运行
dispatchers map[string]*dispatcher // 消息分发器
}
// Run 使用特定地址运行服务器
@ -387,6 +387,13 @@ func (slf *Server) Run(addr string) error {
return nil
}
// IsSocket 是否是 Socket 模式
func (slf *Server) IsSocket() bool {
return slf.network == NetworkTcp || slf.network == NetworkTcp4 || slf.network == NetworkTcp6 ||
slf.network == NetworkUdp || slf.network == NetworkUdp4 || slf.network == NetworkUdp6 ||
slf.network == NetworkUnix || slf.network == NetworkKcp || slf.network == NetworkWebsocket
}
// RunNone 是 Run("") 的简写,仅适用于运行 NetworkNone 服务器
func (slf *Server) RunNone() error {
return slf.Run(str.None)
@ -407,6 +414,18 @@ func (slf *Server) GetOnlineCount() int {
return slf.online.Size()
}
// GetOnlineBotCount 获取在线机器人数量
func (slf *Server) GetOnlineBotCount() int {
var count int
slf.online.Range(func(id string, conn *Conn) bool {
if conn.IsBot() {
count++
}
return true
})
return count
}
// GetOnline 获取在线连接
func (slf *Server) GetOnline(id string) *Conn {
return slf.online.Get(id)
@ -589,18 +608,22 @@ func (slf *Server) pushMessage(message *Message) {
break
}
fallthrough
case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback:
case MessageTypeShuntTicker, MessageTypeShuntAsync, MessageTypeShuntAsyncCallback, MessageTypeUniqueShuntAsync, MessageTypeUniqueShuntAsyncCallback:
var created bool
dispatcher, created = slf.useDispatcher(slf.shuntMatcher(message.conn))
if created {
go dispatcher.start()
}
case MessageTypeSystem, MessageTypeAsync, MessageTypeAsyncCallback, MessageTypeError, MessageTypeTicker:
case MessageTypeSystem, MessageTypeAsync, MessageTypeUniqueAsync, MessageTypeAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeError, MessageTypeTicker:
dispatcher, _ = slf.useDispatcher(serverSystemDispatcher)
}
if dispatcher == nil {
return
}
if (message.t == MessageTypeUniqueShuntAsync || message.t == MessageTypeUniqueAsync) && dispatcher.unique(message.name) {
slf.messagePool.Release(message)
return
}
slf.messageCounter.Add(1)
dispatcher.put(message)
}
@ -613,13 +636,17 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration
message.marks = append(message.marks, log.String(fmt.Sprintf("Other-%d", i+1), s))
}
}
log.Warn("Server", log.String("type", "low-message"), log.String("cost", cost.String()), log.String("message", message.String()), log.Stack("stack"))
var fields = make([]log.Field, 0, len(message.marks)+4)
fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String()))
fields = append(fields, message.marks...)
fields = append(fields, log.Stack("stack"))
log.Warn("Server", fields...)
slf.OnMessageLowExecEvent(message, cost)
}
}
// dispatchMessage 消息分发
func (slf *Server) dispatchMessage(msg *Message) {
func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
var (
ctx context.Context
cancel context.CancelFunc
@ -637,7 +664,7 @@ func (slf *Server) dispatchMessage(msg *Message) {
}
present := time.Now()
if msg.t != MessageTypeAsync {
if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync {
defer func(msg *Message) {
if err := recover(); err != nil {
stack := string(debug.Stack())
@ -647,6 +674,9 @@ func (slf *Server) dispatchMessage(msg *Message) {
slf.OnMessageErrorEvent(msg, e)
}
}
if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback {
dispatcher.antiUnique(msg.name)
}
super.Handle(cancel)
slf.low(msg, present, time.Millisecond*100)
@ -675,10 +705,13 @@ func (slf *Server) dispatchMessage(msg *Message) {
}
case MessageTypeTicker, MessageTypeShuntTicker:
msg.ordinaryHandler()
case MessageTypeAsync, MessageTypeShuntAsync:
case MessageTypeAsync, MessageTypeShuntAsync, MessageTypeUniqueAsync, MessageTypeUniqueShuntAsync:
if err := slf.ants.Submit(func() {
defer func() {
if err := recover(); err != nil {
if msg.t == MessageTypeUniqueAsync || msg.t == MessageTypeUniqueShuntAsync {
dispatcher.antiUnique(msg.name)
}
stack := string(debug.Stack())
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack))
fmt.Println(stack)
@ -700,19 +733,28 @@ func (slf *Server) dispatchMessage(msg *Message) {
}
if msg.errHandler != nil {
if msg.conn == nil {
if msg.t == MessageTypeUniqueAsync {
slf.PushUniqueAsyncCallbackMessage(msg.name, err, msg.errHandler)
return
}
slf.PushAsyncCallbackMessage(err, msg.errHandler)
return
}
if msg.t == MessageTypeUniqueShuntAsync {
slf.PushUniqueShuntAsyncCallbackMessage(msg.conn, msg.name, err, msg.errHandler)
return
}
slf.PushShuntAsyncCallbackMessage(msg.conn, err, msg.errHandler)
return
}
dispatcher.antiUnique(msg.name)
if err != nil {
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", string(debug.Stack())))
}
}); err != nil {
panic(err)
}
case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback:
case MessageTypeAsyncCallback, MessageTypeShuntAsyncCallback, MessageTypeUniqueAsyncCallback, MessageTypeUniqueShuntAsyncCallback:
msg.errHandler(msg.err)
case MessageTypeSystem:
msg.ordinaryHandler()
@ -796,6 +838,38 @@ func (slf *Server) PushShuntTickerMessage(conn *Conn, name string, caller func()
slf.pushMessage(slf.messagePool.Get().castToShuntTickerMessage(conn, name, caller, mark...))
}
// PushUniqueAsyncMessage 向服务器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
func (slf *Server) PushUniqueAsyncMessage(unique string, caller func() error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToUniqueAsyncMessage(unique, caller, callback, mark...))
}
// PushUniqueAsyncCallbackMessage 向服务器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
func (slf *Server) PushUniqueAsyncCallbackMessage(unique string, err error, callback func(err error), mark ...log.Field) {
slf.pushMessage(slf.messagePool.Get().castToUniqueAsyncCallbackMessage(unique, err, callback, mark...))
}
// PushUniqueShuntAsyncMessage 向特定分发器中推送 MessageTypeAsync 消息,消息执行与 MessageTypeAsync 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncMessage 进行转发
// - 不同的是当上一个相同的 unique 消息未执行完成时,将会忽略该消息
func (slf *Server) PushUniqueShuntAsyncMessage(conn *Conn, unique string, caller func() error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushUniqueAsyncMessage(unique, caller, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncMessage(conn, unique, caller, callback, mark...))
}
// PushUniqueShuntAsyncCallbackMessage 向特定分发器中推送 MessageTypeAsyncCallback 消息,消息执行与 MessageTypeAsyncCallback 一致
// - 需要注意的是,当未指定 WithShunt 时,将会通过 PushUniqueAsyncCallbackMessage 进行转发
func (slf *Server) PushUniqueShuntAsyncCallbackMessage(conn *Conn, unique string, err error, callback func(err error), mark ...log.Field) {
if slf.shuntMatcher == nil {
slf.PushUniqueAsyncCallbackMessage(unique, err, callback)
return
}
slf.pushMessage(slf.messagePool.Get().castToUniqueShuntAsyncCallbackMessage(conn, unique, err, callback, mark...))
}
// PushErrorMessage 向服务器中推送 MessageTypeError 消息
// - 通过该函数推送错误消息,当消息触发时将在系统分发器中处理消息
// - 参数 errAction 用于指定错误消息的处理方式,可选值为 MessageErrorActionNone 和 MessageErrorActionShutdown

View File

@ -2,7 +2,6 @@ package concurrent
import (
"github.com/kercylan98/minotaur/utils/log"
"go.uber.org/zap"
"sync"
"time"
)
@ -76,7 +75,7 @@ func (slf *Pool[T]) Get() T {
if !slf.silent {
now := time.Now().Unix()
if now-slf.warn >= 1 {
log.Warn("Pool", log.String("Get", "the number of buffer members is insufficient, consider whether it is due to unreleased or inappropriate buffer size"), zap.Stack("stack"))
log.Warn("Pool", log.String("Get", "the number of buffer members is insufficient, consider whether it is due to unreleased or inappropriate buffer size"), log.Stack("stack"))
slf.warn = now
}
}

26
utils/random/ip.go Normal file
View File

@ -0,0 +1,26 @@
package random
import (
"fmt"
"net"
)
// NetIP 返回一个随机的IP地址
func NetIP() net.IP {
return net.IPv4(byte(Int64(0, 255)), byte(Int64(0, 255)), byte(Int64(0, 255)), byte(Int64(0, 255)))
}
// Port 返回一个随机的端口号
func Port() int {
return Int(1, 65535)
}
// IPv4 返回一个随机产生的IPv4地址。
func IPv4() string {
return fmt.Sprintf("%d.%d.%d.%d", Int(1, 255), Int(0, 255), Int(0, 255), Int(0, 255))
}
// IPv4Port 返回一个随机产生的IPv4地址和端口。
func IPv4Port() string {
return fmt.Sprintf("%d.%d.%d.%d:%d", Int(1, 255), Int(0, 255), Int(0, 255), Int(0, 255), Int(1, 65535))
}

View File

@ -47,8 +47,20 @@ func RetryByRule(f func() error, rule func(count int) time.Duration) error {
// - multiplier延迟时间的乘数通常为 2
// - randomization延迟时间的随机化因子通常为 0.5
func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error {
return ConditionalRetryByExponentialBackoff(f, nil, maxRetries, baseDelay, maxDelay, multiplier, randomization)
}
// ConditionalRetryByExponentialBackoff 该函数与 RetryByExponentialBackoff 类似,但是可以被中断
// - cond 为中断条件,当 cond 返回 false 时,将会中断重试
//
// 该函数通常用于在重试过程中,需要中断重试的场景,例如:
// - 用户请求开始游戏,由于网络等情况,进入重试状态。此时用户再次发送开始游戏请求,此时需要中断之前的重试,避免重复进入游戏
func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error {
retry := 0
for {
if cond != nil && !cond() {
return fmt.Errorf("interrupted")
}
err := f()
if err == nil {
return nil