锁步(帧)同步优化
This commit is contained in:
parent
e6d2eabb65
commit
7be8fe29fc
|
@ -1,5 +1,11 @@
|
||||||
package lockstep
|
package lockstep
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
const (
|
const (
|
||||||
tickerFrameName = "LOCKSTEP_FRAME"
|
tickerFrameName = "LOCKSTEP_FRAME"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrFrameFactorCanNotIsNull = errors.New("frameFactory can not is nil")
|
||||||
|
)
|
||||||
|
|
|
@ -5,6 +5,8 @@ type Frame[Command any] interface {
|
||||||
GetIndex() uint32
|
GetIndex() uint32
|
||||||
// GetCommands 获取这一帧的数据
|
// GetCommands 获取这一帧的数据
|
||||||
GetCommands() []Command
|
GetCommands() []Command
|
||||||
|
// AddCommand 添加命令到这一帧
|
||||||
|
AddCommand(command Command)
|
||||||
// Marshal 序列化帧数据
|
// Marshal 序列化帧数据
|
||||||
Marshal() ([]byte, error)
|
Marshal() ([]byte, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,56 +1,117 @@
|
||||||
package lockstep
|
package lockstep
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"minotaur/utils/synchronization"
|
|
||||||
"minotaur/utils/timer"
|
"minotaur/utils/timer"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func New[WriterID comparable, FrameCommand any]() *Lockstep[WriterID, FrameCommand] {
|
func New[WriterID comparable, FrameCommand any](frameFactory func(frameIndex uint32) Frame[FrameCommand]) *Lockstep[WriterID, FrameCommand] {
|
||||||
lockstep := &Lockstep[WriterID, FrameCommand]{
|
lockstep := &Lockstep[WriterID, FrameCommand]{
|
||||||
|
frameFactory: frameFactory,
|
||||||
ticker: timer.GetTicker(30),
|
ticker: timer.GetTicker(30),
|
||||||
writers: synchronization.NewMap[WriterID, Writer[WriterID, FrameCommand]](),
|
writers: map[WriterID]Writer[WriterID, FrameCommand]{},
|
||||||
writerCurrentFrame: synchronization.NewMap[WriterID, uint32](),
|
writerCurrentFrame: map[WriterID]uint32{},
|
||||||
frames: map[uint32]Frame[FrameCommand]{},
|
frames: map[uint32]Frame[FrameCommand]{},
|
||||||
}
|
}
|
||||||
return lockstep
|
return lockstep
|
||||||
}
|
}
|
||||||
|
|
||||||
type Lockstep[WriterID comparable, FrameCommand any] struct {
|
type Lockstep[WriterID comparable, FrameCommand any] struct {
|
||||||
|
frameFactory func(frameIndex uint32) Frame[FrameCommand]
|
||||||
|
|
||||||
FrameLimit uint32 // 帧数上限
|
FrameLimit uint32 // 帧数上限
|
||||||
FrameRate uint32 // 帧率(每秒的帧数)
|
FrameRate uint32 // 帧率(每秒的帧数)
|
||||||
FrameBroadcastInterval uint32 // 帧数广播间隔帧数
|
FrameBroadcastInterval uint32 // 帧数广播间隔帧数
|
||||||
FrameOnceLimit uint32 // 每次消息最大容纳帧数
|
FrameOnceLimit uint32 // 每次消息最大容纳帧数
|
||||||
|
|
||||||
ticker *timer.Ticker // 定时器
|
ticker *timer.Ticker // 定时器
|
||||||
writers *synchronization.Map[WriterID, Writer[WriterID, FrameCommand]] // 被广播的对象
|
writers map[WriterID]Writer[WriterID, FrameCommand] // 被广播的对象
|
||||||
writerCurrentFrame *synchronization.Map[WriterID, uint32] // 被广播的对象当前帧
|
writerCurrentFrame map[WriterID]uint32 // 被广播的对象当前帧
|
||||||
currentFrame uint32 // 当前帧
|
currentFrame uint32 // 当前帧
|
||||||
currentClientFrame uint32 // 当前客户端帧数
|
writerMaxCurrentFrame uint32 // 最大写入器当前帧
|
||||||
frames map[uint32]Frame[FrameCommand] // 所有帧
|
frames map[uint32]Frame[FrameCommand] // 所有帧
|
||||||
|
framesRWMutes sync.RWMutex // 所有帧读写锁
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetWriter 设置需要被广播的 Writer
|
// SetWriter 设置需要被广播的 Writer
|
||||||
func (slf *Lockstep[WriterID, FrameCommand]) SetWriter(writer ...Writer[WriterID, FrameCommand]) {
|
func (slf *Lockstep[WriterID, FrameCommand]) SetWriter(writer ...Writer[WriterID, FrameCommand]) {
|
||||||
for _, w := range writer {
|
for _, w := range writer {
|
||||||
slf.writers.Set(w.GetID(), w)
|
slf.writers[w.GetID()] = w
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *Lockstep[WriterID, FrameCommand]) Run() {
|
func (slf *Lockstep[WriterID, FrameCommand]) Run() error {
|
||||||
|
if slf.frameFactory == nil {
|
||||||
|
return ErrFrameFactorCanNotIsNull
|
||||||
|
}
|
||||||
slf.Release()
|
slf.Release()
|
||||||
slf.ticker.Loop(tickerFrameName, timer.Instantly, time.Second/time.Duration(slf.FrameRate), timer.Forever, slf.tick)
|
slf.ticker.Loop(tickerFrameName, timer.Instantly, time.Second/time.Duration(slf.FrameRate), timer.Forever, slf.tick)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (slf *Lockstep[WriterID, FrameCommand]) Record(command FrameCommand) {
|
||||||
|
slf.framesRWMutes.RLock()
|
||||||
|
frame, exist := slf.frames[slf.currentFrame]
|
||||||
|
slf.framesRWMutes.RUnlock()
|
||||||
|
if !exist {
|
||||||
|
frame = slf.frameFactory(slf.currentFrame)
|
||||||
|
slf.framesRWMutes.Lock()
|
||||||
|
slf.frames[slf.currentFrame] = frame
|
||||||
|
slf.framesRWMutes.Unlock()
|
||||||
|
}
|
||||||
|
frame.AddCommand(command)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *Lockstep[WriterID, FrameCommand]) Release() {
|
func (slf *Lockstep[WriterID, FrameCommand]) Release() {
|
||||||
slf.ticker.StopTimer(tickerFrameName)
|
slf.ticker.StopTimer(tickerFrameName)
|
||||||
slf.writers.Clear()
|
for k := range slf.writers {
|
||||||
slf.writerCurrentFrame.Clear()
|
delete(slf.writers, k)
|
||||||
|
}
|
||||||
|
for k := range slf.writerCurrentFrame {
|
||||||
|
delete(slf.writers, k)
|
||||||
|
}
|
||||||
slf.currentFrame = 0
|
slf.currentFrame = 0
|
||||||
slf.currentClientFrame = 0
|
slf.framesRWMutes.Lock()
|
||||||
for k := range slf.frames {
|
for k := range slf.frames {
|
||||||
delete(slf.frames, k)
|
delete(slf.frames, k)
|
||||||
}
|
}
|
||||||
|
slf.framesRWMutes.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReWrite 重写
|
||||||
|
func (slf *Lockstep[WriterID, FrameCommand]) ReWrite(writer Writer[WriterID, FrameCommand]) {
|
||||||
|
if !writer.Healthy() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var writerCurrentFrame uint32
|
||||||
|
var frameCounter uint32
|
||||||
|
var frames = make(map[uint32]Frame[FrameCommand])
|
||||||
|
for ; writerCurrentFrame < slf.currentFrame; writerCurrentFrame++ {
|
||||||
|
slf.framesRWMutes.RLock()
|
||||||
|
var frame = slf.frames[writerCurrentFrame]
|
||||||
|
slf.framesRWMutes.RUnlock()
|
||||||
|
if frame == nil && writerCurrentFrame != (slf.currentFrame-1) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
frames[frame.GetIndex()] = frame
|
||||||
|
frameCounter++
|
||||||
|
if writerCurrentFrame == slf.currentFrame-1 || (slf.FrameOnceLimit > 0 && frameCounter >= slf.FrameOnceLimit) {
|
||||||
|
data := writer.Marshal(frames)
|
||||||
|
// TODO: writer.Write error not handle
|
||||||
|
_ = writer.Write(data)
|
||||||
|
frameCounter = 0
|
||||||
|
for k := range frames {
|
||||||
|
delete(frames, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
slf.writerCurrentFrame[writer.GetID()] = writerCurrentFrame
|
||||||
|
if writerCurrentFrame > slf.writerMaxCurrentFrame {
|
||||||
|
slf.writerMaxCurrentFrame = writerCurrentFrame
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *Lockstep[WriterID, FrameCommand]) tick() {
|
func (slf *Lockstep[WriterID, FrameCommand]) tick() {
|
||||||
|
@ -61,37 +122,42 @@ func (slf *Lockstep[WriterID, FrameCommand]) tick() {
|
||||||
|
|
||||||
slf.currentFrame++
|
slf.currentFrame++
|
||||||
|
|
||||||
if slf.currentFrame-slf.currentClientFrame < slf.FrameBroadcastInterval {
|
if slf.currentFrame-slf.writerMaxCurrentFrame < slf.FrameBroadcastInterval {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
slf.writers.RangeSkip(func(id WriterID, writer Writer[WriterID, FrameCommand]) bool {
|
for id, writer := range slf.writers {
|
||||||
|
|
||||||
if !writer.Healthy() {
|
if !writer.Healthy() {
|
||||||
return false
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var writerCurrentFrame uint32
|
||||||
var frameCounter uint32
|
var frameCounter uint32
|
||||||
var frames = make(map[uint32]Frame[FrameCommand])
|
var frames = make(map[uint32]Frame[FrameCommand])
|
||||||
for i := slf.writerCurrentFrame.Get(id); i < slf.currentFrame; i++ {
|
for writerCurrentFrame = slf.writerCurrentFrame[id]; writerCurrentFrame < slf.currentFrame; writerCurrentFrame++ {
|
||||||
var frame = slf.frames[i]
|
slf.framesRWMutes.RLock()
|
||||||
if frame == nil && i != (slf.currentFrame-1) {
|
var frame = slf.frames[writerCurrentFrame]
|
||||||
|
slf.framesRWMutes.RUnlock()
|
||||||
|
if frame == nil && writerCurrentFrame != (slf.currentFrame-1) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
frames[frame.GetIndex()] = frame
|
frames[frame.GetIndex()] = frame
|
||||||
frameCounter++
|
frameCounter++
|
||||||
if i == slf.currentFrame-1 || frameCounter >= slf.FrameOnceLimit {
|
if writerCurrentFrame == slf.currentFrame-1 || (slf.FrameOnceLimit > 0 && frameCounter >= slf.FrameOnceLimit) {
|
||||||
data := writer.Marshal(frames)
|
data := writer.Marshal(frames)
|
||||||
writer.Write(data)
|
// TODO: writer.Write error not handle
|
||||||
|
_ = writer.Write(data)
|
||||||
frameCounter = 0
|
frameCounter = 0
|
||||||
for k := range frames {
|
for k := range frames {
|
||||||
delete(frames, k)
|
delete(frames, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
slf.currentClientFrame = slf.currentFrame
|
slf.writerCurrentFrame[id] = writerCurrentFrame
|
||||||
|
if writerCurrentFrame > slf.writerMaxCurrentFrame {
|
||||||
return true
|
slf.writerMaxCurrentFrame = writerCurrentFrame
|
||||||
})
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,13 @@
|
||||||
package lockstep
|
package lockstep
|
||||||
|
|
||||||
|
// Writer 游戏帧写入器,通常实现写入器的对象应该为包含网络连接的玩家
|
||||||
type Writer[ID comparable, FrameCommand any] interface {
|
type Writer[ID comparable, FrameCommand any] interface {
|
||||||
|
// GetID 游戏帧写入器ID
|
||||||
GetID() ID
|
GetID() ID
|
||||||
|
// Healthy 检查写入器状态是否健康,例如离线、网络环境异常等
|
||||||
Healthy() bool
|
Healthy() bool
|
||||||
|
// Marshal 将多帧数据转换为流格式,以对游戏帧写入器进行写入
|
||||||
Marshal(frames map[uint32]Frame[FrameCommand]) []byte
|
Marshal(frames map[uint32]Frame[FrameCommand]) []byte
|
||||||
|
// Write 向游戏帧写入器中写入数据
|
||||||
Write(data []byte) error
|
Write(data []byte) error
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue