帧同步重构、目录调整

This commit is contained in:
kercylan98 2023-05-12 15:56:23 +08:00
parent 39dc61aef7
commit 915163d37d
9 changed files with 135 additions and 200 deletions

View File

@ -0,0 +1,82 @@
package components
import (
"encoding/json"
"github.com/kercylan98/minotaur/component"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/synchronization"
"github.com/kercylan98/minotaur/utils/timer"
"go.uber.org/zap"
"sync"
"time"
)
func NewLockstep[ClientID comparable, Command any](options ...LockstepOption[ClientID, Command]) *Lockstep[ClientID, Command] {
lockstep := &Lockstep[ClientID, Command]{
clients: synchronization.NewMap[ClientID, component.LockstepClient[ClientID]](),
frames: synchronization.NewMap[int, []Command](),
ticker: timer.GetTicker(10),
frameRate: 15,
serialization: func(frame int, commands []Command) []byte {
frameStruct := struct {
Frame int `json:"frame"`
Commands []Command `json:"commands"`
}{frame, commands}
data, _ := json.Marshal(frameStruct)
return data
},
clientCurrentFrame: map[ClientID]int{},
}
for _, option := range options {
option(lockstep)
}
return lockstep
}
type Lockstep[ClientID comparable, Command any] struct {
clients *synchronization.Map[ClientID, component.LockstepClient[ClientID]] // 接受广播的客户端
frames *synchronization.Map[int, []Command] // 所有帧指令
ticker *timer.Ticker // 定时器
frameMutex sync.Mutex // 帧锁
currentFrame int // 当前帧
clientCurrentFrame map[ClientID]int // 客户端当前帧数
frameRate int // 帧率每秒N帧
serialization func(frame int, commands []Command) []byte // 序列化函数
}
func (slf *Lockstep[ClientID, Command]) JoinClient(client component.LockstepClient[ClientID]) {
slf.clients.Set(client.GetID(), client)
}
func (slf *Lockstep[ClientID, Command]) LeaveClient(clientId ClientID) {
slf.clients.Delete(clientId)
delete(slf.clientCurrentFrame, clientId)
}
func (slf *Lockstep[ClientID, Command]) StartBroadcast() {
slf.ticker.Loop("lockstep", timer.Instantly, time.Second/time.Duration(slf.frameRate), timer.Forever, func() {
slf.frameMutex.Lock()
currentFrame := slf.currentFrame
slf.currentFrame++
slf.frameMutex.Unlock()
frames := slf.frames.Map()
for clientId, client := range slf.clients.Map() {
for i := slf.clientCurrentFrame[clientId]; i <= currentFrame; i++ {
if err := client.Send(slf.serialization(i, frames[i])); err != nil {
log.Error("Lockstep.StartBroadcast", zap.Any("ClientID", client.GetID()), zap.Int("Frame", i), zap.Error(err))
break
}
slf.clientCurrentFrame[clientId] = i
}
}
})
}
func (slf *Lockstep[ClientID, Command]) AddCommand(command Command) {
slf.frames.AtomGetSet(slf.currentFrame, func(value []Command, exist bool) (newValue []Command, isSet bool) {
return append(value, command), true
})
}

View File

@ -0,0 +1,25 @@
package components
type LockstepOption[ClientID comparable, Command any] func(lockstep *Lockstep[ClientID, Command])
// WithLockstepFrameRate 通过特定逻辑帧率创建锁步(帧)同步组件
// - 默认情况下为 15/s
func WithLockstepFrameRate[ClientID comparable, Command any](frameRate int) LockstepOption[ClientID, Command] {
return func(lockstep *Lockstep[ClientID, Command]) {
lockstep.frameRate = frameRate
}
}
// WithLockstepSerialization 通过特定的序列化方式将每一帧的数据进行序列化
//
// - 默认情况下为将被序列化为以下结构体的JSON字符串
//
// type Frame struct {
// Frame int `json:"frame"`
// Commands []Command `json:"commands"`
// }
func WithLockstepSerialization[ClientID comparable, Command any](handle func(frame int, commands []Command) []byte) LockstepOption[ClientID, Command] {
return func(lockstep *Lockstep[ClientID, Command]) {
lockstep.serialization = handle
}
}

12
component/lockstep.go Normal file
View File

@ -0,0 +1,12 @@
package component
type Lockstep[ClientID comparable, Command any] interface {
// JoinClient 加入客户端
JoinClient(client LockstepClient[ClientID])
// LeaveClient 离开客户端
LeaveClient(clientId ClientID)
// StartBroadcast 开始广播
StartBroadcast()
// AddCommand 增加指令
AddCommand(command Command)
}

View File

@ -1,11 +0,0 @@
package lockstep
import "errors"
const (
tickerFrameName = "LOCKSTEP_FRAME"
)
var (
ErrFrameFactorCanNotIsNull = errors.New("frameFactory can not is nil")
)

View File

@ -1,12 +0,0 @@
package lockstep
type Frame[Command any] interface {
// GetIndex 获取这一帧的索引
GetIndex() uint32
// GetCommands 获取这一帧的数据
GetCommands() []Command
// AddCommand 添加命令到这一帧
AddCommand(command Command)
// Marshal 序列化帧数据
Marshal() ([]byte, error)
}

View File

@ -1,163 +0,0 @@
package lockstep
import (
"github.com/kercylan98/minotaur/utils/timer"
"sync"
"time"
)
func New[WriterID comparable, FrameCommand any](frameFactory func(frameIndex uint32) Frame[FrameCommand]) *Lockstep[WriterID, FrameCommand] {
lockstep := &Lockstep[WriterID, FrameCommand]{
frameFactory: frameFactory,
ticker: timer.GetTicker(30),
writers: map[WriterID]Writer[WriterID, FrameCommand]{},
writerCurrentFrame: map[WriterID]uint32{},
frames: map[uint32]Frame[FrameCommand]{},
}
return lockstep
}
type Lockstep[WriterID comparable, FrameCommand any] struct {
frameFactory func(frameIndex uint32) Frame[FrameCommand]
FrameLimit uint32 // 帧数上限
FrameRate uint32 // 帧率(每秒的帧数)
FrameBroadcastInterval uint32 // 帧数广播间隔帧数
FrameOnceLimit uint32 // 每次消息最大容纳帧数
ticker *timer.Ticker // 定时器
writers map[WriterID]Writer[WriterID, FrameCommand] // 被广播的对象
writerCurrentFrame map[WriterID]uint32 // 被广播的对象当前帧
currentFrame uint32 // 当前帧
writerMaxCurrentFrame uint32 // 最大写入器当前帧
frames map[uint32]Frame[FrameCommand] // 所有帧
framesRWMutes sync.RWMutex // 所有帧读写锁
}
// SetWriter 设置需要被广播的 Writer
func (slf *Lockstep[WriterID, FrameCommand]) SetWriter(writer ...Writer[WriterID, FrameCommand]) {
for _, w := range writer {
slf.writers[w.GetID()] = w
}
}
func (slf *Lockstep[WriterID, FrameCommand]) Run() error {
if slf.frameFactory == nil {
return ErrFrameFactorCanNotIsNull
}
slf.Release()
slf.ticker.Loop(tickerFrameName, timer.Instantly, time.Second/time.Duration(slf.FrameRate), timer.Forever, slf.tick)
return nil
}
func (slf *Lockstep[WriterID, FrameCommand]) Record(command FrameCommand) {
slf.framesRWMutes.RLock()
frame, exist := slf.frames[slf.currentFrame]
slf.framesRWMutes.RUnlock()
if !exist {
frame = slf.frameFactory(slf.currentFrame)
slf.framesRWMutes.Lock()
slf.frames[slf.currentFrame] = frame
slf.framesRWMutes.Unlock()
}
frame.AddCommand(command)
}
func (slf *Lockstep[WriterID, FrameCommand]) Release() {
slf.ticker.StopTimer(tickerFrameName)
for k := range slf.writers {
delete(slf.writers, k)
}
for k := range slf.writerCurrentFrame {
delete(slf.writers, k)
}
slf.currentFrame = 0
slf.framesRWMutes.Lock()
for k := range slf.frames {
delete(slf.frames, k)
}
slf.framesRWMutes.Unlock()
}
// ReWrite 重写
func (slf *Lockstep[WriterID, FrameCommand]) ReWrite(writer Writer[WriterID, FrameCommand]) {
if !writer.Healthy() {
return
}
var writerCurrentFrame uint32
var frameCounter uint32
var frames = make(map[uint32]Frame[FrameCommand])
for ; writerCurrentFrame < slf.currentFrame; writerCurrentFrame++ {
slf.framesRWMutes.RLock()
var frame = slf.frames[writerCurrentFrame]
slf.framesRWMutes.RUnlock()
if frame == nil && writerCurrentFrame != (slf.currentFrame-1) {
continue
}
frames[frame.GetIndex()] = frame
frameCounter++
if writerCurrentFrame == slf.currentFrame-1 || (slf.FrameOnceLimit > 0 && frameCounter >= slf.FrameOnceLimit) {
data := writer.Marshal(frames)
// TODO: writer.Write error not handle
_ = writer.Write(data)
frameCounter = 0
for k := range frames {
delete(frames, k)
}
}
}
slf.writerCurrentFrame[writer.GetID()] = writerCurrentFrame
if writerCurrentFrame > slf.writerMaxCurrentFrame {
slf.writerMaxCurrentFrame = writerCurrentFrame
}
}
func (slf *Lockstep[WriterID, FrameCommand]) tick() {
if slf.FrameLimit > 0 && slf.currentFrame >= slf.FrameLimit {
slf.ticker.StopTimer(tickerFrameName)
return
}
slf.currentFrame++
if slf.currentFrame-slf.writerMaxCurrentFrame < slf.FrameBroadcastInterval {
return
}
for id, writer := range slf.writers {
if !writer.Healthy() {
continue
}
var writerCurrentFrame uint32
var frameCounter uint32
var frames = make(map[uint32]Frame[FrameCommand])
for writerCurrentFrame = slf.writerCurrentFrame[id]; writerCurrentFrame < slf.currentFrame; writerCurrentFrame++ {
slf.framesRWMutes.RLock()
var frame = slf.frames[writerCurrentFrame]
slf.framesRWMutes.RUnlock()
if frame == nil && writerCurrentFrame != (slf.currentFrame-1) {
continue
}
frames[frame.GetIndex()] = frame
frameCounter++
if writerCurrentFrame == slf.currentFrame-1 || (slf.FrameOnceLimit > 0 && frameCounter >= slf.FrameOnceLimit) {
data := writer.Marshal(frames)
// TODO: writer.Write error not handle
_ = writer.Write(data)
frameCounter = 0
for k := range frames {
delete(frames, k)
}
}
}
slf.writerCurrentFrame[id] = writerCurrentFrame
if writerCurrentFrame > slf.writerMaxCurrentFrame {
slf.writerMaxCurrentFrame = writerCurrentFrame
}
}
}

View File

@ -1,13 +0,0 @@
package lockstep
// Writer 游戏帧写入器,通常实现写入器的对象应该为包含网络连接的玩家
type Writer[ID comparable, FrameCommand any] interface {
// GetID 游戏帧写入器ID
GetID() ID
// Healthy 检查写入器状态是否健康,例如离线、网络环境异常等
Healthy() bool
// Marshal 将多帧数据转换为流格式,以对游戏帧写入器进行写入
Marshal(frames map[uint32]Frame[FrameCommand]) []byte
// Write 向游戏帧写入器中写入数据
Write(data []byte) error
}

View File

@ -0,0 +1,7 @@
package component
import "github.com/kercylan98/minotaur/game"
type LockstepClient[ID comparable] interface {
game.Player[ID]
}

View File

@ -1,6 +1,9 @@
package random
import "math/rand"
import (
"math/rand"
"time"
)
// Int64 返回一个介于min和max之间的int64类型的随机数。
func Int64(min int64, max int64) int64 {
@ -12,6 +15,11 @@ func Int(min int, max int) int {
return int(Int64(int64(min), int64(max)))
}
// Duration 返回一个介于min和max之间的的Duration类型的随机数。
func Duration(min int64, max int64) time.Duration {
return time.Duration(Int64(min, max))
}
// Float64 返回一个0~1的浮点数
func Float64() float64 {
return rand.Float64()