From 0e8e530ff326d53ef2afa6b2c802560069f3a0dc Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 16 May 2023 18:04:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E6=AE=B5=E9=94=81map?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/synchronization/map.go | 12 +- utils/synchronization/map_segment.go | 261 +++++++++++++++++++++++++++ utils/synchronization/pool.go | 4 +- 3 files changed, 273 insertions(+), 4 deletions(-) create mode 100644 utils/synchronization/map_segment.go diff --git a/utils/synchronization/map.go b/utils/synchronization/map.go index 66ed3a0..d8816d6 100644 --- a/utils/synchronization/map.go +++ b/utils/synchronization/map.go @@ -167,6 +167,10 @@ func (slf *Map[Key, Value]) RangeSkip(handle func(key Key, value Value) bool) { } func (slf *Map[Key, Value]) RangeBreakout(handle func(key Key, value Value) bool) { + slf.rangeBreakout(handle) +} + +func (slf *Map[Key, Value]) rangeBreakout(handle func(key Key, value Value) bool) bool { if !slf.atom { slf.lock.RLock() defer slf.lock.RUnlock() @@ -174,12 +178,17 @@ func (slf *Map[Key, Value]) RangeBreakout(handle func(key Key, value Value) bool for k, v := range slf.data { key, value := k, v if !handle(key, value) { - break + return true } } + return false } func (slf *Map[Key, Value]) RangeFree(handle func(key Key, value Value, skip func(), breakout func())) { + slf.rangeFree(handle) +} + +func (slf *Map[Key, Value]) rangeFree(handle func(key Key, value Value, skip func(), breakout func())) bool { var skipExec, breakoutExec bool var skip = func() { skipExec = true @@ -201,6 +210,7 @@ func (slf *Map[Key, Value]) RangeFree(handle func(key Key, value Value, skip fun break } } + return breakoutExec } func (slf *Map[Key, Value]) Keys() []Key { diff --git a/utils/synchronization/map_segment.go b/utils/synchronization/map_segment.go new file mode 100644 index 0000000..6836b15 --- /dev/null +++ b/utils/synchronization/map_segment.go @@ -0,0 +1,261 @@ +package synchronization + +import ( + "encoding/json" + "github.com/kercylan98/minotaur/utils/hash" + "sync" +) + +func NewMapSegment[Key comparable, value any](segmentCount int) *MapSegment[Key, value] { + ms := &MapSegment[Key, value]{ + segments: map[int]*Map[Key, value]{}, + cache: map[Key]int{}, + consistency: hash.NewConsistency(segmentCount), + } + for i := 0; i < segmentCount; i++ { + ms.consistency.AddNode(i) + ms.segments[i] = NewMap[Key, value]() + } + + return ms +} + +// MapSegment 基于分段锁实现的并发安全的字典数据结构map +type MapSegment[Key comparable, Value any] struct { + segments map[int]*Map[Key, Value] + cache map[Key]int + consistency *hash.Consistency + lock sync.RWMutex +} + +func (slf *MapSegment[Key, Value]) Set(key Key, value Value) { + slf.lock.RLock() + s, exist := slf.cache[key] + slf.lock.RUnlock() + if !exist { + s = slf.consistency.PickNode(key) + slf.lock.Lock() + slf.cache[key] = s + slf.lock.Unlock() + } + slf.segments[s].Set(key, value) +} + +func (slf *MapSegment[Key, Value]) Get(key Key) (value Value) { + slf.lock.RLock() + s, exist := slf.cache[key] + slf.lock.RUnlock() + if !exist { + return value + } + return slf.segments[s].Get(key) +} + +// AtomGetSet 原子方式获取一个值并在之后进行赋值 +func (slf *MapSegment[Key, Value]) AtomGetSet(key Key, handle func(value Value, exist bool) (newValue Value, isSet bool)) { + var value Value + slf.lock.RLock() + s, exist := slf.cache[key] + slf.lock.RUnlock() + if !exist { + if newValue, isSet := handle(value, exist); isSet { + slf.Set(key, newValue) + } + return + } + slf.segments[s].AtomGetSet(key, handle) +} + +func (slf *MapSegment[Key, Value]) Exist(key Key) bool { + slf.lock.RLock() + _, exist := slf.cache[key] + slf.lock.RUnlock() + return exist +} + +func (slf *MapSegment[Key, Value]) GetExist(key Key) (value Value, exist bool) { + slf.lock.RLock() + s, exist := slf.cache[key] + slf.lock.RUnlock() + if !exist { + return value, false + } + return slf.segments[s].GetExist(key) +} + +func (slf *MapSegment[Key, Value]) Length() int { + slf.lock.RLock() + defer slf.lock.RUnlock() + return len(slf.cache) +} + +func (slf *MapSegment[Key, Value]) Delete(key Key) { + slf.lock.Lock() + s, exist := slf.cache[key] + delete(slf.cache, key) + slf.lock.Unlock() + if exist { + slf.segments[s].Delete(key) + } +} + +func (slf *MapSegment[Key, Value]) DeleteGet(key Key) (value Value) { + slf.lock.Lock() + s, exist := slf.cache[key] + delete(slf.cache, key) + slf.lock.Unlock() + if exist { + return slf.segments[s].DeleteGet(key) + } + return +} + +func (slf *MapSegment[Key, Value]) DeleteGetExist(key Key) (value Value, exist bool) { + slf.lock.Lock() + s, exist := slf.cache[key] + delete(slf.cache, key) + slf.lock.Unlock() + if exist { + return slf.segments[s].DeleteGetExist(key) + } + return value, exist +} + +func (slf *MapSegment[Key, Value]) DeleteExist(key Key) bool { + slf.lock.Lock() + s, exist := slf.cache[key] + delete(slf.cache, key) + slf.lock.Unlock() + if exist { + return slf.segments[s].DeleteExist(key) + } + return false +} + +func (slf *MapSegment[Key, Value]) Clear() { + slf.lock.Lock() + for k := range slf.cache { + delete(slf.cache, k) + } + for _, m := range slf.segments { + m.Clear() + } + slf.lock.Unlock() +} + +func (slf *MapSegment[Key, Value]) ClearHandle(handle func(key Key, value Value)) { + slf.lock.Lock() + for k := range slf.cache { + delete(slf.cache, k) + } + for _, m := range slf.segments { + m.ClearHandle(handle) + } + slf.lock.Unlock() +} + +func (slf *MapSegment[Key, Value]) Range(handle func(key Key, value Value)) { + for _, m := range slf.segments { + m.Range(handle) + } +} + +func (slf *MapSegment[Key, Value]) RangeSkip(handle func(key Key, value Value) bool) { + for _, m := range slf.segments { + m.RangeSkip(handle) + } +} + +func (slf *MapSegment[Key, Value]) RangeBreakout(handle func(key Key, value Value) bool) { + for _, m := range slf.segments { + if m.rangeBreakout(handle) { + break + } + } +} + +func (slf *MapSegment[Key, Value]) RangeFree(handle func(key Key, value Value, skip func(), breakout func())) { + for _, m := range slf.segments { + if m.rangeFree(handle) { + break + } + } +} + +func (slf *MapSegment[Key, Value]) Keys() []Key { + var s = make([]Key, 0, len(slf.cache)) + slf.lock.RLock() + for k, _ := range slf.cache { + s = append(s, k) + } + defer slf.lock.RUnlock() + return s +} + +func (slf *MapSegment[Key, Value]) Slice() []Value { + slf.lock.RLock() + var s = make([]Value, 0, len(slf.cache)) + slf.lock.RUnlock() + for _, m := range slf.segments { + s = append(s, m.Slice()...) + } + return s +} + +func (slf *MapSegment[Key, Value]) Map() map[Key]Value { + slf.lock.RLock() + var s = map[Key]Value{} + slf.lock.RUnlock() + for _, m := range slf.segments { + for k, v := range m.Map() { + s[k] = v + } + } + return s +} + +func (slf *MapSegment[Key, Value]) Size() int { + slf.lock.RLock() + defer slf.lock.RUnlock() + return len(slf.cache) +} + +// GetOne 获取一个 +func (slf *MapSegment[Key, Value]) GetOne() (value Value) { + for k, s := range slf.cache { + return slf.segments[s].Get(k) + } + return value +} + +func (slf *MapSegment[Key, Value]) MarshalJSON() ([]byte, error) { + var ms struct { + Segments map[int]*Map[Key, Value] + Cache map[Key]int + SegmentCount int + } + ms.Segments = slf.segments + ms.Cache = slf.cache + ms.SegmentCount = len(slf.segments) + return json.Marshal(ms) +} + +func (slf *MapSegment[Key, Value]) UnmarshalJSON(bytes []byte) error { + var ms struct { + Segments map[int]*Map[Key, Value] + Cache map[Key]int + SegmentCount int + } + if err := json.Unmarshal(bytes, &ms); err != nil { + return err + } + slf.lock.Lock() + slf.consistency = hash.NewConsistency(ms.SegmentCount) + for i := 0; i < ms.SegmentCount; i++ { + slf.consistency.AddNode(i) + } + slf.segments = ms.Segments + slf.cache = ms.Cache + slf.lock.Unlock() + return nil +} diff --git a/utils/synchronization/pool.go b/utils/synchronization/pool.go index 4d4256b..e12bb60 100644 --- a/utils/synchronization/pool.go +++ b/utils/synchronization/pool.go @@ -1,8 +1,6 @@ package synchronization import ( - "github.com/kercylan98/minotaur/utils/log" - "go.uber.org/zap" "sync" ) @@ -39,7 +37,7 @@ func (slf *Pool[T]) Get() T { return data } slf.mutex.Unlock() - log.Warn("Pool", zap.String("Get", "the number of buffer members is insufficient, consider whether it is due to unreleased or inappropriate buffer size")) + //log.Warn("Pool", zap.String("Get", "the number of buffer members is insufficient, consider whether it is due to unreleased or inappropriate buffer size")) return slf.generator() }