perf: lockstep 包优化同步逻辑,帧 id 由 int 更改为 int64 类型,优化数据竞态问题
This commit is contained in:
parent
9e339065d4
commit
d3e563257f
|
@ -2,29 +2,30 @@ package lockstep
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/kercylan98/minotaur/utils/concurrent"
|
"github.com/kercylan98/minotaur/utils/hash"
|
||||||
"github.com/kercylan98/minotaur/utils/timer"
|
"github.com/kercylan98/minotaur/utils/timer"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewLockstep 创建一个锁步(帧)同步默认实现的组件(Lockstep)进行返回
|
// NewLockstep 创建一个锁步(帧)同步默认实现的组件(Lockstep)进行返回
|
||||||
func NewLockstep[ClientID comparable, Command any](options ...Option[ClientID, Command]) *Lockstep[ClientID, Command] {
|
func NewLockstep[ClientID comparable, Command any](options ...Option[ClientID, Command]) *Lockstep[ClientID, Command] {
|
||||||
lockstep := &Lockstep[ClientID, Command]{
|
lockstep := &Lockstep[ClientID, Command]{
|
||||||
clients: concurrent.NewBalanceMap[ClientID, Client[ClientID]](),
|
currentFrame: -1,
|
||||||
frames: concurrent.NewBalanceMap[int, []Command](),
|
frames: make(map[int64][]Command),
|
||||||
ticker: timer.GetTicker(10),
|
ticker: timer.GetTicker(10),
|
||||||
frameRate: 15,
|
frameRate: 15,
|
||||||
serialization: func(frame int, commands []Command) []byte {
|
serialization: func(frame int64, commands []Command) []byte {
|
||||||
frameStruct := struct {
|
frameStruct := struct {
|
||||||
Frame int `json:"frame"`
|
Frame int64 `json:"frame"`
|
||||||
Commands []Command `json:"commands"`
|
Commands []Command `json:"commands"`
|
||||||
}{frame, commands}
|
}{frame, commands}
|
||||||
data, _ := json.Marshal(frameStruct)
|
data, _ := json.Marshal(frameStruct)
|
||||||
return data
|
return data
|
||||||
},
|
},
|
||||||
clientCurrentFrame: concurrent.NewBalanceMap[ClientID, int](),
|
clients: make(map[ClientID]Client[ClientID]),
|
||||||
|
clientFrame: make(map[ClientID]int64),
|
||||||
|
frameCache: make(map[int64][]byte),
|
||||||
}
|
}
|
||||||
for _, option := range options {
|
for _, option := range options {
|
||||||
option(lockstep)
|
option(lockstep)
|
||||||
|
@ -39,118 +40,184 @@ func NewLockstep[ClientID comparable, Command any](options ...Option[ClientID, C
|
||||||
// - 从特定帧开始追帧
|
// - 从特定帧开始追帧
|
||||||
// - 兼容各种基于TCP/UDP/Unix的网络类型,可通过客户端实现其他网络类型同步
|
// - 兼容各种基于TCP/UDP/Unix的网络类型,可通过客户端实现其他网络类型同步
|
||||||
type Lockstep[ClientID comparable, Command any] struct {
|
type Lockstep[ClientID comparable, Command any] struct {
|
||||||
clients *concurrent.BalanceMap[ClientID, Client[ClientID]] // 接受广播的客户端
|
running bool // 运行状态
|
||||||
frames *concurrent.BalanceMap[int, []Command] // 所有帧指令
|
runningLock sync.RWMutex // 运行状态锁
|
||||||
ticker *timer.Ticker // 定时器
|
initFrame int64 // 初始帧
|
||||||
frameMutex sync.Mutex // 帧锁
|
frameRate int64 // 帧率(每秒N帧)
|
||||||
currentFrame int // 当前帧
|
frameLimit int64 // 帧上限
|
||||||
clientCurrentFrame *concurrent.BalanceMap[ClientID, int] // 客户端当前帧数
|
serialization func(frame int64, commands []Command) []byte // 序列化函数
|
||||||
running atomic.Bool
|
|
||||||
|
|
||||||
frameRate int // 帧率(每秒N帧)
|
clients map[ClientID]Client[ClientID] // 接受广播的客户端
|
||||||
frameLimit int // 帧上限
|
clientFrame map[ClientID]int64 // 客户端当前帧
|
||||||
serialization func(frame int, commands []Command) []byte // 序列化函数
|
clientLock sync.RWMutex // 客户端锁
|
||||||
|
|
||||||
|
currentFrame int64 // 当前主要帧
|
||||||
|
currentCommands []Command // 当前帧指令
|
||||||
|
currentFrameLock sync.RWMutex // 当前主要帧锁
|
||||||
|
|
||||||
|
frames map[int64][]Command // 所有已经落帧完成的指令
|
||||||
|
frameLock sync.RWMutex // 帧锁
|
||||||
|
frameCache map[int64][]byte // 帧序列化缓存
|
||||||
|
frameCacheLock sync.RWMutex // 帧序列化缓存锁
|
||||||
|
ticker *timer.Ticker // 定时器
|
||||||
|
|
||||||
lockstepStoppedEventHandles []StoppedEventHandle[ClientID, Command]
|
lockstepStoppedEventHandles []StoppedEventHandle[ClientID, Command]
|
||||||
}
|
}
|
||||||
|
|
||||||
// JoinClient 加入客户端到广播队列中
|
// JoinClient 将客户端加入到广播队列中,通常在开始广播前使用
|
||||||
|
// - 如果客户端在开始广播后加入,将丢失之前的帧数据,如要从特定帧开始追帧请使用 JoinClientWithFrame
|
||||||
func (slf *Lockstep[ClientID, Command]) JoinClient(client Client[ClientID]) {
|
func (slf *Lockstep[ClientID, Command]) JoinClient(client Client[ClientID]) {
|
||||||
slf.clients.Set(client.GetID(), client)
|
slf.clientLock.Lock()
|
||||||
|
defer slf.clientLock.Unlock()
|
||||||
|
slf.clients[client.GetID()] = client
|
||||||
}
|
}
|
||||||
|
|
||||||
// JoinClientWithFrame 加入客户端到广播队列中,并从特定帧开始追帧
|
// JoinClientWithFrame 加入客户端到广播队列中,并从特定帧开始追帧
|
||||||
// - 可用于重连及状态同步、帧同步混用的情况
|
// - 可用于重连及状态同步、帧同步混用的情况
|
||||||
// - 混用:服务端记录指令时同时做一次状态计算,新客户端加入时直接同步当前状态,之后从特定帧开始广播
|
// - 混用:服务端记录指令时同时做一次状态计算,新客户端加入时直接同步当前状态,之后从特定帧开始广播
|
||||||
func (slf *Lockstep[ClientID, Command]) JoinClientWithFrame(client Client[ClientID], frameIndex int) {
|
func (slf *Lockstep[ClientID, Command]) JoinClientWithFrame(client Client[ClientID], frameIndex int64) {
|
||||||
slf.clients.Set(client.GetID(), client)
|
slf.currentFrameLock.RLock()
|
||||||
if frameIndex > slf.currentFrame {
|
if frameIndex > slf.currentFrame {
|
||||||
frameIndex = slf.currentFrame
|
frameIndex = slf.currentFrame
|
||||||
}
|
}
|
||||||
slf.clientCurrentFrame.Set(client.GetID(), frameIndex)
|
slf.currentFrameLock.RUnlock()
|
||||||
|
slf.clientLock.Lock()
|
||||||
|
slf.clients[client.GetID()] = client
|
||||||
|
slf.clientFrame[client.GetID()] = frameIndex
|
||||||
|
slf.clientLock.Unlock()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// LeaveClient 将客户端从广播队列中移除
|
// LeaveClient 将客户端从广播队列中移除
|
||||||
func (slf *Lockstep[ClientID, Command]) LeaveClient(clientId ClientID) {
|
func (slf *Lockstep[ClientID, Command]) LeaveClient(clientId ClientID) {
|
||||||
slf.clients.Delete(clientId)
|
slf.clientLock.Lock()
|
||||||
slf.clientCurrentFrame.Delete(clientId)
|
defer slf.clientLock.Unlock()
|
||||||
|
delete(slf.clients, clientId)
|
||||||
|
delete(slf.clientFrame, clientId)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartBroadcast 开始广播
|
// StartBroadcast 开始广播
|
||||||
// - 在开始广播后将持续按照设定的帧率进行帧数推进,并在每一帧推进时向客户端进行同步,需提前将客户端加入广播队列 JoinClient
|
// - 在开始广播后将持续按照设定的帧率进行帧数推进,并在每一帧推进时向客户端进行同步,需提前将客户端加入广播队列 JoinClient
|
||||||
// - 广播过程中使用 AddCommand 将该帧数据追加到当前帧中
|
// - 广播过程中使用 AddCommand 将该帧数据追加到当前帧中
|
||||||
func (slf *Lockstep[ClientID, Command]) StartBroadcast() {
|
func (slf *Lockstep[ClientID, Command]) StartBroadcast() {
|
||||||
if slf.running.Swap(true) {
|
slf.runningLock.RLock()
|
||||||
|
if slf.running {
|
||||||
|
slf.runningLock.RUnlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
slf.running = true
|
||||||
|
slf.runningLock.RUnlock()
|
||||||
|
|
||||||
slf.ticker.Loop("lockstep", timer.Instantly, time.Second/time.Duration(slf.frameRate), timer.Forever, func() {
|
slf.ticker.Loop("lockstep", timer.Instantly, time.Second/time.Duration(slf.frameRate), timer.Forever, func() {
|
||||||
|
|
||||||
slf.frameMutex.Lock()
|
slf.currentFrameLock.RLock()
|
||||||
currentFrame := slf.currentFrame
|
if slf.frameLimit > 0 && slf.currentFrame >= slf.frameLimit {
|
||||||
if slf.frameLimit > 0 && currentFrame >= slf.frameLimit {
|
slf.currentFrameLock.RUnlock()
|
||||||
slf.StopBroadcast()
|
slf.StopBroadcast()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
slf.currentFrameLock.RUnlock()
|
||||||
|
slf.currentFrameLock.Lock()
|
||||||
slf.currentFrame++
|
slf.currentFrame++
|
||||||
slf.frameMutex.Unlock()
|
currentFrame := slf.currentFrame
|
||||||
|
currentCommands := slf.currentCommands
|
||||||
|
slf.currentCommands = make([]Command, 0, len(currentCommands))
|
||||||
|
slf.currentFrameLock.Unlock()
|
||||||
|
|
||||||
frames := slf.frames.Map()
|
slf.frameLock.Lock()
|
||||||
for clientId, client := range slf.clients.Map() {
|
slf.clientLock.RLock()
|
||||||
var i = slf.clientCurrentFrame.Get(clientId)
|
defer slf.frameLock.Unlock()
|
||||||
|
defer slf.clientLock.RUnlock()
|
||||||
|
slf.frames[currentFrame] = currentCommands
|
||||||
|
|
||||||
|
for clientId, client := range slf.clients {
|
||||||
|
var i = slf.clientFrame[clientId]
|
||||||
for ; i < currentFrame; i++ {
|
for ; i < currentFrame; i++ {
|
||||||
client.Write(slf.serialization(i, frames[i]))
|
cache, exist := slf.frameCache[i]
|
||||||
|
if !exist {
|
||||||
|
cache = slf.serialization(i, slf.frames[i])
|
||||||
|
slf.frameCache[i] = cache
|
||||||
|
}
|
||||||
|
client.Write(cache)
|
||||||
}
|
}
|
||||||
slf.clientCurrentFrame.Set(clientId, i)
|
slf.clientFrame[clientId] = currentFrame
|
||||||
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// StopBroadcast 停止广播
|
// StopBroadcast 停止广播
|
||||||
func (slf *Lockstep[ClientID, Command]) StopBroadcast() {
|
func (slf *Lockstep[ClientID, Command]) StopBroadcast() {
|
||||||
if !slf.running.Swap(false) {
|
slf.runningLock.Lock()
|
||||||
|
if !slf.running {
|
||||||
|
slf.runningLock.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
slf.running = false
|
||||||
|
slf.runningLock.Unlock()
|
||||||
|
|
||||||
slf.ticker.StopTimer("lockstep")
|
slf.ticker.StopTimer("lockstep")
|
||||||
slf.frameMutex.Lock()
|
|
||||||
slf.OnLockstepStoppedEvent()
|
slf.OnLockstepStoppedEvent()
|
||||||
slf.currentFrame = 0
|
|
||||||
slf.clientCurrentFrame.Clear()
|
slf.currentFrameLock.Lock()
|
||||||
slf.frames.Clear()
|
defer slf.currentFrameLock.Unlock()
|
||||||
slf.frameMutex.Unlock()
|
slf.frameCacheLock.Lock()
|
||||||
|
defer slf.frameCacheLock.Unlock()
|
||||||
|
slf.frameLock.Lock()
|
||||||
|
defer slf.frameLock.Unlock()
|
||||||
|
slf.frameCache = make(map[int64][]byte)
|
||||||
|
slf.currentCommands = make([]Command, 0)
|
||||||
|
slf.currentFrame = -1
|
||||||
|
slf.clientFrame = make(map[ClientID]int64)
|
||||||
|
slf.frames = make(map[int64][]Command)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsRunning 是否正在广播
|
||||||
|
func (slf *Lockstep[ClientID, Command]) IsRunning() bool {
|
||||||
|
slf.runningLock.RLock()
|
||||||
|
defer slf.runningLock.RUnlock()
|
||||||
|
return slf.running
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddCommand 添加命令到当前帧
|
// AddCommand 添加命令到当前帧
|
||||||
func (slf *Lockstep[ClientID, Command]) AddCommand(command Command) {
|
func (slf *Lockstep[ClientID, Command]) AddCommand(command Command) {
|
||||||
slf.frames.Atom(func(m map[int][]Command) {
|
slf.currentFrameLock.RLock()
|
||||||
m[slf.currentFrame] = append(m[slf.currentFrame], command)
|
defer slf.currentFrameLock.RUnlock()
|
||||||
})
|
slf.currentCommands = append(slf.currentCommands, command)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetCurrentFrame 获取当前帧
|
// GetCurrentFrame 获取当前帧
|
||||||
func (slf *Lockstep[ClientID, Command]) GetCurrentFrame() int {
|
func (slf *Lockstep[ClientID, Command]) GetCurrentFrame() int64 {
|
||||||
|
slf.currentFrameLock.RLock()
|
||||||
|
defer slf.currentFrameLock.RUnlock()
|
||||||
return slf.currentFrame
|
return slf.currentFrame
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetClientCurrentFrame 获取客户端当前帧
|
// GetClientCurrentFrame 获取客户端当前帧
|
||||||
func (slf *Lockstep[ClientID, Command]) GetClientCurrentFrame(clientId ClientID) int {
|
func (slf *Lockstep[ClientID, Command]) GetClientCurrentFrame(clientId ClientID) int64 {
|
||||||
return slf.clientCurrentFrame.Get(clientId)
|
slf.clientLock.RLock()
|
||||||
|
defer slf.clientLock.RUnlock()
|
||||||
|
return slf.clientFrame[clientId]
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFrameLimit 获取帧上限
|
// GetFrameLimit 获取帧上限
|
||||||
// - 未设置时将返回0
|
// - 未设置时将返回0
|
||||||
func (slf *Lockstep[ClientID, Command]) GetFrameLimit() int {
|
func (slf *Lockstep[ClientID, Command]) GetFrameLimit() int64 {
|
||||||
return slf.frameLimit
|
return slf.frameLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFrames 获取所有帧数据
|
// GetFrames 获取所有落帧完成的数据
|
||||||
func (slf *Lockstep[ClientID, Command]) GetFrames() [][]Command {
|
func (slf *Lockstep[ClientID, Command]) GetFrames() map[int64][]Command {
|
||||||
var frameMap = slf.frames.Map()
|
slf.frameLock.RLock()
|
||||||
var frames = make([][]Command, len(frameMap))
|
defer slf.frameLock.RUnlock()
|
||||||
for index, commands := range frameMap {
|
return hash.Copy(slf.frames)
|
||||||
frames[index] = commands
|
}
|
||||||
}
|
|
||||||
return frames
|
// GetCurrentCommands 获取当前帧还未结束时的所有指令
|
||||||
|
func (slf *Lockstep[ClientID, Command]) GetCurrentCommands() []Command {
|
||||||
|
slf.currentFrameLock.RLock()
|
||||||
|
defer slf.currentFrameLock.RUnlock()
|
||||||
|
return slf.currentCommands
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegLockstepStoppedEvent 当广播停止时将触发被注册的事件处理函数
|
// RegLockstepStoppedEvent 当广播停止时将触发被注册的事件处理函数
|
||||||
|
|
|
@ -4,7 +4,7 @@ type Option[ClientID comparable, Command any] func(lockstep *Lockstep[ClientID,
|
||||||
|
|
||||||
// WithFrameLimit 通过特定逻辑帧上限创建锁步(帧)同步组件
|
// WithFrameLimit 通过特定逻辑帧上限创建锁步(帧)同步组件
|
||||||
// - 当达到上限时将停止广播
|
// - 当达到上限时将停止广播
|
||||||
func WithFrameLimit[ClientID comparable, Command any](frameLimit int) Option[ClientID, Command] {
|
func WithFrameLimit[ClientID comparable, Command any](frameLimit int64) Option[ClientID, Command] {
|
||||||
return func(lockstep *Lockstep[ClientID, Command]) {
|
return func(lockstep *Lockstep[ClientID, Command]) {
|
||||||
if frameLimit > 0 {
|
if frameLimit > 0 {
|
||||||
frameLimit = 0
|
frameLimit = 0
|
||||||
|
@ -15,7 +15,7 @@ func WithFrameLimit[ClientID comparable, Command any](frameLimit int) Option[Cli
|
||||||
|
|
||||||
// WithFrameRate 通过特定逻辑帧率创建锁步(帧)同步组件
|
// WithFrameRate 通过特定逻辑帧率创建锁步(帧)同步组件
|
||||||
// - 默认情况下为 15/s
|
// - 默认情况下为 15/s
|
||||||
func WithFrameRate[ClientID comparable, Command any](frameRate int) Option[ClientID, Command] {
|
func WithFrameRate[ClientID comparable, Command any](frameRate int64) Option[ClientID, Command] {
|
||||||
return func(lockstep *Lockstep[ClientID, Command]) {
|
return func(lockstep *Lockstep[ClientID, Command]) {
|
||||||
lockstep.frameRate = frameRate
|
lockstep.frameRate = frameRate
|
||||||
}
|
}
|
||||||
|
@ -29,8 +29,16 @@ func WithFrameRate[ClientID comparable, Command any](frameRate int) Option[Clien
|
||||||
// Frame int `json:"frame"`
|
// Frame int `json:"frame"`
|
||||||
// Commands []Command `json:"commands"`
|
// Commands []Command `json:"commands"`
|
||||||
// }
|
// }
|
||||||
func WithSerialization[ClientID comparable, Command any](handle func(frame int, commands []Command) []byte) Option[ClientID, Command] {
|
func WithSerialization[ClientID comparable, Command any](handle func(frame int64, commands []Command) []byte) Option[ClientID, Command] {
|
||||||
return func(lockstep *Lockstep[ClientID, Command]) {
|
return func(lockstep *Lockstep[ClientID, Command]) {
|
||||||
lockstep.serialization = handle
|
lockstep.serialization = handle
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithInitFrame 通过特定的初始帧创建锁步(帧)同步组件
|
||||||
|
// - 默认情况下为 0,即第一帧索引为 0
|
||||||
|
func WithInitFrame[ClientID comparable, Command any](initFrame int64) Option[ClientID, Command] {
|
||||||
|
return func(lockstep *Lockstep[ClientID, Command]) {
|
||||||
|
lockstep.initFrame = initFrame
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
package lockstep_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/kercylan98/minotaur/server/lockstep"
|
||||||
|
"github.com/kercylan98/minotaur/utils/log"
|
||||||
|
"github.com/kercylan98/minotaur/utils/random"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Cli struct {
|
||||||
|
id string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (slf *Cli) GetID() string {
|
||||||
|
return slf.id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (slf *Cli) Write(packet []byte, callback ...func(err error)) {
|
||||||
|
log.Info("write", log.String("id", slf.id), log.String("frame", string(packet)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewLockstep(t *testing.T) {
|
||||||
|
ls := lockstep.NewLockstep[string, int]()
|
||||||
|
ls.JoinClient(&Cli{id: "player_1"})
|
||||||
|
ls.JoinClient(&Cli{id: "player_2"})
|
||||||
|
count := 0
|
||||||
|
ls.StartBroadcast()
|
||||||
|
endChan := make(chan bool)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
ls.AddCommand(random.Int(1, 9999))
|
||||||
|
count++
|
||||||
|
if count >= 10 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond * time.Duration(random.Int(10, 200)))
|
||||||
|
}
|
||||||
|
ls.StopBroadcast()
|
||||||
|
endChan <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
<-endChan
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
fmt.Println("end")
|
||||||
|
}
|
Loading…
Reference in New Issue