From 4f5327a41084171990528796276c7fadea49ca85 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 31 May 2023 19:34:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E4=B8=8A=E6=8A=A5=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- reporter/buried.go | 72 ++++++++++++++++++++++++++++ reporter/hit_operation_set.go | 22 +++++++++ reporter/options.go | 10 ++++ reporter/reporter.go | 52 ++++++++++++++++++++ reporter/strategy.go | 90 +++++++++++++++++++++++++++++++++++ 5 files changed, 246 insertions(+) create mode 100644 reporter/buried.go create mode 100644 reporter/hit_operation_set.go create mode 100644 reporter/options.go create mode 100644 reporter/reporter.go create mode 100644 reporter/strategy.go diff --git a/reporter/buried.go b/reporter/buried.go new file mode 100644 index 0000000..e56c260 --- /dev/null +++ b/reporter/buried.go @@ -0,0 +1,72 @@ +package reporter + +type BuriedHitHandle[Data any] func(actions *HitOperationSet[Data]) +type BuriedReportingHandle[Data any] func(name string, data Data) error + +// NewBuried 新建数据埋点 +// - 通常数据埋点应该在全局创建唯一埋点实例进行使用 +// - 需要指定数据埋点名称,由 reporter 包中的全局上报器统一调度上报行为 +// - hitHandle:当埋点命中时将执行该函数,该函数适用于数据处理及持久化等行为,不应该在该函数中执行上报操作 +// - reportingHandle:当数据上报时将执行该函数 +// +// 数据埋点必须注册后才会生效 +func NewBuried[Data any](name string, hitHandle BuriedHitHandle[Data], reportingHandle BuriedReportingHandle[Data], options ...BuriedOption[Data]) *Buried[Data] { + if hitHandle == nil { + panic("data buried point without hit processing function") + } + if reportingHandle == nil { + panic("data buried point without reporting processing function") + } + buried := &Buried[Data]{ + name: name, + operationSet: &HitOperationSet[Data]{}, + hitHandle: hitHandle, + reportingHandle: reportingHandle, + } + for _, option := range options { + option(buried) + } + return buried +} + +// Buried 埋点 +type Buried[Data any] struct { + name string + operationSet *HitOperationSet[Data] + hitHandle BuriedHitHandle[Data] + reportingHandle BuriedReportingHandle[Data] + errorHandle func(buried *Buried[Data], err error) +} + +// GetName 获取埋点数据名称 +func (slf *Buried[Data]) GetName() string { + return slf.name +} + +// Hit 命中数据埋点 +func (slf *Buried[Data]) Hit(data Data) { + slf.operationSet.input = data + slf.hitHandle(slf.operationSet) +} + +// Reporting 上报数据埋点 +func (slf *Buried[Data]) Reporting() error { + return slf.reportingHandle(slf.name, slf.operationSet.GetData()) +} + +// Register 注册到上报器 +// - 注册数据埋点必须要指定最少一个策略 +// - 相同策略可注册多个( ReportStrategyInstantly 除外) +func (slf *Buried[Data]) Register(strategies ...ReportStrategy[Data]) *Buried[Data] { + if len(strategies) == 0 { + panic("invalid data buried without any strategy") + } + if registerBuried.Exist(slf.name) { + panic("repeated data buried point registration") + } + registerBuried.Set(slf.name, true) + for _, strategy := range strategies { + strategy(slf) + } + return slf +} diff --git a/reporter/hit_operation_set.go b/reporter/hit_operation_set.go new file mode 100644 index 0000000..22d1a1f --- /dev/null +++ b/reporter/hit_operation_set.go @@ -0,0 +1,22 @@ +package reporter + +// HitOperationSet 命中操作集 +type HitOperationSet[Data any] struct { + input Data + data Data +} + +// GetInput 获取新输入的数据 +func (slf *HitOperationSet[Data]) GetInput() Data { + return slf.input +} + +// GetData 获取当前数据 +func (slf *HitOperationSet[Data]) GetData() Data { + return slf.data +} + +// SetData 设置当前数据 +func (slf *HitOperationSet[Data]) SetData(data Data) { + slf.data = data +} diff --git a/reporter/options.go b/reporter/options.go new file mode 100644 index 0000000..cf89867 --- /dev/null +++ b/reporter/options.go @@ -0,0 +1,10 @@ +package reporter + +type BuriedOption[Data any] func(buried *Buried[Data]) + +// WithErrorHandle 通过包含上报错误处理函数的方式创建数据埋点 +func WithErrorHandle[Data any](handle func(buried *Buried[Data], err error)) BuriedOption[Data] { + return func(buried *Buried[Data]) { + buried.errorHandle = handle + } +} diff --git a/reporter/reporter.go b/reporter/reporter.go new file mode 100644 index 0000000..cc17b63 --- /dev/null +++ b/reporter/reporter.go @@ -0,0 +1,52 @@ +package reporter + +import ( + "github.com/kercylan98/minotaur/utils/synchronization" + "github.com/kercylan98/minotaur/utils/timer" +) + +var ( + ticker *timer.Ticker // 定时器 + tickerIsDefault bool + registerBuried *synchronization.Map[string, bool] // 已注册的数据埋点 + disableBuried *synchronization.Map[string, bool] // 已排除的数据埋点 +) + +func init() { + ticker = timer.GetTicker(50) + tickerIsDefault = true + registerBuried = synchronization.NewMap[string, bool]() + disableBuried = synchronization.NewMap[string, bool]() +} + +// UseTicker 使用特定定时器取代默认上报定时器 +func UseTicker(t *timer.Ticker) { + if t == nil { + return + } + if tickerIsDefault { + tickerIsDefault = false + ticker.Release() + ticker = t + } +} + +// DisableBuried 禁用特定数据埋点 +func DisableBuried[Data any](buried *Buried[Data]) { + DisableBuriedWithName(buried.GetName()) +} + +// DisableBuriedWithName 禁用特定名称的数据埋点 +func DisableBuriedWithName(name string) { + disableBuried.Set(name, true) +} + +// EnableBuried 启用特定数据埋点 +func EnableBuried[Data any](buried *Buried[Data]) { + EnableBuriedWithName(buried.GetName()) +} + +// EnableBuriedWithName 启用特定名称的数据埋点 +func EnableBuriedWithName(name string) { + disableBuried.Delete(name) +} diff --git a/reporter/strategy.go b/reporter/strategy.go new file mode 100644 index 0000000..885e423 --- /dev/null +++ b/reporter/strategy.go @@ -0,0 +1,90 @@ +package reporter + +import ( + "fmt" + "github.com/kercylan98/minotaur/utils/timer" + "time" +) + +type ReportStrategy[Data any] func(buried *Buried[Data]) + +// ReportStrategyInstantly 命中时将立即上报 +func ReportStrategyInstantly[Data any]() ReportStrategy[Data] { + return func(buried *Buried[Data]) { + oldHitHandle := buried.hitHandle + buried.hitHandle = func(actions *HitOperationSet[Data]) { + oldHitHandle(actions) + ticker.After(fmt.Sprintf("ReportStrategyInstantly_%s", buried.GetName()), timer.Instantly, func() { + if disableBuried.Get(buried.GetName()) { + return + } + if err := buried.Reporting(); err != nil { + if buried.errorHandle != nil { + buried.errorHandle(buried, err) + } + } + }) + } + } +} + +// ReportStrategyAfter 命中后一段时间后上报 +// - 该模式下如果连续在时间范围内命中,将仅上报最后一次结果 +func ReportStrategyAfter[Data any](t time.Duration) ReportStrategy[Data] { + return func(buried *Buried[Data]) { + oldHitHandle := buried.hitHandle + buried.hitHandle = func(actions *HitOperationSet[Data]) { + oldHitHandle(actions) + ticker.After(fmt.Sprintf("ReportStrategyAfter_%s_%d", buried.GetName(), t.Milliseconds()), t, func() { + if disableBuried.Get(buried.GetName()) { + return + } + if err := buried.Reporting(); err != nil { + if buried.errorHandle != nil { + buried.errorHandle(buried, err) + } + } + }) + } + } +} + +// ReportStrategyLoop 循环上报 +func ReportStrategyLoop[Data any](t time.Duration) ReportStrategy[Data] { + return func(buried *Buried[Data]) { + ticker.Loop(fmt.Sprintf("ReportStrategyLoop_%s_%d", buried.GetName(), t.Milliseconds()), timer.Instantly, t, timer.Forever, func() { + if disableBuried.Get(buried.GetName()) { + return + } + if err := buried.Reporting(); err != nil { + if buried.errorHandle != nil { + buried.errorHandle(buried, err) + } + } + }) + } +} + +// ReportStrategyFixedTime 将在每天的固定时间上报 +func ReportStrategyFixedTime[Data any](hour, min, sec int) ReportStrategy[Data] { + return func(buried *Buried[Data]) { + now := time.Now() + current := now.Unix() + next := time.Date(now.Year(), now.Month(), now.Day(), hour, min, sec, 0, now.Location()) + target := next.Unix() + if current >= target { + next = next.AddDate(0, 0, 1) + target = next.Unix() + } + ticker.Loop(fmt.Sprintf("ReportStrategyFixedTime_%s_%d_%d_%d", buried.GetName(), hour, min, sec), time.Duration(target-current)*time.Second, 24*time.Hour, -1, func() { + if disableBuried.Get(buried.GetName()) { + return + } + if err := buried.Reporting(); err != nil { + if buried.errorHandle != nil { + buried.errorHandle(buried, err) + } + } + }) + } +}