diff --git a/component/lockstep/constants.go b/component/lockstep/constants.go new file mode 100644 index 0000000..b1ed842 --- /dev/null +++ b/component/lockstep/constants.go @@ -0,0 +1,5 @@ +package lockstep + +const ( + tickerFrameName = "LOCKSTEP_FRAME" +) diff --git a/component/lockstep/frame.go b/component/lockstep/frame.go new file mode 100644 index 0000000..95046e0 --- /dev/null +++ b/component/lockstep/frame.go @@ -0,0 +1,10 @@ +package lockstep + +type Frame[Command any] interface { + // GetIndex 获取这一帧的索引 + GetIndex() uint32 + // GetCommands 获取这一帧的数据 + GetCommands() []Command + // Marshal 序列化帧数据 + Marshal() ([]byte, error) +} diff --git a/component/lockstep/lockstep.go b/component/lockstep/lockstep.go new file mode 100644 index 0000000..dc655ce --- /dev/null +++ b/component/lockstep/lockstep.go @@ -0,0 +1,97 @@ +package lockstep + +import ( + "minotaur/utils/synchronization" + "minotaur/utils/timer" + "time" +) + +func New[WriterID comparable, FrameCommand any]() *Lockstep[WriterID, FrameCommand] { + lockstep := &Lockstep[WriterID, FrameCommand]{ + ticker: timer.GetTicker(30), + writers: synchronization.NewMap[WriterID, Writer[WriterID, FrameCommand]](), + writerCurrentFrame: synchronization.NewMap[WriterID, uint32](), + frames: map[uint32]Frame[FrameCommand]{}, + } + return lockstep +} + +type Lockstep[WriterID comparable, FrameCommand any] struct { + FrameLimit uint32 // 帧数上限 + FrameRate uint32 // 帧率(每秒的帧数) + FrameBroadcastInterval uint32 // 帧数广播间隔帧数 + FrameOnceLimit uint32 // 每次消息最大容纳帧数 + + ticker *timer.Ticker // 定时器 + writers *synchronization.Map[WriterID, Writer[WriterID, FrameCommand]] // 被广播的对象 + writerCurrentFrame *synchronization.Map[WriterID, uint32] // 被广播的对象当前帧 + currentFrame uint32 // 当前帧 + currentClientFrame uint32 // 当前客户端帧数 + frames map[uint32]Frame[FrameCommand] // 所有帧 +} + +// SetWriter 设置需要被广播的 Writer +func (slf *Lockstep[WriterID, FrameCommand]) SetWriter(writer ...Writer[WriterID, FrameCommand]) { + for _, w := range writer { + slf.writers.Set(w.GetID(), w) + } +} + +func (slf *Lockstep[WriterID, FrameCommand]) Run() { + slf.Release() + slf.ticker.Loop(tickerFrameName, timer.Instantly, time.Second/time.Duration(slf.FrameRate), timer.Forever, slf.tick) +} + +func (slf *Lockstep[WriterID, FrameCommand]) Release() { + slf.ticker.StopTimer(tickerFrameName) + slf.writers.Clear() + slf.writerCurrentFrame.Clear() + slf.currentFrame = 0 + slf.currentClientFrame = 0 + for k := range slf.frames { + delete(slf.frames, k) + } +} + +func (slf *Lockstep[WriterID, FrameCommand]) tick() { + if slf.FrameLimit > 0 && slf.currentFrame >= slf.FrameLimit { + slf.ticker.StopTimer(tickerFrameName) + return + } + + slf.currentFrame++ + + if slf.currentFrame-slf.currentClientFrame < slf.FrameBroadcastInterval { + return + } + + slf.writers.RangeSkip(func(id WriterID, writer Writer[WriterID, FrameCommand]) bool { + + if !writer.Healthy() { + return false + } + + var frameCounter uint32 + var frames = make(map[uint32]Frame[FrameCommand]) + for i := slf.writerCurrentFrame.Get(id); i < slf.currentFrame; i++ { + var frame = slf.frames[i] + if frame == nil && i != (slf.currentFrame-1) { + continue + } + + frames[frame.GetIndex()] = frame + frameCounter++ + if i == slf.currentFrame-1 || frameCounter >= slf.FrameOnceLimit { + data := writer.Marshal(frames) + writer.Write(data) + frameCounter = 0 + for k := range frames { + delete(frames, k) + } + } + } + slf.currentClientFrame = slf.currentFrame + + return true + }) +} diff --git a/component/lockstep/writer.go b/component/lockstep/writer.go new file mode 100644 index 0000000..ba34537 --- /dev/null +++ b/component/lockstep/writer.go @@ -0,0 +1,8 @@ +package lockstep + +type Writer[ID comparable, FrameCommand any] interface { + GetID() ID + Healthy() bool + Marshal(frames map[uint32]Frame[FrameCommand]) []byte + Write(data []byte) error +} diff --git a/main.go b/main.go index 5d1eb8f..caca767 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,8 @@ package main -import "minotaur/server" +import ( + "minotaur/server" +) func main() { s := server.New(server.NetworkKcp) diff --git a/utils/synchronization/map.go b/utils/synchronization/map.go index e67d81b..2dc2762 100644 --- a/utils/synchronization/map.go +++ b/utils/synchronization/map.go @@ -8,6 +8,7 @@ func NewMap[Key comparable, value any]() *Map[Key, value] { } } +// Map 并发安全的字典数据结构 type Map[Key comparable, Value any] struct { lock sync.RWMutex data map[Key]Value @@ -78,6 +79,14 @@ func (slf *Map[Key, Value]) DeleteExist(key Key) bool { return true } +func (slf *Map[Key, Value]) Clear() { + slf.lock.Lock() + defer slf.lock.Unlock() + for k := range slf.data { + delete(slf.data, k) + } +} + func (slf *Map[Key, Value]) Range(handle func(key Key, value Value)) { slf.lock.RLock() defer slf.lock.RUnlock()