diff --git a/README.md b/README.md index b15afec..dd94c44 100644 --- a/README.md +++ b/README.md @@ -158,7 +158,7 @@ package main import "github.com/kercylan98/minotaur/server" func main() { - srv := server.New(server.NetworkWebsocket, server.WithTicker(50, 10, false)) + srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 50, 10, false)) if err := srv.Run(":9999"); err != nil { panic(err) } diff --git a/examples/internal/ticker-server/main.go b/examples/internal/ticker-server/main.go index 680274a..c7a0df5 100644 --- a/examples/internal/ticker-server/main.go +++ b/examples/internal/ticker-server/main.go @@ -3,7 +3,7 @@ package main import "github.com/kercylan98/minotaur/server" func main() { - srv := server.New(server.NetworkWebsocket, server.WithTicker(50, 10, false)) + srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 50, 10, false)) if err := srv.Run(":9999"); err != nil { panic(err) } diff --git a/go.mod b/go.mod index 42bcad4..14aa3a8 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/xtaci/kcp-go/v5 v5.6.3 go.uber.org/atomic v1.10.0 go.uber.org/zap v1.25.0 - golang.org/x/crypto v0.14.0 + golang.org/x/crypto v0.17.0 google.golang.org/grpc v1.59.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) @@ -62,9 +62,9 @@ require ( golang.org/x/arch v0.3.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.14.0 // indirect - golang.org/x/term v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/term v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 7841553..a26711f 100644 --- a/go.sum +++ b/go.sum @@ -201,8 +201,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= @@ -245,16 +245,16 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211204120058-94396e421777/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/server/conn.go b/server/conn.go index 1e8d784..cdec322 100644 --- a/server/conn.go +++ b/server/conn.go @@ -277,11 +277,11 @@ func (slf *Conn) Write(packet []byte, callback ...func(err error)) { } func (slf *Conn) init() { - if slf.server.ticker != nil { + if slf.server.ticker != nil && slf.server.connTickerSize > 0 { if slf.server.tickerAutonomy { - slf.ticker = timer.GetTicker(slf.server.connTickerSize) + slf.ticker = slf.server.tickerPool.GetTicker(slf.server.connTickerSize) } else { - slf.ticker = timer.GetTicker(slf.server.connTickerSize, timer.WithCaller(func(name string, caller func()) { + slf.ticker = slf.server.tickerPool.GetTicker(slf.server.connTickerSize, timer.WithCaller(func(name string, caller func()) { slf.server.PushShuntTickerMessage(slf, name, caller) })) } @@ -335,6 +335,9 @@ func (slf *Conn) init() { func (slf *Conn) Close(err ...error) { slf.mu.Lock() if slf.closed { + if slf.ticker != nil { + slf.ticker.Release() + } slf.mu.Unlock() return } diff --git a/server/lockstep/lockstep.go b/server/lockstep/lockstep.go index 7218876..640c093 100644 --- a/server/lockstep/lockstep.go +++ b/server/lockstep/lockstep.go @@ -2,7 +2,6 @@ package lockstep import ( "encoding/json" - "github.com/kercylan98/minotaur/utils/timer" "sync" "time" ) @@ -11,7 +10,6 @@ import ( func NewLockstep[ClientID comparable, Command any](options ...Option[ClientID, Command]) *Lockstep[ClientID, Command] { lockstep := &Lockstep[ClientID, Command]{ currentFrame: -1, - ticker: timer.GetTicker(10), frameRate: 15, serialization: func(frame int64, commands []Command) []byte { frameStruct := struct { @@ -55,7 +53,7 @@ type Lockstep[ClientID comparable, Command any] struct { frameCache map[int64][]byte // 帧序列化缓存 frameCacheLock sync.RWMutex // 帧序列化缓存锁 - ticker *timer.Ticker // 定时器 + ticker *time.Ticker // 定时器 lockstepStoppedEventHandles []StoppedEventHandle[ClientID, Command] } @@ -123,46 +121,52 @@ func (slf *Lockstep[ClientID, Command]) StartBroadcast() { return } slf.running = true + if slf.ticker == nil { + slf.ticker = time.NewTicker(time.Second / time.Duration(slf.frameRate)) + } slf.runningLock.Unlock() slf.currentFrame = slf.initFrame - slf.ticker.Loop("lockstep", timer.Instantly, time.Second/time.Duration(slf.frameRate), timer.Forever, func() { - - slf.currentFrameLock.RLock() - if slf.frameLimit > 0 && slf.currentFrame >= slf.frameLimit { - slf.currentFrameLock.RUnlock() - slf.StopBroadcast() - return - } - slf.currentFrameLock.RUnlock() - slf.currentFrameLock.Lock() - slf.currentFrame++ - currentFrame := slf.currentFrame - currentCommands := slf.currentCommands - slf.currentCommands = make([]Command, 0, len(currentCommands)) - slf.currentFrameLock.Unlock() - - slf.clientLock.RLock() - defer slf.clientLock.RUnlock() - slf.frameCacheLock.Lock() - defer slf.frameCacheLock.Unlock() - - for clientId, client := range slf.clients { - var i = slf.clientFrame[clientId] - if i < slf.initFrame { - i = slf.initFrame - } - for ; i < currentFrame; i++ { - cache, exist := slf.frameCache[i] - if !exist { - cache = slf.serialization(i, currentCommands) - slf.frameCache[i] = cache + go func(ls *Lockstep[ClientID, Command]) { + for range ls.ticker.C { + go func(ls *Lockstep[ClientID, Command]) { + ls.currentFrameLock.RLock() + if ls.frameLimit > 0 && ls.currentFrame >= ls.frameLimit { + ls.currentFrameLock.RUnlock() + ls.StopBroadcast() + return } - client.Write(cache) - } - slf.clientFrame[clientId] = currentFrame + ls.currentFrameLock.RUnlock() + ls.currentFrameLock.Lock() + ls.currentFrame++ + currentFrame := ls.currentFrame + currentCommands := ls.currentCommands + ls.currentCommands = make([]Command, 0, len(currentCommands)) + ls.currentFrameLock.Unlock() + + ls.clientLock.RLock() + defer ls.clientLock.RUnlock() + ls.frameCacheLock.Lock() + defer ls.frameCacheLock.Unlock() + + for clientId, client := range ls.clients { + var i = ls.clientFrame[clientId] + if i < ls.initFrame { + i = ls.initFrame + } + for ; i < currentFrame; i++ { + cache, exist := ls.frameCache[i] + if !exist { + cache = ls.serialization(i, currentCommands) + ls.frameCache[i] = cache + } + client.Write(cache) + } + ls.clientFrame[clientId] = currentFrame + } + }(ls) } - }) + }(slf) } // StopBroadcast 停止广播 @@ -175,7 +179,10 @@ func (slf *Lockstep[ClientID, Command]) StopBroadcast() { slf.running = false slf.runningLock.Unlock() - slf.ticker.StopTimer("lockstep") + if slf.ticker != nil { + slf.ticker.Stop() + } + slf.ticker = nil slf.OnLockstepStoppedEvent() diff --git a/server/message.go b/server/message.go index 500ace4..d339566 100644 --- a/server/message.go +++ b/server/message.go @@ -3,6 +3,7 @@ package server import ( "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/super" ) const ( @@ -128,7 +129,17 @@ func (slf *Message) MessageType() MessageType { // String 返回消息的字符串表示 func (slf *Message) String() string { - return slf.t.String() + var info = struct { + Type string `json:"type,omitempty"` + Name string `json:"name,omitempty"` + Packet string `json:"packet,omitempty"` + }{ + Type: slf.t.String(), + Name: slf.name, + Packet: string(slf.packet), + } + + return string(super.MarshalJSON(info)) } // String 返回消息类型的字符串表示 diff --git a/server/multiple.go b/server/multiple.go index 602a0b7..0335a16 100644 --- a/server/multiple.go +++ b/server/multiple.go @@ -3,6 +3,7 @@ package server import ( "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/network" + "github.com/xtaci/kcp-go/v5" "math" "os" "os/signal" @@ -35,8 +36,12 @@ func (slf *MultipleServer) Run() { close(runtimeExceptionChannel) }() var wait sync.WaitGroup + var hasKcp bool for i := 0; i < len(slf.servers); i++ { wait.Add(1) + if slf.servers[i].network == NetworkKcp { + hasKcp = true + } go func(address string, server *Server) { var lock sync.Mutex var startFinish bool @@ -62,6 +67,9 @@ func (slf *MultipleServer) Run() { }(slf.addresses[i], slf.servers[i]) } wait.Wait() + if !hasKcp { + kcp.SystemTimedSched.Close() + } log.Info("Server", log.String(serverMultipleMark, "====================================================================")) ip, _ := network.IP() diff --git a/server/options.go b/server/options.go index 8374a61..61f5f0c 100644 --- a/server/options.go +++ b/server/options.go @@ -32,6 +32,7 @@ type runtime struct { supportMessageTypes map[int]bool // websocket模式下支持的消息类型 certFile, keyFile string // TLS文件 messagePoolSize int // 消息池大小 + tickerPool *timer.Pool // 定时器池 ticker *timer.Ticker // 定时器 tickerAutonomy bool // 定时器是否独立运行 connTickerSize int // 连接定时器大小 @@ -130,17 +131,22 @@ func WithWebsocketReadDeadline(t time.Duration) Option { } // WithTicker 通过定时器创建服务器,为服务器添加定时器功能 +// - poolSize:指定服务器定时器池大小,当池子内的定时器数量超出该值后,多余的定时器在释放时将被回收,该值小于等于 0 时将使用 timer.DefaultTickerPoolSize // - size:服务器定时器时间轮大小 -// - connSize:服务器连接定时器时间轮大小 +// - connSize:服务器连接定时器时间轮大小,当该值小于等于 0 的时候,在新连接建立时将不再为其创建定时器 // - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题) -func WithTicker(size, connSize int, autonomy bool) Option { +func WithTicker(poolSize, size, connSize int, autonomy bool) Option { return func(srv *Server) { + if poolSize <= 0 { + poolSize = timer.DefaultTickerPoolSize + } + srv.tickerPool = timer.NewPool(poolSize) srv.connTickerSize = connSize srv.tickerAutonomy = autonomy if !autonomy { - srv.ticker = timer.GetTicker(size) + srv.ticker = srv.tickerPool.GetTicker(size) } else { - srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) { + srv.ticker = srv.tickerPool.GetTicker(size, timer.WithCaller(func(name string, caller func()) { srv.PushTickerMessage(name, caller) })) } diff --git a/server/server.go b/server/server.go index fe346fc..2a700ec 100644 --- a/server/server.go +++ b/server/server.go @@ -352,6 +352,10 @@ func (slf *Server) Run(addr string) error { return ErrCanNotSupportNetwork } + if slf.multiple == nil && slf.network != NetworkKcp { + kcp.SystemTimedSched.Close() + } + <-messageInitFinish close(messageInitFinish) messageInitFinish = nil @@ -489,6 +493,9 @@ func (slf *Server) shutdown(err error) { log.Error("Server", log.Err(shutdownErr)) } } + if slf.tickerPool != nil { + slf.tickerPool.Release() + } if slf.ticker != nil { slf.ticker.Release() } @@ -596,6 +603,7 @@ func (slf *Server) UseShunt(conn *Conn, name string) { delete(slf.dispatchers, curr.name) } } + slf.currDispatcher[conn.GetID()] = d member, exist := slf.dispatcherMember[name] if !exist { @@ -676,8 +684,8 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration var fields = make([]log.Field, 0, len(message.marks)+4) fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String())) fields = append(fields, message.marks...) - fields = append(fields, log.Stack("stack")) - log.Warn("Server", fields...) + //fields = append(fields, log.Stack("stack")) + log.Warn("ServerLowMessage", fields...) slf.OnMessageLowExecEvent(message, cost) } } @@ -693,7 +701,7 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { go func(ctx context.Context, msg *Message) { select { case <-ctx.Done(): - if err := ctx.Err(); err == context.DeadlineExceeded { + if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) { log.Warn("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("SuspectedDeadlock", msg)) slf.OnDeadlockDetectEvent(msg) } @@ -704,32 +712,39 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { present := time.Now() if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync { defer func(msg *Message) { + super.Handle(cancel) if err := recover(); err != nil { stack := string(debug.Stack()) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("error", err), log.String("stack", stack)) fmt.Println(stack) - if e, ok := err.(error); ok { - slf.OnMessageErrorEvent(msg, e) + e, ok := err.(error) + if !ok { + e = fmt.Errorf("%v", err) } + slf.OnMessageErrorEvent(msg, e) } if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback { dispatcher.antiUnique(msg.name) } - super.Handle(cancel) slf.low(msg, present, time.Millisecond*100) slf.messageCounter.Add(-1) if !slf.isShutdown.Load() { slf.messagePool.Release(msg) } - }(msg) + } else { + if cancel != nil { + defer cancel() + } } switch msg.t { case MessageTypePacket: - if !slf.OnConnectionPacketPreprocessEvent(msg.conn, msg.packet, func(newPacket []byte) { msg.packet = newPacket }) { + if !slf.OnConnectionPacketPreprocessEvent(msg.conn, msg.packet, func(newPacket []byte) { + msg.packet = newPacket + }) { slf.OnConnectionReceivePacketEvent(msg.conn, msg.packet) } case MessageTypeError: @@ -753,9 +768,11 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { stack := string(debug.Stack()) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack)) fmt.Println(stack) - if e, ok := err.(error); ok { - slf.OnMessageErrorEvent(msg, e) + e, ok := err.(error) + if !ok { + e = fmt.Errorf("%v", err) } + slf.OnMessageErrorEvent(msg, e) } super.Handle(cancel) slf.low(msg, present, time.Second) diff --git a/server/server_test.go b/server/server_test.go index 855280e..eb998a6 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -11,7 +11,7 @@ import ( func TestNew(t *testing.T) { //limiter := rate.NewLimiter(rate.Every(time.Second), 100) - srv := server.New(server.NetworkWebsocket, server.WithMessageBufferSize(1024*1024), server.WithPProf()) + srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 200, 10, false), server.WithMessageBufferSize(1024*1024), server.WithPProf()) //srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool { // t, c := srv.TimeoutContext(time.Second * 5) // defer c() @@ -24,6 +24,9 @@ func TestNew(t *testing.T) { fmt.Println("关闭", conn.GetID(), err, "Count", srv.GetOnlineCount()) }) srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) { + srv.UseShunt(conn, "1") + srv.UseShunt(conn, "2") + srv.UseShunt(conn, "3") //if srv.GetOnlineCount() > 1 { // conn.Close() //} diff --git a/utils/generic/type.go b/utils/generic/type.go index cbcd1c8..ea70ac4 100644 --- a/utils/generic/type.go +++ b/utils/generic/type.go @@ -27,7 +27,12 @@ type Signed interface { // Unsigned 无符号整数类型 type Unsigned interface { - ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr + UnsignedNumber | ~uintptr +} + +// UnsignedNumber 无符号数字类型 +type UnsignedNumber interface { + ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 } // Float 浮点类型 diff --git a/utils/slice/paged_slice.go b/utils/slice/paged_slice.go new file mode 100644 index 0000000..31da7e7 --- /dev/null +++ b/utils/slice/paged_slice.go @@ -0,0 +1,68 @@ +package slice + +// NewPagedSlice 创建一个新的 PagedSlice 实例。 +func NewPagedSlice[T any](pageSize int) *PagedSlice[T] { + return &PagedSlice[T]{ + pages: make([][]T, 0, pageSize), + pageSize: pageSize, + } +} + +// PagedSlice 是一个高效的动态数组,它通过分页管理内存并减少频繁的内存分配来提高性能。 +type PagedSlice[T any] struct { + pages [][]T + pageSize int + len int + lenLast int +} + +// Add 添加一个元素到 PagedSlice 中。 +func (slf *PagedSlice[T]) Add(value T) { + if slf.lenLast == len(slf.pages[len(slf.pages)-1]) { + slf.pages = append(slf.pages, make([]T, slf.pageSize)) + slf.lenLast = 0 + } + + slf.pages[len(slf.pages)-1][slf.lenLast] = value + slf.len++ + slf.lenLast++ +} + +// Get 获取 PagedSlice 中给定索引的元素。 +func (slf *PagedSlice[T]) Get(index int) *T { + if index < 0 || index >= slf.len { + return nil + } + + return &slf.pages[index/slf.pageSize][index%slf.pageSize] +} + +// Set 设置 PagedSlice 中给定索引的元素。 +func (slf *PagedSlice[T]) Set(index int, value T) { + if index < 0 || index >= slf.len { + return + } + + slf.pages[index/slf.pageSize][index%slf.pageSize] = value +} + +// Len 返回 PagedSlice 中元素的数量。 +func (slf *PagedSlice[T]) Len() int { + return slf.len +} + +// Clear 清空 PagedSlice。 +func (slf *PagedSlice[T]) Clear() { + slf.pages = make([][]T, 0) + slf.len = 0 + slf.lenLast = 0 +} + +// Range 迭代 PagedSlice 中的所有元素。 +func (slf *PagedSlice[T]) Range(f func(index int, value T) bool) { + for i := 0; i < slf.len; i++ { + if !f(i, slf.pages[i/slf.pageSize][i%slf.pageSize]) { + return + } + } +} diff --git a/utils/super/bit_set.go b/utils/super/bit_set.go new file mode 100644 index 0000000..a7cce23 --- /dev/null +++ b/utils/super/bit_set.go @@ -0,0 +1,324 @@ +package super + +import ( + "fmt" + "github.com/kercylan98/minotaur/utils/generic" + "math/bits" +) + +// NewBitSet 通过指定的 Bit 位创建一个 BitSet +func NewBitSet[Bit generic.Integer](bits ...Bit) *BitSet[Bit] { + set := &BitSet[Bit]{set: make([]uint64, 0, 1)} + for _, bit := range bits { + set.Set(bit) + } + return set +} + +// BitSet 是一个可以动态增长的比特位集合 +// - 默认情况下将使用 64 位无符号整数来表示比特位,当需要表示的比特位超过 64 位时,将自动增长 +type BitSet[Bit generic.Integer] struct { + set []uint64 // 比特位集合 +} + +// Set 将指定的位 bit 设置为 1 +func (slf *BitSet[Bit]) Set(bit Bit) *BitSet[Bit] { + word := bit >> 6 + for word >= Bit(len(slf.set)) { + slf.set = append(slf.set, 0) + } + slf.set[word] |= 1 << (bit & 0x3f) + return slf +} + +// Del 将指定的位 bit 设置为 0 +func (slf *BitSet[Bit]) Del(bit Bit) *BitSet[Bit] { + word := bit >> 6 + if word < Bit(len(slf.set)) { + slf.set[word] &^= 1 << (bit & 0x3f) + } + return slf +} + +// Shrink 将 BitSet 中的比特位集合缩小到最小 +// - 正常情况下当 BitSet 中的比特位超出 64 位时,将自动增长,当 BitSet 中的比特位数量减少时,可以使用该方法将 BitSet 中的比特位集合缩小到最小 +func (slf *BitSet[Bit]) Shrink() *BitSet[Bit] { + index := len(slf.set) - 1 + if slf.set[index] != 0 { + return slf + } + + for i := index - 1; i >= 0; i-- { + if slf.set[i] != 0 { + slf.set = slf.set[:i+1] + return slf + } + } + return slf +} + +// Cap 返回当前 BitSet 中可以表示的最大比特位数量 +func (slf *BitSet[Bit]) Cap() int { + return len(slf.set) * 64 +} + +// Has 检查指定的位 bit 是否被设置为 1 +func (slf *BitSet[Bit]) Has(bit Bit) bool { + word := bit >> 6 + return word < Bit(len(slf.set)) && slf.set[word]&(1<<(bit&0x3f)) != 0 +} + +// Clear 清空所有的比特位 +func (slf *BitSet[Bit]) Clear() *BitSet[Bit] { + slf.set = nil + return slf +} + +// Len 返回当前 BitSet 中被设置的比特位数量 +func (slf *BitSet[Bit]) Len() int { + var count int + for _, word := range slf.set { + count += bits.OnesCount64(word) + } + return count +} + +// Bits 返回当前 BitSet 中被设置的比特位 +func (slf *BitSet[Bit]) Bits() []Bit { + bits := make([]Bit, 0, slf.Len()) + for i, word := range slf.set { + for j := 0; j < 64; j++ { + if word&(1<= len(slf.set) || slf.set[i]&word != word { + return false + } + } + return true +} + +// ContainsAny 检查当前 BitSet 是否包含另一个 BitSet 中的任意比特位 +func (slf *BitSet[Bit]) ContainsAny(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// ContainsAll 检查当前 BitSet 是否包含另一个 BitSet 中的所有比特位 +func (slf *BitSet[Bit]) ContainsAll(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i >= len(slf.set) || slf.set[i]&word != word { + return false + } + } + return true +} + +// Intersect 检查当前 BitSet 是否与另一个 BitSet 有交集 +func (slf *BitSet[Bit]) Intersect(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// Union 检查当前 BitSet 是否与另一个 BitSet 有并集 +func (slf *BitSet[Bit]) Union(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// Difference 检查当前 BitSet 是否与另一个 BitSet 有差集 +func (slf *BitSet[Bit]) Difference(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// SymmetricDifference 检查当前 BitSet 是否与另一个 BitSet 有对称差集 +func (slf *BitSet[Bit]) SymmetricDifference(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// Subset 检查当前 BitSet 是否为另一个 BitSet 的子集 +func (slf *BitSet[Bit]) Subset(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i >= len(slf.set) || slf.set[i]&word != word { + return false + } + } + return true +} + +// Superset 检查当前 BitSet 是否为另一个 BitSet 的超集 +func (slf *BitSet[Bit]) Superset(other *BitSet[Bit]) bool { + for i, word := range slf.set { + if i >= len(other.set) || other.set[i]&word != word { + return false + } + } + return true +} + +// Complement 检查当前 BitSet 是否为另一个 BitSet 的补集 +func (slf *BitSet[Bit]) Complement(other *BitSet[Bit]) bool { + for i, word := range slf.set { + if i >= len(other.set) || other.set[i]&word != word { + return false + } + } + return true +} + +// Max 返回当前 BitSet 中最大的比特位 +func (slf *BitSet[Bit]) Max() Bit { + for i := len(slf.set) - 1; i >= 0; i-- { + if slf.set[i] != 0 { + return Bit(i*64 + bits.Len64(slf.set[i]) - 1) + } + } + return 0 +} + +// Min 返回当前 BitSet 中最小的比特位 +func (slf *BitSet[Bit]) Min() Bit { + for i, word := range slf.set { + if word != 0 { + return Bit(i*64 + bits.TrailingZeros64(word)) + } + } + return 0 +} + +// String 返回当前 BitSet 的字符串表示 +func (slf *BitSet[Bit]) String() string { + return fmt.Sprintf("[%v] %v", slf.Len(), slf.Bits()) +} + +// MarshalJSON 实现 json.Marshaler 接口 +func (slf *BitSet[Bit]) MarshalJSON() ([]byte, error) { + return MarshalJSONE(slf.set) +} + +// UnmarshalJSON 实现 json.Unmarshaler 接口 +func (slf *BitSet[Bit]) UnmarshalJSON(data []byte) error { + return UnmarshalJSON(data, &slf.set) +} diff --git a/utils/super/bit_set_test.go b/utils/super/bit_set_test.go new file mode 100644 index 0000000..f654139 --- /dev/null +++ b/utils/super/bit_set_test.go @@ -0,0 +1,33 @@ +package super_test + +import ( + "github.com/kercylan98/minotaur/utils/super" + "testing" +) + +func TestBitSet_Set(t *testing.T) { + bs := super.NewBitSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + bs.Set(11) + bs.Set(12) + bs.Set(13) + t.Log(bs) +} + +func TestBitSet_Del(t *testing.T) { + bs := super.NewBitSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + bs.Del(11) + bs.Del(12) + bs.Del(13) + bs.Del(10) + t.Log(bs) +} + +func TestBitSet_Shrink(t *testing.T) { + bs := super.NewBitSet(63) + t.Log(bs.Cap()) + bs.Set(200) + t.Log(bs.Cap()) + bs.Del(200) + bs.Shrink() + t.Log(bs.Cap()) +} diff --git a/utils/super/retry.go b/utils/super/retry.go index 85cc9d7..bb652fb 100644 --- a/utils/super/retry.go +++ b/utils/super/retry.go @@ -1,6 +1,7 @@ package super import ( + "errors" "fmt" "math" "math/rand" @@ -46,8 +47,9 @@ func RetryByRule(f func() error, rule func(count int) time.Duration) error { // - maxDelay:最大延迟 // - multiplier:延迟时间的乘数,通常为 2 // - randomization:延迟时间的随机化因子,通常为 0.5 -func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error { - return ConditionalRetryByExponentialBackoff(f, nil, maxRetries, baseDelay, maxDelay, multiplier, randomization) +// - ignoreErrors:忽略的错误,当 f 返回的错误在 ignoreErrors 中时,将不会进行重试 +func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64, ignoreErrors ...error) error { + return ConditionalRetryByExponentialBackoff(f, nil, maxRetries, baseDelay, maxDelay, multiplier, randomization, ignoreErrors...) } // ConditionalRetryByExponentialBackoff 该函数与 RetryByExponentialBackoff 类似,但是可以被中断 @@ -55,7 +57,7 @@ func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDel // // 该函数通常用于在重试过程中,需要中断重试的场景,例如: // - 用户请求开始游戏,由于网络等情况,进入重试状态。此时用户再次发送开始游戏请求,此时需要中断之前的重试,避免重复进入游戏 -func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error { +func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64, ignoreErrors ...error) error { retry := 0 for { if cond != nil && !cond() { @@ -65,9 +67,14 @@ func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxR if err == nil { return nil } + for _, ignore := range ignoreErrors { + if errors.Is(err, ignore) { + return err + } + } if retry >= maxRetries { - return fmt.Errorf("max retries reached: %v", err) + return fmt.Errorf("max retries reached: %w", err) } delay := float64(baseDelay) * math.Pow(multiplier, float64(retry)) diff --git a/utils/timer/constants.go b/utils/timer/constants.go index be07f83..53c6b6e 100644 --- a/utils/timer/constants.go +++ b/utils/timer/constants.go @@ -15,3 +15,7 @@ const ( const ( NoMark = "" // 没有设置标记的定时器 ) + +const ( + DefaultTickerPoolSize = 96 +) diff --git a/utils/timer/pool.go b/utils/timer/pool.go new file mode 100644 index 0000000..dc987ac --- /dev/null +++ b/utils/timer/pool.go @@ -0,0 +1,76 @@ +package timer + +import ( + "fmt" + "sync" + + "github.com/RussellLuo/timingwheel" +) + +// NewPool 创建一个定时器池,当 tickerPoolSize 小于等于 0 时,将会引发 panic,可指定为 DefaultTickerPoolSize +func NewPool(tickerPoolSize int) *Pool { + if tickerPoolSize <= 0 { + panic(fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize)) + } + return &Pool{ + tickerPoolSize: tickerPoolSize, + } +} + +// Pool 定时器池 +type Pool struct { + tickers []*Ticker + lock sync.Mutex + tickerPoolSize int + closed bool +} + +// ChangePoolSize 改变定时器池大小 +// - 当传入的大小小于或等于 0 时,将会返回错误,并且不会发生任何改变 +func (slf *Pool) ChangePoolSize(size int) error { + if size <= 0 { + return fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize) + } + slf.lock.Lock() + defer slf.lock.Unlock() + slf.tickerPoolSize = size + return nil +} + +// GetTicker 获取一个新的定时器 +func (slf *Pool) GetTicker(size int, options ...Option) *Ticker { + slf.lock.Lock() + defer slf.lock.Unlock() + + var ticker *Ticker + if len(slf.tickers) > 0 { + ticker = slf.tickers[0] + slf.tickers = slf.tickers[1:] + return ticker + } + + ticker = &Ticker{ + timer: slf, + wheel: timingwheel.NewTimingWheel(timingWheelTick, int64(size)), + timers: make(map[string]*Scheduler), + } + for _, option := range options { + option(ticker) + } + ticker.wheel.Start() + return ticker +} + +// Release 释放定时器池的资源,释放后由其产生的 Ticker 在 Ticker.Release 后将不再回到池中,而是直接释放 +// - 虽然定时器池已被释放,但是依旧可以产出 Ticker +func (slf *Pool) Release() { + slf.lock.Lock() + defer slf.lock.Unlock() + slf.closed = true + for _, ticker := range slf.tickers { + ticker.wheel.Stop() + } + slf.tickers = nil + slf.tickerPoolSize = 0 + return +} diff --git a/utils/timer/ticker.go b/utils/timer/ticker.go index de59f93..337419c 100644 --- a/utils/timer/ticker.go +++ b/utils/timer/ticker.go @@ -11,10 +11,11 @@ import ( // Ticker 定时器 type Ticker struct { - timer *Timer + timer *Pool wheel *timingwheel.TimingWheel timers map[string]*Scheduler lock sync.RWMutex + handle func(name string, caller func()) mark string } @@ -25,7 +26,7 @@ func (slf *Ticker) Mark() string { return slf.mark } -// Release 释放定时器,并将定时器重新放回 Timer 池中 +// Release 释放定时器,并将定时器重新放回 Pool 池中 func (slf *Ticker) Release() { slf.timer.lock.Lock() defer slf.timer.lock.Unlock() @@ -38,7 +39,11 @@ func (slf *Ticker) Release() { } slf.lock.Unlock() - slf.timer.tickers = append(slf.timer.tickers, slf) + if len(slf.timer.tickers) < tickerPoolSize && !slf.timer.closed { + slf.timer.tickers = append(slf.timer.tickers, slf) + } else { + slf.wheel.Stop() + } } // StopTimer 停止特定名称的调度器 @@ -79,6 +84,28 @@ func (slf *Ticker) Cron(name, expression string, handleFunc interface{}, args .. slf.loop(name, 0, 0, expr, 0, handleFunc, args...) } +// CronByInstantly 与 Cron 相同,但是会立即执行一次 +func (slf *Ticker) CronByInstantly(name, expression string, handleFunc interface{}, args ...interface{}) { + func(name, expression string, handleFunc interface{}, args ...interface{}) { + var values = make([]reflect.Value, len(args)) + for i, v := range args { + values[i] = reflect.ValueOf(v) + } + f := reflect.ValueOf(handleFunc) + slf.lock.RLock() + defer slf.lock.RUnlock() + if slf.handle != nil { + slf.handle(name, func() { + f.Call(values) + }) + } else { + f.Call(values) + } + }(name, expression, handleFunc, args...) + + slf.Cron(name, expression, handleFunc, args...) +} + // After 设置一个在特定时间后运行一次的调度器 func (slf *Ticker) After(name string, after time.Duration, handleFunc interface{}, args ...interface{}) { slf.loop(name, after, timingWheelTick, nil, 1, handleFunc, args...) diff --git a/utils/timer/timer.go b/utils/timer/timer.go index 3c9f754..e20d806 100644 --- a/utils/timer/timer.go +++ b/utils/timer/timer.go @@ -1,41 +1,17 @@ package timer -import ( - "sync" - - "github.com/RussellLuo/timingwheel" +var ( + tickerPoolSize = DefaultTickerPoolSize + standardPool = NewPool(tickerPoolSize) ) -var timer = new(Timer) +// SetPoolSize 设置标准池定时器池大小 +// - 默认值为 DefaultTickerPoolSize,当定时器池中的定时器不足时,会自动创建新的定时器,当定时器释放时,会将多余的定时器进行释放,否则将放入定时器池中 +func SetPoolSize(size int) { + _ = standardPool.ChangePoolSize(size) +} +// GetTicker 获取标准池中的一个定时器 func GetTicker(size int, options ...Option) *Ticker { - return timer.NewTicker(size, options...) -} - -type Timer struct { - tickers []*Ticker - lock sync.Mutex -} - -func (slf *Timer) NewTicker(size int, options ...Option) *Ticker { - slf.lock.Lock() - defer slf.lock.Unlock() - - var ticker *Ticker - if len(slf.tickers) > 0 { - ticker = slf.tickers[0] - slf.tickers = slf.tickers[1:] - return ticker - } - - ticker = &Ticker{ - timer: slf, - wheel: timingwheel.NewTimingWheel(timingWheelTick, int64(size)), - timers: make(map[string]*Scheduler), - } - for _, option := range options { - option(ticker) - } - ticker.wheel.Start() - return ticker + return standardPool.GetTicker(size, options...) }