perf: survey 包整体优化

This commit is contained in:
kercylan98 2023-08-23 11:12:48 +08:00
parent e962009eff
commit 50f6b1b085
6 changed files with 269 additions and 75 deletions

View File

@ -5,6 +5,7 @@ import (
"io"
"os"
"path/filepath"
"sync"
)
// PathExist 路径是否存在
@ -81,6 +82,27 @@ func ReadBlockHook(filePath string, bufferSize int, hook func(data []byte)) erro
}
}
// ReadLine 分行读取文件
// - 将filePath路径对应的文件数据并将读到的每一行传入hook函数中当过程中如果产生错误则会返回error。
func ReadLine(filePath string, hook func(line string)) error {
f, err := os.Open(filePath)
if err != nil {
panic(err)
}
reader := bufio.NewReader(f)
for {
line, _, err := reader.ReadLine()
if err == io.EOF {
break
}
if err != nil {
return err
}
hook(string(line))
}
return nil
}
// LineCount 统计文件行数
func LineCount(filePath string) int {
file, err := os.Open(filePath)
@ -126,3 +148,93 @@ func Paths(dir string) []string {
}
return paths
}
// ReadLineWithParallel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理,当过程中如果产生错误则会发生 panic过程前发生错误将会返回 error
// - 由于是并行处理,所以处理过程中的顺序是不确定的。
func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string)) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer func() {
_ = file.Close()
}()
chunks := FindLineChunks(file, chunkSize)
var wg sync.WaitGroup
for _, chunk := range chunks {
wg.Add(1)
go func(chunk [2]int64) {
defer wg.Done()
r := io.NewSectionReader(file, chunk[0], chunk[1]-chunk[0])
scanner := bufio.NewScanner(r)
for scanner.Scan() {
handlerFunc(scanner.Text())
}
if err := scanner.Err(); err != nil {
panic(err)
}
}(chunk)
}
wg.Wait()
return nil
}
// FindLineChunks 查找文件按照每行划分的分块,每个分块的大小将在 chunkSize 和分割后的分块距离行首及行尾的距离中范围内
// - 使用该函数得到的分块是完整的行,不会出现行被分割的情况
// - 当过程中发生错误将会发生 panic
// - 返回值的成员是一个长度为 2 的数组,第一个元素是分块的起始位置,第二个元素是分块的结束位置
func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 {
var chunks [][2]int64
fileSize, err := file.Seek(0, io.SeekEnd)
if err != nil {
panic(err)
}
_, err = file.Seek(0, io.SeekStart)
if err != nil {
panic(err)
}
currentPos := int64(0)
for currentPos < fileSize {
start := currentPos
if start != 0 { // 不是文件的开头
for {
b := make([]byte, 1)
if _, err = file.ReadAt(b, start); err != nil {
panic(err)
}
if b[0] == '\n' {
start++ // 移动到下一行的开始
break
}
start--
}
}
end := start + chunkSize
if end < fileSize { // 不是文件的末尾
for {
b := make([]byte, 1)
if _, err = file.ReadAt(b, end); err != nil {
panic(err)
}
if b[0] == '\n' {
break
}
end++
}
} else {
end = fileSize
}
chunks = append(chunks, [2]int64{start, end})
currentPos = end + 1
}
return chunks
}

View File

@ -1,68 +1,21 @@
package survey
import (
"bufio"
"github.com/kercylan98/minotaur/utils/super"
"io"
"os"
"time"
import "github.com/tidwall/gjson"
type (
Result = gjson.Result
)
// All 处理特定记录器特定日期的所有记录,当 handle 返回 false 时停止处理
func All(name string, t time.Time, handle func(record map[string]any) bool) {
logger := survey[name]
if logger == nil {
return
}
fp := logger.filePath(t.Format(logger.layout))
logger.wl.Lock()
defer logger.wl.Unlock()
// R 记录器所记录的一条数据
type R string
f, err := os.Open(fp)
if err != nil {
return
}
defer func() {
_ = f.Close()
}()
reader := bufio.NewReader(f)
var m = make(map[string]any)
for {
line, _, err := reader.ReadLine()
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
if err = super.UnmarshalJSON(line[logger.dataLayoutLen:], &m); err != nil {
panic(err)
}
if !handle(m) {
break
}
}
// Get 获取指定 key 的值
// - 当 key 为嵌套 key 时,使用 . 进行分割例如a.b.c
// - 更多用法参考https://github.com/tidwall/gjson
func (slf R) Get(key string) Result {
return gjson.Get(string(slf), key)
}
// Sum 处理特定记录器特定日期的所有记录,根据指定的字段进行汇总
func Sum(name string, t time.Time, field string) float64 {
var res float64
All(name, t, func(record map[string]any) bool {
v, exist := record[field]
if !exist {
return true
}
switch value := v.(type) {
case float64:
res += value
case int:
res += float64(value)
case int64:
res += float64(value)
case string:
res += super.StringToFloat64(value)
}
return true
})
return res
func (slf R) String() string {
return string(slf)
}

View File

@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"sync"
"time"
)
const (
@ -28,6 +29,7 @@ type logger struct {
layoutLen int
dataLayout string
dataLayoutLen int
interval time.Duration
}
// flush 将记录器缓冲区的数据写入到文件

View File

@ -1,5 +1,7 @@
package survey
import "time"
// Option 选项
type Option func(logger *logger)
@ -11,3 +13,11 @@ func WithLayout(layout string) Option {
logger.layoutLen = len(layout)
}
}
// WithFlushInterval 设置日志文件刷新间隔
// - 默认为 3s
func WithFlushInterval(interval time.Duration) Option {
return func(logger *logger) {
logger.interval = interval
}
}

View File

@ -2,22 +2,31 @@ package survey
import (
"fmt"
"github.com/kercylan98/minotaur/utils/file"
"github.com/kercylan98/minotaur/utils/log"
"github.com/kercylan98/minotaur/utils/super"
"path/filepath"
"strings"
"sync"
"time"
)
var (
survey = make(map[string]*logger)
survey = make(map[string]*logger)
timers = make(map[time.Duration]*time.Timer)
timerSurvey = make(map[time.Duration]map[string]struct{})
timerSurveyLock sync.Mutex
)
// RegSurvey 注册一个运营日志记录器
func RegSurvey(name, filePath string, options ...Option) {
// Reg 注册一个运营日志记录器
func Reg(name, filePath string, options ...Option) {
fn := filepath.Base(filePath)
ext := filepath.Ext(fn)
fn = strings.TrimSuffix(fn, ext)
timerSurveyLock.Lock()
defer timerSurveyLock.Unlock()
_, exist := survey[name]
if exist {
panic(fmt.Errorf("survey %s already exist", name))
@ -31,12 +40,35 @@ func RegSurvey(name, filePath string, options ...Option) {
layoutLen: len(time.DateOnly),
dataLayout: time.DateTime,
dataLayoutLen: len(time.DateTime) + 3,
interval: time.Second * 3,
}
for _, option := range options {
option(logger)
}
_, exist = timers[logger.interval]
if !exist {
t := time.NewTimer(logger.interval)
timers[logger.interval] = t
timerSurvey[logger.interval] = make(map[string]struct{})
go func(interval time.Duration) {
for {
<-t.C
timerSurveyLock.Lock()
for n := range timerSurvey[interval] {
survey[n].flush()
}
timerSurveyLock.Unlock()
if !t.Reset(interval) {
break
}
}
}(logger.interval)
}
timerSurvey[logger.interval][name] = struct{}{}
survey[name] = logger
log.Info("Survey", log.String("Action", "RegSurvey"), log.String("Name", name), log.String("FilePath", dir+"/"+fn+".${DATE}"+ext))
log.Info("Survey", log.String("Action", "Reg"), log.String("Name", name), log.String("FilePath", dir+"/"+fn+".${DATE}"+ext))
}
// Record 记录一条运营日志
@ -48,9 +80,79 @@ func Record(name string, data map[string]any) {
logger.writer(fmt.Sprintf("%s - %s\n", time.Now().Format(time.DateTime), super.MarshalJSON(data)))
}
// Flush 将所有运营日志记录器的缓冲区数据写入到文件
func Flush() {
for _, logger := range survey {
logger.flush()
// Flush 将运营日志记录器的缓冲区数据写入到文件
// - name 为空时,将所有记录器的缓冲区数据写入到文件
func Flush(names ...string) {
timerSurveyLock.Lock()
defer timerSurveyLock.Unlock()
if len(names) == 0 {
for _, logger := range survey {
logger.flush()
}
return
}
for _, n := range names {
l, e := survey[n]
if e {
l.flush()
}
}
}
// Close 关闭运营日志记录器
func Close(names ...string) {
timerSurveyLock.Lock()
defer timerSurveyLock.Unlock()
if len(names) == 0 {
for _, timer := range timers {
timer.Stop()
}
Flush()
return
}
for _, name := range names {
l, e := survey[name]
if e {
delete(survey, name)
delete(timerSurvey[l.interval], name)
if len(timerSurvey[l.interval]) == 0 {
delete(timerSurvey, l.interval)
timers[l.interval].Stop()
delete(timers, l.interval)
}
l.flush()
}
}
}
// All 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic
// - handle 为并行执行的,需要自行处理并发安全
func All(name string, t time.Time, handle func(record R) bool) {
timerSurveyLock.Lock()
logger := survey[name]
timerSurveyLock.Unlock()
if logger == nil {
return
}
fp := logger.filePath(t.Format(logger.layout))
logger.wl.Lock()
defer logger.wl.Unlock()
err := file.ReadLineWithParallel(fp, 1*1024*1024*1024, func(s string) {
handle(R(s))
})
if err != nil {
panic(err)
}
}
// AllWithPath 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic
// - handle 为并行执行的,需要自行处理并发安全
// - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据
func AllWithPath(filePath string, handle func(record R) bool) {
err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(R(s))
})
if err != nil {
panic(err)
}
}

View File

@ -4,18 +4,33 @@ import (
"fmt"
"github.com/kercylan98/minotaur/utils/log/survey"
"os"
"sync/atomic"
"testing"
"time"
)
func TestRecord(t *testing.T) {
_ = os.MkdirAll("./test", os.ModePerm)
survey.RegSurvey("GLOBAL_DATA", "./test/global_data.log")
survey.Record("GLOBAL_DATA", map[string]any{
"joinTime": time.Now().Unix(),
"action": 1,
survey.Reg("GLOBAL_DATA", "./test/global_data.log")
now := time.Now()
//for i := 0; i < 100000000; i++ {
// survey.Record("GLOBAL_DATA", map[string]any{
// "joinTime": time.Now().Unix(),
// "action": random.Int64(1, 999),
// })
// // 每500w flush一次
// if i%5000000 == 0 {
// survey.Flush()
// }
//}
//survey.Flush()
//
var i atomic.Int64
survey.All("GLOBAL_DATA", time.Now(), func(record survey.R) bool {
i.Add(record.Get("action").Int())
return true
})
survey.Flush()
fmt.Println(survey.Sum("GLOBAL_DATA", time.Now(), "action"))
fmt.Println("write cost:", time.Since(now), i.Load())
}
// Line: 30000001, time: 1.45s