Files
vRp.CD2g_test/utils/unbounded/ring.go

95 lines
1.6 KiB
Go

package unbounded
import (
"github.com/kercylan98/minotaur/utils/buffer"
"github.com/pkg/errors"
"golang.org/x/net/context"
"sync"
)
// NewRing 创建一个并发安全的基于环形缓冲区实现的无界缓冲区
func NewRing[T any](ctx context.Context) *Ring[T] {
r := &Ring[T]{
ctx: ctx,
ring: buffer.NewRing[T](1024),
ch: make(chan T, 1024),
}
r.cond = sync.NewCond(&r.rw)
go r.run()
return r
}
// Ring 是并发安全的,基于环形缓冲区实现的无界缓冲区
type Ring[T any] struct {
ctx context.Context
ring *buffer.Ring[T]
rw sync.RWMutex
cond *sync.Cond
ch chan T
closed bool
}
// Put 将数据放入缓冲区
func (r *Ring[T]) Put(v ...T) error {
if len(v) == 0 {
return nil
}
r.rw.Lock()
if r.closed {
r.rw.Unlock()
return errors.New("unbounded ring is closed")
}
for _, t := range v {
r.ring.Write(t)
}
r.rw.Unlock()
r.cond.Signal()
return nil
}
// Get 获取可接收消息的读取通道
func (r *Ring[T]) Get() <-chan T {
return r.ch
}
// Close 关闭缓冲区
func (r *Ring[T]) Close() {
r.rw.RLock()
r.closed = true
r.rw.RUnlock()
r.cond.Signal()
}
// IsClosed 是否已关闭
func (r *Ring[T]) IsClosed() bool {
r.rw.RLock()
defer r.rw.RUnlock()
return r.closed
}
func (r *Ring[T]) run() {
for {
select {
case <-r.ctx.Done():
r.Close()
default:
r.rw.Lock()
if r.ring.IsEmpty() {
if r.closed { // 如果已关闭并且没有数据,则关闭通道
close(r.ch)
r.rw.Unlock()
return
}
// 等待数据
r.cond.Wait()
}
vs := r.ring.ReadAll()
r.rw.Unlock()
for _, v := range vs {
r.ch <- v
}
}
}
}