From 915163d37d49d9cc10989a6da011b50f9f705349 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Fri, 12 May 2023 15:56:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B8=A7=E5=90=8C=E6=AD=A5=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E3=80=81=E7=9B=AE=E5=BD=95=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/components/lockstep.go | 82 ++++++++++++ component/components/lockstep_options.go | 25 ++++ component/lockstep.go | 12 ++ component/lockstep/constants.go | 11 -- component/lockstep/frame.go | 12 -- component/lockstep/lockstep.go | 163 ----------------------- component/lockstep/writer.go | 13 -- component/lockstep_client.go | 7 + utils/random/number.go | 10 +- 9 files changed, 135 insertions(+), 200 deletions(-) create mode 100644 component/components/lockstep.go create mode 100644 component/components/lockstep_options.go create mode 100644 component/lockstep.go delete mode 100644 component/lockstep/constants.go delete mode 100644 component/lockstep/frame.go delete mode 100644 component/lockstep/lockstep.go delete mode 100644 component/lockstep/writer.go create mode 100644 component/lockstep_client.go diff --git a/component/components/lockstep.go b/component/components/lockstep.go new file mode 100644 index 0000000..0c39242 --- /dev/null +++ b/component/components/lockstep.go @@ -0,0 +1,82 @@ +package components + +import ( + "encoding/json" + "github.com/kercylan98/minotaur/component" + "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/synchronization" + "github.com/kercylan98/minotaur/utils/timer" + "go.uber.org/zap" + "sync" + "time" +) + +func NewLockstep[ClientID comparable, Command any](options ...LockstepOption[ClientID, Command]) *Lockstep[ClientID, Command] { + lockstep := &Lockstep[ClientID, Command]{ + clients: synchronization.NewMap[ClientID, component.LockstepClient[ClientID]](), + frames: synchronization.NewMap[int, []Command](), + ticker: timer.GetTicker(10), + frameRate: 15, + serialization: func(frame int, commands []Command) []byte { + frameStruct := struct { + Frame int `json:"frame"` + Commands []Command `json:"commands"` + }{frame, commands} + data, _ := json.Marshal(frameStruct) + return data + }, + clientCurrentFrame: map[ClientID]int{}, + } + for _, option := range options { + option(lockstep) + } + return lockstep +} + +type Lockstep[ClientID comparable, Command any] struct { + clients *synchronization.Map[ClientID, component.LockstepClient[ClientID]] // 接受广播的客户端 + frames *synchronization.Map[int, []Command] // 所有帧指令 + ticker *timer.Ticker // 定时器 + frameMutex sync.Mutex // 帧锁 + currentFrame int // 当前帧 + clientCurrentFrame map[ClientID]int // 客户端当前帧数 + + frameRate int // 帧率(每秒N帧) + serialization func(frame int, commands []Command) []byte // 序列化函数 +} + +func (slf *Lockstep[ClientID, Command]) JoinClient(client component.LockstepClient[ClientID]) { + slf.clients.Set(client.GetID(), client) +} + +func (slf *Lockstep[ClientID, Command]) LeaveClient(clientId ClientID) { + slf.clients.Delete(clientId) + delete(slf.clientCurrentFrame, clientId) +} + +func (slf *Lockstep[ClientID, Command]) StartBroadcast() { + slf.ticker.Loop("lockstep", timer.Instantly, time.Second/time.Duration(slf.frameRate), timer.Forever, func() { + + slf.frameMutex.Lock() + currentFrame := slf.currentFrame + slf.currentFrame++ + slf.frameMutex.Unlock() + + frames := slf.frames.Map() + for clientId, client := range slf.clients.Map() { + for i := slf.clientCurrentFrame[clientId]; i <= currentFrame; i++ { + if err := client.Send(slf.serialization(i, frames[i])); err != nil { + log.Error("Lockstep.StartBroadcast", zap.Any("ClientID", client.GetID()), zap.Int("Frame", i), zap.Error(err)) + break + } + slf.clientCurrentFrame[clientId] = i + } + } + }) +} + +func (slf *Lockstep[ClientID, Command]) AddCommand(command Command) { + slf.frames.AtomGetSet(slf.currentFrame, func(value []Command, exist bool) (newValue []Command, isSet bool) { + return append(value, command), true + }) +} diff --git a/component/components/lockstep_options.go b/component/components/lockstep_options.go new file mode 100644 index 0000000..d9214fa --- /dev/null +++ b/component/components/lockstep_options.go @@ -0,0 +1,25 @@ +package components + +type LockstepOption[ClientID comparable, Command any] func(lockstep *Lockstep[ClientID, Command]) + +// WithLockstepFrameRate 通过特定逻辑帧率创建锁步(帧)同步组件 +// - 默认情况下为 15/s +func WithLockstepFrameRate[ClientID comparable, Command any](frameRate int) LockstepOption[ClientID, Command] { + return func(lockstep *Lockstep[ClientID, Command]) { + lockstep.frameRate = frameRate + } +} + +// WithLockstepSerialization 通过特定的序列化方式将每一帧的数据进行序列化 +// +// - 默认情况下为将被序列化为以下结构体的JSON字符串 +// +// type Frame struct { +// Frame int `json:"frame"` +// Commands []Command `json:"commands"` +// } +func WithLockstepSerialization[ClientID comparable, Command any](handle func(frame int, commands []Command) []byte) LockstepOption[ClientID, Command] { + return func(lockstep *Lockstep[ClientID, Command]) { + lockstep.serialization = handle + } +} diff --git a/component/lockstep.go b/component/lockstep.go new file mode 100644 index 0000000..c267364 --- /dev/null +++ b/component/lockstep.go @@ -0,0 +1,12 @@ +package component + +type Lockstep[ClientID comparable, Command any] interface { + // JoinClient 加入客户端 + JoinClient(client LockstepClient[ClientID]) + // LeaveClient 离开客户端 + LeaveClient(clientId ClientID) + // StartBroadcast 开始广播 + StartBroadcast() + // AddCommand 增加指令 + AddCommand(command Command) +} diff --git a/component/lockstep/constants.go b/component/lockstep/constants.go deleted file mode 100644 index 9fc2e44..0000000 --- a/component/lockstep/constants.go +++ /dev/null @@ -1,11 +0,0 @@ -package lockstep - -import "errors" - -const ( - tickerFrameName = "LOCKSTEP_FRAME" -) - -var ( - ErrFrameFactorCanNotIsNull = errors.New("frameFactory can not is nil") -) diff --git a/component/lockstep/frame.go b/component/lockstep/frame.go deleted file mode 100644 index da8f3d1..0000000 --- a/component/lockstep/frame.go +++ /dev/null @@ -1,12 +0,0 @@ -package lockstep - -type Frame[Command any] interface { - // GetIndex 获取这一帧的索引 - GetIndex() uint32 - // GetCommands 获取这一帧的数据 - GetCommands() []Command - // AddCommand 添加命令到这一帧 - AddCommand(command Command) - // Marshal 序列化帧数据 - Marshal() ([]byte, error) -} diff --git a/component/lockstep/lockstep.go b/component/lockstep/lockstep.go deleted file mode 100644 index 1ca22a9..0000000 --- a/component/lockstep/lockstep.go +++ /dev/null @@ -1,163 +0,0 @@ -package lockstep - -import ( - "github.com/kercylan98/minotaur/utils/timer" - "sync" - "time" -) - -func New[WriterID comparable, FrameCommand any](frameFactory func(frameIndex uint32) Frame[FrameCommand]) *Lockstep[WriterID, FrameCommand] { - lockstep := &Lockstep[WriterID, FrameCommand]{ - frameFactory: frameFactory, - ticker: timer.GetTicker(30), - writers: map[WriterID]Writer[WriterID, FrameCommand]{}, - writerCurrentFrame: map[WriterID]uint32{}, - frames: map[uint32]Frame[FrameCommand]{}, - } - return lockstep -} - -type Lockstep[WriterID comparable, FrameCommand any] struct { - frameFactory func(frameIndex uint32) Frame[FrameCommand] - - FrameLimit uint32 // 帧数上限 - FrameRate uint32 // 帧率(每秒的帧数) - FrameBroadcastInterval uint32 // 帧数广播间隔帧数 - FrameOnceLimit uint32 // 每次消息最大容纳帧数 - - ticker *timer.Ticker // 定时器 - writers map[WriterID]Writer[WriterID, FrameCommand] // 被广播的对象 - writerCurrentFrame map[WriterID]uint32 // 被广播的对象当前帧 - currentFrame uint32 // 当前帧 - writerMaxCurrentFrame uint32 // 最大写入器当前帧 - frames map[uint32]Frame[FrameCommand] // 所有帧 - framesRWMutes sync.RWMutex // 所有帧读写锁 -} - -// SetWriter 设置需要被广播的 Writer -func (slf *Lockstep[WriterID, FrameCommand]) SetWriter(writer ...Writer[WriterID, FrameCommand]) { - for _, w := range writer { - slf.writers[w.GetID()] = w - } -} - -func (slf *Lockstep[WriterID, FrameCommand]) Run() error { - if slf.frameFactory == nil { - return ErrFrameFactorCanNotIsNull - } - slf.Release() - slf.ticker.Loop(tickerFrameName, timer.Instantly, time.Second/time.Duration(slf.FrameRate), timer.Forever, slf.tick) - return nil -} - -func (slf *Lockstep[WriterID, FrameCommand]) Record(command FrameCommand) { - slf.framesRWMutes.RLock() - frame, exist := slf.frames[slf.currentFrame] - slf.framesRWMutes.RUnlock() - if !exist { - frame = slf.frameFactory(slf.currentFrame) - slf.framesRWMutes.Lock() - slf.frames[slf.currentFrame] = frame - slf.framesRWMutes.Unlock() - } - frame.AddCommand(command) -} - -func (slf *Lockstep[WriterID, FrameCommand]) Release() { - slf.ticker.StopTimer(tickerFrameName) - for k := range slf.writers { - delete(slf.writers, k) - } - for k := range slf.writerCurrentFrame { - delete(slf.writers, k) - } - slf.currentFrame = 0 - slf.framesRWMutes.Lock() - for k := range slf.frames { - delete(slf.frames, k) - } - slf.framesRWMutes.Unlock() -} - -// ReWrite 重写 -func (slf *Lockstep[WriterID, FrameCommand]) ReWrite(writer Writer[WriterID, FrameCommand]) { - if !writer.Healthy() { - return - } - - var writerCurrentFrame uint32 - var frameCounter uint32 - var frames = make(map[uint32]Frame[FrameCommand]) - for ; writerCurrentFrame < slf.currentFrame; writerCurrentFrame++ { - slf.framesRWMutes.RLock() - var frame = slf.frames[writerCurrentFrame] - slf.framesRWMutes.RUnlock() - if frame == nil && writerCurrentFrame != (slf.currentFrame-1) { - continue - } - - frames[frame.GetIndex()] = frame - frameCounter++ - if writerCurrentFrame == slf.currentFrame-1 || (slf.FrameOnceLimit > 0 && frameCounter >= slf.FrameOnceLimit) { - data := writer.Marshal(frames) - // TODO: writer.Write error not handle - _ = writer.Write(data) - frameCounter = 0 - for k := range frames { - delete(frames, k) - } - } - } - slf.writerCurrentFrame[writer.GetID()] = writerCurrentFrame - if writerCurrentFrame > slf.writerMaxCurrentFrame { - slf.writerMaxCurrentFrame = writerCurrentFrame - } -} - -func (slf *Lockstep[WriterID, FrameCommand]) tick() { - if slf.FrameLimit > 0 && slf.currentFrame >= slf.FrameLimit { - slf.ticker.StopTimer(tickerFrameName) - return - } - - slf.currentFrame++ - - if slf.currentFrame-slf.writerMaxCurrentFrame < slf.FrameBroadcastInterval { - return - } - - for id, writer := range slf.writers { - - if !writer.Healthy() { - continue - } - - var writerCurrentFrame uint32 - var frameCounter uint32 - var frames = make(map[uint32]Frame[FrameCommand]) - for writerCurrentFrame = slf.writerCurrentFrame[id]; writerCurrentFrame < slf.currentFrame; writerCurrentFrame++ { - slf.framesRWMutes.RLock() - var frame = slf.frames[writerCurrentFrame] - slf.framesRWMutes.RUnlock() - if frame == nil && writerCurrentFrame != (slf.currentFrame-1) { - continue - } - - frames[frame.GetIndex()] = frame - frameCounter++ - if writerCurrentFrame == slf.currentFrame-1 || (slf.FrameOnceLimit > 0 && frameCounter >= slf.FrameOnceLimit) { - data := writer.Marshal(frames) - // TODO: writer.Write error not handle - _ = writer.Write(data) - frameCounter = 0 - for k := range frames { - delete(frames, k) - } - } - } - slf.writerCurrentFrame[id] = writerCurrentFrame - if writerCurrentFrame > slf.writerMaxCurrentFrame { - slf.writerMaxCurrentFrame = writerCurrentFrame - } - } -} diff --git a/component/lockstep/writer.go b/component/lockstep/writer.go deleted file mode 100644 index e66082d..0000000 --- a/component/lockstep/writer.go +++ /dev/null @@ -1,13 +0,0 @@ -package lockstep - -// Writer 游戏帧写入器,通常实现写入器的对象应该为包含网络连接的玩家 -type Writer[ID comparable, FrameCommand any] interface { - // GetID 游戏帧写入器ID - GetID() ID - // Healthy 检查写入器状态是否健康,例如离线、网络环境异常等 - Healthy() bool - // Marshal 将多帧数据转换为流格式,以对游戏帧写入器进行写入 - Marshal(frames map[uint32]Frame[FrameCommand]) []byte - // Write 向游戏帧写入器中写入数据 - Write(data []byte) error -} diff --git a/component/lockstep_client.go b/component/lockstep_client.go new file mode 100644 index 0000000..de6c5f2 --- /dev/null +++ b/component/lockstep_client.go @@ -0,0 +1,7 @@ +package component + +import "github.com/kercylan98/minotaur/game" + +type LockstepClient[ID comparable] interface { + game.Player[ID] +} diff --git a/utils/random/number.go b/utils/random/number.go index a6d5f90..a0e2cfd 100644 --- a/utils/random/number.go +++ b/utils/random/number.go @@ -1,6 +1,9 @@ package random -import "math/rand" +import ( + "math/rand" + "time" +) // Int64 返回一个介于min和max之间的int64类型的随机数。 func Int64(min int64, max int64) int64 { @@ -12,6 +15,11 @@ func Int(min int, max int) int { return int(Int64(int64(min), int64(max))) } +// Duration 返回一个介于min和max之间的的Duration类型的随机数。 +func Duration(min int64, max int64) time.Duration { + return time.Duration(Int64(min, max)) +} + // Float64 返回一个0~1的浮点数 func Float64() float64 { return rand.Float64()