移除数据上报功能,设计不合理
This commit is contained in:
parent
b0b64f4f55
commit
6def300d4a
|
@ -1,5 +0,0 @@
|
||||||
# 跨进程上报方案
|
|
||||||
|
|
||||||
跨进程上报通常指主进程和上报进程分离的情况下进行数据上报。
|
|
||||||
|
|
||||||
可通过 `reportingHandle` 将主进程数据传输到上报进程进行处理,或者将数据传输到`Redis`或者消息中间件后,由上报进程进行处理。
|
|
|
@ -1,72 +0,0 @@
|
||||||
package reporter
|
|
||||||
|
|
||||||
type BuriedHitHandle[Data any] func(actions *HitOperationSet[Data])
|
|
||||||
type BuriedReportingHandle[Data any] func(name string, data Data) error
|
|
||||||
|
|
||||||
// NewBuried 新建数据埋点
|
|
||||||
// - 通常数据埋点应该在全局创建唯一埋点实例进行使用
|
|
||||||
// - 需要指定数据埋点名称,由 reporter 包中的全局上报器统一调度上报行为
|
|
||||||
// - hitHandle:当埋点命中时将执行该函数,该函数适用于数据处理及持久化等行为,不应该在该函数中执行上报操作
|
|
||||||
// - reportingHandle:当数据上报时将执行该函数
|
|
||||||
//
|
|
||||||
// 数据埋点必须注册后才会生效
|
|
||||||
func NewBuried[Data any](name string, hitHandle BuriedHitHandle[Data], reportingHandle BuriedReportingHandle[Data], options ...BuriedOption[Data]) *Buried[Data] {
|
|
||||||
if hitHandle == nil {
|
|
||||||
panic("data buried point without hit processing function")
|
|
||||||
}
|
|
||||||
if reportingHandle == nil {
|
|
||||||
panic("data buried point without reporting processing function")
|
|
||||||
}
|
|
||||||
buried := &Buried[Data]{
|
|
||||||
name: name,
|
|
||||||
operationSet: &HitOperationSet[Data]{},
|
|
||||||
hitHandle: hitHandle,
|
|
||||||
reportingHandle: reportingHandle,
|
|
||||||
}
|
|
||||||
for _, option := range options {
|
|
||||||
option(buried)
|
|
||||||
}
|
|
||||||
return buried
|
|
||||||
}
|
|
||||||
|
|
||||||
// Buried 埋点
|
|
||||||
type Buried[Data any] struct {
|
|
||||||
name string
|
|
||||||
operationSet *HitOperationSet[Data]
|
|
||||||
hitHandle BuriedHitHandle[Data]
|
|
||||||
reportingHandle BuriedReportingHandle[Data]
|
|
||||||
errorHandle func(buried *Buried[Data], err error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetName 获取埋点数据名称
|
|
||||||
func (slf *Buried[Data]) GetName() string {
|
|
||||||
return slf.name
|
|
||||||
}
|
|
||||||
|
|
||||||
// Hit 命中数据埋点
|
|
||||||
func (slf *Buried[Data]) Hit(data Data) {
|
|
||||||
slf.operationSet.input = data
|
|
||||||
slf.hitHandle(slf.operationSet)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reporting 上报数据埋点
|
|
||||||
func (slf *Buried[Data]) Reporting() error {
|
|
||||||
return slf.reportingHandle(slf.name, slf.operationSet.GetData())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register 注册到上报器
|
|
||||||
// - 注册数据埋点必须要指定最少一个策略
|
|
||||||
// - 相同策略可注册多个( ReportStrategyInstantly 除外)
|
|
||||||
func (slf *Buried[Data]) Register(strategies ...ReportStrategy[Data]) *Buried[Data] {
|
|
||||||
if len(strategies) == 0 {
|
|
||||||
panic("invalid data buried without any strategy")
|
|
||||||
}
|
|
||||||
if registerBuried.Exist(slf.name) {
|
|
||||||
panic("repeated data buried point registration")
|
|
||||||
}
|
|
||||||
registerBuried.Set(slf.name, true)
|
|
||||||
for _, strategy := range strategies {
|
|
||||||
strategy(slf)
|
|
||||||
}
|
|
||||||
return slf
|
|
||||||
}
|
|
|
@ -1,22 +0,0 @@
|
||||||
package reporter
|
|
||||||
|
|
||||||
// HitOperationSet 命中操作集
|
|
||||||
type HitOperationSet[Data any] struct {
|
|
||||||
input Data
|
|
||||||
data Data
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetInput 获取新输入的数据
|
|
||||||
func (slf *HitOperationSet[Data]) GetInput() Data {
|
|
||||||
return slf.input
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetData 获取当前数据
|
|
||||||
func (slf *HitOperationSet[Data]) GetData() Data {
|
|
||||||
return slf.data
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetData 设置当前数据
|
|
||||||
func (slf *HitOperationSet[Data]) SetData(data Data) {
|
|
||||||
slf.data = data
|
|
||||||
}
|
|
|
@ -1,10 +0,0 @@
|
||||||
package reporter
|
|
||||||
|
|
||||||
type BuriedOption[Data any] func(buried *Buried[Data])
|
|
||||||
|
|
||||||
// WithErrorHandle 通过包含上报错误处理函数的方式创建数据埋点
|
|
||||||
func WithErrorHandle[Data any](handle func(buried *Buried[Data], err error)) BuriedOption[Data] {
|
|
||||||
return func(buried *Buried[Data]) {
|
|
||||||
buried.errorHandle = handle
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,52 +0,0 @@
|
||||||
package reporter
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/kercylan98/minotaur/utils/synchronization"
|
|
||||||
"github.com/kercylan98/minotaur/utils/timer"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
ticker *timer.Ticker // 定时器
|
|
||||||
tickerIsDefault bool
|
|
||||||
registerBuried *synchronization.Map[string, bool] // 已注册的数据埋点
|
|
||||||
disableBuried *synchronization.Map[string, bool] // 已排除的数据埋点
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
ticker = timer.GetTicker(50)
|
|
||||||
tickerIsDefault = true
|
|
||||||
registerBuried = synchronization.NewMap[string, bool]()
|
|
||||||
disableBuried = synchronization.NewMap[string, bool]()
|
|
||||||
}
|
|
||||||
|
|
||||||
// UseTicker 使用特定定时器取代默认上报定时器
|
|
||||||
func UseTicker(t *timer.Ticker) {
|
|
||||||
if t == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if tickerIsDefault {
|
|
||||||
tickerIsDefault = false
|
|
||||||
ticker.Release()
|
|
||||||
ticker = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// DisableBuried 禁用特定数据埋点
|
|
||||||
func DisableBuried[Data any](buried *Buried[Data]) {
|
|
||||||
DisableBuriedWithName(buried.GetName())
|
|
||||||
}
|
|
||||||
|
|
||||||
// DisableBuriedWithName 禁用特定名称的数据埋点
|
|
||||||
func DisableBuriedWithName(name string) {
|
|
||||||
disableBuried.Set(name, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// EnableBuried 启用特定数据埋点
|
|
||||||
func EnableBuried[Data any](buried *Buried[Data]) {
|
|
||||||
EnableBuriedWithName(buried.GetName())
|
|
||||||
}
|
|
||||||
|
|
||||||
// EnableBuriedWithName 启用特定名称的数据埋点
|
|
||||||
func EnableBuriedWithName(name string) {
|
|
||||||
disableBuried.Delete(name)
|
|
||||||
}
|
|
|
@ -1,90 +0,0 @@
|
||||||
package reporter
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"github.com/kercylan98/minotaur/utils/timer"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ReportStrategy[Data any] func(buried *Buried[Data])
|
|
||||||
|
|
||||||
// ReportStrategyInstantly 命中时将立即上报
|
|
||||||
func ReportStrategyInstantly[Data any]() ReportStrategy[Data] {
|
|
||||||
return func(buried *Buried[Data]) {
|
|
||||||
oldHitHandle := buried.hitHandle
|
|
||||||
buried.hitHandle = func(actions *HitOperationSet[Data]) {
|
|
||||||
oldHitHandle(actions)
|
|
||||||
ticker.After(fmt.Sprintf("ReportStrategyInstantly_%s", buried.GetName()), timer.Instantly, func() {
|
|
||||||
if disableBuried.Get(buried.GetName()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := buried.Reporting(); err != nil {
|
|
||||||
if buried.errorHandle != nil {
|
|
||||||
buried.errorHandle(buried, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReportStrategyAfter 命中后一段时间后上报
|
|
||||||
// - 该模式下如果连续在时间范围内命中,将仅上报最后一次结果
|
|
||||||
func ReportStrategyAfter[Data any](t time.Duration) ReportStrategy[Data] {
|
|
||||||
return func(buried *Buried[Data]) {
|
|
||||||
oldHitHandle := buried.hitHandle
|
|
||||||
buried.hitHandle = func(actions *HitOperationSet[Data]) {
|
|
||||||
oldHitHandle(actions)
|
|
||||||
ticker.After(fmt.Sprintf("ReportStrategyAfter_%s_%d", buried.GetName(), t.Milliseconds()), t, func() {
|
|
||||||
if disableBuried.Get(buried.GetName()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := buried.Reporting(); err != nil {
|
|
||||||
if buried.errorHandle != nil {
|
|
||||||
buried.errorHandle(buried, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReportStrategyLoop 循环上报
|
|
||||||
func ReportStrategyLoop[Data any](t time.Duration) ReportStrategy[Data] {
|
|
||||||
return func(buried *Buried[Data]) {
|
|
||||||
ticker.Loop(fmt.Sprintf("ReportStrategyLoop_%s_%d", buried.GetName(), t.Milliseconds()), timer.Instantly, t, timer.Forever, func() {
|
|
||||||
if disableBuried.Get(buried.GetName()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := buried.Reporting(); err != nil {
|
|
||||||
if buried.errorHandle != nil {
|
|
||||||
buried.errorHandle(buried, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReportStrategyFixedTime 将在每天的固定时间上报
|
|
||||||
func ReportStrategyFixedTime[Data any](hour, min, sec int) ReportStrategy[Data] {
|
|
||||||
return func(buried *Buried[Data]) {
|
|
||||||
now := time.Now()
|
|
||||||
current := now.Unix()
|
|
||||||
next := time.Date(now.Year(), now.Month(), now.Day(), hour, min, sec, 0, now.Location())
|
|
||||||
target := next.Unix()
|
|
||||||
if current >= target {
|
|
||||||
next = next.AddDate(0, 0, 1)
|
|
||||||
target = next.Unix()
|
|
||||||
}
|
|
||||||
ticker.Loop(fmt.Sprintf("ReportStrategyFixedTime_%s_%d_%d_%d", buried.GetName(), hour, min, sec), time.Duration(target-current)*time.Second, 24*time.Hour, -1, func() {
|
|
||||||
if disableBuried.Get(buried.GetName()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := buried.Reporting(); err != nil {
|
|
||||||
if buried.errorHandle != nil {
|
|
||||||
buried.errorHandle(buried, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue