From d3e563257f8e72569f76132002c8ea73e5fe39b0 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Sat, 21 Oct 2023 11:15:30 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20lockstep=20=E5=8C=85=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E9=80=BB=E8=BE=91=EF=BC=8C=E5=B8=A7=20id=20?= =?UTF-8?q?=E7=94=B1=20int=20=E6=9B=B4=E6=94=B9=E4=B8=BA=20int64=20?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=EF=BC=8C=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=AB=9E=E6=80=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/lockstep/lockstep.go | 183 +++++++++++++++++++--------- server/lockstep/lockstep_options.go | 14 ++- server/lockstep/lockstep_test.go | 48 ++++++++ 3 files changed, 184 insertions(+), 61 deletions(-) create mode 100644 server/lockstep/lockstep_test.go diff --git a/server/lockstep/lockstep.go b/server/lockstep/lockstep.go index 7a962b6..4215c91 100644 --- a/server/lockstep/lockstep.go +++ b/server/lockstep/lockstep.go @@ -2,29 +2,30 @@ package lockstep import ( "encoding/json" - "github.com/kercylan98/minotaur/utils/concurrent" + "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/timer" "sync" - "sync/atomic" "time" ) // NewLockstep 创建一个锁步(帧)同步默认实现的组件(Lockstep)进行返回 func NewLockstep[ClientID comparable, Command any](options ...Option[ClientID, Command]) *Lockstep[ClientID, Command] { lockstep := &Lockstep[ClientID, Command]{ - clients: concurrent.NewBalanceMap[ClientID, Client[ClientID]](), - frames: concurrent.NewBalanceMap[int, []Command](), - ticker: timer.GetTicker(10), - frameRate: 15, - serialization: func(frame int, commands []Command) []byte { + currentFrame: -1, + frames: make(map[int64][]Command), + ticker: timer.GetTicker(10), + frameRate: 15, + serialization: func(frame int64, commands []Command) []byte { frameStruct := struct { - Frame int `json:"frame"` + Frame int64 `json:"frame"` Commands []Command `json:"commands"` }{frame, commands} data, _ := json.Marshal(frameStruct) return data }, - clientCurrentFrame: concurrent.NewBalanceMap[ClientID, int](), + clients: make(map[ClientID]Client[ClientID]), + clientFrame: make(map[ClientID]int64), + frameCache: make(map[int64][]byte), } for _, option := range options { option(lockstep) @@ -39,118 +40,184 @@ func NewLockstep[ClientID comparable, Command any](options ...Option[ClientID, C // - 从特定帧开始追帧 // - 兼容各种基于TCP/UDP/Unix的网络类型,可通过客户端实现其他网络类型同步 type Lockstep[ClientID comparable, Command any] struct { - clients *concurrent.BalanceMap[ClientID, Client[ClientID]] // 接受广播的客户端 - frames *concurrent.BalanceMap[int, []Command] // 所有帧指令 - ticker *timer.Ticker // 定时器 - frameMutex sync.Mutex // 帧锁 - currentFrame int // 当前帧 - clientCurrentFrame *concurrent.BalanceMap[ClientID, int] // 客户端当前帧数 - running atomic.Bool + running bool // 运行状态 + runningLock sync.RWMutex // 运行状态锁 + initFrame int64 // 初始帧 + frameRate int64 // 帧率(每秒N帧) + frameLimit int64 // 帧上限 + serialization func(frame int64, commands []Command) []byte // 序列化函数 - frameRate int // 帧率(每秒N帧) - frameLimit int // 帧上限 - serialization func(frame int, commands []Command) []byte // 序列化函数 + clients map[ClientID]Client[ClientID] // 接受广播的客户端 + clientFrame map[ClientID]int64 // 客户端当前帧 + clientLock sync.RWMutex // 客户端锁 + + currentFrame int64 // 当前主要帧 + currentCommands []Command // 当前帧指令 + currentFrameLock sync.RWMutex // 当前主要帧锁 + + frames map[int64][]Command // 所有已经落帧完成的指令 + frameLock sync.RWMutex // 帧锁 + frameCache map[int64][]byte // 帧序列化缓存 + frameCacheLock sync.RWMutex // 帧序列化缓存锁 + ticker *timer.Ticker // 定时器 lockstepStoppedEventHandles []StoppedEventHandle[ClientID, Command] } -// JoinClient 加入客户端到广播队列中 +// JoinClient 将客户端加入到广播队列中,通常在开始广播前使用 +// - 如果客户端在开始广播后加入,将丢失之前的帧数据,如要从特定帧开始追帧请使用 JoinClientWithFrame func (slf *Lockstep[ClientID, Command]) JoinClient(client Client[ClientID]) { - slf.clients.Set(client.GetID(), client) + slf.clientLock.Lock() + defer slf.clientLock.Unlock() + slf.clients[client.GetID()] = client } // JoinClientWithFrame 加入客户端到广播队列中,并从特定帧开始追帧 // - 可用于重连及状态同步、帧同步混用的情况 // - 混用:服务端记录指令时同时做一次状态计算,新客户端加入时直接同步当前状态,之后从特定帧开始广播 -func (slf *Lockstep[ClientID, Command]) JoinClientWithFrame(client Client[ClientID], frameIndex int) { - slf.clients.Set(client.GetID(), client) +func (slf *Lockstep[ClientID, Command]) JoinClientWithFrame(client Client[ClientID], frameIndex int64) { + slf.currentFrameLock.RLock() if frameIndex > slf.currentFrame { frameIndex = slf.currentFrame } - slf.clientCurrentFrame.Set(client.GetID(), frameIndex) + slf.currentFrameLock.RUnlock() + slf.clientLock.Lock() + slf.clients[client.GetID()] = client + slf.clientFrame[client.GetID()] = frameIndex + slf.clientLock.Unlock() + } // LeaveClient 将客户端从广播队列中移除 func (slf *Lockstep[ClientID, Command]) LeaveClient(clientId ClientID) { - slf.clients.Delete(clientId) - slf.clientCurrentFrame.Delete(clientId) + slf.clientLock.Lock() + defer slf.clientLock.Unlock() + delete(slf.clients, clientId) + delete(slf.clientFrame, clientId) } // StartBroadcast 开始广播 // - 在开始广播后将持续按照设定的帧率进行帧数推进,并在每一帧推进时向客户端进行同步,需提前将客户端加入广播队列 JoinClient // - 广播过程中使用 AddCommand 将该帧数据追加到当前帧中 func (slf *Lockstep[ClientID, Command]) StartBroadcast() { - if slf.running.Swap(true) { + slf.runningLock.RLock() + if slf.running { + slf.runningLock.RUnlock() return } + slf.running = true + slf.runningLock.RUnlock() + slf.ticker.Loop("lockstep", timer.Instantly, time.Second/time.Duration(slf.frameRate), timer.Forever, func() { - slf.frameMutex.Lock() - currentFrame := slf.currentFrame - if slf.frameLimit > 0 && currentFrame >= slf.frameLimit { + slf.currentFrameLock.RLock() + if slf.frameLimit > 0 && slf.currentFrame >= slf.frameLimit { + slf.currentFrameLock.RUnlock() slf.StopBroadcast() return } + slf.currentFrameLock.RUnlock() + slf.currentFrameLock.Lock() slf.currentFrame++ - slf.frameMutex.Unlock() + currentFrame := slf.currentFrame + currentCommands := slf.currentCommands + slf.currentCommands = make([]Command, 0, len(currentCommands)) + slf.currentFrameLock.Unlock() - frames := slf.frames.Map() - for clientId, client := range slf.clients.Map() { - var i = slf.clientCurrentFrame.Get(clientId) + slf.frameLock.Lock() + slf.clientLock.RLock() + defer slf.frameLock.Unlock() + defer slf.clientLock.RUnlock() + slf.frames[currentFrame] = currentCommands + + for clientId, client := range slf.clients { + var i = slf.clientFrame[clientId] for ; i < currentFrame; i++ { - client.Write(slf.serialization(i, frames[i])) + cache, exist := slf.frameCache[i] + if !exist { + cache = slf.serialization(i, slf.frames[i]) + slf.frameCache[i] = cache + } + client.Write(cache) } - slf.clientCurrentFrame.Set(clientId, i) - + slf.clientFrame[clientId] = currentFrame } }) } // StopBroadcast 停止广播 func (slf *Lockstep[ClientID, Command]) StopBroadcast() { - if !slf.running.Swap(false) { + slf.runningLock.Lock() + if !slf.running { + slf.runningLock.Unlock() return } + slf.running = false + slf.runningLock.Unlock() + slf.ticker.StopTimer("lockstep") - slf.frameMutex.Lock() + slf.OnLockstepStoppedEvent() - slf.currentFrame = 0 - slf.clientCurrentFrame.Clear() - slf.frames.Clear() - slf.frameMutex.Unlock() + + slf.currentFrameLock.Lock() + defer slf.currentFrameLock.Unlock() + slf.frameCacheLock.Lock() + defer slf.frameCacheLock.Unlock() + slf.frameLock.Lock() + defer slf.frameLock.Unlock() + slf.frameCache = make(map[int64][]byte) + slf.currentCommands = make([]Command, 0) + slf.currentFrame = -1 + slf.clientFrame = make(map[ClientID]int64) + slf.frames = make(map[int64][]Command) +} + +// IsRunning 是否正在广播 +func (slf *Lockstep[ClientID, Command]) IsRunning() bool { + slf.runningLock.RLock() + defer slf.runningLock.RUnlock() + return slf.running } // AddCommand 添加命令到当前帧 func (slf *Lockstep[ClientID, Command]) AddCommand(command Command) { - slf.frames.Atom(func(m map[int][]Command) { - m[slf.currentFrame] = append(m[slf.currentFrame], command) - }) + slf.currentFrameLock.RLock() + defer slf.currentFrameLock.RUnlock() + slf.currentCommands = append(slf.currentCommands, command) } // GetCurrentFrame 获取当前帧 -func (slf *Lockstep[ClientID, Command]) GetCurrentFrame() int { +func (slf *Lockstep[ClientID, Command]) GetCurrentFrame() int64 { + slf.currentFrameLock.RLock() + defer slf.currentFrameLock.RUnlock() return slf.currentFrame } // GetClientCurrentFrame 获取客户端当前帧 -func (slf *Lockstep[ClientID, Command]) GetClientCurrentFrame(clientId ClientID) int { - return slf.clientCurrentFrame.Get(clientId) +func (slf *Lockstep[ClientID, Command]) GetClientCurrentFrame(clientId ClientID) int64 { + slf.clientLock.RLock() + defer slf.clientLock.RUnlock() + return slf.clientFrame[clientId] } // GetFrameLimit 获取帧上限 // - 未设置时将返回0 -func (slf *Lockstep[ClientID, Command]) GetFrameLimit() int { +func (slf *Lockstep[ClientID, Command]) GetFrameLimit() int64 { return slf.frameLimit } -// GetFrames 获取所有帧数据 -func (slf *Lockstep[ClientID, Command]) GetFrames() [][]Command { - var frameMap = slf.frames.Map() - var frames = make([][]Command, len(frameMap)) - for index, commands := range frameMap { - frames[index] = commands - } - return frames +// GetFrames 获取所有落帧完成的数据 +func (slf *Lockstep[ClientID, Command]) GetFrames() map[int64][]Command { + slf.frameLock.RLock() + defer slf.frameLock.RUnlock() + return hash.Copy(slf.frames) +} + +// GetCurrentCommands 获取当前帧还未结束时的所有指令 +func (slf *Lockstep[ClientID, Command]) GetCurrentCommands() []Command { + slf.currentFrameLock.RLock() + defer slf.currentFrameLock.RUnlock() + return slf.currentCommands } // RegLockstepStoppedEvent 当广播停止时将触发被注册的事件处理函数 diff --git a/server/lockstep/lockstep_options.go b/server/lockstep/lockstep_options.go index a29b782..8e77c8b 100644 --- a/server/lockstep/lockstep_options.go +++ b/server/lockstep/lockstep_options.go @@ -4,7 +4,7 @@ type Option[ClientID comparable, Command any] func(lockstep *Lockstep[ClientID, // WithFrameLimit 通过特定逻辑帧上限创建锁步(帧)同步组件 // - 当达到上限时将停止广播 -func WithFrameLimit[ClientID comparable, Command any](frameLimit int) Option[ClientID, Command] { +func WithFrameLimit[ClientID comparable, Command any](frameLimit int64) Option[ClientID, Command] { return func(lockstep *Lockstep[ClientID, Command]) { if frameLimit > 0 { frameLimit = 0 @@ -15,7 +15,7 @@ func WithFrameLimit[ClientID comparable, Command any](frameLimit int) Option[Cli // WithFrameRate 通过特定逻辑帧率创建锁步(帧)同步组件 // - 默认情况下为 15/s -func WithFrameRate[ClientID comparable, Command any](frameRate int) Option[ClientID, Command] { +func WithFrameRate[ClientID comparable, Command any](frameRate int64) Option[ClientID, Command] { return func(lockstep *Lockstep[ClientID, Command]) { lockstep.frameRate = frameRate } @@ -29,8 +29,16 @@ func WithFrameRate[ClientID comparable, Command any](frameRate int) Option[Clien // Frame int `json:"frame"` // Commands []Command `json:"commands"` // } -func WithSerialization[ClientID comparable, Command any](handle func(frame int, commands []Command) []byte) Option[ClientID, Command] { +func WithSerialization[ClientID comparable, Command any](handle func(frame int64, commands []Command) []byte) Option[ClientID, Command] { return func(lockstep *Lockstep[ClientID, Command]) { lockstep.serialization = handle } } + +// WithInitFrame 通过特定的初始帧创建锁步(帧)同步组件 +// - 默认情况下为 0,即第一帧索引为 0 +func WithInitFrame[ClientID comparable, Command any](initFrame int64) Option[ClientID, Command] { + return func(lockstep *Lockstep[ClientID, Command]) { + lockstep.initFrame = initFrame + } +} diff --git a/server/lockstep/lockstep_test.go b/server/lockstep/lockstep_test.go new file mode 100644 index 0000000..4899e30 --- /dev/null +++ b/server/lockstep/lockstep_test.go @@ -0,0 +1,48 @@ +package lockstep_test + +import ( + "fmt" + "github.com/kercylan98/minotaur/server/lockstep" + "github.com/kercylan98/minotaur/utils/log" + "github.com/kercylan98/minotaur/utils/random" + "testing" + "time" +) + +type Cli struct { + id string +} + +func (slf *Cli) GetID() string { + return slf.id +} + +func (slf *Cli) Write(packet []byte, callback ...func(err error)) { + log.Info("write", log.String("id", slf.id), log.String("frame", string(packet))) +} + +func TestNewLockstep(t *testing.T) { + ls := lockstep.NewLockstep[string, int]() + ls.JoinClient(&Cli{id: "player_1"}) + ls.JoinClient(&Cli{id: "player_2"}) + count := 0 + ls.StartBroadcast() + endChan := make(chan bool) + + go func() { + for { + ls.AddCommand(random.Int(1, 9999)) + count++ + if count >= 10 { + break + } + time.Sleep(time.Millisecond * time.Duration(random.Int(10, 200))) + } + ls.StopBroadcast() + endChan <- true + }() + + <-endChan + time.Sleep(time.Second) + fmt.Println("end") +}