vRp.CD2g_test/utils/buffer/ring_unbounded.go

101 lines
1.9 KiB
Go

package buffer
import (
"sync"
)
// NewRingUnbounded 创建一个并发安全的基于环形缓冲区实现的无界缓冲区
func NewRingUnbounded[T any](bufferSize int) *RingUnbounded[T] {
ru := &RingUnbounded[T]{
ring: NewRing[T](1024),
rc: make(chan T, bufferSize),
closedSignal: make(chan struct{}),
}
ru.cond = sync.NewCond(&ru.rrm)
ru.process()
return ru
}
// RingUnbounded 基于环形缓冲区实现的无界缓冲区
type RingUnbounded[T any] struct {
ring *Ring[T]
rrm sync.Mutex
cond *sync.Cond
rc chan T
closed bool
closedMutex sync.RWMutex
closedSignal chan struct{}
}
// Write 写入数据
func (b *RingUnbounded[T]) Write(v T) {
b.closedMutex.RLock()
defer b.closedMutex.RUnlock()
if b.closed {
return
}
b.rrm.Lock()
b.ring.Write(v)
b.cond.Signal()
b.rrm.Unlock()
}
// Read 读取数据
func (b *RingUnbounded[T]) Read() <-chan T {
return b.rc
}
// Closed 判断缓冲区是否已关闭
func (b *RingUnbounded[T]) Closed() bool {
b.closedMutex.RLock()
defer b.closedMutex.RUnlock()
return b.closed
}
// Close 关闭缓冲区,关闭后将不再接收新数据,但是已有数据仍然可以读取
func (b *RingUnbounded[T]) Close() <-chan struct{} {
b.closedMutex.Lock()
defer b.closedMutex.Unlock()
if b.closed {
return b.closedSignal
}
b.closed = true
b.rrm.Lock()
b.cond.Signal()
b.rrm.Unlock()
return b.closedSignal
}
func (b *RingUnbounded[T]) process() {
go func(b *RingUnbounded[T]) {
for {
b.closedMutex.RLock()
b.rrm.Lock()
vs := b.ring.ReadAll()
if len(vs) == 0 && !b.closed {
b.closedMutex.RUnlock()
b.cond.Wait()
} else {
b.closedMutex.RUnlock()
}
b.rrm.Unlock()
b.closedMutex.RLock()
if b.closed && len(vs) == 0 {
close(b.rc)
close(b.closedSignal)
b.closedMutex.RUnlock()
break
}
for _, v := range vs {
b.rc <- v
}
b.closedMutex.RUnlock()
}
}(b)
}