diff --git a/component/components/lockstep.go b/component/components/lockstep.go index 6d238e6..facbd96 100644 --- a/component/components/lockstep.go +++ b/component/components/lockstep.go @@ -6,9 +6,11 @@ import ( "github.com/kercylan98/minotaur/utils/synchronization" "github.com/kercylan98/minotaur/utils/timer" "sync" + "sync/atomic" "time" ) +// NewLockstep 创建一个锁步(帧)同步默认实现的组件(Lockstep)进行返回 func NewLockstep[ClientID comparable, Command any](options ...LockstepOption[ClientID, Command]) *Lockstep[ClientID, Command] { lockstep := &Lockstep[ClientID, Command]{ clients: synchronization.NewMap[ClientID, component.LockstepClient[ClientID]](), @@ -31,6 +33,7 @@ func NewLockstep[ClientID comparable, Command any](options ...LockstepOption[Cli return lockstep } +// Lockstep 锁步(帧)同步默认实现 type Lockstep[ClientID comparable, Command any] struct { clients *synchronization.Map[ClientID, component.LockstepClient[ClientID]] // 接受广播的客户端 frames *synchronization.Map[int, []Command] // 所有帧指令 @@ -38,15 +41,21 @@ type Lockstep[ClientID comparable, Command any] struct { frameMutex sync.Mutex // 帧锁 currentFrame int // 当前帧 clientCurrentFrame *synchronization.Map[ClientID, int] // 客户端当前帧数 + running atomic.Bool frameRate int // 帧率(每秒N帧) + frameLimit int // 帧上限 serialization func(frame int, commands []Command) []byte // 序列化函数 + + lockstepStoppedEventHandles []component.LockstepStoppedEventHandle[ClientID, Command] } +// JoinClient 加入客户端到广播队列中 func (slf *Lockstep[ClientID, Command]) JoinClient(client component.LockstepClient[ClientID]) { slf.clients.Set(client.GetID(), client) } +// JoinClientWithFrame 加入客户端到广播队列中,并从特定帧开始追帧 func (slf *Lockstep[ClientID, Command]) JoinClientWithFrame(client component.LockstepClient[ClientID], frameIndex int) { slf.clients.Set(client.GetID(), client) if frameIndex > slf.currentFrame { @@ -55,16 +64,25 @@ func (slf *Lockstep[ClientID, Command]) JoinClientWithFrame(client component.Loc slf.clientCurrentFrame.Set(client.GetID(), frameIndex) } +// LeaveClient 将客户端从广播队列中移除 func (slf *Lockstep[ClientID, Command]) LeaveClient(clientId ClientID) { slf.clients.Delete(clientId) slf.clientCurrentFrame.Delete(clientId) } +// StartBroadcast 开始广播 func (slf *Lockstep[ClientID, Command]) StartBroadcast() { + if slf.running.Swap(true) { + return + } slf.ticker.Loop("lockstep", timer.Instantly, time.Second/time.Duration(slf.frameRate), timer.Forever, func() { slf.frameMutex.Lock() currentFrame := slf.currentFrame + if slf.frameLimit > 0 && currentFrame >= slf.frameLimit { + slf.Stop() + return + } slf.currentFrame++ slf.frameMutex.Unlock() @@ -80,17 +98,60 @@ func (slf *Lockstep[ClientID, Command]) StartBroadcast() { }) } +// Stop 停止广播 func (slf *Lockstep[ClientID, Command]) Stop() { + if !slf.running.Swap(false) { + return + } slf.ticker.StopTimer("lockstep") slf.frameMutex.Lock() + slf.OnLockstepStoppedEvent() slf.currentFrame = 0 slf.clientCurrentFrame.Clear() slf.frames.Clear() slf.frameMutex.Unlock() } +// AddCommand 添加命令到当前帧 func (slf *Lockstep[ClientID, Command]) AddCommand(command Command) { slf.frames.AtomGetSet(slf.currentFrame, func(value []Command, exist bool) (newValue []Command, isSet bool) { return append(value, command), true }) } + +// GetCurrentFrame 获取当前帧 +func (slf *Lockstep[ClientID, Command]) GetCurrentFrame() int { + return slf.currentFrame +} + +// GetClientCurrentFrame 获取客户端当前帧 +func (slf *Lockstep[ClientID, Command]) GetClientCurrentFrame(clientId ClientID) int { + return slf.clientCurrentFrame.Get(clientId) +} + +// GetFrameLimit 获取帧上限 +// - 未设置时将返回0 +func (slf *Lockstep[ClientID, Command]) GetFrameLimit() int { + return slf.frameLimit +} + +// GetFrames 获取所有帧数据 +func (slf *Lockstep[ClientID, Command]) GetFrames() [][]Command { + var frameMap = slf.frames.Map() + var frames = make([][]Command, len(frameMap)) + for index, commands := range frameMap { + frames[index] = commands + } + return frames +} + +// RegLockstepStoppedEvent 当广播停止时将触发被注册的事件处理函数 +func (slf *Lockstep[ClientID, Command]) RegLockstepStoppedEvent(handle component.LockstepStoppedEventHandle[ClientID, Command]) { + slf.lockstepStoppedEventHandles = append(slf.lockstepStoppedEventHandles, handle) +} + +func (slf *Lockstep[ClientID, Command]) OnLockstepStoppedEvent() { + for _, handle := range slf.lockstepStoppedEventHandles { + handle(slf) + } +} diff --git a/component/components/lockstep_options.go b/component/components/lockstep_options.go index d9214fa..129adc8 100644 --- a/component/components/lockstep_options.go +++ b/component/components/lockstep_options.go @@ -2,6 +2,17 @@ package components type LockstepOption[ClientID comparable, Command any] func(lockstep *Lockstep[ClientID, Command]) +// WithLockstepFrameLimit 通过特定逻辑帧上限创建锁步(帧)同步组件 +// - 当达到上限时将停止广播 +func WithLockstepFrameLimit[ClientID comparable, Command any](frameLimit int) LockstepOption[ClientID, Command] { + return func(lockstep *Lockstep[ClientID, Command]) { + if frameLimit > 0 { + frameLimit = 0 + } + lockstep.frameLimit = frameLimit + } +} + // WithLockstepFrameRate 通过特定逻辑帧率创建锁步(帧)同步组件 // - 默认情况下为 15/s func WithLockstepFrameRate[ClientID comparable, Command any](frameRate int) LockstepOption[ClientID, Command] { diff --git a/component/lockstep.go b/component/lockstep.go index 47d2801..d5a55c7 100644 --- a/component/lockstep.go +++ b/component/lockstep.go @@ -13,4 +13,20 @@ type Lockstep[ClientID comparable, Command any] interface { Stop() // AddCommand 增加指令 AddCommand(command Command) + // GetCurrentFrame 获取当前帧 + GetCurrentFrame() int + // GetClientCurrentFrame 获取客户端当前帧 + GetClientCurrentFrame(clientId ClientID) int + // GetFrameLimit 获取帧上限 + GetFrameLimit() int + // GetFrames 获取所有帧数据 + GetFrames() [][]Command + + // RegLockstepStoppedEvent 当停止广播时将立即执行被注册的事件处理函数 + RegLockstepStoppedEvent(handle LockstepStoppedEventHandle[ClientID, Command]) + OnLockstepStoppedEvent() } + +type ( + LockstepStoppedEventHandle[ClientID comparable, Command any] func(lockstep Lockstep[ClientID, Command]) +)