refactor: 优化 survey,移除 All 函数,新增 Flusher 接口,可自行实现其他持久化方式
This commit is contained in:
parent
c6f8c19086
commit
d9ba1bc85c
|
@ -0,0 +1,9 @@
|
|||
package survey
|
||||
|
||||
// Flusher 用于刷新缓冲区的接口
|
||||
type Flusher interface {
|
||||
// Flush 将缓冲区的数据持久化
|
||||
Flush(records []string)
|
||||
// Info 返回当前刷新器的信息
|
||||
Info() string
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package survey
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/utils/log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// NewFileFlusher 创建一个文件刷新器
|
||||
// - layout 为日志文件名的时间戳格式 (默认为 time.DateOnly)
|
||||
func NewFileFlusher(filePath string, layout ...string) *FileFlusher {
|
||||
fn := filepath.Base(filePath)
|
||||
ext := filepath.Ext(fn)
|
||||
fn = strings.TrimSuffix(fn, ext)
|
||||
dir := filepath.Dir(filePath)
|
||||
fl := &FileFlusher{
|
||||
dir: dir,
|
||||
fn: fn,
|
||||
fe: ext,
|
||||
layout: time.DateOnly,
|
||||
layoutLen: len(time.DateOnly),
|
||||
}
|
||||
if len(layout) > 0 {
|
||||
fl.layout = layout[0]
|
||||
fl.layoutLen = len(fl.layout)
|
||||
}
|
||||
return fl
|
||||
}
|
||||
|
||||
type FileFlusher struct {
|
||||
dir string
|
||||
fn string
|
||||
fe string
|
||||
layout string
|
||||
layoutLen int
|
||||
}
|
||||
|
||||
func (slf *FileFlusher) Flush(records []string) {
|
||||
var (
|
||||
file *os.File
|
||||
writer *bufio.Writer
|
||||
err error
|
||||
last string
|
||||
)
|
||||
for _, data := range records {
|
||||
tick := data[0:slf.layoutLen]
|
||||
if tick != last {
|
||||
if file != nil {
|
||||
_ = writer.Flush()
|
||||
_ = file.Close()
|
||||
}
|
||||
fp := filepath.Join(slf.dir, fmt.Sprintf("%s.%s%s", slf.fn, tick, slf.fe))
|
||||
file, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
log.Fatal("Survey", log.String("Action", "DateSwitch"), log.String("FilePath", fp), log.Err(err))
|
||||
return
|
||||
}
|
||||
writer = bufio.NewWriterSize(file, 1024*10240)
|
||||
last = tick
|
||||
}
|
||||
_, _ = writer.WriteString(data)
|
||||
}
|
||||
_ = writer.Flush()
|
||||
_ = file.Close()
|
||||
}
|
||||
|
||||
func (slf *FileFlusher) Info() string {
|
||||
return fmt.Sprintf("%s/%s.${DATE}%s", slf.dir, slf.fn, slf.fe)
|
||||
}
|
|
@ -1,28 +1,17 @@
|
|||
package survey
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"github.com/kercylan98/minotaur/utils/log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// logger 用于埋点数据的运营日志记录器
|
||||
type logger struct {
|
||||
bl sync.Mutex // writer lock
|
||||
wl sync.Mutex // flush lock
|
||||
dir string
|
||||
fn string
|
||||
fe string
|
||||
bs []string
|
||||
layout string
|
||||
layoutLen int
|
||||
dataLayout string
|
||||
dataLayoutLen int
|
||||
interval time.Duration
|
||||
bl sync.Mutex // writer lock
|
||||
wl sync.Mutex // flush lock
|
||||
bs []string
|
||||
interval time.Duration
|
||||
flusher Flusher
|
||||
}
|
||||
|
||||
// flush 将记录器缓冲区的数据写入到文件
|
||||
|
@ -39,33 +28,7 @@ func (slf *logger) flush() {
|
|||
|
||||
slf.wl.Lock()
|
||||
defer slf.wl.Unlock()
|
||||
|
||||
var (
|
||||
file *os.File
|
||||
writer *bufio.Writer
|
||||
err error
|
||||
last string
|
||||
)
|
||||
for _, data := range ds {
|
||||
tick := data[0:slf.layoutLen]
|
||||
if tick != last {
|
||||
if file != nil {
|
||||
_ = writer.Flush()
|
||||
_ = file.Close()
|
||||
}
|
||||
fp := slf.filePath(tick)
|
||||
file, err = os.OpenFile(fp, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
log.Fatal("Survey", log.String("Action", "DateSwitch"), log.String("FilePath", fp), log.Err(err))
|
||||
return
|
||||
}
|
||||
writer = bufio.NewWriterSize(file, 1024*10240)
|
||||
last = tick
|
||||
}
|
||||
_, _ = writer.WriteString(data)
|
||||
}
|
||||
_ = writer.Flush()
|
||||
_ = file.Close()
|
||||
slf.flusher.Flush(ds)
|
||||
}
|
||||
|
||||
// writer 写入数据到记录器缓冲区
|
||||
|
@ -77,8 +40,3 @@ func (slf *logger) writer(d string) {
|
|||
slf.flush()
|
||||
}
|
||||
}
|
||||
|
||||
// filePath 获取文件路径
|
||||
func (slf *logger) filePath(t string) string {
|
||||
return filepath.Join(slf.dir, fmt.Sprintf("%s.%s%s", slf.fn, t, slf.fe))
|
||||
}
|
||||
|
|
|
@ -5,15 +5,6 @@ import "time"
|
|||
// Option 选项
|
||||
type Option func(logger *logger)
|
||||
|
||||
// WithLayout 设置日志文件名的时间戳格式
|
||||
// - 默认为 time.DateOnly
|
||||
func WithLayout(layout string) Option {
|
||||
return func(logger *logger) {
|
||||
logger.layout = layout
|
||||
logger.layoutLen = len(layout)
|
||||
}
|
||||
}
|
||||
|
||||
// WithFlushInterval 设置日志文件刷新间隔
|
||||
// - 默认为 3s,当日志文件刷新间隔 <= 0 时,将会在每次写入日志时刷新日志文件
|
||||
func WithFlushInterval(interval time.Duration) Option {
|
||||
|
|
|
@ -5,8 +5,6 @@ import (
|
|||
"github.com/kercylan98/minotaur/utils/file"
|
||||
"github.com/kercylan98/minotaur/utils/log"
|
||||
"github.com/kercylan98/minotaur/utils/super"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -19,10 +17,7 @@ var (
|
|||
)
|
||||
|
||||
// Reg 注册一个运营日志记录器
|
||||
func Reg(name, filePath string, options ...Option) {
|
||||
fn := filepath.Base(filePath)
|
||||
ext := filepath.Ext(fn)
|
||||
fn = strings.TrimSuffix(fn, ext)
|
||||
func Reg(name string, flusher Flusher, options ...Option) {
|
||||
|
||||
timerSurveyLock.Lock()
|
||||
defer timerSurveyLock.Unlock()
|
||||
|
@ -31,16 +26,9 @@ func Reg(name, filePath string, options ...Option) {
|
|||
if exist {
|
||||
panic(fmt.Errorf("survey %s already exist", name))
|
||||
}
|
||||
dir := filepath.Dir(filePath)
|
||||
logger := &logger{
|
||||
dir: dir,
|
||||
fn: fn,
|
||||
fe: ext,
|
||||
layout: time.DateOnly,
|
||||
layoutLen: len(time.DateOnly),
|
||||
dataLayout: time.DateTime,
|
||||
dataLayoutLen: len(time.DateTime) + 3,
|
||||
interval: time.Second * 3,
|
||||
flusher: flusher,
|
||||
interval: time.Second * 3,
|
||||
}
|
||||
for _, option := range options {
|
||||
option(logger)
|
||||
|
@ -68,7 +56,7 @@ func Reg(name, filePath string, options ...Option) {
|
|||
}
|
||||
|
||||
survey[name] = logger
|
||||
log.Info("Survey", log.String("Action", "Reg"), log.String("Name", name), log.String("FilePath", dir+"/"+fn+".${DATE}"+ext))
|
||||
log.Info("Survey", log.String("Action", "Reg"), log.String("Name", name), log.String("Info", logger.flusher.Info()))
|
||||
}
|
||||
|
||||
// Record 记录一条运营日志
|
||||
|
@ -125,26 +113,6 @@ func Close(names ...string) {
|
|||
}
|
||||
}
|
||||
|
||||
// All 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic
|
||||
// - handle 为并行执行的,需要自行处理并发安全
|
||||
func All(name string, t time.Time, handle func(record R)) {
|
||||
timerSurveyLock.Lock()
|
||||
logger := survey[name]
|
||||
timerSurveyLock.Unlock()
|
||||
if logger == nil {
|
||||
return
|
||||
}
|
||||
fp := logger.filePath(t.Format(logger.layout))
|
||||
logger.wl.Lock()
|
||||
defer logger.wl.Unlock()
|
||||
err := file.ReadLineWithParallel(fp, 1*1024*1024*1024, func(s string) {
|
||||
handle(R(s))
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// AllWithPath 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic
|
||||
// - handle 为并行执行的,需要自行处理并发安全
|
||||
// - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据
|
||||
|
|
Loading…
Reference in New Issue