Merge branch 'develop'

This commit is contained in:
kercylan98 2023-12-21 14:47:35 +08:00
commit de70e85665
20 changed files with 689 additions and 114 deletions

View File

@ -158,7 +158,7 @@ package main
import "github.com/kercylan98/minotaur/server"
func main() {
srv := server.New(server.NetworkWebsocket, server.WithTicker(50, 10, false))
srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 50, 10, false))
if err := srv.Run(":9999"); err != nil {
panic(err)
}

View File

@ -3,7 +3,7 @@ package main
import "github.com/kercylan98/minotaur/server"
func main() {
srv := server.New(server.NetworkWebsocket, server.WithTicker(50, 10, false))
srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 50, 10, false))
if err := srv.Run(":9999"); err != nil {
panic(err)
}

8
go.mod
View File

@ -21,7 +21,7 @@ require (
github.com/xtaci/kcp-go/v5 v5.6.3
go.uber.org/atomic v1.10.0
go.uber.org/zap v1.25.0
golang.org/x/crypto v0.14.0
golang.org/x/crypto v0.17.0
google.golang.org/grpc v1.59.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
@ -62,9 +62,9 @@ require (
golang.org/x/arch v0.3.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

16
go.sum
View File

@ -201,8 +201,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k=
@ -245,16 +245,16 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211204120058-94396e421777/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=

View File

@ -277,11 +277,11 @@ func (slf *Conn) Write(packet []byte, callback ...func(err error)) {
}
func (slf *Conn) init() {
if slf.server.ticker != nil {
if slf.server.ticker != nil && slf.server.connTickerSize > 0 {
if slf.server.tickerAutonomy {
slf.ticker = timer.GetTicker(slf.server.connTickerSize)
slf.ticker = slf.server.tickerPool.GetTicker(slf.server.connTickerSize)
} else {
slf.ticker = timer.GetTicker(slf.server.connTickerSize, timer.WithCaller(func(name string, caller func()) {
slf.ticker = slf.server.tickerPool.GetTicker(slf.server.connTickerSize, timer.WithCaller(func(name string, caller func()) {
slf.server.PushShuntTickerMessage(slf, name, caller)
}))
}
@ -335,6 +335,9 @@ func (slf *Conn) init() {
func (slf *Conn) Close(err ...error) {
slf.mu.Lock()
if slf.closed {
if slf.ticker != nil {
slf.ticker.Release()
}
slf.mu.Unlock()
return
}

View File

@ -2,7 +2,6 @@ package lockstep
import (
"encoding/json"
"github.com/kercylan98/minotaur/utils/timer"
"sync"
"time"
)
@ -11,7 +10,6 @@ import (
func NewLockstep[ClientID comparable, Command any](options ...Option[ClientID, Command]) *Lockstep[ClientID, Command] {
lockstep := &Lockstep[ClientID, Command]{
currentFrame: -1,
ticker: timer.GetTicker(10),
frameRate: 15,
serialization: func(frame int64, commands []Command) []byte {
frameStruct := struct {
@ -55,7 +53,7 @@ type Lockstep[ClientID comparable, Command any] struct {
frameCache map[int64][]byte // 帧序列化缓存
frameCacheLock sync.RWMutex // 帧序列化缓存锁
ticker *timer.Ticker // 定时器
ticker *time.Ticker // 定时器
lockstepStoppedEventHandles []StoppedEventHandle[ClientID, Command]
}
@ -123,46 +121,52 @@ func (slf *Lockstep[ClientID, Command]) StartBroadcast() {
return
}
slf.running = true
if slf.ticker == nil {
slf.ticker = time.NewTicker(time.Second / time.Duration(slf.frameRate))
}
slf.runningLock.Unlock()
slf.currentFrame = slf.initFrame
slf.ticker.Loop("lockstep", timer.Instantly, time.Second/time.Duration(slf.frameRate), timer.Forever, func() {
slf.currentFrameLock.RLock()
if slf.frameLimit > 0 && slf.currentFrame >= slf.frameLimit {
slf.currentFrameLock.RUnlock()
slf.StopBroadcast()
return
}
slf.currentFrameLock.RUnlock()
slf.currentFrameLock.Lock()
slf.currentFrame++
currentFrame := slf.currentFrame
currentCommands := slf.currentCommands
slf.currentCommands = make([]Command, 0, len(currentCommands))
slf.currentFrameLock.Unlock()
slf.clientLock.RLock()
defer slf.clientLock.RUnlock()
slf.frameCacheLock.Lock()
defer slf.frameCacheLock.Unlock()
for clientId, client := range slf.clients {
var i = slf.clientFrame[clientId]
if i < slf.initFrame {
i = slf.initFrame
}
for ; i < currentFrame; i++ {
cache, exist := slf.frameCache[i]
if !exist {
cache = slf.serialization(i, currentCommands)
slf.frameCache[i] = cache
go func(ls *Lockstep[ClientID, Command]) {
for range ls.ticker.C {
go func(ls *Lockstep[ClientID, Command]) {
ls.currentFrameLock.RLock()
if ls.frameLimit > 0 && ls.currentFrame >= ls.frameLimit {
ls.currentFrameLock.RUnlock()
ls.StopBroadcast()
return
}
client.Write(cache)
}
slf.clientFrame[clientId] = currentFrame
ls.currentFrameLock.RUnlock()
ls.currentFrameLock.Lock()
ls.currentFrame++
currentFrame := ls.currentFrame
currentCommands := ls.currentCommands
ls.currentCommands = make([]Command, 0, len(currentCommands))
ls.currentFrameLock.Unlock()
ls.clientLock.RLock()
defer ls.clientLock.RUnlock()
ls.frameCacheLock.Lock()
defer ls.frameCacheLock.Unlock()
for clientId, client := range ls.clients {
var i = ls.clientFrame[clientId]
if i < ls.initFrame {
i = ls.initFrame
}
for ; i < currentFrame; i++ {
cache, exist := ls.frameCache[i]
if !exist {
cache = ls.serialization(i, currentCommands)
ls.frameCache[i] = cache
}
client.Write(cache)
}
ls.clientFrame[clientId] = currentFrame
}
}(ls)
}
})
}(slf)
}
// StopBroadcast 停止广播
@ -175,7 +179,10 @@ func (slf *Lockstep[ClientID, Command]) StopBroadcast() {
slf.running = false
slf.runningLock.Unlock()
slf.ticker.StopTimer("lockstep")
if slf.ticker != nil {
slf.ticker.Stop()
}
slf.ticker = nil
slf.OnLockstepStoppedEvent()

View File

@ -3,6 +3,7 @@ package server
import (
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
)
const (
@ -128,7 +129,17 @@ func (slf *Message) MessageType() MessageType {
// String 返回消息的字符串表示
func (slf *Message) String() string {
return slf.t.String()
var info = struct {
Type string `json:"type,omitempty"`
Name string `json:"name,omitempty"`
Packet string `json:"packet,omitempty"`
}{
Type: slf.t.String(),
Name: slf.name,
Packet: string(slf.packet),
}
return string(super.MarshalJSON(info))
}
// String 返回消息类型的字符串表示

View File

@ -3,6 +3,7 @@ package server
import (
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/network"
"github.com/xtaci/kcp-go/v5"
"math"
"os"
"os/signal"
@ -35,8 +36,12 @@ func (slf *MultipleServer) Run() {
close(runtimeExceptionChannel)
}()
var wait sync.WaitGroup
var hasKcp bool
for i := 0; i < len(slf.servers); i++ {
wait.Add(1)
if slf.servers[i].network == NetworkKcp {
hasKcp = true
}
go func(address string, server *Server) {
var lock sync.Mutex
var startFinish bool
@ -62,6 +67,9 @@ func (slf *MultipleServer) Run() {
}(slf.addresses[i], slf.servers[i])
}
wait.Wait()
if !hasKcp {
kcp.SystemTimedSched.Close()
}
log.Info("Server", log.String(serverMultipleMark, "===================================================================="))
ip, _ := network.IP()

View File

@ -32,6 +32,7 @@ type runtime struct {
supportMessageTypes map[int]bool // websocket模式下支持的消息类型
certFile, keyFile string // TLS文件
messagePoolSize int // 消息池大小
tickerPool *timer.Pool // 定时器池
ticker *timer.Ticker // 定时器
tickerAutonomy bool // 定时器是否独立运行
connTickerSize int // 连接定时器大小
@ -130,17 +131,22 @@ func WithWebsocketReadDeadline(t time.Duration) Option {
}
// WithTicker 通过定时器创建服务器,为服务器添加定时器功能
// - poolSize指定服务器定时器池大小当池子内的定时器数量超出该值后多余的定时器在释放时将被回收该值小于等于 0 时将使用 timer.DefaultTickerPoolSize
// - size服务器定时器时间轮大小
// - connSize服务器连接定时器时间轮大小
// - connSize服务器连接定时器时间轮大小,当该值小于等于 0 的时候,在新连接建立时将不再为其创建定时器
// - autonomy定时器是否独立运行独立运行的情况下不会作为服务器消息运行会导致并发问题
func WithTicker(size, connSize int, autonomy bool) Option {
func WithTicker(poolSize, size, connSize int, autonomy bool) Option {
return func(srv *Server) {
if poolSize <= 0 {
poolSize = timer.DefaultTickerPoolSize
}
srv.tickerPool = timer.NewPool(poolSize)
srv.connTickerSize = connSize
srv.tickerAutonomy = autonomy
if !autonomy {
srv.ticker = timer.GetTicker(size)
srv.ticker = srv.tickerPool.GetTicker(size)
} else {
srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) {
srv.ticker = srv.tickerPool.GetTicker(size, timer.WithCaller(func(name string, caller func()) {
srv.PushTickerMessage(name, caller)
}))
}

View File

@ -352,6 +352,10 @@ func (slf *Server) Run(addr string) error {
return ErrCanNotSupportNetwork
}
if slf.multiple == nil && slf.network != NetworkKcp {
kcp.SystemTimedSched.Close()
}
<-messageInitFinish
close(messageInitFinish)
messageInitFinish = nil
@ -489,6 +493,9 @@ func (slf *Server) shutdown(err error) {
log.Error("Server", log.Err(shutdownErr))
}
}
if slf.tickerPool != nil {
slf.tickerPool.Release()
}
if slf.ticker != nil {
slf.ticker.Release()
}
@ -596,6 +603,7 @@ func (slf *Server) UseShunt(conn *Conn, name string) {
delete(slf.dispatchers, curr.name)
}
}
slf.currDispatcher[conn.GetID()] = d
member, exist := slf.dispatcherMember[name]
if !exist {
@ -676,8 +684,8 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration
var fields = make([]log.Field, 0, len(message.marks)+4)
fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String()))
fields = append(fields, message.marks...)
fields = append(fields, log.Stack("stack"))
log.Warn("Server", fields...)
//fields = append(fields, log.Stack("stack"))
log.Warn("ServerLowMessage", fields...)
slf.OnMessageLowExecEvent(message, cost)
}
}
@ -693,7 +701,7 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
go func(ctx context.Context, msg *Message) {
select {
case <-ctx.Done():
if err := ctx.Err(); err == context.DeadlineExceeded {
if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) {
log.Warn("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("SuspectedDeadlock", msg))
slf.OnDeadlockDetectEvent(msg)
}
@ -704,32 +712,39 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
present := time.Now()
if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync {
defer func(msg *Message) {
super.Handle(cancel)
if err := recover(); err != nil {
stack := string(debug.Stack())
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("error", err), log.String("stack", stack))
fmt.Println(stack)
if e, ok := err.(error); ok {
slf.OnMessageErrorEvent(msg, e)
e, ok := err.(error)
if !ok {
e = fmt.Errorf("%v", err)
}
slf.OnMessageErrorEvent(msg, e)
}
if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback {
dispatcher.antiUnique(msg.name)
}
super.Handle(cancel)
slf.low(msg, present, time.Millisecond*100)
slf.messageCounter.Add(-1)
if !slf.isShutdown.Load() {
slf.messagePool.Release(msg)
}
}(msg)
} else {
if cancel != nil {
defer cancel()
}
}
switch msg.t {
case MessageTypePacket:
if !slf.OnConnectionPacketPreprocessEvent(msg.conn, msg.packet, func(newPacket []byte) { msg.packet = newPacket }) {
if !slf.OnConnectionPacketPreprocessEvent(msg.conn, msg.packet, func(newPacket []byte) {
msg.packet = newPacket
}) {
slf.OnConnectionReceivePacketEvent(msg.conn, msg.packet)
}
case MessageTypeError:
@ -753,9 +768,11 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) {
stack := string(debug.Stack())
log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack))
fmt.Println(stack)
if e, ok := err.(error); ok {
slf.OnMessageErrorEvent(msg, e)
e, ok := err.(error)
if !ok {
e = fmt.Errorf("%v", err)
}
slf.OnMessageErrorEvent(msg, e)
}
super.Handle(cancel)
slf.low(msg, present, time.Second)

View File

@ -11,7 +11,7 @@ import (
func TestNew(t *testing.T) {
//limiter := rate.NewLimiter(rate.Every(time.Second), 100)
srv := server.New(server.NetworkWebsocket, server.WithMessageBufferSize(1024*1024), server.WithPProf())
srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 200, 10, false), server.WithMessageBufferSize(1024*1024), server.WithPProf())
//srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool {
// t, c := srv.TimeoutContext(time.Second * 5)
// defer c()
@ -24,6 +24,9 @@ func TestNew(t *testing.T) {
fmt.Println("关闭", conn.GetID(), err, "Count", srv.GetOnlineCount())
})
srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) {
srv.UseShunt(conn, "1")
srv.UseShunt(conn, "2")
srv.UseShunt(conn, "3")
//if srv.GetOnlineCount() > 1 {
// conn.Close()
//}

View File

@ -27,7 +27,12 @@ type Signed interface {
// Unsigned 无符号整数类型
type Unsigned interface {
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr
UnsignedNumber | ~uintptr
}
// UnsignedNumber 无符号数字类型
type UnsignedNumber interface {
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64
}
// Float 浮点类型

View File

@ -0,0 +1,68 @@
package slice
// NewPagedSlice 创建一个新的 PagedSlice 实例。
func NewPagedSlice[T any](pageSize int) *PagedSlice[T] {
return &PagedSlice[T]{
pages: make([][]T, 0, pageSize),
pageSize: pageSize,
}
}
// PagedSlice 是一个高效的动态数组,它通过分页管理内存并减少频繁的内存分配来提高性能。
type PagedSlice[T any] struct {
pages [][]T
pageSize int
len int
lenLast int
}
// Add 添加一个元素到 PagedSlice 中。
func (slf *PagedSlice[T]) Add(value T) {
if slf.lenLast == len(slf.pages[len(slf.pages)-1]) {
slf.pages = append(slf.pages, make([]T, slf.pageSize))
slf.lenLast = 0
}
slf.pages[len(slf.pages)-1][slf.lenLast] = value
slf.len++
slf.lenLast++
}
// Get 获取 PagedSlice 中给定索引的元素。
func (slf *PagedSlice[T]) Get(index int) *T {
if index < 0 || index >= slf.len {
return nil
}
return &slf.pages[index/slf.pageSize][index%slf.pageSize]
}
// Set 设置 PagedSlice 中给定索引的元素。
func (slf *PagedSlice[T]) Set(index int, value T) {
if index < 0 || index >= slf.len {
return
}
slf.pages[index/slf.pageSize][index%slf.pageSize] = value
}
// Len 返回 PagedSlice 中元素的数量。
func (slf *PagedSlice[T]) Len() int {
return slf.len
}
// Clear 清空 PagedSlice。
func (slf *PagedSlice[T]) Clear() {
slf.pages = make([][]T, 0)
slf.len = 0
slf.lenLast = 0
}
// Range 迭代 PagedSlice 中的所有元素。
func (slf *PagedSlice[T]) Range(f func(index int, value T) bool) {
for i := 0; i < slf.len; i++ {
if !f(i, slf.pages[i/slf.pageSize][i%slf.pageSize]) {
return
}
}
}

324
utils/super/bit_set.go Normal file
View File

@ -0,0 +1,324 @@
package super
import (
"fmt"
"github.com/kercylan98/minotaur/utils/generic"
"math/bits"
)
// NewBitSet 通过指定的 Bit 位创建一个 BitSet
func NewBitSet[Bit generic.Integer](bits ...Bit) *BitSet[Bit] {
set := &BitSet[Bit]{set: make([]uint64, 0, 1)}
for _, bit := range bits {
set.Set(bit)
}
return set
}
// BitSet 是一个可以动态增长的比特位集合
// - 默认情况下将使用 64 位无符号整数来表示比特位,当需要表示的比特位超过 64 位时,将自动增长
type BitSet[Bit generic.Integer] struct {
set []uint64 // 比特位集合
}
// Set 将指定的位 bit 设置为 1
func (slf *BitSet[Bit]) Set(bit Bit) *BitSet[Bit] {
word := bit >> 6
for word >= Bit(len(slf.set)) {
slf.set = append(slf.set, 0)
}
slf.set[word] |= 1 << (bit & 0x3f)
return slf
}
// Del 将指定的位 bit 设置为 0
func (slf *BitSet[Bit]) Del(bit Bit) *BitSet[Bit] {
word := bit >> 6
if word < Bit(len(slf.set)) {
slf.set[word] &^= 1 << (bit & 0x3f)
}
return slf
}
// Shrink 将 BitSet 中的比特位集合缩小到最小
// - 正常情况下当 BitSet 中的比特位超出 64 位时,将自动增长,当 BitSet 中的比特位数量减少时,可以使用该方法将 BitSet 中的比特位集合缩小到最小
func (slf *BitSet[Bit]) Shrink() *BitSet[Bit] {
index := len(slf.set) - 1
if slf.set[index] != 0 {
return slf
}
for i := index - 1; i >= 0; i-- {
if slf.set[i] != 0 {
slf.set = slf.set[:i+1]
return slf
}
}
return slf
}
// Cap 返回当前 BitSet 中可以表示的最大比特位数量
func (slf *BitSet[Bit]) Cap() int {
return len(slf.set) * 64
}
// Has 检查指定的位 bit 是否被设置为 1
func (slf *BitSet[Bit]) Has(bit Bit) bool {
word := bit >> 6
return word < Bit(len(slf.set)) && slf.set[word]&(1<<(bit&0x3f)) != 0
}
// Clear 清空所有的比特位
func (slf *BitSet[Bit]) Clear() *BitSet[Bit] {
slf.set = nil
return slf
}
// Len 返回当前 BitSet 中被设置的比特位数量
func (slf *BitSet[Bit]) Len() int {
var count int
for _, word := range slf.set {
count += bits.OnesCount64(word)
}
return count
}
// Bits 返回当前 BitSet 中被设置的比特位
func (slf *BitSet[Bit]) Bits() []Bit {
bits := make([]Bit, 0, slf.Len())
for i, word := range slf.set {
for j := 0; j < 64; j++ {
if word&(1<<j) != 0 {
bits = append(bits, Bit(i*64+j))
}
}
}
return bits
}
// Reverse 反转当前 BitSet 中的所有比特位
func (slf *BitSet[Bit]) Reverse() *BitSet[Bit] {
for i, word := range slf.set {
slf.set[i] = bits.Reverse64(word)
}
return slf
}
// Not 返回当前 BitSet 中所有比特位的反转
func (slf *BitSet[Bit]) Not() *BitSet[Bit] {
for i, word := range slf.set {
slf.set[i] = ^word
}
return slf
}
// And 将当前 BitSet 与另一个 BitSet 进行按位与运算
func (slf *BitSet[Bit]) And(other *BitSet[Bit]) *BitSet[Bit] {
for i, word := range other.set {
if i < len(slf.set) {
slf.set[i] &= word
} else {
slf.set = append(slf.set, word)
}
}
return slf
}
// Or 将当前 BitSet 与另一个 BitSet 进行按位或运算
func (slf *BitSet[Bit]) Or(other *BitSet[Bit]) *BitSet[Bit] {
for i, word := range other.set {
if i < len(slf.set) {
slf.set[i] |= word
} else {
slf.set = append(slf.set, word)
}
}
return slf
}
// Xor 将当前 BitSet 与另一个 BitSet 进行按位异或运算
func (slf *BitSet[Bit]) Xor(other *BitSet[Bit]) *BitSet[Bit] {
for i, word := range other.set {
if i < len(slf.set) {
slf.set[i] ^= word
} else {
slf.set = append(slf.set, word)
}
}
return slf
}
// Sub 将当前 BitSet 与另一个 BitSet 进行按位减运算
func (slf *BitSet[Bit]) Sub(other *BitSet[Bit]) *BitSet[Bit] {
for i, word := range other.set {
if i < len(slf.set) {
slf.set[i] &^= word
}
}
return slf
}
// IsZero 检查当前 BitSet 是否为空
func (slf *BitSet[Bit]) IsZero() bool {
for _, word := range slf.set {
if word != 0 {
return false
}
}
return true
}
// Clone 返回当前 BitSet 的副本
func (slf *BitSet[Bit]) Clone() *BitSet[Bit] {
other := &BitSet[Bit]{set: make([]uint64, len(slf.set))}
copy(other.set, slf.set)
return other
}
// Equal 检查当前 BitSet 是否与另一个 BitSet 相等
func (slf *BitSet[Bit]) Equal(other *BitSet[Bit]) bool {
if len(slf.set) != len(other.set) {
return false
}
for i, word := range slf.set {
if word != other.set[i] {
return false
}
}
return true
}
// Contains 检查当前 BitSet 是否包含另一个 BitSet
func (slf *BitSet[Bit]) Contains(other *BitSet[Bit]) bool {
for i, word := range other.set {
if i >= len(slf.set) || slf.set[i]&word != word {
return false
}
}
return true
}
// ContainsAny 检查当前 BitSet 是否包含另一个 BitSet 中的任意比特位
func (slf *BitSet[Bit]) ContainsAny(other *BitSet[Bit]) bool {
for i, word := range other.set {
if i < len(slf.set) && slf.set[i]&word != 0 {
return true
}
}
return false
}
// ContainsAll 检查当前 BitSet 是否包含另一个 BitSet 中的所有比特位
func (slf *BitSet[Bit]) ContainsAll(other *BitSet[Bit]) bool {
for i, word := range other.set {
if i >= len(slf.set) || slf.set[i]&word != word {
return false
}
}
return true
}
// Intersect 检查当前 BitSet 是否与另一个 BitSet 有交集
func (slf *BitSet[Bit]) Intersect(other *BitSet[Bit]) bool {
for i, word := range other.set {
if i < len(slf.set) && slf.set[i]&word != 0 {
return true
}
}
return false
}
// Union 检查当前 BitSet 是否与另一个 BitSet 有并集
func (slf *BitSet[Bit]) Union(other *BitSet[Bit]) bool {
for i, word := range other.set {
if i < len(slf.set) && slf.set[i]&word != 0 {
return true
}
}
return false
}
// Difference 检查当前 BitSet 是否与另一个 BitSet 有差集
func (slf *BitSet[Bit]) Difference(other *BitSet[Bit]) bool {
for i, word := range other.set {
if i < len(slf.set) && slf.set[i]&word != 0 {
return true
}
}
return false
}
// SymmetricDifference 检查当前 BitSet 是否与另一个 BitSet 有对称差集
func (slf *BitSet[Bit]) SymmetricDifference(other *BitSet[Bit]) bool {
for i, word := range other.set {
if i < len(slf.set) && slf.set[i]&word != 0 {
return true
}
}
return false
}
// Subset 检查当前 BitSet 是否为另一个 BitSet 的子集
func (slf *BitSet[Bit]) Subset(other *BitSet[Bit]) bool {
for i, word := range other.set {
if i >= len(slf.set) || slf.set[i]&word != word {
return false
}
}
return true
}
// Superset 检查当前 BitSet 是否为另一个 BitSet 的超集
func (slf *BitSet[Bit]) Superset(other *BitSet[Bit]) bool {
for i, word := range slf.set {
if i >= len(other.set) || other.set[i]&word != word {
return false
}
}
return true
}
// Complement 检查当前 BitSet 是否为另一个 BitSet 的补集
func (slf *BitSet[Bit]) Complement(other *BitSet[Bit]) bool {
for i, word := range slf.set {
if i >= len(other.set) || other.set[i]&word != word {
return false
}
}
return true
}
// Max 返回当前 BitSet 中最大的比特位
func (slf *BitSet[Bit]) Max() Bit {
for i := len(slf.set) - 1; i >= 0; i-- {
if slf.set[i] != 0 {
return Bit(i*64 + bits.Len64(slf.set[i]) - 1)
}
}
return 0
}
// Min 返回当前 BitSet 中最小的比特位
func (slf *BitSet[Bit]) Min() Bit {
for i, word := range slf.set {
if word != 0 {
return Bit(i*64 + bits.TrailingZeros64(word))
}
}
return 0
}
// String 返回当前 BitSet 的字符串表示
func (slf *BitSet[Bit]) String() string {
return fmt.Sprintf("[%v] %v", slf.Len(), slf.Bits())
}
// MarshalJSON 实现 json.Marshaler 接口
func (slf *BitSet[Bit]) MarshalJSON() ([]byte, error) {
return MarshalJSONE(slf.set)
}
// UnmarshalJSON 实现 json.Unmarshaler 接口
func (slf *BitSet[Bit]) UnmarshalJSON(data []byte) error {
return UnmarshalJSON(data, &slf.set)
}

View File

@ -0,0 +1,33 @@
package super_test
import (
"github.com/kercylan98/minotaur/utils/super"
"testing"
)
func TestBitSet_Set(t *testing.T) {
bs := super.NewBitSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
bs.Set(11)
bs.Set(12)
bs.Set(13)
t.Log(bs)
}
func TestBitSet_Del(t *testing.T) {
bs := super.NewBitSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
bs.Del(11)
bs.Del(12)
bs.Del(13)
bs.Del(10)
t.Log(bs)
}
func TestBitSet_Shrink(t *testing.T) {
bs := super.NewBitSet(63)
t.Log(bs.Cap())
bs.Set(200)
t.Log(bs.Cap())
bs.Del(200)
bs.Shrink()
t.Log(bs.Cap())
}

View File

@ -1,6 +1,7 @@
package super
import (
"errors"
"fmt"
"math"
"math/rand"
@ -46,8 +47,9 @@ func RetryByRule(f func() error, rule func(count int) time.Duration) error {
// - maxDelay最大延迟
// - multiplier延迟时间的乘数通常为 2
// - randomization延迟时间的随机化因子通常为 0.5
func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error {
return ConditionalRetryByExponentialBackoff(f, nil, maxRetries, baseDelay, maxDelay, multiplier, randomization)
// - ignoreErrors忽略的错误当 f 返回的错误在 ignoreErrors 中时,将不会进行重试
func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64, ignoreErrors ...error) error {
return ConditionalRetryByExponentialBackoff(f, nil, maxRetries, baseDelay, maxDelay, multiplier, randomization, ignoreErrors...)
}
// ConditionalRetryByExponentialBackoff 该函数与 RetryByExponentialBackoff 类似,但是可以被中断
@ -55,7 +57,7 @@ func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDel
//
// 该函数通常用于在重试过程中,需要中断重试的场景,例如:
// - 用户请求开始游戏,由于网络等情况,进入重试状态。此时用户再次发送开始游戏请求,此时需要中断之前的重试,避免重复进入游戏
func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error {
func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64, ignoreErrors ...error) error {
retry := 0
for {
if cond != nil && !cond() {
@ -65,9 +67,14 @@ func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxR
if err == nil {
return nil
}
for _, ignore := range ignoreErrors {
if errors.Is(err, ignore) {
return err
}
}
if retry >= maxRetries {
return fmt.Errorf("max retries reached: %v", err)
return fmt.Errorf("max retries reached: %w", err)
}
delay := float64(baseDelay) * math.Pow(multiplier, float64(retry))

View File

@ -15,3 +15,7 @@ const (
const (
NoMark = "" // 没有设置标记的定时器
)
const (
DefaultTickerPoolSize = 96
)

76
utils/timer/pool.go Normal file
View File

@ -0,0 +1,76 @@
package timer
import (
"fmt"
"sync"
"github.com/RussellLuo/timingwheel"
)
// NewPool 创建一个定时器池,当 tickerPoolSize 小于等于 0 时,将会引发 panic可指定为 DefaultTickerPoolSize
func NewPool(tickerPoolSize int) *Pool {
if tickerPoolSize <= 0 {
panic(fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize))
}
return &Pool{
tickerPoolSize: tickerPoolSize,
}
}
// Pool 定时器池
type Pool struct {
tickers []*Ticker
lock sync.Mutex
tickerPoolSize int
closed bool
}
// ChangePoolSize 改变定时器池大小
// - 当传入的大小小于或等于 0 时,将会返回错误,并且不会发生任何改变
func (slf *Pool) ChangePoolSize(size int) error {
if size <= 0 {
return fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize)
}
slf.lock.Lock()
defer slf.lock.Unlock()
slf.tickerPoolSize = size
return nil
}
// GetTicker 获取一个新的定时器
func (slf *Pool) GetTicker(size int, options ...Option) *Ticker {
slf.lock.Lock()
defer slf.lock.Unlock()
var ticker *Ticker
if len(slf.tickers) > 0 {
ticker = slf.tickers[0]
slf.tickers = slf.tickers[1:]
return ticker
}
ticker = &Ticker{
timer: slf,
wheel: timingwheel.NewTimingWheel(timingWheelTick, int64(size)),
timers: make(map[string]*Scheduler),
}
for _, option := range options {
option(ticker)
}
ticker.wheel.Start()
return ticker
}
// Release 释放定时器池的资源,释放后由其产生的 Ticker 在 Ticker.Release 后将不再回到池中,而是直接释放
// - 虽然定时器池已被释放,但是依旧可以产出 Ticker
func (slf *Pool) Release() {
slf.lock.Lock()
defer slf.lock.Unlock()
slf.closed = true
for _, ticker := range slf.tickers {
ticker.wheel.Stop()
}
slf.tickers = nil
slf.tickerPoolSize = 0
return
}

View File

@ -11,10 +11,11 @@ import (
// Ticker 定时器
type Ticker struct {
timer *Timer
timer *Pool
wheel *timingwheel.TimingWheel
timers map[string]*Scheduler
lock sync.RWMutex
handle func(name string, caller func())
mark string
}
@ -25,7 +26,7 @@ func (slf *Ticker) Mark() string {
return slf.mark
}
// Release 释放定时器,并将定时器重新放回 Timer 池中
// Release 释放定时器,并将定时器重新放回 Pool 池中
func (slf *Ticker) Release() {
slf.timer.lock.Lock()
defer slf.timer.lock.Unlock()
@ -38,7 +39,11 @@ func (slf *Ticker) Release() {
}
slf.lock.Unlock()
slf.timer.tickers = append(slf.timer.tickers, slf)
if len(slf.timer.tickers) < tickerPoolSize && !slf.timer.closed {
slf.timer.tickers = append(slf.timer.tickers, slf)
} else {
slf.wheel.Stop()
}
}
// StopTimer 停止特定名称的调度器
@ -79,6 +84,28 @@ func (slf *Ticker) Cron(name, expression string, handleFunc interface{}, args ..
slf.loop(name, 0, 0, expr, 0, handleFunc, args...)
}
// CronByInstantly 与 Cron 相同,但是会立即执行一次
func (slf *Ticker) CronByInstantly(name, expression string, handleFunc interface{}, args ...interface{}) {
func(name, expression string, handleFunc interface{}, args ...interface{}) {
var values = make([]reflect.Value, len(args))
for i, v := range args {
values[i] = reflect.ValueOf(v)
}
f := reflect.ValueOf(handleFunc)
slf.lock.RLock()
defer slf.lock.RUnlock()
if slf.handle != nil {
slf.handle(name, func() {
f.Call(values)
})
} else {
f.Call(values)
}
}(name, expression, handleFunc, args...)
slf.Cron(name, expression, handleFunc, args...)
}
// After 设置一个在特定时间后运行一次的调度器
func (slf *Ticker) After(name string, after time.Duration, handleFunc interface{}, args ...interface{}) {
slf.loop(name, after, timingWheelTick, nil, 1, handleFunc, args...)

View File

@ -1,41 +1,17 @@
package timer
import (
"sync"
"github.com/RussellLuo/timingwheel"
var (
tickerPoolSize = DefaultTickerPoolSize
standardPool = NewPool(tickerPoolSize)
)
var timer = new(Timer)
// SetPoolSize 设置标准池定时器池大小
// - 默认值为 DefaultTickerPoolSize当定时器池中的定时器不足时会自动创建新的定时器当定时器释放时会将多余的定时器进行释放否则将放入定时器池中
func SetPoolSize(size int) {
_ = standardPool.ChangePoolSize(size)
}
// GetTicker 获取标准池中的一个定时器
func GetTicker(size int, options ...Option) *Ticker {
return timer.NewTicker(size, options...)
}
type Timer struct {
tickers []*Ticker
lock sync.Mutex
}
func (slf *Timer) NewTicker(size int, options ...Option) *Ticker {
slf.lock.Lock()
defer slf.lock.Unlock()
var ticker *Ticker
if len(slf.tickers) > 0 {
ticker = slf.tickers[0]
slf.tickers = slf.tickers[1:]
return ticker
}
ticker = &Ticker{
timer: slf,
wheel: timingwheel.NewTimingWheel(timingWheelTick, int64(size)),
timers: make(map[string]*Scheduler),
}
for _, option := range options {
option(ticker)
}
ticker.wheel.Start()
return ticker
return standardPool.GetTicker(size, options...)
}