diff --git a/README.md b/README.md index 0f46bae..5bd76b5 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ import ( func main() { srv := server.New(server.NetworkWebsocket) - srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet server.Packet) { + srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) { conn.Write(packet) }) if err := srv.Run(":9999"); err != nil { @@ -173,6 +173,7 @@ func main() { wait.Wait() } ``` +在分布式环境中,如果存在类似于多服务器需要同时间刷新配置时,可使用`Cron`表达式设置定时任务。 ### 流操作 可以通过 `stream` 包快速开启对`切片`和`map`的流式操作,例如: diff --git a/game/activity/activity.go b/game/activity/activity.go index 959f54a..3ab45fa 100644 --- a/game/activity/activity.go +++ b/game/activity/activity.go @@ -1,14 +1,11 @@ package activity import ( - "errors" "fmt" "github.com/kercylan98/minotaur/utils/generic" "github.com/kercylan98/minotaur/utils/timer" - "github.com/kercylan98/minotaur/utils/times" "reflect" "sync" - "sync/atomic" "time" ) @@ -25,18 +22,12 @@ var ( ticker *timer.Ticker tickerOnce sync.Once - isDebug atomic.Bool ) func init() { ticker = timer.GetTicker(10) } -// SetDebug 设置是否开启调试模式 -func SetDebug(d bool) { - isDebug.Store(d) -} - // SetTicker 设置自定义定时器,该方法必须在使用活动系统前调用,且只能调用一次 func SetTicker(size int, options ...timer.Option) { tickerOnce.Do(func() { @@ -47,22 +38,31 @@ func SetTicker(size int, options ...timer.Option) { }) } +// LoadGlobalData 加载所有活动全局数据 +func LoadGlobalData(handler func(activityType, activityId, data any)) { + for _, f := range activityGlobalDataLoader { + f(handler) + } +} + +// LoadEntityData 加载所有活动实体数据 +func LoadEntityData(handler func(activityType, activityId, entityId, data any)) { + for _, f := range activityEntityDataLoader { + f(handler) + } +} + // LoadOrRefreshActivity 加载或刷新活动 // - 通常在活动配置刷新时候将活动通过该方法注册或刷新 -func LoadOrRefreshActivity[Type, ID generic.Basic](activityType Type, activityId ID, options ...Option[Type, ID]) error { - register := getControllerRegister(activityType) - if register == nil { - return errors.New("activity type not registered") +func LoadOrRefreshActivity[Type, ID generic.Basic](activityType Type, activityId ID, options ...*Options) error { + register, exist := activityRegister[activityType] + if !exist { + return fmt.Errorf("activity type %v not registered, activity %v registration failed", activityType, activityId) } - act, initFinishCallback := register(activityType, activityId) - activity := act.(*Activity[Type, ID]) - for _, option := range options { - option(activity) - } - initFinishCallback(activity) - if !activity.tl.Check(true, stateLine...) { - return errors.New("activity state timeline is invalid") + activity := register(activityId, initOptions(options...)).(*Activity[Type, ID]) + if !activity.options.Tl.Check(true, stateLine...) { + return fmt.Errorf("activity %v state timeline is invalid", activityId) } stateTrigger := map[byte]func(){ @@ -71,14 +71,15 @@ func LoadOrRefreshActivity[Type, ID generic.Basic](activityType Type, activityId stateEnded: func() { OnEndedEvent(activity); OnExtendedShowStartedEvent(activity) }, stateExtendedShowEnded: func() { OnExtendedShowEndedEvent(activity) }, } + for _, state := range stateLine { - if activity.tl.HasState(state) { - activity.tl.AddTriggerToState(state, stateTrigger[state]) + if activity.options.Tl.HasState(state) { + activity.options.Tl.AddTriggerToState(state, stateTrigger[state]) continue } for next := state; next <= stateLine[len(stateLine)-1]; next++ { - if activity.tl.HasState(next) { - activity.tl.AddTriggerToState(next, stateTrigger[state]) + if activity.options.Tl.HasState(next) { + activity.options.Tl.AddTriggerToState(next, stateTrigger[state]) break } } @@ -88,69 +89,55 @@ func LoadOrRefreshActivity[Type, ID generic.Basic](activityType Type, activityId return nil } -// LoadGlobalData 加载所有活动全局数据 -// - 一般用于持久化活动数据 -func LoadGlobalData(handler func(activityType, activityId, data any)) { - for _, f := range controllerGlobalDataReader { - f(handler) - } -} - -// LoadEntityData 加载所有活动实体数据 -// - 一般用于持久化活动数据 -func LoadEntityData(handler func(activityType, activityId, entityId, data any)) { - for _, f := range controllerEntityDataReader { - f(handler) - } -} - // Activity 活动描述 type Activity[Type, ID generic.Basic] struct { - id ID // 活动 ID - t Type // 活动类型 - state byte // 活动状态 - tl *times.StateLine[byte] // 活动时间线 - loop time.Duration // 活动多久循环一次 - lazy bool // 是否懒加载 - tickerKey string // 定时器 key - retention time.Duration // 活动数据保留时间 - retentionKey string // 保留定时器 key + id ID // 活动 ID + t Type // 活动类型 + options *Options // 活动选项 + state byte // 活动状态 + + lazy bool // 是否懒加载 + tickerKey string // 定时器 key + retention time.Duration // 活动数据保留时间 + retentionKey string // 保留定时器 key mutex sync.RWMutex getLastNewDayTime func() time.Time // 获取上次新的一天的时间 setLastNewDayTime func(time.Time) // 设置上次新的一天的时间 + clearData func() // 清理活动数据 + initializeData func() // 初始化活动数据 } func (slf *Activity[Type, ID]) refresh() { slf.mutex.Lock() defer slf.mutex.Unlock() curr := time.Now() - if slf.state = slf.tl.GetStateByTime(curr); slf.state == stateUpcoming { + if slf.state = slf.options.Tl.GetStateByTime(curr); slf.state == stateUpcoming || (slf.state == stateStarted && !slf.options.Tl.HasState(stateUpcoming)) { ticker.StopTimer(slf.retentionKey) - resetActivityData(slf.t, slf.id) + slf.initializeData() } - for _, f := range slf.tl.GetTriggerByState(slf.state) { + for _, f := range slf.options.Tl.GetTriggerByState(slf.state) { if f != nil { f() } } - next := slf.tl.GetNextTimeByState(slf.state) + next := slf.options.Tl.GetNextTimeByState(slf.state) if !next.IsZero() && next.After(curr) { ticker.After(slf.tickerKey, next.Sub(curr)+time.Millisecond*100, slf.refresh) } else { ticker.StopTimer(slf.tickerKey) ticker.StopTimer(fmt.Sprintf("activity:new_day:%d:%v", reflect.ValueOf(slf.t).Kind(), slf.id)) - if slf.loop > 0 { - slf.tl.Move(slf.loop * 2) - ticker.After(slf.tickerKey, slf.loop+time.Millisecond*100, slf.refresh) + if slf.options.Loop > 0 { + slf.options.Tl.Move(slf.options.Loop * 2) + ticker.After(slf.tickerKey, slf.options.Loop+time.Millisecond*100, slf.refresh) return } if slf.retention > 0 { ticker.After(slf.tickerKey, slf.retention, func() { ticker.StopTimer(slf.retentionKey) - resetActivityData(slf.t, slf.id) + slf.clearData() }) } } diff --git a/game/activity/activity_test.go b/game/activity/activity_test.go index af551db..4ec4006 100644 --- a/game/activity/activity_test.go +++ b/game/activity/activity_test.go @@ -16,8 +16,9 @@ type PlayerData struct { } func TestRegTypeByGlobalData(t *testing.T) { - controller := activity.DefineGlobalDataActivity[int, int, *ActivityData](1, func(activityId int, data *activity.DataMeta[*ActivityData]) { - data.Data.players = append(data.Data.players, "temp") + + controller := activity.DefineGlobalDataActivity[int, int, *ActivityData](1).InitializeGlobalData(func(activityId int, data *activity.DataMeta[*ActivityData]) { + data.Data.players = []string{"1", "2", "3"} }) activity.RegUpcomingEvent(1, func(activityId int) { @@ -48,12 +49,12 @@ func TestRegTypeByGlobalData(t *testing.T) { now := time.Now() - if err := activity.LoadOrRefreshActivity(1, 1, - activity.WithUpcomingTime[int, int](now.Add(1*time.Second)), - activity.WithStartTime[int, int](now.Add(2*times.Second)), - activity.WithEndTime[int, int](now.Add(3*times.Second)), - activity.WithExtendedShowTime[int, int](now.Add(4*times.Second)), - activity.WithLoop[int, int](3*times.Second), + if err := activity.LoadOrRefreshActivity(1, 1, activity.NewOptions(). + WithUpcomingTime(now.Add(1*time.Second)). + WithStartTime(now.Add(2*times.Second)). + WithEndTime(now.Add(3*times.Second)). + WithExtendedShowTime(now.Add(4*times.Second)). + WithLoop(3*times.Second), ); err != nil { t.Fatal(err) } diff --git a/game/activity/controller.go b/game/activity/controller.go index 7231c13..87e74e8 100644 --- a/game/activity/controller.go +++ b/game/activity/controller.go @@ -2,87 +2,87 @@ package activity import ( "github.com/kercylan98/minotaur/utils/generic" + "reflect" "sync" ) type none byte // DefineNoneDataActivity 声明无数据的活动类型 -func DefineNoneDataActivity[Type, ID generic.Basic](activityType Type) NoneDataActivityController[Type, ID, none, none, none] { - return regController(activityType, &Controller[Type, ID, none, none, none]{ +func DefineNoneDataActivity[Type, ID generic.Basic](activityType Type) NoneDataActivityController[Type, ID, *none, none, *none] { + return regController(&Controller[Type, ID, *none, none, *none]{ t: activityType, }) } // DefineGlobalDataActivity 声明拥有全局数据的活动类型 -func DefineGlobalDataActivity[Type, ID generic.Basic, Data any](activityType Type, initializer func(activityId ID, data *DataMeta[Data])) GlobalDataActivityController[Type, ID, Data, none, none] { - return regController(activityType, &Controller[Type, ID, Data, none, none]{ - t: activityType, - globalData: make(map[ID]*DataMeta[Data]), - globalInit: initializer, +func DefineGlobalDataActivity[Type, ID generic.Basic, Data any](activityType Type) GlobalDataActivityController[Type, ID, Data, none, *none] { + return regController(&Controller[Type, ID, Data, none, *none]{ + t: activityType, }) } // DefineEntityDataActivity 声明拥有实体数据的活动类型 -func DefineEntityDataActivity[Type, ID, EntityID generic.Basic, EntityData any](activityType Type, initializer func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) EntityDataActivityController[Type, ID, none, EntityID, EntityData] { - return regController(activityType, &Controller[Type, ID, none, EntityID, EntityData]{ - t: activityType, - entityData: make(map[ID]map[EntityID]*EntityDataMeta[EntityData]), - entityInit: initializer, +func DefineEntityDataActivity[Type, ID, EntityID generic.Basic, EntityData any](activityType Type) EntityDataActivityController[Type, ID, *none, EntityID, EntityData] { + return regController(&Controller[Type, ID, *none, EntityID, EntityData]{ + t: activityType, }) } // DefineGlobalAndEntityDataActivity 声明拥有全局数据和实体数据的活动类型 -func DefineGlobalAndEntityDataActivity[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any](activityType Type, globalInitializer func(activityId ID, data *DataMeta[Data]), entityInitializer func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) GlobalAndEntityDataActivityController[Type, ID, Data, EntityID, EntityData] { - return regController(activityType, &Controller[Type, ID, Data, EntityID, EntityData]{ - t: activityType, - globalData: make(map[ID]*DataMeta[Data]), - entityData: make(map[ID]map[EntityID]*EntityDataMeta[EntityData]), - globalInit: globalInitializer, - entityInit: entityInitializer, +func DefineGlobalAndEntityDataActivity[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any](activityType Type) GlobalAndEntityDataActivityController[Type, ID, Data, EntityID, EntityData] { + return regController(&Controller[Type, ID, Data, EntityID, EntityData]{ + t: activityType, }) } // Controller 活动控制器 type Controller[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any] struct { - t Type // 活动类型 - activities map[ID]*Activity[Type, ID] // 活动 - globalInit func(activityId ID, data *DataMeta[Data]) // 全局初始化器 - entityInit func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData]) // 实体初始化器 - globalDataLoader func(activityId any) // 全局数据加载器 - entityDataLoader func(activityId any, entityId any) // 实体数据加载器 - globalInitializer func(activityType Type, activityId ID, data map[ID]*DataMeta[Data]) // 全局数据初始化器 - entityInitializer func(activityType Type, activityId ID, data map[ID]*DataMeta[Data], entityData map[ID]map[EntityID]*EntityDataMeta[EntityData]) // 实体数据初始化器 - globalData map[ID]*DataMeta[Data] // 全局数据 - entityData map[ID]map[EntityID]*EntityDataMeta[EntityData] // 实体数据 - mutex sync.RWMutex + t Type // 活动类型 + activities map[ID]*Activity[Type, ID] // 活动列表 + globalData map[ID]*DataMeta[Data] // 全局数据 + entityData map[ID]map[EntityID]*EntityDataMeta[EntityData] // 实体数据 + entityTof reflect.Type // 实体数据类型 + globalInit func(activityId ID, data *DataMeta[Data]) // 全局数据初始化函数 + entityInit func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData]) // 实体数据初始化函数 + mutex sync.RWMutex } // GetGlobalData 获取特定活动全局数据 func (slf *Controller[Type, ID, Data, EntityID, EntityData]) GetGlobalData(activityId ID) Data { - slf.globalDataLoader(activityId) slf.mutex.RLock() defer slf.mutex.RUnlock() - return slf.globalData[activityId].Data + global := slf.globalData[activityId] + if slf.globalInit != nil { + global.once.Do(func() { + slf.globalInit(activityId, global) + }) + } + return global.Data } // GetEntityData 获取特定活动实体数据 func (slf *Controller[Type, ID, Data, EntityID, EntityData]) GetEntityData(activityId ID, entityId EntityID) EntityData { - slf.entityDataLoader(activityId, entityId) slf.mutex.RLock() defer slf.mutex.RUnlock() - return slf.entityData[activityId][entityId].Data -} - -// GetAllEntityData 获取特定活动所有实体数据 -func (slf *Controller[Type, ID, Data, EntityID, EntityData]) GetAllEntityData(activityId ID) map[EntityID]EntityData { - var entities = make(map[EntityID]EntityData) - slf.mutex.RLock() - for k, v := range slf.entityData[activityId] { - entities[k] = v.Data + entities, exist := slf.entityData[activityId] + if !exist { + entities = make(map[EntityID]*EntityDataMeta[EntityData]) + slf.entityData[activityId] = entities } - slf.mutex.RUnlock() - return entities + entity, exist := entities[entityId] + if !exist { + entity = &EntityDataMeta[EntityData]{ + Data: reflect.New(slf.entityTof).Interface().(EntityData), + } + entities[entityId] = entity + } + if slf.entityInit != nil { + entity.once.Do(func() { + slf.entityInit(activityId, entityId, entity) + }) + } + return entity.Data } // IsOpen 活动是否开启 @@ -108,7 +108,7 @@ func (slf *Controller[Type, ID, Data, EntityID, EntityData]) IsShow(activityId I } activity.mutex.RLock() defer activity.mutex.RUnlock() - return activity.state == stateUpcoming || (activity.state == stateEnded && activity.tl.HasState(stateExtendedShowEnded)) + return activity.state == stateUpcoming || (activity.state == stateEnded && activity.options.Tl.HasState(stateExtendedShowEnded)) } // IsOpenOrShow 活动是否开启或展示 @@ -122,7 +122,7 @@ func (slf *Controller[Type, ID, Data, EntityID, EntityData]) IsOpenOrShow(activi activity.mutex.RLock() defer activity.mutex.RUnlock() - return activity.state == stateStarted || activity.state == stateUpcoming || (activity.state == stateEnded && activity.tl.HasState(stateExtendedShowEnded)) + return activity.state == stateStarted || activity.state == stateUpcoming || (activity.state == stateEnded && activity.options.Tl.HasState(stateExtendedShowEnded)) } // Refresh 刷新活动 @@ -135,3 +135,24 @@ func (slf *Controller[Type, ID, Data, EntityID, EntityData]) Refresh(activityId } activity.refresh() } + +func (slf *Controller[Type, ID, Data, EntityID, EntityData]) InitializeNoneData(handler func(activityId ID, data *DataMeta[Data])) NoneDataActivityController[Type, ID, Data, EntityID, EntityData] { + slf.globalInit = handler + return slf +} + +func (slf *Controller[Type, ID, Data, EntityID, EntityData]) InitializeGlobalData(handler func(activityId ID, data *DataMeta[Data])) GlobalDataActivityController[Type, ID, Data, EntityID, EntityData] { + slf.globalInit = handler + return slf +} + +func (slf *Controller[Type, ID, Data, EntityID, EntityData]) InitializeEntityData(handler func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) EntityDataActivityController[Type, ID, Data, EntityID, EntityData] { + slf.entityInit = handler + return slf +} + +func (slf *Controller[Type, ID, Data, EntityID, EntityData]) InitializeGlobalAndEntityData(handler func(activityId ID, data *DataMeta[Data]), entityHandler func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) GlobalAndEntityDataActivityController[Type, ID, Data, EntityID, EntityData] { + slf.globalInit = handler + slf.entityInit = entityHandler + return slf +} diff --git a/game/activity/controller_interface.go b/game/activity/controller_interface.go index 7493af5..5b006b4 100644 --- a/game/activity/controller_interface.go +++ b/game/activity/controller_interface.go @@ -16,6 +16,11 @@ type BasicActivityController[Type, ID generic.Basic, Data any, EntityID generic. // NoneDataActivityController 无数据活动控制器 type NoneDataActivityController[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any] interface { BasicActivityController[Type, ID, Data, EntityID, EntityData] + // InitializeNoneData 初始化活动 + // - 该函数提供了一个操作活动数据的入口,可以在该函数中对传入的活动数据进行初始化 + // + // 对于无数据活动,该函数的意义在于,可以在该函数中对活动进行初始化,比如设置活动的状态等,虽然为无数据活动,但是例如活动本身携带的状态数据也是需要加载的 + InitializeNoneData(handler func(activityId ID, data *DataMeta[Data])) NoneDataActivityController[Type, ID, Data, EntityID, EntityData] } // GlobalDataActivityController 全局数据活动控制器 @@ -23,6 +28,9 @@ type GlobalDataActivityController[Type, ID generic.Basic, Data any, EntityID gen BasicActivityController[Type, ID, Data, EntityID, EntityData] // GetGlobalData 获取全局数据 GetGlobalData(activityId ID) Data + // InitializeGlobalData 初始化活动 + // - 该函数提供了一个操作活动数据的入口,可以在该函数中对传入的活动数据进行初始化 + InitializeGlobalData(handler func(activityId ID, data *DataMeta[Data])) GlobalDataActivityController[Type, ID, Data, EntityID, EntityData] } // EntityDataActivityController 实体数据活动控制器 @@ -30,6 +38,9 @@ type EntityDataActivityController[Type, ID generic.Basic, Data any, EntityID gen BasicActivityController[Type, ID, Data, EntityID, EntityData] // GetEntityData 获取实体数据 GetEntityData(activityId ID, entityId EntityID) EntityData + // InitializeEntityData 初始化活动 + // - 该函数提供了一个操作活动数据的入口,可以在该函数中对传入的活动数据进行初始化 + InitializeEntityData(handler func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) EntityDataActivityController[Type, ID, Data, EntityID, EntityData] } // GlobalAndEntityDataActivityController 全局数据和实体数据活动控制器 @@ -39,4 +50,7 @@ type GlobalAndEntityDataActivityController[Type, ID generic.Basic, Data any, Ent GetGlobalData(activityId ID) Data // GetEntityData 获取实体数据 GetEntityData(activityId ID, entityId EntityID) EntityData + // InitializeGlobalAndEntityData 初始化活动 + // - 该函数提供了一个操作活动数据的入口,可以在该函数中对传入的活动数据进行初始化 + InitializeGlobalAndEntityData(handler func(activityId ID, data *DataMeta[Data]), entityHandler func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) GlobalAndEntityDataActivityController[Type, ID, Data, EntityID, EntityData] } diff --git a/game/activity/controller_internal.go b/game/activity/controller_internal.go index c459cd3..5ef7e17 100644 --- a/game/activity/controller_internal.go +++ b/game/activity/controller_internal.go @@ -3,155 +3,105 @@ package activity import ( "fmt" "github.com/kercylan98/minotaur/utils/generic" + "github.com/kercylan98/minotaur/utils/hash" "github.com/kercylan98/minotaur/utils/times" "reflect" - "sync" "time" ) var ( - controllers map[any]any // type -> controller (特定类型活动控制器) - controllerRegisters map[any]func(activityType, activityId any) (act any, optionInitCallback func(activity any)) // type -> register (控制活动注册到特定类型控制器的注册机) - controllerGlobalDataReader []func(handler func(activityType, activityId, data any)) // 活动全局数据读取器 - controllerEntityDataReader []func(handler func(activityType, activityId, entityId, data any)) // 活动实体数据读取器 - controllerReset map[any]func(activityId any) // type -> reset (活动数据重置器) - controllersLock sync.RWMutex + activityRegister map[any]func(activityId any, options *Options) (activity any) // type -> register (控制活动注册到特定类型控制器的注册机) + activityGlobalDataLoader []func(handler func(activityType, activityId, data any)) // 全局数据加载器 + activityEntityDataLoader []func(handler func(activityType, activityId, entityId, data any)) // 实体数据加载器 ) func init() { - controllers = make(map[any]any) - controllerRegisters = make(map[any]func(activityType, activityId any) (act any, optionInitCallback func(activity any))) - controllerGlobalDataReader = make([]func(handler func(activityType, activityId, data any)), 0) - controllerEntityDataReader = make([]func(handler func(activityType, activityId, entityId, data any)), 0) - controllerReset = make(map[any]func(activityId any)) + activityRegister = make(map[any]func(activityId any, options *Options) (activity any)) } -// regController 注册活动类型 -func regController[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any](activityType Type, controller *Controller[Type, ID, Data, EntityID, EntityData]) *Controller[Type, ID, Data, EntityID, EntityData] { - controllersLock.Lock() - defer controllersLock.Unlock() +// regController 注册活动类型控制器 +func regController[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any](controller *Controller[Type, ID, Data, EntityID, EntityData]) *Controller[Type, ID, Data, EntityID, EntityData] { + var entityZero EntityData controller.activities = make(map[ID]*Activity[Type, ID]) - controllerGlobalDataReader = append(controllerGlobalDataReader, func(handler func(activityType, activityId, data any)) { - controller.mutex.RLock() - defer controller.mutex.RUnlock() - for activityId, data := range controller.globalData { - handler(activityType, activityId, data) - } - }) - controllerEntityDataReader = append(controllerEntityDataReader, func(handler func(activityType, activityId, entityId, data any)) { - controller.mutex.RLock() - defer controller.mutex.RUnlock() - for activityId, entities := range controller.entityData { - for entityId, data := range entities { - handler(activityType, activityId, entityId, data) - } - } - }) + controller.globalData = make(map[ID]*DataMeta[Data]) + controller.entityTof = reflect.TypeOf(entityZero) + if controller.entityTof.Kind() == reflect.Pointer { + controller.entityTof = controller.entityTof.Elem() + } - if controller.globalData != nil { - var d Data - var tof = reflect.TypeOf(d) - if tof.Kind() == reflect.Pointer { - tof = tof.Elem() - } - controller.globalDataLoader = func(activityId any) { - controller.mutex.Lock() - defer controller.mutex.Unlock() - var id = activityId.(ID) - if _, exist := controller.globalData[id]; exist { - return - } - data := &DataMeta[Data]{ - Data: reflect.New(tof).Interface().(Data), - } - if controller.globalInit != nil { - controller.globalInit(id, data) - } - controller.globalData[id] = data - } + // 反射类型 + var ( + zero Data + tof = reflect.TypeOf(zero) + ) + if tof.Kind() == reflect.Pointer { + tof = tof.Elem() } - if controller.entityData != nil { - var d Data - var tof = reflect.TypeOf(d) - if tof.Kind() == reflect.Pointer { - tof = tof.Elem() - } - controller.entityDataLoader = func(activityId any, entityId any) { - controller.mutex.Lock() - defer controller.mutex.Unlock() - var id, eid = activityId.(ID), entityId.(EntityID) - entities, exist := controller.entityData[id] - if !exist { - entities = make(map[EntityID]*EntityDataMeta[EntityData]) - controller.entityData[id] = entities - } - if _, exist = entities[eid]; exist { - return - } - data := &EntityDataMeta[EntityData]{ - Data: reflect.New(tof).Interface().(EntityData), - } - if controller.entityInit != nil { - controller.entityInit(id, eid, data) - } - entities[eid] = data - } - } - controllers[activityType] = controller - controllerRegisters[activityType] = func(activityType, activityId any) (act any, optionInitCallback func(activity any)) { - var at, ai = activityType.(Type), activityId.(ID) + + // 活动注册机(注册机内不加载活动数据,仅定义基本活动信息) + activityRegister[controller.t] = func(aid any, options *Options) any { + activityId := aid.(ID) controller.mutex.Lock() - activity, exist := controller.activities[ai] + activity, exist := controller.activities[activityId] if !exist { activity = &Activity[Type, ID]{ - t: at, - id: ai, - tl: times.NewStateLine[byte](stateClosed), - tickerKey: fmt.Sprintf("activity:%d:%v", reflect.ValueOf(at).Kind(), ai), + t: controller.t, + id: activityId, + options: options, + tickerKey: fmt.Sprintf("activity:%d:%v", reflect.ValueOf(controller.t).Kind(), activityId), getLastNewDayTime: func() time.Time { - return controller.globalData[ai].LastNewDay + return controller.globalData[activityId].LastNewDay }, setLastNewDayTime: func(t time.Time) { - controller.globalData[ai].LastNewDay = t + controller.globalData[activityId].LastNewDay = t + }, + clearData: func() { + controller.mutex.Lock() + defer controller.mutex.Unlock() + delete(controller.globalData, activityId) + delete(controller.entityData, activityId) + }, + initializeData: func() { + controller.mutex.Lock() + defer controller.mutex.Unlock() + controller.globalData[activityId] = &DataMeta[Data]{ + Data: reflect.New(tof).Interface().(Data), + } + if controller.entityData == nil { + controller.entityData = make(map[ID]map[EntityID]*EntityDataMeta[EntityData]) + } + controller.entityData[activityId] = make(map[EntityID]*EntityDataMeta[EntityData]) }, } + if activity.options == nil { + activity.options = NewOptions() + } + if activity.options.Tl == nil || activity.options.Tl.GetStateCount() == 0 { + activity.options.Tl = times.NewStateLine[byte](stateClosed) + } + controller.activities[activityId] = activity } controller.mutex.Unlock() - controller.activities[activity.id] = activity - return activity, func(activity any) { - act := activity.(*Activity[Type, ID]) - if !act.lazy { - controller.GetGlobalData(ai) + + // 全局数据加载器 + activityGlobalDataLoader = append(activityGlobalDataLoader, func(handler func(activityType any, activityId any, data any)) { + controller.mutex.RLock() + data := controller.globalData[activityId] + controller.mutex.RUnlock() + handler(controller.t, activityId, data) + }) + + // 实体数据加载器 + activityEntityDataLoader = append(activityEntityDataLoader, func(handler func(activityType any, activityId any, entityId any, data any)) { + controller.mutex.RLock() + entities := hash.Copy(controller.entityData[activityId]) + controller.mutex.RUnlock() + for entityId, data := range entities { + handler(controller.t, activityId, entityId, data) } - if act.retention > 0 { - act.retentionKey = fmt.Sprintf("%s:retention", act.tickerKey) - } - } - } - controllerReset[activityType] = func(activityId any) { - var id = activityId.(ID) - controller.mutex.Lock() - defer controller.mutex.Unlock() - delete(controller.globalData, id) - delete(controller.entityData, id) + }) + return activity } + return controller } - -// getControllerRegister 获取活动类型注册机 -func getControllerRegister[Type generic.Basic](activityType Type) func(activityType, activityId any) (act any, optionInitCallback func(activity any)) { - controllersLock.RLock() - defer controllersLock.RUnlock() - return controllerRegisters[activityType] -} - -// resetActivityData 重置活动数据 -func resetActivityData[Type, ID generic.Basic](activityType Type, activityId ID) { - controllersLock.RLock() - defer controllersLock.RUnlock() - reset, exist := controllerReset[activityType] - if !exist { - return - } - reset(activityId) -} diff --git a/game/activity/data.go b/game/activity/data.go index b3e7c18..0db3e55 100644 --- a/game/activity/data.go +++ b/game/activity/data.go @@ -1,19 +1,20 @@ package activity -import "time" +import ( + "sync" + "time" +) // DataMeta 全局活动数据 type DataMeta[Data any] struct { - Start time.Time `json:"start,omitempty"` // 活动开始时间 - End time.Time `json:"end,omitempty"` // 活动结束时间 - Data Data `json:"data,omitempty"` // 活动数据 - LastNewDay time.Time `json:"lastNewDay,omitempty"` // 上次跨天时间 + once sync.Once + Data Data `json:"data,omitempty"` // 活动数据 + LastNewDay time.Time `json:"last_new_day,omitempty"` // 上次跨天时间 } // EntityDataMeta 活动实体数据 type EntityDataMeta[Data any] struct { - Start time.Time `json:"start,omitempty"` // 对象参与活动时间 - End time.Time `json:"end,omitempty"` // 对象结束活动时间 - Data Data `json:"data,omitempty"` // 活动数据 - LastNewDay time.Time `json:"lastNewDay,omitempty"` // 上次跨天时间 + once sync.Once + Data Data `json:"data,omitempty"` // 活动数据 + LastNewDay time.Time `json:"last_new_day,omitempty"` // 上次跨天时间 } diff --git a/game/activity/internal/example/activities/activity.go b/game/activity/internal/example/activities/activity.go new file mode 100644 index 0000000..baac150 --- /dev/null +++ b/game/activity/internal/example/activities/activity.go @@ -0,0 +1,24 @@ +package activities + +import ( + "github.com/kercylan98/minotaur/game/activity" + "github.com/kercylan98/minotaur/game/activity/internal/example/types" + "github.com/kercylan98/minotaur/utils/super" + "time" +) + +var ( + DemoActivity = activity.DefineEntityDataActivity[int, int, string, *types.DemoActivityData](1).InitializeEntityData(func(activityId int, entityId string, data *activity.EntityDataMeta[*types.DemoActivityData]) { + // 模拟数据库加载 + _ = super.UnmarshalJSON([]byte(`{"last_new_day": "2021-01-01 00:00:00", "data": {"login_num": 3}}`), data) + }) +) + +func init() { + // 模拟配置加载活动 + if err := activity.LoadOrRefreshActivity(1, 1, activity.NewOptions(). + WithStartTime(time.Now().Add(time.Second*3)), + ); err != nil { + panic(err) + } +} diff --git a/game/activity/internal/example/activities/demoactivity/demoactivity.go b/game/activity/internal/example/activities/demoactivity/demoactivity.go new file mode 100644 index 0000000..4c426e2 --- /dev/null +++ b/game/activity/internal/example/activities/demoactivity/demoactivity.go @@ -0,0 +1,15 @@ +package demoactivity + +import ( + "github.com/kercylan98/minotaur/game/activity" + "github.com/kercylan98/minotaur/game/activity/internal/example/activities" + "github.com/kercylan98/minotaur/utils/log" +) + +func init() { + activity.RegStartedEvent(1, onActivityStart) +} + +func onActivityStart(id int) { + log.Info("activity start", log.Int("id", id), log.Any("entity", activities.DemoActivity.GetEntityData(id, "demo_entity"))) +} diff --git a/game/activity/internal/example/main.go b/game/activity/internal/example/main.go new file mode 100644 index 0000000..95101b3 --- /dev/null +++ b/game/activity/internal/example/main.go @@ -0,0 +1,10 @@ +package main + +import ( + _ "github.com/kercylan98/minotaur/game/activity/internal/example/activities/demoactivity" + "time" +) + +func main() { + time.Sleep(time.Second * 5) +} diff --git a/game/activity/internal/example/types/player.go b/game/activity/internal/example/types/player.go new file mode 100644 index 0000000..afb56a0 --- /dev/null +++ b/game/activity/internal/example/types/player.go @@ -0,0 +1,5 @@ +package types + +type DemoActivityData struct { + LoginNum int `json:"login_num"` // 登录次数 +} diff --git a/game/activity/options.go b/game/activity/options.go index 3eecb4d..7b18bed 100644 --- a/game/activity/options.go +++ b/game/activity/options.go @@ -1,56 +1,72 @@ package activity import ( - "github.com/kercylan98/minotaur/utils/generic" + "github.com/kercylan98/minotaur/utils/times" "time" ) -// Option 活动选项 -type Option[Type, ID generic.Basic] func(*Activity[Type, ID]) +// NewOptions 创建活动选项 +func NewOptions() *Options { + return new(Options) +} + +// initOptions 初始化活动选项 +func initOptions(opts ...*Options) *Options { + var opt *Options + if len(opts) > 0 { + opt = opts[0] + } + if opt == nil { + opt = NewOptions() + } + return opt +} + +// Options 活动选项 +type Options struct { + Tl *times.StateLine[byte] // 活动时间线 + Loop time.Duration // 活动循环,时间间隔小于等于 0 表示不循环 +} // WithUpcomingTime 设置活动预告时间 -func WithUpcomingTime[Type, ID generic.Basic](t time.Time) Option[Type, ID] { - return func(activity *Activity[Type, ID]) { - activity.tl.AddState(stateUpcoming, t) +func (slf *Options) WithUpcomingTime(t time.Time) *Options { + if slf.Tl == nil { + slf.Tl = times.NewStateLine[byte](stateClosed) } + slf.Tl.AddState(stateUpcoming, t) + return slf } // WithStartTime 设置活动开始时间 -func WithStartTime[Type, ID generic.Basic](t time.Time) Option[Type, ID] { - return func(activity *Activity[Type, ID]) { - activity.tl.AddState(stateStarted, t) +func (slf *Options) WithStartTime(t time.Time) *Options { + if slf.Tl == nil { + slf.Tl = times.NewStateLine[byte](stateClosed) } + slf.Tl.AddState(stateStarted, t) + return slf } // WithEndTime 设置活动结束时间 -func WithEndTime[Type, ID generic.Basic](t time.Time) Option[Type, ID] { - return func(activity *Activity[Type, ID]) { - activity.tl.AddState(stateEnded, t) +func (slf *Options) WithEndTime(t time.Time) *Options { + if slf.Tl == nil { + slf.Tl = times.NewStateLine[byte](stateClosed) } + slf.Tl.AddState(stateEnded, t) + return slf } // WithExtendedShowTime 设置延长展示时间 -func WithExtendedShowTime[Type, ID generic.Basic](t time.Time) Option[Type, ID] { - return func(activity *Activity[Type, ID]) { - activity.tl.AddState(stateExtendedShowEnded, t) +func (slf *Options) WithExtendedShowTime(t time.Time) *Options { + if slf.Tl == nil { + slf.Tl = times.NewStateLine[byte](stateClosed) } + slf.Tl.AddState(stateExtendedShowEnded, t) + return slf } // WithLoop 设置活动循环,时间间隔小于等于 0 表示不循环 // - 当活动状态展示结束后,会根据该选项设置的时间间隔重新开始 -func WithLoop[Type, ID generic.Basic](interval time.Duration) Option[Type, ID] { - return func(activity *Activity[Type, ID]) { - if interval <= 0 { - interval = 0 - } - activity.loop = interval - } -} - -// WithLazy 设置活动数据懒加载 -// - 该选项仅用于全局数据,默认情况下,活动全局数据会在活动注册时候加载,如果设置了该选项,则会在第一次获取数据时候加载 -func WithLazy[Type, ID generic.Basic](lazy bool) Option[Type, ID] { - return func(activity *Activity[Type, ID]) { - activity.lazy = lazy - } +func (slf *Options) WithLoop(interval time.Duration) *Options { + slf.Loop = interval + return slf } diff --git a/go.mod b/go.mod index 3dadec3..36c7a6d 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/goccy/go-json v0.10.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect + github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jonboulle/clockwork v0.3.0 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect diff --git a/go.sum b/go.sum index 91205e9..db67b8c 100644 --- a/go.sum +++ b/go.sum @@ -76,6 +76,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= +github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY= +github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -132,6 +134,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= diff --git a/server/conn.go b/server/conn.go index e474faf..1806047 100644 --- a/server/conn.go +++ b/server/conn.go @@ -296,6 +296,9 @@ func (slf *Conn) init() { }, ) slf.loop = writeloop.NewWriteLoop[*connPacket](slf.pool, func(data *connPacket) error { + if slf.server.runtime.packetWarnSize > 0 && len(data.packet) > slf.server.runtime.packetWarnSize { + log.Warn("Conn.Write", log.String("State", "PacketWarn"), log.String("Reason", "PacketSize"), log.String("ID", slf.GetID()), log.Int("PacketSize", len(data.packet))) + } var err error if slf.delay > 0 || slf.fluctuation > 0 { time.Sleep(random.Duration(int64(slf.delay-slf.fluctuation), int64(slf.delay+slf.fluctuation))) diff --git a/server/constants.go b/server/constants.go index e02720a..888dc8b 100644 --- a/server/constants.go +++ b/server/constants.go @@ -25,6 +25,7 @@ const ( DefaultMessageBufferSize = 1024 DefaultAsyncPoolSize = 256 DefaultWebsocketReadDeadline = 30 * time.Second + DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB ) const ( diff --git a/server/event.go b/server/event.go index 5244ffa..c3c390d 100644 --- a/server/event.go +++ b/server/event.go @@ -239,6 +239,9 @@ func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacket } func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) { + if slf.Server.runtime.packetWarnSize > 0 && len(packet) > slf.Server.runtime.packetWarnSize { + log.Warn("Server", log.String("OnConnectionReceivePacketEvent", fmt.Sprintf("packet size %d > %d", len(packet), slf.Server.runtime.packetWarnSize))) + } slf.connectionReceivePacketEventHandles.RangeValue(func(index int, value ConnectionReceivePacketEventHandle) bool { value(slf.Server, conn, packet) return true diff --git a/server/options.go b/server/options.go index 0dc122e..53a1def 100644 --- a/server/options.go +++ b/server/options.go @@ -41,6 +41,21 @@ type runtime struct { websocketWriteCompression bool // websocket写入压缩 limitLife time.Duration // 限制最大生命周期 shuntMatcher func(conn *Conn) string // 分流匹配器 + packetWarnSize int // 数据包大小警告 +} + +// WithPacketWarnSize 通过数据包大小警告的方式创建服务器,当数据包大小超过指定大小时,将会输出 WARN 类型的日志 +// - 默认值为 DefaultPacketWarnSize +// - 当 size <= 0 时,表示不设置警告 +func WithPacketWarnSize(size int) Option { + return func(srv *Server) { + if size <= 0 { + srv.packetWarnSize = 0 + log.Info("WithPacketWarnSize", log.String("State", "Ignore"), log.String("Reason", "size <= 0")) + return + } + srv.packetWarnSize = size + } } // WithShunt 通过连接数据包分流的方式创建服务器 diff --git a/server/server.go b/server/server.go index 3659512..9626d6d 100644 --- a/server/server.go +++ b/server/server.go @@ -34,6 +34,7 @@ func New(network Network, options ...Option) *Server { server := &Server{ runtime: &runtime{ messagePoolSize: DefaultMessageBufferSize, + packetWarnSize: DefaultPacketWarnSize, }, option: &option{}, network: network, diff --git a/utils/file/file.go b/utils/file/file.go index 97a1e5a..7f6ae98 100644 --- a/utils/file/file.go +++ b/utils/file/file.go @@ -2,6 +2,7 @@ package file import ( "bufio" + "github.com/kercylan98/minotaur/utils/slice" "io" "os" "path/filepath" @@ -149,25 +150,35 @@ func Paths(dir string) []string { return paths } -// ReadLineWithParallel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理,当过程中如果产生错误则会发生 panic,过程前发生错误将会返回 error +// ReadLineWithParallelByChannel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理 // - 由于是并行处理,所以处理过程中的顺序是不确定的。 -func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string)) error { +// - 可通过 start 参数指定开始读取的位置,如果不指定则从文件开头开始读取。 +func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string), start ...int64) (n int64, err error) { file, err := os.Open(filename) + offset := slice.GetValue(start, 0) if err != nil { - return err + return offset, err } defer func() { _ = file.Close() }() - chunks := FindLineChunks(file, chunkSize) + chunks := FindLineChunksByOffset(file, offset, chunkSize) + var end int64 + var endMutex sync.Mutex var wg sync.WaitGroup for _, chunk := range chunks { wg.Add(1) go func(chunk [2]int64) { defer wg.Done() - r := io.NewSectionReader(file, chunk[0], chunk[1]-chunk[0]) + endMutex.Lock() + e := chunk[1] - chunk[0] + if e > end { + end = e + 1 + } + endMutex.Unlock() + r := io.NewSectionReader(file, chunk[0], e) scanner := bufio.NewScanner(r) for scanner.Scan() { @@ -175,12 +186,12 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str } if err := scanner.Err(); err != nil { - panic(err) + return } }(chunk) } wg.Wait() - return nil + return end, nil } // FindLineChunks 查找文件按照每行划分的分块,每个分块的大小将在 chunkSize 和分割后的分块距离行首及行尾的距离中范围内 @@ -188,6 +199,11 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str // - 当过程中发生错误将会发生 panic // - 返回值的成员是一个长度为 2 的数组,第一个元素是分块的起始位置,第二个元素是分块的结束位置 func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 { + return FindLineChunksByOffset(file, 0, chunkSize) +} + +// FindLineChunksByOffset 该函数与 FindLineChunks 类似,不同的是该函数可以指定 offset 从指定位置开始读取文件 +func FindLineChunksByOffset(file *os.File, offset, chunkSize int64) [][2]int64 { var chunks [][2]int64 fileSize, err := file.Seek(0, io.SeekEnd) @@ -199,7 +215,7 @@ func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 { panic(err) } - currentPos := int64(0) + currentPos := offset for currentPos < fileSize { start := currentPos if start != 0 { // 不是文件的开头 diff --git a/utils/file/file_test.go b/utils/file/file_test.go index ae45998..3e97aec 100644 --- a/utils/file/file_test.go +++ b/utils/file/file_test.go @@ -5,6 +5,7 @@ import ( "github.com/kercylan98/minotaur/utils/file" "strings" "testing" + "time" ) func TestFilePaths(t *testing.T) { @@ -20,3 +21,15 @@ func TestFilePaths(t *testing.T) { } fmt.Println("total line:", line, "total file:", fileCount) } + +func TestNewIncrementReader(t *testing.T) { + n, _ := file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) { + t.Log(s) + }) + + time.Sleep(time.Second * 3) + n, _ = file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) { + t.Log(s) + }, n) + +} diff --git a/utils/log/survey/analyzer.go b/utils/log/survey/analyzer.go index c626820..59f5cb0 100644 --- a/utils/log/survey/analyzer.go +++ b/utils/log/survey/analyzer.go @@ -12,6 +12,7 @@ type Analyzer struct { vc map[string]int64 // 记录了每个 key 生效的计数数量 repeat map[string]struct{} // 去重信息 subs map[string]*Analyzer + format map[string]func(v any) any // 格式化函数 m sync.Mutex } @@ -30,6 +31,16 @@ func (slf *Analyzer) Sub(key string) *Analyzer { return sub } +// SetFormat 设置格式化函数 +func (slf *Analyzer) SetFormat(key string, format func(v any) any) { + slf.m.Lock() + defer slf.m.Unlock() + if slf.format == nil { + slf.format = make(map[string]func(v any) any) + } + slf.format[key] = format +} + // SetValueIfGreaterThan 设置指定 key 的值,当新值大于旧值时 // - 当已有值不为 float64 时,将会被忽略 func (slf *Analyzer) SetValueIfGreaterThan(key string, value float64) { diff --git a/utils/log/survey/analyzer_test.go b/utils/log/survey/analyzer_test.go index 0eaa05d..689b9a3 100644 --- a/utils/log/survey/analyzer_test.go +++ b/utils/log/survey/analyzer_test.go @@ -6,10 +6,10 @@ import ( "testing" ) -func TestClose(t *testing.T) { +func TestIncrementAnalyze(t *testing.T) { path := `./test/day.2023-09-06.log` - report := survey.Analyze(path, func(analyzer *survey.Analyzer, record survey.R) { + reader := survey.IncrementAnalyze(path, func(analyzer *survey.Analyzer, record survey.R) { switch record.GetString("type") { case "open_conn": analyzer.IncreaseValueNonRepeat("开播人数", record, 1, "live_id") @@ -44,5 +44,11 @@ func TestClose(t *testing.T) { } }) - fmt.Println(report.FilterSub("warzone0009")) + for i := 0; i < 10; i++ { + report, err := reader() + if err != nil { + t.Fatal(err) + } + fmt.Println(report.FilterSub("warzone0009")) + } } diff --git a/utils/log/survey/report.go b/utils/log/survey/report.go index 3cfac94..8b31bb7 100644 --- a/utils/log/survey/report.go +++ b/utils/log/survey/report.go @@ -1,6 +1,7 @@ package survey import ( + "fmt" "github.com/kercylan98/minotaur/utils/super" "strings" ) @@ -9,10 +10,24 @@ func newReport(analyzer *Analyzer) *Report { report := &Report{ analyzer: analyzer, Name: "ROOT", - Values: analyzer.v, + Values: make(map[string]any), Counter: analyzer.vc, Subs: make([]*Report, 0, len(analyzer.subs)), } + for k, v := range analyzer.v { + if format, exist := analyzer.format[k]; exist { + func() { + defer func() { + if err := recover(); err != nil { + panic(fmt.Errorf("format key[%s] value[%v] error: %v", k, v, err)) + } + }() + report.Values[k] = format(v) + }() + continue + } + report.Values[k] = v + } for k, v := range analyzer.subs { sub := newReport(v) sub.Name = k diff --git a/utils/log/survey/survey.go b/utils/log/survey/survey.go index 750c261..8fff7c5 100644 --- a/utils/log/survey/survey.go +++ b/utils/log/survey/survey.go @@ -127,7 +127,7 @@ func Close(names ...string) { // - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据 func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report { analyzer := new(Analyzer) - err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { + _, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { handle(analyzer, R(s)) }) if err != nil { @@ -141,7 +141,7 @@ func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R)) *Report { analyzer := new(Analyzer) for _, filePath := range filePaths { - err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { + _, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { handle(analyzer, R(s)) }) if err != nil { @@ -151,3 +151,19 @@ func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R)) return newReport(analyzer) } + +// IncrementAnalyze 增量分析,返回一个函数,每次调用该函数都会分析文件中新增的内容 +func IncrementAnalyze(filePath string, handle func(analyzer *Analyzer, record R)) func() (*Report, error) { + var analyzer = new(Analyzer) + var offset int64 + return func() (*Report, error) { + var err error + offset, err = file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { + handle(analyzer, R(s)) + }, offset) + if err != nil { + return nil, err + } + return newReport(analyzer), nil + } +} diff --git a/utils/str/str.go b/utils/str/str.go index 1f40bc0..ac016c8 100644 --- a/utils/str/str.go +++ b/utils/str/str.go @@ -1,6 +1,9 @@ package str -import "strings" +import ( + "slices" + "strings" +) const ( None = "" // 空字符串 @@ -194,3 +197,13 @@ func CamelStringBytes(str []byte) []byte { } return camelStr } + +// SortJoin 将多个字符串排序后拼接 +func SortJoin(delimiter string, s ...string) string { + var strList = make([]string, 0, len(s)) + for _, str := range s { + strList = append(strList, str) + } + slices.Sort(strList) + return strings.Join(strList, delimiter) +} diff --git a/utils/timer/scheduler.go b/utils/timer/scheduler.go index ffa6ee5..aac4f81 100644 --- a/utils/timer/scheduler.go +++ b/utils/timer/scheduler.go @@ -1,6 +1,7 @@ package timer import ( + "github.com/gorhill/cronexpr" "reflect" "sync" "time" @@ -26,6 +27,8 @@ type Scheduler struct { ticker *Ticker lock sync.RWMutex + + expr *cronexpr.Expression } // Name 获取调度器名称 @@ -38,9 +41,13 @@ func (slf *Scheduler) Next(prev time.Time) time.Time { slf.lock.RLock() defer slf.lock.RUnlock() - if slf.kill || (slf.total > 0 && slf.trigger > slf.total) { + if slf.kill || (slf.expr != nil && slf.total > 0 && slf.trigger > slf.total) { return time.Time{} } + if slf.expr != nil { + next := slf.expr.Next(prev) + return next + } if slf.trigger == 0 { slf.trigger++ return prev.Add(slf.after) diff --git a/utils/timer/ticker.go b/utils/timer/ticker.go index b3a4b1b..de59f93 100644 --- a/utils/timer/ticker.go +++ b/utils/timer/ticker.go @@ -1,6 +1,7 @@ package timer import ( + "github.com/gorhill/cronexpr" "reflect" "sync" "time" @@ -72,20 +73,33 @@ func (slf *Ticker) GetSchedulers() []string { return names } +// Cron 通过 cron 表达式设置一个调度器,当 cron 表达式错误时,将会引发 panic +func (slf *Ticker) Cron(name, expression string, handleFunc interface{}, args ...interface{}) { + expr := cronexpr.MustParse(expression) + slf.loop(name, 0, 0, expr, 0, handleFunc, args...) +} + // After 设置一个在特定时间后运行一次的调度器 func (slf *Ticker) After(name string, after time.Duration, handleFunc interface{}, args ...interface{}) { - slf.Loop(name, after, timingWheelTick, 1, handleFunc, args...) + slf.loop(name, after, timingWheelTick, nil, 1, handleFunc, args...) } // Loop 设置一个在特定时间后反复运行的调度器 func (slf *Ticker) Loop(name string, after, interval time.Duration, times int, handleFunc interface{}, args ...interface{}) { + slf.loop(name, after, interval, nil, times, handleFunc, args...) +} + +// Loop 设置一个在特定时间后反复运行的调度器 +func (slf *Ticker) loop(name string, after, interval time.Duration, expr *cronexpr.Expression, times int, handleFunc interface{}, args ...interface{}) { slf.StopTimer(name) - if after < timingWheelTick { - after = timingWheelTick - } - if interval < timingWheelTick { - interval = timingWheelTick + if expr == nil { + if after < timingWheelTick { + after = timingWheelTick + } + if interval < timingWheelTick { + interval = timingWheelTick + } } var values = make([]reflect.Value, len(args)) @@ -101,6 +115,7 @@ func (slf *Ticker) Loop(name string, after, interval time.Duration, times int, h cbFunc: reflect.ValueOf(handleFunc), cbArgs: values, ticker: slf, + expr: expr, } slf.lock.Lock() diff --git a/utils/timer/ticker_test.go b/utils/timer/ticker_test.go new file mode 100644 index 0000000..b568f13 --- /dev/null +++ b/utils/timer/ticker_test.go @@ -0,0 +1,25 @@ +package timer_test + +import ( + "github.com/kercylan98/minotaur/utils/timer" + "github.com/kercylan98/minotaur/utils/times" + "testing" + "time" +) + +func TestTicker_Cron(t *testing.T) { + ticker := timer.GetTicker(10) + ticker.After("1_sec", time.Second, func() { + t.Log(time.Now().Format(time.DateTime), "1_sec") + }) + + ticker.Loop("1_sec_loop_3", 0, time.Second, 3, func() { + t.Log(time.Now().Format(time.DateTime), "1_sec_loop_3") + }) + + ticker.Cron("5_sec_cron", "0/5 * * * * * ?", func() { + t.Log(time.Now().Format(time.DateTime), "5_sec_cron") + }) + + time.Sleep(times.Week) +}