数据上报功能封装
This commit is contained in:
parent
e026d38b0c
commit
4f5327a410
|
@ -0,0 +1,72 @@
|
|||
package reporter
|
||||
|
||||
type BuriedHitHandle[Data any] func(actions *HitOperationSet[Data])
|
||||
type BuriedReportingHandle[Data any] func(name string, data Data) error
|
||||
|
||||
// NewBuried 新建数据埋点
|
||||
// - 通常数据埋点应该在全局创建唯一埋点实例进行使用
|
||||
// - 需要指定数据埋点名称,由 reporter 包中的全局上报器统一调度上报行为
|
||||
// - hitHandle:当埋点命中时将执行该函数,该函数适用于数据处理及持久化等行为,不应该在该函数中执行上报操作
|
||||
// - reportingHandle:当数据上报时将执行该函数
|
||||
//
|
||||
// 数据埋点必须注册后才会生效
|
||||
func NewBuried[Data any](name string, hitHandle BuriedHitHandle[Data], reportingHandle BuriedReportingHandle[Data], options ...BuriedOption[Data]) *Buried[Data] {
|
||||
if hitHandle == nil {
|
||||
panic("data buried point without hit processing function")
|
||||
}
|
||||
if reportingHandle == nil {
|
||||
panic("data buried point without reporting processing function")
|
||||
}
|
||||
buried := &Buried[Data]{
|
||||
name: name,
|
||||
operationSet: &HitOperationSet[Data]{},
|
||||
hitHandle: hitHandle,
|
||||
reportingHandle: reportingHandle,
|
||||
}
|
||||
for _, option := range options {
|
||||
option(buried)
|
||||
}
|
||||
return buried
|
||||
}
|
||||
|
||||
// Buried 埋点
|
||||
type Buried[Data any] struct {
|
||||
name string
|
||||
operationSet *HitOperationSet[Data]
|
||||
hitHandle BuriedHitHandle[Data]
|
||||
reportingHandle BuriedReportingHandle[Data]
|
||||
errorHandle func(buried *Buried[Data], err error)
|
||||
}
|
||||
|
||||
// GetName 获取埋点数据名称
|
||||
func (slf *Buried[Data]) GetName() string {
|
||||
return slf.name
|
||||
}
|
||||
|
||||
// Hit 命中数据埋点
|
||||
func (slf *Buried[Data]) Hit(data Data) {
|
||||
slf.operationSet.input = data
|
||||
slf.hitHandle(slf.operationSet)
|
||||
}
|
||||
|
||||
// Reporting 上报数据埋点
|
||||
func (slf *Buried[Data]) Reporting() error {
|
||||
return slf.reportingHandle(slf.name, slf.operationSet.GetData())
|
||||
}
|
||||
|
||||
// Register 注册到上报器
|
||||
// - 注册数据埋点必须要指定最少一个策略
|
||||
// - 相同策略可注册多个( ReportStrategyInstantly 除外)
|
||||
func (slf *Buried[Data]) Register(strategies ...ReportStrategy[Data]) *Buried[Data] {
|
||||
if len(strategies) == 0 {
|
||||
panic("invalid data buried without any strategy")
|
||||
}
|
||||
if registerBuried.Exist(slf.name) {
|
||||
panic("repeated data buried point registration")
|
||||
}
|
||||
registerBuried.Set(slf.name, true)
|
||||
for _, strategy := range strategies {
|
||||
strategy(slf)
|
||||
}
|
||||
return slf
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package reporter
|
||||
|
||||
// HitOperationSet 命中操作集
|
||||
type HitOperationSet[Data any] struct {
|
||||
input Data
|
||||
data Data
|
||||
}
|
||||
|
||||
// GetInput 获取新输入的数据
|
||||
func (slf *HitOperationSet[Data]) GetInput() Data {
|
||||
return slf.input
|
||||
}
|
||||
|
||||
// GetData 获取当前数据
|
||||
func (slf *HitOperationSet[Data]) GetData() Data {
|
||||
return slf.data
|
||||
}
|
||||
|
||||
// SetData 设置当前数据
|
||||
func (slf *HitOperationSet[Data]) SetData(data Data) {
|
||||
slf.data = data
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package reporter
|
||||
|
||||
type BuriedOption[Data any] func(buried *Buried[Data])
|
||||
|
||||
// WithErrorHandle 通过包含上报错误处理函数的方式创建数据埋点
|
||||
func WithErrorHandle[Data any](handle func(buried *Buried[Data], err error)) BuriedOption[Data] {
|
||||
return func(buried *Buried[Data]) {
|
||||
buried.errorHandle = handle
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package reporter
|
||||
|
||||
import (
|
||||
"github.com/kercylan98/minotaur/utils/synchronization"
|
||||
"github.com/kercylan98/minotaur/utils/timer"
|
||||
)
|
||||
|
||||
var (
|
||||
ticker *timer.Ticker // 定时器
|
||||
tickerIsDefault bool
|
||||
registerBuried *synchronization.Map[string, bool] // 已注册的数据埋点
|
||||
disableBuried *synchronization.Map[string, bool] // 已排除的数据埋点
|
||||
)
|
||||
|
||||
func init() {
|
||||
ticker = timer.GetTicker(50)
|
||||
tickerIsDefault = true
|
||||
registerBuried = synchronization.NewMap[string, bool]()
|
||||
disableBuried = synchronization.NewMap[string, bool]()
|
||||
}
|
||||
|
||||
// UseTicker 使用特定定时器取代默认上报定时器
|
||||
func UseTicker(t *timer.Ticker) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
if tickerIsDefault {
|
||||
tickerIsDefault = false
|
||||
ticker.Release()
|
||||
ticker = t
|
||||
}
|
||||
}
|
||||
|
||||
// DisableBuried 禁用特定数据埋点
|
||||
func DisableBuried[Data any](buried *Buried[Data]) {
|
||||
DisableBuriedWithName(buried.GetName())
|
||||
}
|
||||
|
||||
// DisableBuriedWithName 禁用特定名称的数据埋点
|
||||
func DisableBuriedWithName(name string) {
|
||||
disableBuried.Set(name, true)
|
||||
}
|
||||
|
||||
// EnableBuried 启用特定数据埋点
|
||||
func EnableBuried[Data any](buried *Buried[Data]) {
|
||||
EnableBuriedWithName(buried.GetName())
|
||||
}
|
||||
|
||||
// EnableBuriedWithName 启用特定名称的数据埋点
|
||||
func EnableBuriedWithName(name string) {
|
||||
disableBuried.Delete(name)
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
package reporter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/utils/timer"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ReportStrategy[Data any] func(buried *Buried[Data])
|
||||
|
||||
// ReportStrategyInstantly 命中时将立即上报
|
||||
func ReportStrategyInstantly[Data any]() ReportStrategy[Data] {
|
||||
return func(buried *Buried[Data]) {
|
||||
oldHitHandle := buried.hitHandle
|
||||
buried.hitHandle = func(actions *HitOperationSet[Data]) {
|
||||
oldHitHandle(actions)
|
||||
ticker.After(fmt.Sprintf("ReportStrategyInstantly_%s", buried.GetName()), timer.Instantly, func() {
|
||||
if disableBuried.Get(buried.GetName()) {
|
||||
return
|
||||
}
|
||||
if err := buried.Reporting(); err != nil {
|
||||
if buried.errorHandle != nil {
|
||||
buried.errorHandle(buried, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ReportStrategyAfter 命中后一段时间后上报
|
||||
// - 该模式下如果连续在时间范围内命中,将仅上报最后一次结果
|
||||
func ReportStrategyAfter[Data any](t time.Duration) ReportStrategy[Data] {
|
||||
return func(buried *Buried[Data]) {
|
||||
oldHitHandle := buried.hitHandle
|
||||
buried.hitHandle = func(actions *HitOperationSet[Data]) {
|
||||
oldHitHandle(actions)
|
||||
ticker.After(fmt.Sprintf("ReportStrategyAfter_%s_%d", buried.GetName(), t.Milliseconds()), t, func() {
|
||||
if disableBuried.Get(buried.GetName()) {
|
||||
return
|
||||
}
|
||||
if err := buried.Reporting(); err != nil {
|
||||
if buried.errorHandle != nil {
|
||||
buried.errorHandle(buried, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ReportStrategyLoop 循环上报
|
||||
func ReportStrategyLoop[Data any](t time.Duration) ReportStrategy[Data] {
|
||||
return func(buried *Buried[Data]) {
|
||||
ticker.Loop(fmt.Sprintf("ReportStrategyLoop_%s_%d", buried.GetName(), t.Milliseconds()), timer.Instantly, t, timer.Forever, func() {
|
||||
if disableBuried.Get(buried.GetName()) {
|
||||
return
|
||||
}
|
||||
if err := buried.Reporting(); err != nil {
|
||||
if buried.errorHandle != nil {
|
||||
buried.errorHandle(buried, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ReportStrategyFixedTime 将在每天的固定时间上报
|
||||
func ReportStrategyFixedTime[Data any](hour, min, sec int) ReportStrategy[Data] {
|
||||
return func(buried *Buried[Data]) {
|
||||
now := time.Now()
|
||||
current := now.Unix()
|
||||
next := time.Date(now.Year(), now.Month(), now.Day(), hour, min, sec, 0, now.Location())
|
||||
target := next.Unix()
|
||||
if current >= target {
|
||||
next = next.AddDate(0, 0, 1)
|
||||
target = next.Unix()
|
||||
}
|
||||
ticker.Loop(fmt.Sprintf("ReportStrategyFixedTime_%s_%d_%d_%d", buried.GetName(), hour, min, sec), time.Duration(target-current)*time.Second, 24*time.Hour, -1, func() {
|
||||
if disableBuried.Get(buried.GetName()) {
|
||||
return
|
||||
}
|
||||
if err := buried.Reporting(); err != nil {
|
||||
if buried.errorHandle != nil {
|
||||
buried.errorHandle(buried, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue