From 12619b5fa4008a803eb2f62947732de7ad9b0d7f Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 12 Dec 2023 10:52:01 +0800 Subject: [PATCH 01/20] =?UTF-8?q?feat:=20timer.Ticker=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=20CronByInstantly=20=E5=87=BD=E6=95=B0=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=9C=A8=E8=AE=BE=E7=BD=AE=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=89=8D=E5=85=88=E6=89=A7=E8=A1=8C=E4=B8=80=E6=AC=A1?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/timer/ticker.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/utils/timer/ticker.go b/utils/timer/ticker.go index de59f93..4b267a3 100644 --- a/utils/timer/ticker.go +++ b/utils/timer/ticker.go @@ -79,6 +79,25 @@ func (slf *Ticker) Cron(name, expression string, handleFunc interface{}, args .. slf.loop(name, 0, 0, expr, 0, handleFunc, args...) } +// CronByInstantly 与 Cron 相同,但是会立即执行一次 +func (slf *Ticker) CronByInstantly(name, expression string, handleFunc interface{}, args ...interface{}) { + var values = make([]reflect.Value, len(args)) + for i, v := range args { + values[i] = reflect.ValueOf(v) + } + f := reflect.ValueOf(handleFunc) + slf.lock.RLock() + defer slf.lock.RUnlock() + if slf.handle != nil { + slf.handle(name, func() { + f.Call(values) + }) + } else { + f.Call(values) + } + slf.Cron(name, expression, handleFunc, args...) +} + // After 设置一个在特定时间后运行一次的调度器 func (slf *Ticker) After(name string, after time.Duration, handleFunc interface{}, args ...interface{}) { slf.loop(name, after, timingWheelTick, nil, 1, handleFunc, args...) From 5714a437cca056df300bb1ee9133d974dd0473fe Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 12 Dec 2023 10:52:51 +0800 Subject: [PATCH 02/20] =?UTF-8?q?feat:=20super.RetryByExponentialBackoff?= =?UTF-8?q?=20=E5=92=8C=20super.ConditionalRetryByExponentialBackoff=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E8=AE=BE=E7=BD=AE=E5=BF=BD=E7=95=A5=E7=9A=84?= =?UTF-8?q?=E9=94=99=E8=AF=AF=EF=BC=8C=E5=BD=93=E8=BF=94=E5=9B=9E=E5=BF=BD?= =?UTF-8?q?=E7=95=A5=E7=9A=84=E9=94=99=E8=AF=AF=E6=97=B6=E5=B0=86=E4=B8=8D?= =?UTF-8?q?=E5=86=8D=E8=BF=9B=E8=A1=8C=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/super/retry.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/utils/super/retry.go b/utils/super/retry.go index 85cc9d7..bb652fb 100644 --- a/utils/super/retry.go +++ b/utils/super/retry.go @@ -1,6 +1,7 @@ package super import ( + "errors" "fmt" "math" "math/rand" @@ -46,8 +47,9 @@ func RetryByRule(f func() error, rule func(count int) time.Duration) error { // - maxDelay:最大延迟 // - multiplier:延迟时间的乘数,通常为 2 // - randomization:延迟时间的随机化因子,通常为 0.5 -func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error { - return ConditionalRetryByExponentialBackoff(f, nil, maxRetries, baseDelay, maxDelay, multiplier, randomization) +// - ignoreErrors:忽略的错误,当 f 返回的错误在 ignoreErrors 中时,将不会进行重试 +func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64, ignoreErrors ...error) error { + return ConditionalRetryByExponentialBackoff(f, nil, maxRetries, baseDelay, maxDelay, multiplier, randomization, ignoreErrors...) } // ConditionalRetryByExponentialBackoff 该函数与 RetryByExponentialBackoff 类似,但是可以被中断 @@ -55,7 +57,7 @@ func RetryByExponentialBackoff(f func() error, maxRetries int, baseDelay, maxDel // // 该函数通常用于在重试过程中,需要中断重试的场景,例如: // - 用户请求开始游戏,由于网络等情况,进入重试状态。此时用户再次发送开始游戏请求,此时需要中断之前的重试,避免重复进入游戏 -func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64) error { +func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxRetries int, baseDelay, maxDelay time.Duration, multiplier, randomization float64, ignoreErrors ...error) error { retry := 0 for { if cond != nil && !cond() { @@ -65,9 +67,14 @@ func ConditionalRetryByExponentialBackoff(f func() error, cond func() bool, maxR if err == nil { return nil } + for _, ignore := range ignoreErrors { + if errors.Is(err, ignore) { + return err + } + } if retry >= maxRetries { - return fmt.Errorf("max retries reached: %v", err) + return fmt.Errorf("max retries reached: %w", err) } delay := float64(baseDelay) * math.Pow(multiplier, float64(retry)) From 8a8610f756a6e17934c0ea5d908edccdffab5ee5 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 12 Dec 2023 11:50:58 +0800 Subject: [PATCH 03/20] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20timer.Ticker?= =?UTF-8?q?=20=E7=9A=84=20CronByInstantly=20=E5=87=BD=E6=95=B0=E5=AF=BC?= =?UTF-8?q?=E8=87=B4=E7=9A=84=E6=AD=BB=E9=94=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/timer/ticker.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/utils/timer/ticker.go b/utils/timer/ticker.go index 4b267a3..1892181 100644 --- a/utils/timer/ticker.go +++ b/utils/timer/ticker.go @@ -81,20 +81,23 @@ func (slf *Ticker) Cron(name, expression string, handleFunc interface{}, args .. // CronByInstantly 与 Cron 相同,但是会立即执行一次 func (slf *Ticker) CronByInstantly(name, expression string, handleFunc interface{}, args ...interface{}) { - var values = make([]reflect.Value, len(args)) - for i, v := range args { - values[i] = reflect.ValueOf(v) - } - f := reflect.ValueOf(handleFunc) - slf.lock.RLock() - defer slf.lock.RUnlock() - if slf.handle != nil { - slf.handle(name, func() { + func(name, expression string, handleFunc interface{}, args ...interface{}) { + var values = make([]reflect.Value, len(args)) + for i, v := range args { + values[i] = reflect.ValueOf(v) + } + f := reflect.ValueOf(handleFunc) + slf.lock.RLock() + defer slf.lock.RUnlock() + if slf.handle != nil { + slf.handle(name, func() { + f.Call(values) + }) + } else { f.Call(values) - }) - } else { - f.Call(values) - } + } + }(name, expression, handleFunc, args...) + slf.Cron(name, expression, handleFunc, args...) } From 38cc3129ba15d4ff7f06782c74d990834bbc0471 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 13 Dec 2023 17:07:57 +0800 Subject: [PATCH 04/20] =?UTF-8?q?feat:=20super=20=E5=8C=85=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E6=AF=94=E7=89=B9=E6=8E=A9=E7=A0=81=E7=B1=BB=E5=9E=8B?= =?UTF-8?q?=20BitMask=EF=BC=8C=E5=8F=AF=E9=80=9A=E8=BF=87=20super.Mask=20?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E5=88=9B=E5=BB=BA=E3=80=82=E8=AF=A5=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E5=8F=AF=E6=9B=BF=E4=BB=A3=20super.Permission?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/super/mask.go | 96 ++++++++++++++++++++++++++++++++++++++++ utils/super/mask_test.go | 16 +++++++ 2 files changed, 112 insertions(+) create mode 100644 utils/super/mask.go create mode 100644 utils/super/mask_test.go diff --git a/utils/super/mask.go b/utils/super/mask.go new file mode 100644 index 0000000..982ab43 --- /dev/null +++ b/utils/super/mask.go @@ -0,0 +1,96 @@ +package super + +import ( + "github.com/kercylan98/minotaur/utils/generic" + "math/bits" + "strconv" +) + +// BitMask 使用泛型类型 Bit 表示一个比特掩码,Bit 必须是整数类型。 +type BitMask[Bit generic.Integer] uint64 + +// Mask 创建一个新的 BitMask,包含给定的比特位。 +// 参数 bits 是一个可变参数列表,表示要设置的比特位。 +// - 使用按位或运算符 (|=) 将 bit 位置的比特设置为 1。 +func Mask[Bit generic.Integer](bits ...Bit) BitMask[Bit] { + var mask Bit + for _, bit := range bits { + mask |= 1 << bit + } + return BitMask[Bit](mask) +} + +// Matches 检查当前 BitMask 是否与另一个 BitMask 完全匹配。 +func (slf *BitMask[Bit]) Matches(bits BitMask[Bit]) bool { + return *slf == bits +} + +// Contains 检查当前 BitMask 是否包含另一个 BitMask。 +// - 使用按位与运算符 (&) 计算两个 BitMask 的公共部分,然后与 bits 进行比较。 +func (slf *BitMask[Bit]) Contains(bits BitMask[Bit]) bool { + return (*slf & bits) == bits +} + +// Has 检查当前 BitMask 是否包含特定的比特位。 +// - 使用按位与运算符 (&) 检查 bit 位置的比特是否为 1。 +func (slf *BitMask[Bit]) Has(bits ...Bit) bool { + for _, bit := range bits { + if *slf&(1< Date: Wed, 13 Dec 2023 19:37:45 +0800 Subject: [PATCH 05/20] =?UTF-8?q?feat:=20slice=20=E5=8C=85=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20PagedSlice=20=E7=BB=93=E6=9E=84=EF=BC=8C=E5=AE=83?= =?UTF-8?q?=E9=80=9A=E8=BF=87=E5=88=86=E9=A1=B5=E7=AE=A1=E7=90=86=E5=86=85?= =?UTF-8?q?=E5=AD=98=E5=B9=B6=E5=87=8F=E5=B0=91=E9=A2=91=E7=B9=81=E7=9A=84?= =?UTF-8?q?=E5=86=85=E5=AD=98=E5=88=86=E9=85=8D=E6=9D=A5=E6=8F=90=E9=AB=98?= =?UTF-8?q?=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/slice/paged_slice.go | 68 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 utils/slice/paged_slice.go diff --git a/utils/slice/paged_slice.go b/utils/slice/paged_slice.go new file mode 100644 index 0000000..31da7e7 --- /dev/null +++ b/utils/slice/paged_slice.go @@ -0,0 +1,68 @@ +package slice + +// NewPagedSlice 创建一个新的 PagedSlice 实例。 +func NewPagedSlice[T any](pageSize int) *PagedSlice[T] { + return &PagedSlice[T]{ + pages: make([][]T, 0, pageSize), + pageSize: pageSize, + } +} + +// PagedSlice 是一个高效的动态数组,它通过分页管理内存并减少频繁的内存分配来提高性能。 +type PagedSlice[T any] struct { + pages [][]T + pageSize int + len int + lenLast int +} + +// Add 添加一个元素到 PagedSlice 中。 +func (slf *PagedSlice[T]) Add(value T) { + if slf.lenLast == len(slf.pages[len(slf.pages)-1]) { + slf.pages = append(slf.pages, make([]T, slf.pageSize)) + slf.lenLast = 0 + } + + slf.pages[len(slf.pages)-1][slf.lenLast] = value + slf.len++ + slf.lenLast++ +} + +// Get 获取 PagedSlice 中给定索引的元素。 +func (slf *PagedSlice[T]) Get(index int) *T { + if index < 0 || index >= slf.len { + return nil + } + + return &slf.pages[index/slf.pageSize][index%slf.pageSize] +} + +// Set 设置 PagedSlice 中给定索引的元素。 +func (slf *PagedSlice[T]) Set(index int, value T) { + if index < 0 || index >= slf.len { + return + } + + slf.pages[index/slf.pageSize][index%slf.pageSize] = value +} + +// Len 返回 PagedSlice 中元素的数量。 +func (slf *PagedSlice[T]) Len() int { + return slf.len +} + +// Clear 清空 PagedSlice。 +func (slf *PagedSlice[T]) Clear() { + slf.pages = make([][]T, 0) + slf.len = 0 + slf.lenLast = 0 +} + +// Range 迭代 PagedSlice 中的所有元素。 +func (slf *PagedSlice[T]) Range(f func(index int, value T) bool) { + for i := 0; i < slf.len; i++ { + if !f(i, slf.pages[i/slf.pageSize][i%slf.pageSize]) { + return + } + } +} From 05c65e9efdc44a416fe7293d3f407072044c6d8f Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Fri, 15 Dec 2023 16:21:03 +0800 Subject: [PATCH 06/20] =?UTF-8?q?feat:=20=E7=A7=BB=E9=99=A4=20super.BitMas?= =?UTF-8?q?k=20=E4=BB=A5=20super.BitSet=20=E6=9B=BF=E4=BB=A3=EF=BC=8Csuper?= =?UTF-8?q?.BitSet=20=E6=98=AF=E4=B8=80=E4=B8=AA=E5=8F=AF=E5=8A=A8?= =?UTF-8?q?=E6=80=81=E5=A2=9E=E9=95=BF=E7=9A=84=E6=AF=94=E7=89=B9=E4=BD=8D?= =?UTF-8?q?=E9=9B=86=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/super/bit_set.go | 324 ++++++++++++++++++++++++++++++++++++ utils/super/bit_set_test.go | 33 ++++ utils/super/mask.go | 96 ----------- utils/super/mask_test.go | 16 -- 4 files changed, 357 insertions(+), 112 deletions(-) create mode 100644 utils/super/bit_set.go create mode 100644 utils/super/bit_set_test.go delete mode 100644 utils/super/mask.go delete mode 100644 utils/super/mask_test.go diff --git a/utils/super/bit_set.go b/utils/super/bit_set.go new file mode 100644 index 0000000..a7cce23 --- /dev/null +++ b/utils/super/bit_set.go @@ -0,0 +1,324 @@ +package super + +import ( + "fmt" + "github.com/kercylan98/minotaur/utils/generic" + "math/bits" +) + +// NewBitSet 通过指定的 Bit 位创建一个 BitSet +func NewBitSet[Bit generic.Integer](bits ...Bit) *BitSet[Bit] { + set := &BitSet[Bit]{set: make([]uint64, 0, 1)} + for _, bit := range bits { + set.Set(bit) + } + return set +} + +// BitSet 是一个可以动态增长的比特位集合 +// - 默认情况下将使用 64 位无符号整数来表示比特位,当需要表示的比特位超过 64 位时,将自动增长 +type BitSet[Bit generic.Integer] struct { + set []uint64 // 比特位集合 +} + +// Set 将指定的位 bit 设置为 1 +func (slf *BitSet[Bit]) Set(bit Bit) *BitSet[Bit] { + word := bit >> 6 + for word >= Bit(len(slf.set)) { + slf.set = append(slf.set, 0) + } + slf.set[word] |= 1 << (bit & 0x3f) + return slf +} + +// Del 将指定的位 bit 设置为 0 +func (slf *BitSet[Bit]) Del(bit Bit) *BitSet[Bit] { + word := bit >> 6 + if word < Bit(len(slf.set)) { + slf.set[word] &^= 1 << (bit & 0x3f) + } + return slf +} + +// Shrink 将 BitSet 中的比特位集合缩小到最小 +// - 正常情况下当 BitSet 中的比特位超出 64 位时,将自动增长,当 BitSet 中的比特位数量减少时,可以使用该方法将 BitSet 中的比特位集合缩小到最小 +func (slf *BitSet[Bit]) Shrink() *BitSet[Bit] { + index := len(slf.set) - 1 + if slf.set[index] != 0 { + return slf + } + + for i := index - 1; i >= 0; i-- { + if slf.set[i] != 0 { + slf.set = slf.set[:i+1] + return slf + } + } + return slf +} + +// Cap 返回当前 BitSet 中可以表示的最大比特位数量 +func (slf *BitSet[Bit]) Cap() int { + return len(slf.set) * 64 +} + +// Has 检查指定的位 bit 是否被设置为 1 +func (slf *BitSet[Bit]) Has(bit Bit) bool { + word := bit >> 6 + return word < Bit(len(slf.set)) && slf.set[word]&(1<<(bit&0x3f)) != 0 +} + +// Clear 清空所有的比特位 +func (slf *BitSet[Bit]) Clear() *BitSet[Bit] { + slf.set = nil + return slf +} + +// Len 返回当前 BitSet 中被设置的比特位数量 +func (slf *BitSet[Bit]) Len() int { + var count int + for _, word := range slf.set { + count += bits.OnesCount64(word) + } + return count +} + +// Bits 返回当前 BitSet 中被设置的比特位 +func (slf *BitSet[Bit]) Bits() []Bit { + bits := make([]Bit, 0, slf.Len()) + for i, word := range slf.set { + for j := 0; j < 64; j++ { + if word&(1<= len(slf.set) || slf.set[i]&word != word { + return false + } + } + return true +} + +// ContainsAny 检查当前 BitSet 是否包含另一个 BitSet 中的任意比特位 +func (slf *BitSet[Bit]) ContainsAny(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// ContainsAll 检查当前 BitSet 是否包含另一个 BitSet 中的所有比特位 +func (slf *BitSet[Bit]) ContainsAll(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i >= len(slf.set) || slf.set[i]&word != word { + return false + } + } + return true +} + +// Intersect 检查当前 BitSet 是否与另一个 BitSet 有交集 +func (slf *BitSet[Bit]) Intersect(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// Union 检查当前 BitSet 是否与另一个 BitSet 有并集 +func (slf *BitSet[Bit]) Union(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// Difference 检查当前 BitSet 是否与另一个 BitSet 有差集 +func (slf *BitSet[Bit]) Difference(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// SymmetricDifference 检查当前 BitSet 是否与另一个 BitSet 有对称差集 +func (slf *BitSet[Bit]) SymmetricDifference(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i < len(slf.set) && slf.set[i]&word != 0 { + return true + } + } + return false +} + +// Subset 检查当前 BitSet 是否为另一个 BitSet 的子集 +func (slf *BitSet[Bit]) Subset(other *BitSet[Bit]) bool { + for i, word := range other.set { + if i >= len(slf.set) || slf.set[i]&word != word { + return false + } + } + return true +} + +// Superset 检查当前 BitSet 是否为另一个 BitSet 的超集 +func (slf *BitSet[Bit]) Superset(other *BitSet[Bit]) bool { + for i, word := range slf.set { + if i >= len(other.set) || other.set[i]&word != word { + return false + } + } + return true +} + +// Complement 检查当前 BitSet 是否为另一个 BitSet 的补集 +func (slf *BitSet[Bit]) Complement(other *BitSet[Bit]) bool { + for i, word := range slf.set { + if i >= len(other.set) || other.set[i]&word != word { + return false + } + } + return true +} + +// Max 返回当前 BitSet 中最大的比特位 +func (slf *BitSet[Bit]) Max() Bit { + for i := len(slf.set) - 1; i >= 0; i-- { + if slf.set[i] != 0 { + return Bit(i*64 + bits.Len64(slf.set[i]) - 1) + } + } + return 0 +} + +// Min 返回当前 BitSet 中最小的比特位 +func (slf *BitSet[Bit]) Min() Bit { + for i, word := range slf.set { + if word != 0 { + return Bit(i*64 + bits.TrailingZeros64(word)) + } + } + return 0 +} + +// String 返回当前 BitSet 的字符串表示 +func (slf *BitSet[Bit]) String() string { + return fmt.Sprintf("[%v] %v", slf.Len(), slf.Bits()) +} + +// MarshalJSON 实现 json.Marshaler 接口 +func (slf *BitSet[Bit]) MarshalJSON() ([]byte, error) { + return MarshalJSONE(slf.set) +} + +// UnmarshalJSON 实现 json.Unmarshaler 接口 +func (slf *BitSet[Bit]) UnmarshalJSON(data []byte) error { + return UnmarshalJSON(data, &slf.set) +} diff --git a/utils/super/bit_set_test.go b/utils/super/bit_set_test.go new file mode 100644 index 0000000..f654139 --- /dev/null +++ b/utils/super/bit_set_test.go @@ -0,0 +1,33 @@ +package super_test + +import ( + "github.com/kercylan98/minotaur/utils/super" + "testing" +) + +func TestBitSet_Set(t *testing.T) { + bs := super.NewBitSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + bs.Set(11) + bs.Set(12) + bs.Set(13) + t.Log(bs) +} + +func TestBitSet_Del(t *testing.T) { + bs := super.NewBitSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + bs.Del(11) + bs.Del(12) + bs.Del(13) + bs.Del(10) + t.Log(bs) +} + +func TestBitSet_Shrink(t *testing.T) { + bs := super.NewBitSet(63) + t.Log(bs.Cap()) + bs.Set(200) + t.Log(bs.Cap()) + bs.Del(200) + bs.Shrink() + t.Log(bs.Cap()) +} diff --git a/utils/super/mask.go b/utils/super/mask.go deleted file mode 100644 index 982ab43..0000000 --- a/utils/super/mask.go +++ /dev/null @@ -1,96 +0,0 @@ -package super - -import ( - "github.com/kercylan98/minotaur/utils/generic" - "math/bits" - "strconv" -) - -// BitMask 使用泛型类型 Bit 表示一个比特掩码,Bit 必须是整数类型。 -type BitMask[Bit generic.Integer] uint64 - -// Mask 创建一个新的 BitMask,包含给定的比特位。 -// 参数 bits 是一个可变参数列表,表示要设置的比特位。 -// - 使用按位或运算符 (|=) 将 bit 位置的比特设置为 1。 -func Mask[Bit generic.Integer](bits ...Bit) BitMask[Bit] { - var mask Bit - for _, bit := range bits { - mask |= 1 << bit - } - return BitMask[Bit](mask) -} - -// Matches 检查当前 BitMask 是否与另一个 BitMask 完全匹配。 -func (slf *BitMask[Bit]) Matches(bits BitMask[Bit]) bool { - return *slf == bits -} - -// Contains 检查当前 BitMask 是否包含另一个 BitMask。 -// - 使用按位与运算符 (&) 计算两个 BitMask 的公共部分,然后与 bits 进行比较。 -func (slf *BitMask[Bit]) Contains(bits BitMask[Bit]) bool { - return (*slf & bits) == bits -} - -// Has 检查当前 BitMask 是否包含特定的比特位。 -// - 使用按位与运算符 (&) 检查 bit 位置的比特是否为 1。 -func (slf *BitMask[Bit]) Has(bits ...Bit) bool { - for _, bit := range bits { - if *slf&(1< Date: Tue, 19 Dec 2023 15:59:56 +0800 Subject: [PATCH 07/20] =?UTF-8?q?feat:=20generic=20=E5=8C=85=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20Unsigned=20=E8=A1=A8=E7=A4=BA=E6=97=A0=E7=AC=A6?= =?UTF-8?q?=E5=8F=B7=E6=95=B4=E6=95=B0=E7=9A=84=E7=BA=A6=E6=9D=9F=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/generic/type.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/utils/generic/type.go b/utils/generic/type.go index cbcd1c8..ea70ac4 100644 --- a/utils/generic/type.go +++ b/utils/generic/type.go @@ -27,7 +27,12 @@ type Signed interface { // Unsigned 无符号整数类型 type Unsigned interface { - ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr + UnsignedNumber | ~uintptr +} + +// UnsignedNumber 无符号数字类型 +type UnsignedNumber interface { + ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 } // Float 浮点类型 From ba24b09c71afba891b888bc51308ba9e4503c325 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 19 Dec 2023 16:00:45 +0800 Subject: [PATCH 08/20] =?UTF-8?q?style:=20=E7=A7=BB=E9=99=A4=20server=20?= =?UTF-8?q?=E6=85=A2=E6=B6=88=E6=81=AF=E6=97=A0=E6=84=8F=E4=B9=89=E7=9A=84?= =?UTF-8?q?=E5=A0=86=E6=A0=88=E4=BF=A1=E6=81=AF=EF=BC=8C=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E7=9A=84=20String=20=E5=87=BD=E6=95=B0?= =?UTF-8?q?=E8=BF=94=E5=9B=9E=E7=9A=84=E4=B8=8D=E5=86=8D=E6=98=AF=E7=AE=80?= =?UTF-8?q?=E5=8D=95=E7=9A=84=E6=B6=88=E6=81=AF=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/message.go | 13 ++++++++++++- server/server.go | 4 ++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/server/message.go b/server/message.go index 500ace4..d339566 100644 --- a/server/message.go +++ b/server/message.go @@ -3,6 +3,7 @@ package server import ( "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/super" ) const ( @@ -128,7 +129,17 @@ func (slf *Message) MessageType() MessageType { // String 返回消息的字符串表示 func (slf *Message) String() string { - return slf.t.String() + var info = struct { + Type string `json:"type,omitempty"` + Name string `json:"name,omitempty"` + Packet string `json:"packet,omitempty"` + }{ + Type: slf.t.String(), + Name: slf.name, + Packet: string(slf.packet), + } + + return string(super.MarshalJSON(info)) } // String 返回消息类型的字符串表示 diff --git a/server/server.go b/server/server.go index fe346fc..5134858 100644 --- a/server/server.go +++ b/server/server.go @@ -676,8 +676,8 @@ func (slf *Server) low(message *Message, present time.Time, expect time.Duration var fields = make([]log.Field, 0, len(message.marks)+4) fields = append(fields, log.String("type", messageNames[message.t]), log.String("cost", cost.String()), log.String("message", message.String())) fields = append(fields, message.marks...) - fields = append(fields, log.Stack("stack")) - log.Warn("Server", fields...) + //fields = append(fields, log.Stack("stack")) + log.Warn("ServerLowMessage", fields...) slf.OnMessageLowExecEvent(message, cost) } } From 34a680e710c1925aac94cc68d8aa800f9cee3a65 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 19 Dec 2023 16:16:51 +0800 Subject: [PATCH 09/20] other: Russh vulnerable to Prefix Truncation Attack against ChaCha20-Poly1305 and Encrypt-then-MAC #7 --- go.mod | 8 ++++---- go.sum | 6 ++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 42bcad4..14aa3a8 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/xtaci/kcp-go/v5 v5.6.3 go.uber.org/atomic v1.10.0 go.uber.org/zap v1.25.0 - golang.org/x/crypto v0.14.0 + golang.org/x/crypto v0.17.0 google.golang.org/grpc v1.59.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) @@ -62,9 +62,9 @@ require ( golang.org/x/arch v0.3.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.14.0 // indirect - golang.org/x/term v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/term v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 7841553..9ed10af 100644 --- a/go.sum +++ b/go.sum @@ -203,6 +203,8 @@ golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= @@ -247,14 +249,18 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= From 82ecb983971710fcc02b64e17bca13254bc799d0 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 20 Dec 2023 09:48:03 +0800 Subject: [PATCH 10/20] =?UTF-8?q?other:=20=E4=BC=98=E5=8C=96=20server=20?= =?UTF-8?q?=E5=8C=85=E9=83=A8=E5=88=86=20error=20=E7=9A=84=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/server.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/server.go b/server/server.go index 5134858..532840b 100644 --- a/server/server.go +++ b/server/server.go @@ -693,7 +693,7 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { go func(ctx context.Context, msg *Message) { select { case <-ctx.Done(): - if err := ctx.Err(); err == context.DeadlineExceeded { + if err := ctx.Err(); errors.Is(err, context.DeadlineExceeded) { log.Warn("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("SuspectedDeadlock", msg)) slf.OnDeadlockDetectEvent(msg) } @@ -708,9 +708,7 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { stack := string(debug.Stack()) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("error", err), log.String("stack", stack)) fmt.Println(stack) - if e, ok := err.(error); ok { - slf.OnMessageErrorEvent(msg, e) - } + slf.OnMessageErrorEvent(msg, fmt.Errorf("%v", err)) } if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback { dispatcher.antiUnique(msg.name) @@ -729,7 +727,9 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { switch msg.t { case MessageTypePacket: - if !slf.OnConnectionPacketPreprocessEvent(msg.conn, msg.packet, func(newPacket []byte) { msg.packet = newPacket }) { + if !slf.OnConnectionPacketPreprocessEvent(msg.conn, msg.packet, func(newPacket []byte) { + msg.packet = newPacket + }) { slf.OnConnectionReceivePacketEvent(msg.conn, msg.packet) } case MessageTypeError: From e60017c0ebe7d762a388dc1d51bee5f2306b16d5 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 20 Dec 2023 10:46:01 +0800 Subject: [PATCH 11/20] =?UTF-8?q?other:=20=E4=BC=98=E5=8C=96=20server=20?= =?UTF-8?q?=E5=8C=85=E6=B6=88=E6=81=AF=E5=88=86=E5=8F=91=20cancel=20?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/server.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/server/server.go b/server/server.go index 532840b..f8a8ed3 100644 --- a/server/server.go +++ b/server/server.go @@ -702,28 +702,31 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { } present := time.Now() - if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync { - defer func(msg *Message) { + defer func(msg *Message) { + super.Handle(cancel) + if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync { if err := recover(); err != nil { stack := string(debug.Stack()) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("error", err), log.String("stack", stack)) fmt.Println(stack) - slf.OnMessageErrorEvent(msg, fmt.Errorf("%v", err)) + e, ok := err.(error) + if !ok { + e = fmt.Errorf("%v", err) + } + slf.OnMessageErrorEvent(msg, e) } if msg.t == MessageTypeUniqueAsyncCallback || msg.t == MessageTypeUniqueShuntAsyncCallback { dispatcher.antiUnique(msg.name) } - super.Handle(cancel) slf.low(msg, present, time.Millisecond*100) slf.messageCounter.Add(-1) if !slf.isShutdown.Load() { slf.messagePool.Release(msg) } - - }(msg) - } + } + }(msg) switch msg.t { case MessageTypePacket: @@ -753,9 +756,11 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { stack := string(debug.Stack()) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.Any("error", err), log.String("stack", stack)) fmt.Println(stack) - if e, ok := err.(error); ok { - slf.OnMessageErrorEvent(msg, e) + e, ok := err.(error) + if !ok { + e = fmt.Errorf("%v", err) } + slf.OnMessageErrorEvent(msg, e) } super.Handle(cancel) slf.low(msg, present, time.Second) From 7e092293301628c28be4191eacc623d23978d955 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 20 Dec 2023 11:24:51 +0800 Subject: [PATCH 12/20] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20server.Server?= =?UTF-8?q?=20=E5=9C=A8=E4=BD=BF=E7=94=A8=20UseShunt=20=E5=87=BD=E6=95=B0?= =?UTF-8?q?=E6=97=B6=E7=94=B1=E4=BA=8E=E6=9C=AA=E8=AE=B0=E5=BD=95=E5=BD=93?= =?UTF-8?q?=E5=89=8D=E5=88=86=E5=8F=91=E5=99=A8=E5=AF=BC=E8=87=B4=E7=9A=84?= =?UTF-8?q?=E5=86=85=E5=AD=98=E6=B3=84=E6=BC=8F=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.sum | 10 ++-------- server/server.go | 1 + server/server_test.go | 3 +++ 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/go.sum b/go.sum index 9ed10af..a26711f 100644 --- a/go.sum +++ b/go.sum @@ -201,8 +201,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -247,19 +245,15 @@ golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211204120058-94396e421777/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/server/server.go b/server/server.go index f8a8ed3..48c4c3f 100644 --- a/server/server.go +++ b/server/server.go @@ -596,6 +596,7 @@ func (slf *Server) UseShunt(conn *Conn, name string) { delete(slf.dispatchers, curr.name) } } + slf.currDispatcher[conn.GetID()] = d member, exist := slf.dispatcherMember[name] if !exist { diff --git a/server/server_test.go b/server/server_test.go index 855280e..a79a03c 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -24,6 +24,9 @@ func TestNew(t *testing.T) { fmt.Println("关闭", conn.GetID(), err, "Count", srv.GetOnlineCount()) }) srv.RegConnectionOpenedEvent(func(srv *server.Server, conn *server.Conn) { + srv.UseShunt(conn, "1") + srv.UseShunt(conn, "2") + srv.UseShunt(conn, "3") //if srv.GetOnlineCount() > 1 { // conn.Close() //} From 4d72e8cbcba656dca7a3938275e9bd01d3116015 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 20 Dec 2023 12:21:43 +0800 Subject: [PATCH 13/20] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20server=20?= =?UTF-8?q?=E5=8C=85=E6=9C=AA=E4=BD=BF=E7=94=A8=20KCP=20=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=E6=97=B6=E4=BC=9A=E6=9C=89=E9=A2=9D=E5=A4=96=E7=9A=84?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E5=99=A8=E6=8D=9F=E8=80=97=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/multiple.go | 8 ++++++++ server/server.go | 4 ++++ server/server_test.go | 2 +- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/server/multiple.go b/server/multiple.go index 602a0b7..0335a16 100644 --- a/server/multiple.go +++ b/server/multiple.go @@ -3,6 +3,7 @@ package server import ( "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/network" + "github.com/xtaci/kcp-go/v5" "math" "os" "os/signal" @@ -35,8 +36,12 @@ func (slf *MultipleServer) Run() { close(runtimeExceptionChannel) }() var wait sync.WaitGroup + var hasKcp bool for i := 0; i < len(slf.servers); i++ { wait.Add(1) + if slf.servers[i].network == NetworkKcp { + hasKcp = true + } go func(address string, server *Server) { var lock sync.Mutex var startFinish bool @@ -62,6 +67,9 @@ func (slf *MultipleServer) Run() { }(slf.addresses[i], slf.servers[i]) } wait.Wait() + if !hasKcp { + kcp.SystemTimedSched.Close() + } log.Info("Server", log.String(serverMultipleMark, "====================================================================")) ip, _ := network.IP() diff --git a/server/server.go b/server/server.go index 48c4c3f..098337b 100644 --- a/server/server.go +++ b/server/server.go @@ -352,6 +352,10 @@ func (slf *Server) Run(addr string) error { return ErrCanNotSupportNetwork } + if slf.multiple == nil && slf.network != NetworkKcp { + kcp.SystemTimedSched.Close() + } + <-messageInitFinish close(messageInitFinish) messageInitFinish = nil diff --git a/server/server_test.go b/server/server_test.go index a79a03c..d06a3cc 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -11,7 +11,7 @@ import ( func TestNew(t *testing.T) { //limiter := rate.NewLimiter(rate.Every(time.Second), 100) - srv := server.New(server.NetworkWebsocket, server.WithMessageBufferSize(1024*1024), server.WithPProf()) + srv := server.New(server.NetworkWebsocket, server.WithTicker(200, 10, false), server.WithMessageBufferSize(1024*1024), server.WithPProf()) //srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool { // t, c := srv.TimeoutContext(time.Second * 5) // defer c() From 508e30fb5bf7c5915db90065523e76a9ed3852ef Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 20 Dec 2023 16:57:54 +0800 Subject: [PATCH 14/20] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20timer.Ticker?= =?UTF-8?q?=20=E5=92=8C=20lockstep=20=E5=8C=85=E5=AD=98=E5=9C=A8=E7=9A=84?= =?UTF-8?q?=E5=86=85=E5=AD=98=E6=B3=84=E6=BC=8F=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/conn.go | 5 ++++- server/lockstep/lockstep.go | 8 +++++++- utils/timer/ticker.go | 6 +++++- utils/timer/timer.go | 13 +++++++++++++ 4 files changed, 29 insertions(+), 3 deletions(-) diff --git a/server/conn.go b/server/conn.go index 1e8d784..c6fa0e3 100644 --- a/server/conn.go +++ b/server/conn.go @@ -277,7 +277,7 @@ func (slf *Conn) Write(packet []byte, callback ...func(err error)) { } func (slf *Conn) init() { - if slf.server.ticker != nil { + if slf.server.ticker != nil && slf.server.connTickerSize > 0 { if slf.server.tickerAutonomy { slf.ticker = timer.GetTicker(slf.server.connTickerSize) } else { @@ -335,6 +335,9 @@ func (slf *Conn) init() { func (slf *Conn) Close(err ...error) { slf.mu.Lock() if slf.closed { + if slf.ticker != nil { + slf.ticker.Release() + } slf.mu.Unlock() return } diff --git a/server/lockstep/lockstep.go b/server/lockstep/lockstep.go index 7218876..10ac55d 100644 --- a/server/lockstep/lockstep.go +++ b/server/lockstep/lockstep.go @@ -123,6 +123,9 @@ func (slf *Lockstep[ClientID, Command]) StartBroadcast() { return } slf.running = true + if slf.ticker == nil { + slf.ticker = timer.GetTicker(10) + } slf.runningLock.Unlock() slf.currentFrame = slf.initFrame @@ -175,7 +178,10 @@ func (slf *Lockstep[ClientID, Command]) StopBroadcast() { slf.running = false slf.runningLock.Unlock() - slf.ticker.StopTimer("lockstep") + if slf.ticker != nil { + slf.ticker.Release() + } + slf.ticker = nil slf.OnLockstepStoppedEvent() diff --git a/utils/timer/ticker.go b/utils/timer/ticker.go index 1892181..470498d 100644 --- a/utils/timer/ticker.go +++ b/utils/timer/ticker.go @@ -38,7 +38,11 @@ func (slf *Ticker) Release() { } slf.lock.Unlock() - slf.timer.tickers = append(slf.timer.tickers, slf) + if len(slf.timer.tickers) < tickerPoolSize { + slf.timer.tickers = append(slf.timer.tickers, slf) + } else { + slf.wheel.Stop() + } } // StopTimer 停止特定名称的调度器 diff --git a/utils/timer/timer.go b/utils/timer/timer.go index 3c9f754..c778b92 100644 --- a/utils/timer/timer.go +++ b/utils/timer/timer.go @@ -6,8 +6,21 @@ import ( "github.com/RussellLuo/timingwheel" ) +var tickerPoolSize = 96 + var timer = new(Timer) +// SetTickerPoolSize 设置定时器池大小 +// - 默认值为 96,当定时器池中的定时器不足时,会自动创建新的定时器,当定时器释放时,会将多余的定时器进行释放,否则将放入定时器池中 +func SetTickerPoolSize(size int) { + if size <= 0 { + panic("ticker pool size must be greater than 0") + } + timer.lock.Lock() + defer timer.lock.Unlock() + tickerPoolSize = size +} + func GetTicker(size int, options ...Option) *Ticker { return timer.NewTicker(size, options...) } From 9038bfc2b529934866555a57dd36fae5d788ad04 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Thu, 21 Dec 2023 14:08:21 +0800 Subject: [PATCH 15/20] =?UTF-8?q?perf:=20=E7=A7=BB=E9=99=A4=20lockstep=20?= =?UTF-8?q?=E5=AF=B9=20timer.Ticket=20=E7=9A=84=E4=BE=9D=E8=B5=96=EF=BC=8C?= =?UTF-8?q?=E6=9B=B4=E6=94=B9=E4=B8=BA=20time.Ticker=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=EF=BC=8C=E5=87=8F=E5=B0=91=E4=B8=8D=E5=BF=85=E8=A6=81=E7=9A=84?= =?UTF-8?q?=E8=B5=84=E6=BA=90=E5=8D=A0=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/lockstep/lockstep.go | 81 +++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/server/lockstep/lockstep.go b/server/lockstep/lockstep.go index 10ac55d..640c093 100644 --- a/server/lockstep/lockstep.go +++ b/server/lockstep/lockstep.go @@ -2,7 +2,6 @@ package lockstep import ( "encoding/json" - "github.com/kercylan98/minotaur/utils/timer" "sync" "time" ) @@ -11,7 +10,6 @@ import ( func NewLockstep[ClientID comparable, Command any](options ...Option[ClientID, Command]) *Lockstep[ClientID, Command] { lockstep := &Lockstep[ClientID, Command]{ currentFrame: -1, - ticker: timer.GetTicker(10), frameRate: 15, serialization: func(frame int64, commands []Command) []byte { frameStruct := struct { @@ -55,7 +53,7 @@ type Lockstep[ClientID comparable, Command any] struct { frameCache map[int64][]byte // 帧序列化缓存 frameCacheLock sync.RWMutex // 帧序列化缓存锁 - ticker *timer.Ticker // 定时器 + ticker *time.Ticker // 定时器 lockstepStoppedEventHandles []StoppedEventHandle[ClientID, Command] } @@ -124,48 +122,51 @@ func (slf *Lockstep[ClientID, Command]) StartBroadcast() { } slf.running = true if slf.ticker == nil { - slf.ticker = timer.GetTicker(10) + slf.ticker = time.NewTicker(time.Second / time.Duration(slf.frameRate)) } slf.runningLock.Unlock() slf.currentFrame = slf.initFrame - slf.ticker.Loop("lockstep", timer.Instantly, time.Second/time.Duration(slf.frameRate), timer.Forever, func() { - - slf.currentFrameLock.RLock() - if slf.frameLimit > 0 && slf.currentFrame >= slf.frameLimit { - slf.currentFrameLock.RUnlock() - slf.StopBroadcast() - return - } - slf.currentFrameLock.RUnlock() - slf.currentFrameLock.Lock() - slf.currentFrame++ - currentFrame := slf.currentFrame - currentCommands := slf.currentCommands - slf.currentCommands = make([]Command, 0, len(currentCommands)) - slf.currentFrameLock.Unlock() - - slf.clientLock.RLock() - defer slf.clientLock.RUnlock() - slf.frameCacheLock.Lock() - defer slf.frameCacheLock.Unlock() - - for clientId, client := range slf.clients { - var i = slf.clientFrame[clientId] - if i < slf.initFrame { - i = slf.initFrame - } - for ; i < currentFrame; i++ { - cache, exist := slf.frameCache[i] - if !exist { - cache = slf.serialization(i, currentCommands) - slf.frameCache[i] = cache + go func(ls *Lockstep[ClientID, Command]) { + for range ls.ticker.C { + go func(ls *Lockstep[ClientID, Command]) { + ls.currentFrameLock.RLock() + if ls.frameLimit > 0 && ls.currentFrame >= ls.frameLimit { + ls.currentFrameLock.RUnlock() + ls.StopBroadcast() + return } - client.Write(cache) - } - slf.clientFrame[clientId] = currentFrame + ls.currentFrameLock.RUnlock() + ls.currentFrameLock.Lock() + ls.currentFrame++ + currentFrame := ls.currentFrame + currentCommands := ls.currentCommands + ls.currentCommands = make([]Command, 0, len(currentCommands)) + ls.currentFrameLock.Unlock() + + ls.clientLock.RLock() + defer ls.clientLock.RUnlock() + ls.frameCacheLock.Lock() + defer ls.frameCacheLock.Unlock() + + for clientId, client := range ls.clients { + var i = ls.clientFrame[clientId] + if i < ls.initFrame { + i = ls.initFrame + } + for ; i < currentFrame; i++ { + cache, exist := ls.frameCache[i] + if !exist { + cache = ls.serialization(i, currentCommands) + ls.frameCache[i] = cache + } + client.Write(cache) + } + ls.clientFrame[clientId] = currentFrame + } + }(ls) } - }) + }(slf) } // StopBroadcast 停止广播 @@ -179,7 +180,7 @@ func (slf *Lockstep[ClientID, Command]) StopBroadcast() { slf.runningLock.Unlock() if slf.ticker != nil { - slf.ticker.Release() + slf.ticker.Stop() } slf.ticker = nil From 2ff7db96d274b0c039830702f5a5d4365a658e5c Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Thu, 21 Dec 2023 14:09:17 +0800 Subject: [PATCH 16/20] =?UTF-8?q?other:=20=E4=BC=98=E5=8C=96=20server=20?= =?UTF-8?q?=E5=8C=85=E6=B6=88=E6=81=AF=E5=88=86=E5=8F=91=E6=97=B6=E5=AF=B9?= =?UTF-8?q?=E4=BA=8E=20cancel=20=E7=9A=84=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/server.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/server/server.go b/server/server.go index 098337b..d057225 100644 --- a/server/server.go +++ b/server/server.go @@ -707,9 +707,9 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { } present := time.Now() - defer func(msg *Message) { - super.Handle(cancel) - if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync { + if msg.t != MessageTypeAsync && msg.t != MessageTypeUniqueAsync && msg.t != MessageTypeShuntAsync && msg.t != MessageTypeUniqueShuntAsync { + defer func(msg *Message) { + super.Handle(cancel) if err := recover(); err != nil { stack := string(debug.Stack()) log.Error("Server", log.String("MessageType", messageNames[msg.t]), log.String("Info", msg.String()), log.Any("error", err), log.String("stack", stack)) @@ -730,8 +730,12 @@ func (slf *Server) dispatchMessage(dispatcher *dispatcher, msg *Message) { if !slf.isShutdown.Load() { slf.messagePool.Release(msg) } + }(msg) + } else { + if cancel != nil { + defer cancel() } - }(msg) + } switch msg.t { case MessageTypePacket: From 1ae1c8d65c6f9ef11a8455cf1f13354dea624fc7 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Thu, 21 Dec 2023 14:22:18 +0800 Subject: [PATCH 17/20] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=20timer=20?= =?UTF-8?q?=E5=8C=85=E7=9A=84=20GetTicker=20=E8=8E=B7=E5=8F=96=E5=88=B0?= =?UTF-8?q?=E7=9A=84=E4=B8=BA=E5=86=85=E7=BD=AE=E5=AE=9A=E6=97=B6=E5=99=A8?= =?UTF-8?q?=E6=B1=A0=E4=B8=AD=E7=9A=84=E5=AE=9A=E6=97=B6=E5=99=A8=EF=BC=8C?= =?UTF-8?q?=E5=8F=AF=E9=80=9A=E8=BF=87=20timer.NewTimer=20=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E5=AE=9A=E6=97=B6=E5=99=A8=E6=B1=A0=E5=8F=A6=E8=A1=8C?= =?UTF-8?q?=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/timer/constants.go | 4 ++++ utils/timer/timer.go | 46 ++++++++++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/utils/timer/constants.go b/utils/timer/constants.go index be07f83..53c6b6e 100644 --- a/utils/timer/constants.go +++ b/utils/timer/constants.go @@ -15,3 +15,7 @@ const ( const ( NoMark = "" // 没有设置标记的定时器 ) + +const ( + DefaultTickerPoolSize = 96 +) diff --git a/utils/timer/timer.go b/utils/timer/timer.go index c778b92..2a410e4 100644 --- a/utils/timer/timer.go +++ b/utils/timer/timer.go @@ -1,35 +1,55 @@ package timer import ( + "fmt" "sync" "github.com/RussellLuo/timingwheel" ) -var tickerPoolSize = 96 - -var timer = new(Timer) +var ( + tickerPoolSize = DefaultTickerPoolSize + standardTimer = NewTimer(tickerPoolSize) +) // SetTickerPoolSize 设置定时器池大小 -// - 默认值为 96,当定时器池中的定时器不足时,会自动创建新的定时器,当定时器释放时,会将多余的定时器进行释放,否则将放入定时器池中 +// - 默认值为 DefaultTickerPoolSize,当定时器池中的定时器不足时,会自动创建新的定时器,当定时器释放时,会将多余的定时器进行释放,否则将放入定时器池中 func SetTickerPoolSize(size int) { - if size <= 0 { - panic("ticker pool size must be greater than 0") - } - timer.lock.Lock() - defer timer.lock.Unlock() - tickerPoolSize = size + _ = standardTimer.ChangeTickerPoolSize(size) } func GetTicker(size int, options ...Option) *Ticker { - return timer.NewTicker(size, options...) + return standardTimer.NewTicker(size, options...) +} + +func NewTimer(tickerPoolSize int) *Timer { + if tickerPoolSize <= 0 { + panic(fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize)) + } + return &Timer{ + tickerPoolSize: tickerPoolSize, + } } type Timer struct { - tickers []*Ticker - lock sync.Mutex + tickers []*Ticker + lock sync.Mutex + tickerPoolSize int } +// ChangeTickerPoolSize 改变定时器池大小 +// - 当传入的大小小于或等于 0 时,将会返回错误,并且不会发生任何改变 +func (slf *Timer) ChangeTickerPoolSize(size int) error { + if size <= 0 { + return fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize) + } + slf.lock.Lock() + defer slf.lock.Unlock() + slf.tickerPoolSize = size + return nil +} + +// NewTicker 获取一个新的定时器 func (slf *Timer) NewTicker(size int, options ...Option) *Ticker { slf.lock.Lock() defer slf.lock.Unlock() From 50181c7ecbcd2f9d37c9a3ceceed88dc6143444f Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Thu, 21 Dec 2023 14:37:17 +0800 Subject: [PATCH 18/20] =?UTF-8?q?style:=20=E4=BF=AE=E6=94=B9=20timer.Timer?= =?UTF-8?q?=20=E5=90=8D=E5=AD=97=E4=B8=BA=20timer.Pool?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/timer/pool.go | 61 ++++++++++++++++++++++++++++++++++++++ utils/timer/ticker.go | 5 ++-- utils/timer/timer.go | 69 ++++--------------------------------------- 3 files changed, 70 insertions(+), 65 deletions(-) create mode 100644 utils/timer/pool.go diff --git a/utils/timer/pool.go b/utils/timer/pool.go new file mode 100644 index 0000000..390d3fa --- /dev/null +++ b/utils/timer/pool.go @@ -0,0 +1,61 @@ +package timer + +import ( + "fmt" + "sync" + + "github.com/RussellLuo/timingwheel" +) + +// NewPool 创建一个定时器池,当 tickerPoolSize 小于等于 0 时,将会引发 panic,可指定为 DefaultTickerPoolSize +func NewPool(tickerPoolSize int) *Pool { + if tickerPoolSize <= 0 { + panic(fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize)) + } + return &Pool{ + tickerPoolSize: tickerPoolSize, + } +} + +// Pool 定时器池 +type Pool struct { + tickers []*Ticker + lock sync.Mutex + tickerPoolSize int +} + +// ChangePoolSize 改变定时器池大小 +// - 当传入的大小小于或等于 0 时,将会返回错误,并且不会发生任何改变 +func (slf *Pool) ChangePoolSize(size int) error { + if size <= 0 { + return fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize) + } + slf.lock.Lock() + defer slf.lock.Unlock() + slf.tickerPoolSize = size + return nil +} + +// GetTicker 获取一个新的定时器 +func (slf *Pool) GetTicker(size int, options ...Option) *Ticker { + slf.lock.Lock() + defer slf.lock.Unlock() + + var ticker *Ticker + if len(slf.tickers) > 0 { + ticker = slf.tickers[0] + slf.tickers = slf.tickers[1:] + return ticker + } + + ticker = &Ticker{ + timer: slf, + wheel: timingwheel.NewTimingWheel(timingWheelTick, int64(size)), + timers: make(map[string]*Scheduler), + } + for _, option := range options { + option(ticker) + } + ticker.wheel.Start() + return ticker +} diff --git a/utils/timer/ticker.go b/utils/timer/ticker.go index 470498d..4e05840 100644 --- a/utils/timer/ticker.go +++ b/utils/timer/ticker.go @@ -11,10 +11,11 @@ import ( // Ticker 定时器 type Ticker struct { - timer *Timer + timer *Pool wheel *timingwheel.TimingWheel timers map[string]*Scheduler lock sync.RWMutex + handle func(name string, caller func()) mark string } @@ -25,7 +26,7 @@ func (slf *Ticker) Mark() string { return slf.mark } -// Release 释放定时器,并将定时器重新放回 Timer 池中 +// Release 释放定时器,并将定时器重新放回 Pool 池中 func (slf *Ticker) Release() { slf.timer.lock.Lock() defer slf.timer.lock.Unlock() diff --git a/utils/timer/timer.go b/utils/timer/timer.go index 2a410e4..e20d806 100644 --- a/utils/timer/timer.go +++ b/utils/timer/timer.go @@ -1,74 +1,17 @@ package timer -import ( - "fmt" - "sync" - - "github.com/RussellLuo/timingwheel" -) - var ( tickerPoolSize = DefaultTickerPoolSize - standardTimer = NewTimer(tickerPoolSize) + standardPool = NewPool(tickerPoolSize) ) -// SetTickerPoolSize 设置定时器池大小 +// SetPoolSize 设置标准池定时器池大小 // - 默认值为 DefaultTickerPoolSize,当定时器池中的定时器不足时,会自动创建新的定时器,当定时器释放时,会将多余的定时器进行释放,否则将放入定时器池中 -func SetTickerPoolSize(size int) { - _ = standardTimer.ChangeTickerPoolSize(size) +func SetPoolSize(size int) { + _ = standardPool.ChangePoolSize(size) } +// GetTicker 获取标准池中的一个定时器 func GetTicker(size int, options ...Option) *Ticker { - return standardTimer.NewTicker(size, options...) -} - -func NewTimer(tickerPoolSize int) *Timer { - if tickerPoolSize <= 0 { - panic(fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize)) - } - return &Timer{ - tickerPoolSize: tickerPoolSize, - } -} - -type Timer struct { - tickers []*Ticker - lock sync.Mutex - tickerPoolSize int -} - -// ChangeTickerPoolSize 改变定时器池大小 -// - 当传入的大小小于或等于 0 时,将会返回错误,并且不会发生任何改变 -func (slf *Timer) ChangeTickerPoolSize(size int) error { - if size <= 0 { - return fmt.Errorf("timer tickerPoolSize must greater than 0, got: %d", tickerPoolSize) - } - slf.lock.Lock() - defer slf.lock.Unlock() - slf.tickerPoolSize = size - return nil -} - -// NewTicker 获取一个新的定时器 -func (slf *Timer) NewTicker(size int, options ...Option) *Ticker { - slf.lock.Lock() - defer slf.lock.Unlock() - - var ticker *Ticker - if len(slf.tickers) > 0 { - ticker = slf.tickers[0] - slf.tickers = slf.tickers[1:] - return ticker - } - - ticker = &Ticker{ - timer: slf, - wheel: timingwheel.NewTimingWheel(timingWheelTick, int64(size)), - timers: make(map[string]*Scheduler), - } - for _, option := range options { - option(ticker) - } - ticker.wheel.Start() - return ticker + return standardPool.GetTicker(size, options...) } From ae98963ecc168f099d03e7c379736c2d567b6ace Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Thu, 21 Dec 2023 14:43:55 +0800 Subject: [PATCH 19/20] =?UTF-8?q?feat:=20timer.Pool=20=E6=96=B0=E5=A2=9E?= =?UTF-8?q?=20Release=20=E5=87=BD=E6=95=B0=EF=BC=8C=E5=8F=AF=E4=B8=BB?= =?UTF-8?q?=E5=8A=A8=E9=87=8A=E6=94=BE=E6=B1=A0=E4=B8=AD=E7=9A=84=E6=89=80?= =?UTF-8?q?=E6=9C=89=E5=AE=9A=E6=97=B6=E5=99=A8=E5=8F=8A=E6=B1=A0=E5=AD=90?= =?UTF-8?q?=E6=9C=AC=E8=BA=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/timer/pool.go | 15 +++++++++++++++ utils/timer/ticker.go | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/utils/timer/pool.go b/utils/timer/pool.go index 390d3fa..dc987ac 100644 --- a/utils/timer/pool.go +++ b/utils/timer/pool.go @@ -22,6 +22,7 @@ type Pool struct { tickers []*Ticker lock sync.Mutex tickerPoolSize int + closed bool } // ChangePoolSize 改变定时器池大小 @@ -59,3 +60,17 @@ func (slf *Pool) GetTicker(size int, options ...Option) *Ticker { ticker.wheel.Start() return ticker } + +// Release 释放定时器池的资源,释放后由其产生的 Ticker 在 Ticker.Release 后将不再回到池中,而是直接释放 +// - 虽然定时器池已被释放,但是依旧可以产出 Ticker +func (slf *Pool) Release() { + slf.lock.Lock() + defer slf.lock.Unlock() + slf.closed = true + for _, ticker := range slf.tickers { + ticker.wheel.Stop() + } + slf.tickers = nil + slf.tickerPoolSize = 0 + return +} diff --git a/utils/timer/ticker.go b/utils/timer/ticker.go index 4e05840..337419c 100644 --- a/utils/timer/ticker.go +++ b/utils/timer/ticker.go @@ -39,7 +39,7 @@ func (slf *Ticker) Release() { } slf.lock.Unlock() - if len(slf.timer.tickers) < tickerPoolSize { + if len(slf.timer.tickers) < tickerPoolSize && !slf.timer.closed { slf.timer.tickers = append(slf.timer.tickers, slf) } else { slf.wheel.Stop() From 4f3b4eb1d5a0051e7d57217cb07c0c973359d82c Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Thu, 21 Dec 2023 14:43:59 +0800 Subject: [PATCH 20/20] =?UTF-8?q?other:=20=E4=BF=AE=E6=94=B9=20server.With?= =?UTF-8?q?Ticker=20=E5=B0=86=E4=B8=8D=E5=86=8D=E4=BD=BF=E7=94=A8=E6=A0=87?= =?UTF-8?q?=E5=87=86=E6=B1=A0=E7=9A=84=E5=AE=9A=E6=97=B6=E5=99=A8=EF=BC=8C?= =?UTF-8?q?=E8=80=8C=E6=98=AF=E8=87=AA=E8=A1=8C=E7=BB=B4=E6=8A=A4=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E5=99=A8=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- examples/internal/ticker-server/main.go | 2 +- server/conn.go | 4 ++-- server/options.go | 14 ++++++++++---- server/server.go | 3 +++ server/server_test.go | 2 +- 6 files changed, 18 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index b15afec..dd94c44 100644 --- a/README.md +++ b/README.md @@ -158,7 +158,7 @@ package main import "github.com/kercylan98/minotaur/server" func main() { - srv := server.New(server.NetworkWebsocket, server.WithTicker(50, 10, false)) + srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 50, 10, false)) if err := srv.Run(":9999"); err != nil { panic(err) } diff --git a/examples/internal/ticker-server/main.go b/examples/internal/ticker-server/main.go index 680274a..c7a0df5 100644 --- a/examples/internal/ticker-server/main.go +++ b/examples/internal/ticker-server/main.go @@ -3,7 +3,7 @@ package main import "github.com/kercylan98/minotaur/server" func main() { - srv := server.New(server.NetworkWebsocket, server.WithTicker(50, 10, false)) + srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 50, 10, false)) if err := srv.Run(":9999"); err != nil { panic(err) } diff --git a/server/conn.go b/server/conn.go index c6fa0e3..cdec322 100644 --- a/server/conn.go +++ b/server/conn.go @@ -279,9 +279,9 @@ func (slf *Conn) Write(packet []byte, callback ...func(err error)) { func (slf *Conn) init() { if slf.server.ticker != nil && slf.server.connTickerSize > 0 { if slf.server.tickerAutonomy { - slf.ticker = timer.GetTicker(slf.server.connTickerSize) + slf.ticker = slf.server.tickerPool.GetTicker(slf.server.connTickerSize) } else { - slf.ticker = timer.GetTicker(slf.server.connTickerSize, timer.WithCaller(func(name string, caller func()) { + slf.ticker = slf.server.tickerPool.GetTicker(slf.server.connTickerSize, timer.WithCaller(func(name string, caller func()) { slf.server.PushShuntTickerMessage(slf, name, caller) })) } diff --git a/server/options.go b/server/options.go index 8374a61..61f5f0c 100644 --- a/server/options.go +++ b/server/options.go @@ -32,6 +32,7 @@ type runtime struct { supportMessageTypes map[int]bool // websocket模式下支持的消息类型 certFile, keyFile string // TLS文件 messagePoolSize int // 消息池大小 + tickerPool *timer.Pool // 定时器池 ticker *timer.Ticker // 定时器 tickerAutonomy bool // 定时器是否独立运行 connTickerSize int // 连接定时器大小 @@ -130,17 +131,22 @@ func WithWebsocketReadDeadline(t time.Duration) Option { } // WithTicker 通过定时器创建服务器,为服务器添加定时器功能 +// - poolSize:指定服务器定时器池大小,当池子内的定时器数量超出该值后,多余的定时器在释放时将被回收,该值小于等于 0 时将使用 timer.DefaultTickerPoolSize // - size:服务器定时器时间轮大小 -// - connSize:服务器连接定时器时间轮大小 +// - connSize:服务器连接定时器时间轮大小,当该值小于等于 0 的时候,在新连接建立时将不再为其创建定时器 // - autonomy:定时器是否独立运行(独立运行的情况下不会作为服务器消息运行,会导致并发问题) -func WithTicker(size, connSize int, autonomy bool) Option { +func WithTicker(poolSize, size, connSize int, autonomy bool) Option { return func(srv *Server) { + if poolSize <= 0 { + poolSize = timer.DefaultTickerPoolSize + } + srv.tickerPool = timer.NewPool(poolSize) srv.connTickerSize = connSize srv.tickerAutonomy = autonomy if !autonomy { - srv.ticker = timer.GetTicker(size) + srv.ticker = srv.tickerPool.GetTicker(size) } else { - srv.ticker = timer.GetTicker(size, timer.WithCaller(func(name string, caller func()) { + srv.ticker = srv.tickerPool.GetTicker(size, timer.WithCaller(func(name string, caller func()) { srv.PushTickerMessage(name, caller) })) } diff --git a/server/server.go b/server/server.go index d057225..2a700ec 100644 --- a/server/server.go +++ b/server/server.go @@ -493,6 +493,9 @@ func (slf *Server) shutdown(err error) { log.Error("Server", log.Err(shutdownErr)) } } + if slf.tickerPool != nil { + slf.tickerPool.Release() + } if slf.ticker != nil { slf.ticker.Release() } diff --git a/server/server_test.go b/server/server_test.go index d06a3cc..eb998a6 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -11,7 +11,7 @@ import ( func TestNew(t *testing.T) { //limiter := rate.NewLimiter(rate.Every(time.Second), 100) - srv := server.New(server.NetworkWebsocket, server.WithTicker(200, 10, false), server.WithMessageBufferSize(1024*1024), server.WithPProf()) + srv := server.New(server.NetworkWebsocket, server.WithTicker(-1, 200, 10, false), server.WithMessageBufferSize(1024*1024), server.WithPProf()) //srv.RegMessageExecBeforeEvent(func(srv *server.Server, message *server.Message) bool { // t, c := srv.TimeoutContext(time.Second * 5) // defer c()