diff --git a/component/lockstep/constants.go b/component/lockstep/constants.go index b1ed842..9fc2e44 100644 --- a/component/lockstep/constants.go +++ b/component/lockstep/constants.go @@ -1,5 +1,11 @@ package lockstep +import "errors" + const ( tickerFrameName = "LOCKSTEP_FRAME" ) + +var ( + ErrFrameFactorCanNotIsNull = errors.New("frameFactory can not is nil") +) diff --git a/component/lockstep/frame.go b/component/lockstep/frame.go index 95046e0..da8f3d1 100644 --- a/component/lockstep/frame.go +++ b/component/lockstep/frame.go @@ -5,6 +5,8 @@ type Frame[Command any] interface { GetIndex() uint32 // GetCommands 获取这一帧的数据 GetCommands() []Command + // AddCommand 添加命令到这一帧 + AddCommand(command Command) // Marshal 序列化帧数据 Marshal() ([]byte, error) } diff --git a/component/lockstep/lockstep.go b/component/lockstep/lockstep.go index dc655ce..279e8dc 100644 --- a/component/lockstep/lockstep.go +++ b/component/lockstep/lockstep.go @@ -1,56 +1,117 @@ package lockstep import ( - "minotaur/utils/synchronization" "minotaur/utils/timer" + "sync" "time" ) -func New[WriterID comparable, FrameCommand any]() *Lockstep[WriterID, FrameCommand] { +func New[WriterID comparable, FrameCommand any](frameFactory func(frameIndex uint32) Frame[FrameCommand]) *Lockstep[WriterID, FrameCommand] { lockstep := &Lockstep[WriterID, FrameCommand]{ + frameFactory: frameFactory, ticker: timer.GetTicker(30), - writers: synchronization.NewMap[WriterID, Writer[WriterID, FrameCommand]](), - writerCurrentFrame: synchronization.NewMap[WriterID, uint32](), + writers: map[WriterID]Writer[WriterID, FrameCommand]{}, + writerCurrentFrame: map[WriterID]uint32{}, frames: map[uint32]Frame[FrameCommand]{}, } return lockstep } type Lockstep[WriterID comparable, FrameCommand any] struct { + frameFactory func(frameIndex uint32) Frame[FrameCommand] + FrameLimit uint32 // 帧数上限 FrameRate uint32 // 帧率(每秒的帧数) FrameBroadcastInterval uint32 // 帧数广播间隔帧数 FrameOnceLimit uint32 // 每次消息最大容纳帧数 - ticker *timer.Ticker // 定时器 - writers *synchronization.Map[WriterID, Writer[WriterID, FrameCommand]] // 被广播的对象 - writerCurrentFrame *synchronization.Map[WriterID, uint32] // 被广播的对象当前帧 - currentFrame uint32 // 当前帧 - currentClientFrame uint32 // 当前客户端帧数 - frames map[uint32]Frame[FrameCommand] // 所有帧 + ticker *timer.Ticker // 定时器 + writers map[WriterID]Writer[WriterID, FrameCommand] // 被广播的对象 + writerCurrentFrame map[WriterID]uint32 // 被广播的对象当前帧 + currentFrame uint32 // 当前帧 + writerMaxCurrentFrame uint32 // 最大写入器当前帧 + frames map[uint32]Frame[FrameCommand] // 所有帧 + framesRWMutes sync.RWMutex // 所有帧读写锁 } // SetWriter 设置需要被广播的 Writer func (slf *Lockstep[WriterID, FrameCommand]) SetWriter(writer ...Writer[WriterID, FrameCommand]) { for _, w := range writer { - slf.writers.Set(w.GetID(), w) + slf.writers[w.GetID()] = w } } -func (slf *Lockstep[WriterID, FrameCommand]) Run() { +func (slf *Lockstep[WriterID, FrameCommand]) Run() error { + if slf.frameFactory == nil { + return ErrFrameFactorCanNotIsNull + } slf.Release() slf.ticker.Loop(tickerFrameName, timer.Instantly, time.Second/time.Duration(slf.FrameRate), timer.Forever, slf.tick) + return nil +} + +func (slf *Lockstep[WriterID, FrameCommand]) Record(command FrameCommand) { + slf.framesRWMutes.RLock() + frame, exist := slf.frames[slf.currentFrame] + slf.framesRWMutes.RUnlock() + if !exist { + frame = slf.frameFactory(slf.currentFrame) + slf.framesRWMutes.Lock() + slf.frames[slf.currentFrame] = frame + slf.framesRWMutes.Unlock() + } + frame.AddCommand(command) } func (slf *Lockstep[WriterID, FrameCommand]) Release() { slf.ticker.StopTimer(tickerFrameName) - slf.writers.Clear() - slf.writerCurrentFrame.Clear() + for k := range slf.writers { + delete(slf.writers, k) + } + for k := range slf.writerCurrentFrame { + delete(slf.writers, k) + } slf.currentFrame = 0 - slf.currentClientFrame = 0 + slf.framesRWMutes.Lock() for k := range slf.frames { delete(slf.frames, k) } + slf.framesRWMutes.Unlock() +} + +// ReWrite 重写 +func (slf *Lockstep[WriterID, FrameCommand]) ReWrite(writer Writer[WriterID, FrameCommand]) { + if !writer.Healthy() { + return + } + + var writerCurrentFrame uint32 + var frameCounter uint32 + var frames = make(map[uint32]Frame[FrameCommand]) + for ; writerCurrentFrame < slf.currentFrame; writerCurrentFrame++ { + slf.framesRWMutes.RLock() + var frame = slf.frames[writerCurrentFrame] + slf.framesRWMutes.RUnlock() + if frame == nil && writerCurrentFrame != (slf.currentFrame-1) { + continue + } + + frames[frame.GetIndex()] = frame + frameCounter++ + if writerCurrentFrame == slf.currentFrame-1 || (slf.FrameOnceLimit > 0 && frameCounter >= slf.FrameOnceLimit) { + data := writer.Marshal(frames) + // TODO: writer.Write error not handle + _ = writer.Write(data) + frameCounter = 0 + for k := range frames { + delete(frames, k) + } + } + } + slf.writerCurrentFrame[writer.GetID()] = writerCurrentFrame + if writerCurrentFrame > slf.writerMaxCurrentFrame { + slf.writerMaxCurrentFrame = writerCurrentFrame + } } func (slf *Lockstep[WriterID, FrameCommand]) tick() { @@ -61,37 +122,42 @@ func (slf *Lockstep[WriterID, FrameCommand]) tick() { slf.currentFrame++ - if slf.currentFrame-slf.currentClientFrame < slf.FrameBroadcastInterval { + if slf.currentFrame-slf.writerMaxCurrentFrame < slf.FrameBroadcastInterval { return } - slf.writers.RangeSkip(func(id WriterID, writer Writer[WriterID, FrameCommand]) bool { + for id, writer := range slf.writers { if !writer.Healthy() { - return false + continue } + var writerCurrentFrame uint32 var frameCounter uint32 var frames = make(map[uint32]Frame[FrameCommand]) - for i := slf.writerCurrentFrame.Get(id); i < slf.currentFrame; i++ { - var frame = slf.frames[i] - if frame == nil && i != (slf.currentFrame-1) { + for writerCurrentFrame = slf.writerCurrentFrame[id]; writerCurrentFrame < slf.currentFrame; writerCurrentFrame++ { + slf.framesRWMutes.RLock() + var frame = slf.frames[writerCurrentFrame] + slf.framesRWMutes.RUnlock() + if frame == nil && writerCurrentFrame != (slf.currentFrame-1) { continue } frames[frame.GetIndex()] = frame frameCounter++ - if i == slf.currentFrame-1 || frameCounter >= slf.FrameOnceLimit { + if writerCurrentFrame == slf.currentFrame-1 || (slf.FrameOnceLimit > 0 && frameCounter >= slf.FrameOnceLimit) { data := writer.Marshal(frames) - writer.Write(data) + // TODO: writer.Write error not handle + _ = writer.Write(data) frameCounter = 0 for k := range frames { delete(frames, k) } } } - slf.currentClientFrame = slf.currentFrame - - return true - }) + slf.writerCurrentFrame[id] = writerCurrentFrame + if writerCurrentFrame > slf.writerMaxCurrentFrame { + slf.writerMaxCurrentFrame = writerCurrentFrame + } + } } diff --git a/component/lockstep/writer.go b/component/lockstep/writer.go index ba34537..e66082d 100644 --- a/component/lockstep/writer.go +++ b/component/lockstep/writer.go @@ -1,8 +1,13 @@ package lockstep +// Writer 游戏帧写入器,通常实现写入器的对象应该为包含网络连接的玩家 type Writer[ID comparable, FrameCommand any] interface { + // GetID 游戏帧写入器ID GetID() ID + // Healthy 检查写入器状态是否健康,例如离线、网络环境异常等 Healthy() bool + // Marshal 将多帧数据转换为流格式,以对游戏帧写入器进行写入 Marshal(frames map[uint32]Frame[FrameCommand]) []byte + // Write 向游戏帧写入器中写入数据 Write(data []byte) error }