Merge branch 'develop'

This commit is contained in:
kercylan98 2023-11-28 12:10:03 +08:00
commit a1ec303fc9
29 changed files with 508 additions and 298 deletions

View File

@ -78,7 +78,7 @@ import (
func main() {
srv := server.New(server.NetworkWebsocket)
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet server.Packet) {
srv.RegConnectionReceivePacketEvent(func(srv *server.Server, conn *server.Conn, packet []byte) {
conn.Write(packet)
})
if err := srv.Run(":9999"); err != nil {
@ -173,6 +173,7 @@ func main() {
wait.Wait()
}
```
在分布式环境中,如果存在类似于多服务器需要同时间刷新配置时,可使用`Cron`表达式设置定时任务。
### 流操作
可以通过 `stream` 包快速开启对`切片`和`map`的流式操作,例如:

View File

@ -1,14 +1,11 @@
package activity
import (
"errors"
"fmt"
"github.com/kercylan98/minotaur/utils/generic"
"github.com/kercylan98/minotaur/utils/timer"
"github.com/kercylan98/minotaur/utils/times"
"reflect"
"sync"
"sync/atomic"
"time"
)
@ -25,18 +22,12 @@ var (
ticker *timer.Ticker
tickerOnce sync.Once
isDebug atomic.Bool
)
func init() {
ticker = timer.GetTicker(10)
}
// SetDebug 设置是否开启调试模式
func SetDebug(d bool) {
isDebug.Store(d)
}
// SetTicker 设置自定义定时器,该方法必须在使用活动系统前调用,且只能调用一次
func SetTicker(size int, options ...timer.Option) {
tickerOnce.Do(func() {
@ -47,22 +38,31 @@ func SetTicker(size int, options ...timer.Option) {
})
}
// LoadGlobalData 加载所有活动全局数据
func LoadGlobalData(handler func(activityType, activityId, data any)) {
for _, f := range activityGlobalDataLoader {
f(handler)
}
}
// LoadEntityData 加载所有活动实体数据
func LoadEntityData(handler func(activityType, activityId, entityId, data any)) {
for _, f := range activityEntityDataLoader {
f(handler)
}
}
// LoadOrRefreshActivity 加载或刷新活动
// - 通常在活动配置刷新时候将活动通过该方法注册或刷新
func LoadOrRefreshActivity[Type, ID generic.Basic](activityType Type, activityId ID, options ...Option[Type, ID]) error {
register := getControllerRegister(activityType)
if register == nil {
return errors.New("activity type not registered")
func LoadOrRefreshActivity[Type, ID generic.Basic](activityType Type, activityId ID, options ...*Options) error {
register, exist := activityRegister[activityType]
if !exist {
return fmt.Errorf("activity type %v not registered, activity %v registration failed", activityType, activityId)
}
act, initFinishCallback := register(activityType, activityId)
activity := act.(*Activity[Type, ID])
for _, option := range options {
option(activity)
}
initFinishCallback(activity)
if !activity.tl.Check(true, stateLine...) {
return errors.New("activity state timeline is invalid")
activity := register(activityId, initOptions(options...)).(*Activity[Type, ID])
if !activity.options.Tl.Check(true, stateLine...) {
return fmt.Errorf("activity %v state timeline is invalid", activityId)
}
stateTrigger := map[byte]func(){
@ -71,14 +71,15 @@ func LoadOrRefreshActivity[Type, ID generic.Basic](activityType Type, activityId
stateEnded: func() { OnEndedEvent(activity); OnExtendedShowStartedEvent(activity) },
stateExtendedShowEnded: func() { OnExtendedShowEndedEvent(activity) },
}
for _, state := range stateLine {
if activity.tl.HasState(state) {
activity.tl.AddTriggerToState(state, stateTrigger[state])
if activity.options.Tl.HasState(state) {
activity.options.Tl.AddTriggerToState(state, stateTrigger[state])
continue
}
for next := state; next <= stateLine[len(stateLine)-1]; next++ {
if activity.tl.HasState(next) {
activity.tl.AddTriggerToState(next, stateTrigger[state])
if activity.options.Tl.HasState(next) {
activity.options.Tl.AddTriggerToState(next, stateTrigger[state])
break
}
}
@ -88,69 +89,55 @@ func LoadOrRefreshActivity[Type, ID generic.Basic](activityType Type, activityId
return nil
}
// LoadGlobalData 加载所有活动全局数据
// - 一般用于持久化活动数据
func LoadGlobalData(handler func(activityType, activityId, data any)) {
for _, f := range controllerGlobalDataReader {
f(handler)
}
}
// LoadEntityData 加载所有活动实体数据
// - 一般用于持久化活动数据
func LoadEntityData(handler func(activityType, activityId, entityId, data any)) {
for _, f := range controllerEntityDataReader {
f(handler)
}
}
// Activity 活动描述
type Activity[Type, ID generic.Basic] struct {
id ID // 活动 ID
t Type // 活动类型
state byte // 活动状态
tl *times.StateLine[byte] // 活动时间线
loop time.Duration // 活动多久循环一次
lazy bool // 是否懒加载
tickerKey string // 定时器 key
retention time.Duration // 活动数据保留时间
retentionKey string // 保留定时器 key
id ID // 活动 ID
t Type // 活动类型
options *Options // 活动选项
state byte // 活动状态
lazy bool // 是否懒加载
tickerKey string // 定时器 key
retention time.Duration // 活动数据保留时间
retentionKey string // 保留定时器 key
mutex sync.RWMutex
getLastNewDayTime func() time.Time // 获取上次新的一天的时间
setLastNewDayTime func(time.Time) // 设置上次新的一天的时间
clearData func() // 清理活动数据
initializeData func() // 初始化活动数据
}
func (slf *Activity[Type, ID]) refresh() {
slf.mutex.Lock()
defer slf.mutex.Unlock()
curr := time.Now()
if slf.state = slf.tl.GetStateByTime(curr); slf.state == stateUpcoming {
if slf.state = slf.options.Tl.GetStateByTime(curr); slf.state == stateUpcoming || (slf.state == stateStarted && !slf.options.Tl.HasState(stateUpcoming)) {
ticker.StopTimer(slf.retentionKey)
resetActivityData(slf.t, slf.id)
slf.initializeData()
}
for _, f := range slf.tl.GetTriggerByState(slf.state) {
for _, f := range slf.options.Tl.GetTriggerByState(slf.state) {
if f != nil {
f()
}
}
next := slf.tl.GetNextTimeByState(slf.state)
next := slf.options.Tl.GetNextTimeByState(slf.state)
if !next.IsZero() && next.After(curr) {
ticker.After(slf.tickerKey, next.Sub(curr)+time.Millisecond*100, slf.refresh)
} else {
ticker.StopTimer(slf.tickerKey)
ticker.StopTimer(fmt.Sprintf("activity:new_day:%d:%v", reflect.ValueOf(slf.t).Kind(), slf.id))
if slf.loop > 0 {
slf.tl.Move(slf.loop * 2)
ticker.After(slf.tickerKey, slf.loop+time.Millisecond*100, slf.refresh)
if slf.options.Loop > 0 {
slf.options.Tl.Move(slf.options.Loop * 2)
ticker.After(slf.tickerKey, slf.options.Loop+time.Millisecond*100, slf.refresh)
return
}
if slf.retention > 0 {
ticker.After(slf.tickerKey, slf.retention, func() {
ticker.StopTimer(slf.retentionKey)
resetActivityData(slf.t, slf.id)
slf.clearData()
})
}
}

View File

@ -16,8 +16,9 @@ type PlayerData struct {
}
func TestRegTypeByGlobalData(t *testing.T) {
controller := activity.DefineGlobalDataActivity[int, int, *ActivityData](1, func(activityId int, data *activity.DataMeta[*ActivityData]) {
data.Data.players = append(data.Data.players, "temp")
controller := activity.DefineGlobalDataActivity[int, int, *ActivityData](1).InitializeGlobalData(func(activityId int, data *activity.DataMeta[*ActivityData]) {
data.Data.players = []string{"1", "2", "3"}
})
activity.RegUpcomingEvent(1, func(activityId int) {
@ -48,12 +49,12 @@ func TestRegTypeByGlobalData(t *testing.T) {
now := time.Now()
if err := activity.LoadOrRefreshActivity(1, 1,
activity.WithUpcomingTime[int, int](now.Add(1*time.Second)),
activity.WithStartTime[int, int](now.Add(2*times.Second)),
activity.WithEndTime[int, int](now.Add(3*times.Second)),
activity.WithExtendedShowTime[int, int](now.Add(4*times.Second)),
activity.WithLoop[int, int](3*times.Second),
if err := activity.LoadOrRefreshActivity(1, 1, activity.NewOptions().
WithUpcomingTime(now.Add(1*time.Second)).
WithStartTime(now.Add(2*times.Second)).
WithEndTime(now.Add(3*times.Second)).
WithExtendedShowTime(now.Add(4*times.Second)).
WithLoop(3*times.Second),
); err != nil {
t.Fatal(err)
}

View File

@ -2,87 +2,87 @@ package activity
import (
"github.com/kercylan98/minotaur/utils/generic"
"reflect"
"sync"
)
type none byte
// DefineNoneDataActivity 声明无数据的活动类型
func DefineNoneDataActivity[Type, ID generic.Basic](activityType Type) NoneDataActivityController[Type, ID, none, none, none] {
return regController(activityType, &Controller[Type, ID, none, none, none]{
func DefineNoneDataActivity[Type, ID generic.Basic](activityType Type) NoneDataActivityController[Type, ID, *none, none, *none] {
return regController(&Controller[Type, ID, *none, none, *none]{
t: activityType,
})
}
// DefineGlobalDataActivity 声明拥有全局数据的活动类型
func DefineGlobalDataActivity[Type, ID generic.Basic, Data any](activityType Type, initializer func(activityId ID, data *DataMeta[Data])) GlobalDataActivityController[Type, ID, Data, none, none] {
return regController(activityType, &Controller[Type, ID, Data, none, none]{
t: activityType,
globalData: make(map[ID]*DataMeta[Data]),
globalInit: initializer,
func DefineGlobalDataActivity[Type, ID generic.Basic, Data any](activityType Type) GlobalDataActivityController[Type, ID, Data, none, *none] {
return regController(&Controller[Type, ID, Data, none, *none]{
t: activityType,
})
}
// DefineEntityDataActivity 声明拥有实体数据的活动类型
func DefineEntityDataActivity[Type, ID, EntityID generic.Basic, EntityData any](activityType Type, initializer func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) EntityDataActivityController[Type, ID, none, EntityID, EntityData] {
return regController(activityType, &Controller[Type, ID, none, EntityID, EntityData]{
t: activityType,
entityData: make(map[ID]map[EntityID]*EntityDataMeta[EntityData]),
entityInit: initializer,
func DefineEntityDataActivity[Type, ID, EntityID generic.Basic, EntityData any](activityType Type) EntityDataActivityController[Type, ID, *none, EntityID, EntityData] {
return regController(&Controller[Type, ID, *none, EntityID, EntityData]{
t: activityType,
})
}
// DefineGlobalAndEntityDataActivity 声明拥有全局数据和实体数据的活动类型
func DefineGlobalAndEntityDataActivity[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any](activityType Type, globalInitializer func(activityId ID, data *DataMeta[Data]), entityInitializer func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) GlobalAndEntityDataActivityController[Type, ID, Data, EntityID, EntityData] {
return regController(activityType, &Controller[Type, ID, Data, EntityID, EntityData]{
t: activityType,
globalData: make(map[ID]*DataMeta[Data]),
entityData: make(map[ID]map[EntityID]*EntityDataMeta[EntityData]),
globalInit: globalInitializer,
entityInit: entityInitializer,
func DefineGlobalAndEntityDataActivity[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any](activityType Type) GlobalAndEntityDataActivityController[Type, ID, Data, EntityID, EntityData] {
return regController(&Controller[Type, ID, Data, EntityID, EntityData]{
t: activityType,
})
}
// Controller 活动控制器
type Controller[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any] struct {
t Type // 活动类型
activities map[ID]*Activity[Type, ID] // 活动
globalInit func(activityId ID, data *DataMeta[Data]) // 全局初始化器
entityInit func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData]) // 实体初始化器
globalDataLoader func(activityId any) // 全局数据加载器
entityDataLoader func(activityId any, entityId any) // 实体数据加载器
globalInitializer func(activityType Type, activityId ID, data map[ID]*DataMeta[Data]) // 全局数据初始化器
entityInitializer func(activityType Type, activityId ID, data map[ID]*DataMeta[Data], entityData map[ID]map[EntityID]*EntityDataMeta[EntityData]) // 实体数据初始化器
globalData map[ID]*DataMeta[Data] // 全局数据
entityData map[ID]map[EntityID]*EntityDataMeta[EntityData] // 实体数据
mutex sync.RWMutex
t Type // 活动类型
activities map[ID]*Activity[Type, ID] // 活动列表
globalData map[ID]*DataMeta[Data] // 全局数据
entityData map[ID]map[EntityID]*EntityDataMeta[EntityData] // 实体数据
entityTof reflect.Type // 实体数据类型
globalInit func(activityId ID, data *DataMeta[Data]) // 全局数据初始化函数
entityInit func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData]) // 实体数据初始化函数
mutex sync.RWMutex
}
// GetGlobalData 获取特定活动全局数据
func (slf *Controller[Type, ID, Data, EntityID, EntityData]) GetGlobalData(activityId ID) Data {
slf.globalDataLoader(activityId)
slf.mutex.RLock()
defer slf.mutex.RUnlock()
return slf.globalData[activityId].Data
global := slf.globalData[activityId]
if slf.globalInit != nil {
global.once.Do(func() {
slf.globalInit(activityId, global)
})
}
return global.Data
}
// GetEntityData 获取特定活动实体数据
func (slf *Controller[Type, ID, Data, EntityID, EntityData]) GetEntityData(activityId ID, entityId EntityID) EntityData {
slf.entityDataLoader(activityId, entityId)
slf.mutex.RLock()
defer slf.mutex.RUnlock()
return slf.entityData[activityId][entityId].Data
}
// GetAllEntityData 获取特定活动所有实体数据
func (slf *Controller[Type, ID, Data, EntityID, EntityData]) GetAllEntityData(activityId ID) map[EntityID]EntityData {
var entities = make(map[EntityID]EntityData)
slf.mutex.RLock()
for k, v := range slf.entityData[activityId] {
entities[k] = v.Data
entities, exist := slf.entityData[activityId]
if !exist {
entities = make(map[EntityID]*EntityDataMeta[EntityData])
slf.entityData[activityId] = entities
}
slf.mutex.RUnlock()
return entities
entity, exist := entities[entityId]
if !exist {
entity = &EntityDataMeta[EntityData]{
Data: reflect.New(slf.entityTof).Interface().(EntityData),
}
entities[entityId] = entity
}
if slf.entityInit != nil {
entity.once.Do(func() {
slf.entityInit(activityId, entityId, entity)
})
}
return entity.Data
}
// IsOpen 活动是否开启
@ -108,7 +108,7 @@ func (slf *Controller[Type, ID, Data, EntityID, EntityData]) IsShow(activityId I
}
activity.mutex.RLock()
defer activity.mutex.RUnlock()
return activity.state == stateUpcoming || (activity.state == stateEnded && activity.tl.HasState(stateExtendedShowEnded))
return activity.state == stateUpcoming || (activity.state == stateEnded && activity.options.Tl.HasState(stateExtendedShowEnded))
}
// IsOpenOrShow 活动是否开启或展示
@ -122,7 +122,7 @@ func (slf *Controller[Type, ID, Data, EntityID, EntityData]) IsOpenOrShow(activi
activity.mutex.RLock()
defer activity.mutex.RUnlock()
return activity.state == stateStarted || activity.state == stateUpcoming || (activity.state == stateEnded && activity.tl.HasState(stateExtendedShowEnded))
return activity.state == stateStarted || activity.state == stateUpcoming || (activity.state == stateEnded && activity.options.Tl.HasState(stateExtendedShowEnded))
}
// Refresh 刷新活动
@ -135,3 +135,24 @@ func (slf *Controller[Type, ID, Data, EntityID, EntityData]) Refresh(activityId
}
activity.refresh()
}
func (slf *Controller[Type, ID, Data, EntityID, EntityData]) InitializeNoneData(handler func(activityId ID, data *DataMeta[Data])) NoneDataActivityController[Type, ID, Data, EntityID, EntityData] {
slf.globalInit = handler
return slf
}
func (slf *Controller[Type, ID, Data, EntityID, EntityData]) InitializeGlobalData(handler func(activityId ID, data *DataMeta[Data])) GlobalDataActivityController[Type, ID, Data, EntityID, EntityData] {
slf.globalInit = handler
return slf
}
func (slf *Controller[Type, ID, Data, EntityID, EntityData]) InitializeEntityData(handler func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) EntityDataActivityController[Type, ID, Data, EntityID, EntityData] {
slf.entityInit = handler
return slf
}
func (slf *Controller[Type, ID, Data, EntityID, EntityData]) InitializeGlobalAndEntityData(handler func(activityId ID, data *DataMeta[Data]), entityHandler func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) GlobalAndEntityDataActivityController[Type, ID, Data, EntityID, EntityData] {
slf.globalInit = handler
slf.entityInit = entityHandler
return slf
}

View File

@ -16,6 +16,11 @@ type BasicActivityController[Type, ID generic.Basic, Data any, EntityID generic.
// NoneDataActivityController 无数据活动控制器
type NoneDataActivityController[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any] interface {
BasicActivityController[Type, ID, Data, EntityID, EntityData]
// InitializeNoneData 初始化活动
// - 该函数提供了一个操作活动数据的入口,可以在该函数中对传入的活动数据进行初始化
//
// 对于无数据活动,该函数的意义在于,可以在该函数中对活动进行初始化,比如设置活动的状态等,虽然为无数据活动,但是例如活动本身携带的状态数据也是需要加载的
InitializeNoneData(handler func(activityId ID, data *DataMeta[Data])) NoneDataActivityController[Type, ID, Data, EntityID, EntityData]
}
// GlobalDataActivityController 全局数据活动控制器
@ -23,6 +28,9 @@ type GlobalDataActivityController[Type, ID generic.Basic, Data any, EntityID gen
BasicActivityController[Type, ID, Data, EntityID, EntityData]
// GetGlobalData 获取全局数据
GetGlobalData(activityId ID) Data
// InitializeGlobalData 初始化活动
// - 该函数提供了一个操作活动数据的入口,可以在该函数中对传入的活动数据进行初始化
InitializeGlobalData(handler func(activityId ID, data *DataMeta[Data])) GlobalDataActivityController[Type, ID, Data, EntityID, EntityData]
}
// EntityDataActivityController 实体数据活动控制器
@ -30,6 +38,9 @@ type EntityDataActivityController[Type, ID generic.Basic, Data any, EntityID gen
BasicActivityController[Type, ID, Data, EntityID, EntityData]
// GetEntityData 获取实体数据
GetEntityData(activityId ID, entityId EntityID) EntityData
// InitializeEntityData 初始化活动
// - 该函数提供了一个操作活动数据的入口,可以在该函数中对传入的活动数据进行初始化
InitializeEntityData(handler func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) EntityDataActivityController[Type, ID, Data, EntityID, EntityData]
}
// GlobalAndEntityDataActivityController 全局数据和实体数据活动控制器
@ -39,4 +50,7 @@ type GlobalAndEntityDataActivityController[Type, ID generic.Basic, Data any, Ent
GetGlobalData(activityId ID) Data
// GetEntityData 获取实体数据
GetEntityData(activityId ID, entityId EntityID) EntityData
// InitializeGlobalAndEntityData 初始化活动
// - 该函数提供了一个操作活动数据的入口,可以在该函数中对传入的活动数据进行初始化
InitializeGlobalAndEntityData(handler func(activityId ID, data *DataMeta[Data]), entityHandler func(activityId ID, entityId EntityID, data *EntityDataMeta[EntityData])) GlobalAndEntityDataActivityController[Type, ID, Data, EntityID, EntityData]
}

View File

@ -3,155 +3,105 @@ package activity
import (
"fmt"
"github.com/kercylan98/minotaur/utils/generic"
"github.com/kercylan98/minotaur/utils/hash"
"github.com/kercylan98/minotaur/utils/times"
"reflect"
"sync"
"time"
)
var (
controllers map[any]any // type -> controller (特定类型活动控制器)
controllerRegisters map[any]func(activityType, activityId any) (act any, optionInitCallback func(activity any)) // type -> register (控制活动注册到特定类型控制器的注册机)
controllerGlobalDataReader []func(handler func(activityType, activityId, data any)) // 活动全局数据读取器
controllerEntityDataReader []func(handler func(activityType, activityId, entityId, data any)) // 活动实体数据读取器
controllerReset map[any]func(activityId any) // type -> reset (活动数据重置器)
controllersLock sync.RWMutex
activityRegister map[any]func(activityId any, options *Options) (activity any) // type -> register (控制活动注册到特定类型控制器的注册机)
activityGlobalDataLoader []func(handler func(activityType, activityId, data any)) // 全局数据加载器
activityEntityDataLoader []func(handler func(activityType, activityId, entityId, data any)) // 实体数据加载器
)
func init() {
controllers = make(map[any]any)
controllerRegisters = make(map[any]func(activityType, activityId any) (act any, optionInitCallback func(activity any)))
controllerGlobalDataReader = make([]func(handler func(activityType, activityId, data any)), 0)
controllerEntityDataReader = make([]func(handler func(activityType, activityId, entityId, data any)), 0)
controllerReset = make(map[any]func(activityId any))
activityRegister = make(map[any]func(activityId any, options *Options) (activity any))
}
// regController 注册活动类型
func regController[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any](activityType Type, controller *Controller[Type, ID, Data, EntityID, EntityData]) *Controller[Type, ID, Data, EntityID, EntityData] {
controllersLock.Lock()
defer controllersLock.Unlock()
// regController 注册活动类型控制器
func regController[Type, ID generic.Basic, Data any, EntityID generic.Basic, EntityData any](controller *Controller[Type, ID, Data, EntityID, EntityData]) *Controller[Type, ID, Data, EntityID, EntityData] {
var entityZero EntityData
controller.activities = make(map[ID]*Activity[Type, ID])
controllerGlobalDataReader = append(controllerGlobalDataReader, func(handler func(activityType, activityId, data any)) {
controller.mutex.RLock()
defer controller.mutex.RUnlock()
for activityId, data := range controller.globalData {
handler(activityType, activityId, data)
}
})
controllerEntityDataReader = append(controllerEntityDataReader, func(handler func(activityType, activityId, entityId, data any)) {
controller.mutex.RLock()
defer controller.mutex.RUnlock()
for activityId, entities := range controller.entityData {
for entityId, data := range entities {
handler(activityType, activityId, entityId, data)
}
}
})
controller.globalData = make(map[ID]*DataMeta[Data])
controller.entityTof = reflect.TypeOf(entityZero)
if controller.entityTof.Kind() == reflect.Pointer {
controller.entityTof = controller.entityTof.Elem()
}
if controller.globalData != nil {
var d Data
var tof = reflect.TypeOf(d)
if tof.Kind() == reflect.Pointer {
tof = tof.Elem()
}
controller.globalDataLoader = func(activityId any) {
controller.mutex.Lock()
defer controller.mutex.Unlock()
var id = activityId.(ID)
if _, exist := controller.globalData[id]; exist {
return
}
data := &DataMeta[Data]{
Data: reflect.New(tof).Interface().(Data),
}
if controller.globalInit != nil {
controller.globalInit(id, data)
}
controller.globalData[id] = data
}
// 反射类型
var (
zero Data
tof = reflect.TypeOf(zero)
)
if tof.Kind() == reflect.Pointer {
tof = tof.Elem()
}
if controller.entityData != nil {
var d Data
var tof = reflect.TypeOf(d)
if tof.Kind() == reflect.Pointer {
tof = tof.Elem()
}
controller.entityDataLoader = func(activityId any, entityId any) {
controller.mutex.Lock()
defer controller.mutex.Unlock()
var id, eid = activityId.(ID), entityId.(EntityID)
entities, exist := controller.entityData[id]
if !exist {
entities = make(map[EntityID]*EntityDataMeta[EntityData])
controller.entityData[id] = entities
}
if _, exist = entities[eid]; exist {
return
}
data := &EntityDataMeta[EntityData]{
Data: reflect.New(tof).Interface().(EntityData),
}
if controller.entityInit != nil {
controller.entityInit(id, eid, data)
}
entities[eid] = data
}
}
controllers[activityType] = controller
controllerRegisters[activityType] = func(activityType, activityId any) (act any, optionInitCallback func(activity any)) {
var at, ai = activityType.(Type), activityId.(ID)
// 活动注册机(注册机内不加载活动数据,仅定义基本活动信息)
activityRegister[controller.t] = func(aid any, options *Options) any {
activityId := aid.(ID)
controller.mutex.Lock()
activity, exist := controller.activities[ai]
activity, exist := controller.activities[activityId]
if !exist {
activity = &Activity[Type, ID]{
t: at,
id: ai,
tl: times.NewStateLine[byte](stateClosed),
tickerKey: fmt.Sprintf("activity:%d:%v", reflect.ValueOf(at).Kind(), ai),
t: controller.t,
id: activityId,
options: options,
tickerKey: fmt.Sprintf("activity:%d:%v", reflect.ValueOf(controller.t).Kind(), activityId),
getLastNewDayTime: func() time.Time {
return controller.globalData[ai].LastNewDay
return controller.globalData[activityId].LastNewDay
},
setLastNewDayTime: func(t time.Time) {
controller.globalData[ai].LastNewDay = t
controller.globalData[activityId].LastNewDay = t
},
clearData: func() {
controller.mutex.Lock()
defer controller.mutex.Unlock()
delete(controller.globalData, activityId)
delete(controller.entityData, activityId)
},
initializeData: func() {
controller.mutex.Lock()
defer controller.mutex.Unlock()
controller.globalData[activityId] = &DataMeta[Data]{
Data: reflect.New(tof).Interface().(Data),
}
if controller.entityData == nil {
controller.entityData = make(map[ID]map[EntityID]*EntityDataMeta[EntityData])
}
controller.entityData[activityId] = make(map[EntityID]*EntityDataMeta[EntityData])
},
}
if activity.options == nil {
activity.options = NewOptions()
}
if activity.options.Tl == nil || activity.options.Tl.GetStateCount() == 0 {
activity.options.Tl = times.NewStateLine[byte](stateClosed)
}
controller.activities[activityId] = activity
}
controller.mutex.Unlock()
controller.activities[activity.id] = activity
return activity, func(activity any) {
act := activity.(*Activity[Type, ID])
if !act.lazy {
controller.GetGlobalData(ai)
// 全局数据加载器
activityGlobalDataLoader = append(activityGlobalDataLoader, func(handler func(activityType any, activityId any, data any)) {
controller.mutex.RLock()
data := controller.globalData[activityId]
controller.mutex.RUnlock()
handler(controller.t, activityId, data)
})
// 实体数据加载器
activityEntityDataLoader = append(activityEntityDataLoader, func(handler func(activityType any, activityId any, entityId any, data any)) {
controller.mutex.RLock()
entities := hash.Copy(controller.entityData[activityId])
controller.mutex.RUnlock()
for entityId, data := range entities {
handler(controller.t, activityId, entityId, data)
}
if act.retention > 0 {
act.retentionKey = fmt.Sprintf("%s:retention", act.tickerKey)
}
}
}
controllerReset[activityType] = func(activityId any) {
var id = activityId.(ID)
controller.mutex.Lock()
defer controller.mutex.Unlock()
delete(controller.globalData, id)
delete(controller.entityData, id)
})
return activity
}
return controller
}
// getControllerRegister 获取活动类型注册机
func getControllerRegister[Type generic.Basic](activityType Type) func(activityType, activityId any) (act any, optionInitCallback func(activity any)) {
controllersLock.RLock()
defer controllersLock.RUnlock()
return controllerRegisters[activityType]
}
// resetActivityData 重置活动数据
func resetActivityData[Type, ID generic.Basic](activityType Type, activityId ID) {
controllersLock.RLock()
defer controllersLock.RUnlock()
reset, exist := controllerReset[activityType]
if !exist {
return
}
reset(activityId)
}

View File

@ -1,19 +1,20 @@
package activity
import "time"
import (
"sync"
"time"
)
// DataMeta 全局活动数据
type DataMeta[Data any] struct {
Start time.Time `json:"start,omitempty"` // 活动开始时间
End time.Time `json:"end,omitempty"` // 活动结束时间
Data Data `json:"data,omitempty"` // 活动数据
LastNewDay time.Time `json:"lastNewDay,omitempty"` // 上次跨天时间
once sync.Once
Data Data `json:"data,omitempty"` // 活动数据
LastNewDay time.Time `json:"last_new_day,omitempty"` // 上次跨天时间
}
// EntityDataMeta 活动实体数据
type EntityDataMeta[Data any] struct {
Start time.Time `json:"start,omitempty"` // 对象参与活动时间
End time.Time `json:"end,omitempty"` // 对象结束活动时间
Data Data `json:"data,omitempty"` // 活动数据
LastNewDay time.Time `json:"lastNewDay,omitempty"` // 上次跨天时间
once sync.Once
Data Data `json:"data,omitempty"` // 活动数据
LastNewDay time.Time `json:"last_new_day,omitempty"` // 上次跨天时间
}

View File

@ -0,0 +1,24 @@
package activities
import (
"github.com/kercylan98/minotaur/game/activity"
"github.com/kercylan98/minotaur/game/activity/internal/example/types"
"github.com/kercylan98/minotaur/utils/super"
"time"
)
var (
DemoActivity = activity.DefineEntityDataActivity[int, int, string, *types.DemoActivityData](1).InitializeEntityData(func(activityId int, entityId string, data *activity.EntityDataMeta[*types.DemoActivityData]) {
// 模拟数据库加载
_ = super.UnmarshalJSON([]byte(`{"last_new_day": "2021-01-01 00:00:00", "data": {"login_num": 3}}`), data)
})
)
func init() {
// 模拟配置加载活动
if err := activity.LoadOrRefreshActivity(1, 1, activity.NewOptions().
WithStartTime(time.Now().Add(time.Second*3)),
); err != nil {
panic(err)
}
}

View File

@ -0,0 +1,15 @@
package demoactivity
import (
"github.com/kercylan98/minotaur/game/activity"
"github.com/kercylan98/minotaur/game/activity/internal/example/activities"
"github.com/kercylan98/minotaur/utils/log"
)
func init() {
activity.RegStartedEvent(1, onActivityStart)
}
func onActivityStart(id int) {
log.Info("activity start", log.Int("id", id), log.Any("entity", activities.DemoActivity.GetEntityData(id, "demo_entity")))
}

View File

@ -0,0 +1,10 @@
package main
import (
_ "github.com/kercylan98/minotaur/game/activity/internal/example/activities/demoactivity"
"time"
)
func main() {
time.Sleep(time.Second * 5)
}

View File

@ -0,0 +1,5 @@
package types
type DemoActivityData struct {
LoginNum int `json:"login_num"` // 登录次数
}

View File

@ -1,56 +1,72 @@
package activity
import (
"github.com/kercylan98/minotaur/utils/generic"
"github.com/kercylan98/minotaur/utils/times"
"time"
)
// Option 活动选项
type Option[Type, ID generic.Basic] func(*Activity[Type, ID])
// NewOptions 创建活动选项
func NewOptions() *Options {
return new(Options)
}
// initOptions 初始化活动选项
func initOptions(opts ...*Options) *Options {
var opt *Options
if len(opts) > 0 {
opt = opts[0]
}
if opt == nil {
opt = NewOptions()
}
return opt
}
// Options 活动选项
type Options struct {
Tl *times.StateLine[byte] // 活动时间线
Loop time.Duration // 活动循环,时间间隔小于等于 0 表示不循环
}
// WithUpcomingTime 设置活动预告时间
func WithUpcomingTime[Type, ID generic.Basic](t time.Time) Option[Type, ID] {
return func(activity *Activity[Type, ID]) {
activity.tl.AddState(stateUpcoming, t)
func (slf *Options) WithUpcomingTime(t time.Time) *Options {
if slf.Tl == nil {
slf.Tl = times.NewStateLine[byte](stateClosed)
}
slf.Tl.AddState(stateUpcoming, t)
return slf
}
// WithStartTime 设置活动开始时间
func WithStartTime[Type, ID generic.Basic](t time.Time) Option[Type, ID] {
return func(activity *Activity[Type, ID]) {
activity.tl.AddState(stateStarted, t)
func (slf *Options) WithStartTime(t time.Time) *Options {
if slf.Tl == nil {
slf.Tl = times.NewStateLine[byte](stateClosed)
}
slf.Tl.AddState(stateStarted, t)
return slf
}
// WithEndTime 设置活动结束时间
func WithEndTime[Type, ID generic.Basic](t time.Time) Option[Type, ID] {
return func(activity *Activity[Type, ID]) {
activity.tl.AddState(stateEnded, t)
func (slf *Options) WithEndTime(t time.Time) *Options {
if slf.Tl == nil {
slf.Tl = times.NewStateLine[byte](stateClosed)
}
slf.Tl.AddState(stateEnded, t)
return slf
}
// WithExtendedShowTime 设置延长展示时间
func WithExtendedShowTime[Type, ID generic.Basic](t time.Time) Option[Type, ID] {
return func(activity *Activity[Type, ID]) {
activity.tl.AddState(stateExtendedShowEnded, t)
func (slf *Options) WithExtendedShowTime(t time.Time) *Options {
if slf.Tl == nil {
slf.Tl = times.NewStateLine[byte](stateClosed)
}
slf.Tl.AddState(stateExtendedShowEnded, t)
return slf
}
// WithLoop 设置活动循环,时间间隔小于等于 0 表示不循环
// - 当活动状态展示结束后,会根据该选项设置的时间间隔重新开始
func WithLoop[Type, ID generic.Basic](interval time.Duration) Option[Type, ID] {
return func(activity *Activity[Type, ID]) {
if interval <= 0 {
interval = 0
}
activity.loop = interval
}
}
// WithLazy 设置活动数据懒加载
// - 该选项仅用于全局数据,默认情况下,活动全局数据会在活动注册时候加载,如果设置了该选项,则会在第一次获取数据时候加载
func WithLazy[Type, ID generic.Basic](lazy bool) Option[Type, ID] {
return func(activity *Activity[Type, ID]) {
activity.lazy = lazy
}
func (slf *Options) WithLoop(interval time.Duration) *Options {
slf.Loop = interval
return slf
}

1
go.mod
View File

@ -36,6 +36,7 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gopherjs/gopherjs v1.17.2 // indirect
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/jtolds/gls v4.20.0+incompatible // indirect

4
go.sum
View File

@ -76,6 +76,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g=
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
@ -132,6 +134,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=

View File

@ -296,6 +296,9 @@ func (slf *Conn) init() {
},
)
slf.loop = writeloop.NewWriteLoop[*connPacket](slf.pool, func(data *connPacket) error {
if slf.server.runtime.packetWarnSize > 0 && len(data.packet) > slf.server.runtime.packetWarnSize {
log.Warn("Conn.Write", log.String("State", "PacketWarn"), log.String("Reason", "PacketSize"), log.String("ID", slf.GetID()), log.Int("PacketSize", len(data.packet)))
}
var err error
if slf.delay > 0 || slf.fluctuation > 0 {
time.Sleep(random.Duration(int64(slf.delay-slf.fluctuation), int64(slf.delay+slf.fluctuation)))

View File

@ -25,6 +25,7 @@ const (
DefaultMessageBufferSize = 1024
DefaultAsyncPoolSize = 256
DefaultWebsocketReadDeadline = 30 * time.Second
DefaultPacketWarnSize = 1024 * 1024 * 1 // 1MB
)
const (

View File

@ -239,6 +239,9 @@ func (slf *event) RegConnectionReceivePacketEvent(handle ConnectionReceivePacket
}
func (slf *event) OnConnectionReceivePacketEvent(conn *Conn, packet []byte) {
if slf.Server.runtime.packetWarnSize > 0 && len(packet) > slf.Server.runtime.packetWarnSize {
log.Warn("Server", log.String("OnConnectionReceivePacketEvent", fmt.Sprintf("packet size %d > %d", len(packet), slf.Server.runtime.packetWarnSize)))
}
slf.connectionReceivePacketEventHandles.RangeValue(func(index int, value ConnectionReceivePacketEventHandle) bool {
value(slf.Server, conn, packet)
return true

View File

@ -41,6 +41,21 @@ type runtime struct {
websocketWriteCompression bool // websocket写入压缩
limitLife time.Duration // 限制最大生命周期
shuntMatcher func(conn *Conn) string // 分流匹配器
packetWarnSize int // 数据包大小警告
}
// WithPacketWarnSize 通过数据包大小警告的方式创建服务器,当数据包大小超过指定大小时,将会输出 WARN 类型的日志
// - 默认值为 DefaultPacketWarnSize
// - 当 size <= 0 时,表示不设置警告
func WithPacketWarnSize(size int) Option {
return func(srv *Server) {
if size <= 0 {
srv.packetWarnSize = 0
log.Info("WithPacketWarnSize", log.String("State", "Ignore"), log.String("Reason", "size <= 0"))
return
}
srv.packetWarnSize = size
}
}
// WithShunt 通过连接数据包分流的方式创建服务器

View File

@ -34,6 +34,7 @@ func New(network Network, options ...Option) *Server {
server := &Server{
runtime: &runtime{
messagePoolSize: DefaultMessageBufferSize,
packetWarnSize: DefaultPacketWarnSize,
},
option: &option{},
network: network,

View File

@ -2,6 +2,7 @@ package file
import (
"bufio"
"github.com/kercylan98/minotaur/utils/slice"
"io"
"os"
"path/filepath"
@ -149,25 +150,35 @@ func Paths(dir string) []string {
return paths
}
// ReadLineWithParallel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理,当过程中如果产生错误则会发生 panic过程前发生错误将会返回 error
// ReadLineWithParallelByChannel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理
// - 由于是并行处理,所以处理过程中的顺序是不确定的。
func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string)) error {
// - 可通过 start 参数指定开始读取的位置,如果不指定则从文件开头开始读取。
func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string), start ...int64) (n int64, err error) {
file, err := os.Open(filename)
offset := slice.GetValue(start, 0)
if err != nil {
return err
return offset, err
}
defer func() {
_ = file.Close()
}()
chunks := FindLineChunks(file, chunkSize)
chunks := FindLineChunksByOffset(file, offset, chunkSize)
var end int64
var endMutex sync.Mutex
var wg sync.WaitGroup
for _, chunk := range chunks {
wg.Add(1)
go func(chunk [2]int64) {
defer wg.Done()
r := io.NewSectionReader(file, chunk[0], chunk[1]-chunk[0])
endMutex.Lock()
e := chunk[1] - chunk[0]
if e > end {
end = e + 1
}
endMutex.Unlock()
r := io.NewSectionReader(file, chunk[0], e)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
@ -175,12 +186,12 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str
}
if err := scanner.Err(); err != nil {
panic(err)
return
}
}(chunk)
}
wg.Wait()
return nil
return end, nil
}
// FindLineChunks 查找文件按照每行划分的分块,每个分块的大小将在 chunkSize 和分割后的分块距离行首及行尾的距离中范围内
@ -188,6 +199,11 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str
// - 当过程中发生错误将会发生 panic
// - 返回值的成员是一个长度为 2 的数组,第一个元素是分块的起始位置,第二个元素是分块的结束位置
func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 {
return FindLineChunksByOffset(file, 0, chunkSize)
}
// FindLineChunksByOffset 该函数与 FindLineChunks 类似,不同的是该函数可以指定 offset 从指定位置开始读取文件
func FindLineChunksByOffset(file *os.File, offset, chunkSize int64) [][2]int64 {
var chunks [][2]int64
fileSize, err := file.Seek(0, io.SeekEnd)
@ -199,7 +215,7 @@ func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 {
panic(err)
}
currentPos := int64(0)
currentPos := offset
for currentPos < fileSize {
start := currentPos
if start != 0 { // 不是文件的开头

View File

@ -5,6 +5,7 @@ import (
"github.com/kercylan98/minotaur/utils/file"
"strings"
"testing"
"time"
)
func TestFilePaths(t *testing.T) {
@ -20,3 +21,15 @@ func TestFilePaths(t *testing.T) {
}
fmt.Println("total line:", line, "total file:", fileCount)
}
func TestNewIncrementReader(t *testing.T) {
n, _ := file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) {
t.Log(s)
})
time.Sleep(time.Second * 3)
n, _ = file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) {
t.Log(s)
}, n)
}

View File

@ -12,6 +12,7 @@ type Analyzer struct {
vc map[string]int64 // 记录了每个 key 生效的计数数量
repeat map[string]struct{} // 去重信息
subs map[string]*Analyzer
format map[string]func(v any) any // 格式化函数
m sync.Mutex
}
@ -30,6 +31,16 @@ func (slf *Analyzer) Sub(key string) *Analyzer {
return sub
}
// SetFormat 设置格式化函数
func (slf *Analyzer) SetFormat(key string, format func(v any) any) {
slf.m.Lock()
defer slf.m.Unlock()
if slf.format == nil {
slf.format = make(map[string]func(v any) any)
}
slf.format[key] = format
}
// SetValueIfGreaterThan 设置指定 key 的值,当新值大于旧值时
// - 当已有值不为 float64 时,将会被忽略
func (slf *Analyzer) SetValueIfGreaterThan(key string, value float64) {

View File

@ -6,10 +6,10 @@ import (
"testing"
)
func TestClose(t *testing.T) {
func TestIncrementAnalyze(t *testing.T) {
path := `./test/day.2023-09-06.log`
report := survey.Analyze(path, func(analyzer *survey.Analyzer, record survey.R) {
reader := survey.IncrementAnalyze(path, func(analyzer *survey.Analyzer, record survey.R) {
switch record.GetString("type") {
case "open_conn":
analyzer.IncreaseValueNonRepeat("开播人数", record, 1, "live_id")
@ -44,5 +44,11 @@ func TestClose(t *testing.T) {
}
})
fmt.Println(report.FilterSub("warzone0009"))
for i := 0; i < 10; i++ {
report, err := reader()
if err != nil {
t.Fatal(err)
}
fmt.Println(report.FilterSub("warzone0009"))
}
}

View File

@ -1,6 +1,7 @@
package survey
import (
"fmt"
"github.com/kercylan98/minotaur/utils/super"
"strings"
)
@ -9,10 +10,24 @@ func newReport(analyzer *Analyzer) *Report {
report := &Report{
analyzer: analyzer,
Name: "ROOT",
Values: analyzer.v,
Values: make(map[string]any),
Counter: analyzer.vc,
Subs: make([]*Report, 0, len(analyzer.subs)),
}
for k, v := range analyzer.v {
if format, exist := analyzer.format[k]; exist {
func() {
defer func() {
if err := recover(); err != nil {
panic(fmt.Errorf("format key[%s] value[%v] error: %v", k, v, err))
}
}()
report.Values[k] = format(v)
}()
continue
}
report.Values[k] = v
}
for k, v := range analyzer.subs {
sub := newReport(v)
sub.Name = k

View File

@ -127,7 +127,7 @@ func Close(names ...string) {
// - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据
func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report {
analyzer := new(Analyzer)
err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
_, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(analyzer, R(s))
})
if err != nil {
@ -141,7 +141,7 @@ func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report
func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R)) *Report {
analyzer := new(Analyzer)
for _, filePath := range filePaths {
err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
_, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(analyzer, R(s))
})
if err != nil {
@ -151,3 +151,19 @@ func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R))
return newReport(analyzer)
}
// IncrementAnalyze 增量分析,返回一个函数,每次调用该函数都会分析文件中新增的内容
func IncrementAnalyze(filePath string, handle func(analyzer *Analyzer, record R)) func() (*Report, error) {
var analyzer = new(Analyzer)
var offset int64
return func() (*Report, error) {
var err error
offset, err = file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(analyzer, R(s))
}, offset)
if err != nil {
return nil, err
}
return newReport(analyzer), nil
}
}

View File

@ -1,6 +1,9 @@
package str
import "strings"
import (
"slices"
"strings"
)
const (
None = "" // 空字符串
@ -194,3 +197,13 @@ func CamelStringBytes(str []byte) []byte {
}
return camelStr
}
// SortJoin 将多个字符串排序后拼接
func SortJoin(delimiter string, s ...string) string {
var strList = make([]string, 0, len(s))
for _, str := range s {
strList = append(strList, str)
}
slices.Sort(strList)
return strings.Join(strList, delimiter)
}

View File

@ -1,6 +1,7 @@
package timer
import (
"github.com/gorhill/cronexpr"
"reflect"
"sync"
"time"
@ -26,6 +27,8 @@ type Scheduler struct {
ticker *Ticker
lock sync.RWMutex
expr *cronexpr.Expression
}
// Name 获取调度器名称
@ -38,9 +41,13 @@ func (slf *Scheduler) Next(prev time.Time) time.Time {
slf.lock.RLock()
defer slf.lock.RUnlock()
if slf.kill || (slf.total > 0 && slf.trigger > slf.total) {
if slf.kill || (slf.expr != nil && slf.total > 0 && slf.trigger > slf.total) {
return time.Time{}
}
if slf.expr != nil {
next := slf.expr.Next(prev)
return next
}
if slf.trigger == 0 {
slf.trigger++
return prev.Add(slf.after)

View File

@ -1,6 +1,7 @@
package timer
import (
"github.com/gorhill/cronexpr"
"reflect"
"sync"
"time"
@ -72,20 +73,33 @@ func (slf *Ticker) GetSchedulers() []string {
return names
}
// Cron 通过 cron 表达式设置一个调度器,当 cron 表达式错误时,将会引发 panic
func (slf *Ticker) Cron(name, expression string, handleFunc interface{}, args ...interface{}) {
expr := cronexpr.MustParse(expression)
slf.loop(name, 0, 0, expr, 0, handleFunc, args...)
}
// After 设置一个在特定时间后运行一次的调度器
func (slf *Ticker) After(name string, after time.Duration, handleFunc interface{}, args ...interface{}) {
slf.Loop(name, after, timingWheelTick, 1, handleFunc, args...)
slf.loop(name, after, timingWheelTick, nil, 1, handleFunc, args...)
}
// Loop 设置一个在特定时间后反复运行的调度器
func (slf *Ticker) Loop(name string, after, interval time.Duration, times int, handleFunc interface{}, args ...interface{}) {
slf.loop(name, after, interval, nil, times, handleFunc, args...)
}
// Loop 设置一个在特定时间后反复运行的调度器
func (slf *Ticker) loop(name string, after, interval time.Duration, expr *cronexpr.Expression, times int, handleFunc interface{}, args ...interface{}) {
slf.StopTimer(name)
if after < timingWheelTick {
after = timingWheelTick
}
if interval < timingWheelTick {
interval = timingWheelTick
if expr == nil {
if after < timingWheelTick {
after = timingWheelTick
}
if interval < timingWheelTick {
interval = timingWheelTick
}
}
var values = make([]reflect.Value, len(args))
@ -101,6 +115,7 @@ func (slf *Ticker) Loop(name string, after, interval time.Duration, times int, h
cbFunc: reflect.ValueOf(handleFunc),
cbArgs: values,
ticker: slf,
expr: expr,
}
slf.lock.Lock()

View File

@ -0,0 +1,25 @@
package timer_test
import (
"github.com/kercylan98/minotaur/utils/timer"
"github.com/kercylan98/minotaur/utils/times"
"testing"
"time"
)
func TestTicker_Cron(t *testing.T) {
ticker := timer.GetTicker(10)
ticker.After("1_sec", time.Second, func() {
t.Log(time.Now().Format(time.DateTime), "1_sec")
})
ticker.Loop("1_sec_loop_3", 0, time.Second, 3, func() {
t.Log(time.Now().Format(time.DateTime), "1_sec_loop_3")
})
ticker.Cron("5_sec_cron", "0/5 * * * * * ?", func() {
t.Log(time.Now().Format(time.DateTime), "5_sec_cron")
})
time.Sleep(times.Week)
}