From 50f6b1b085887bfc985b33d384cd3a7c3248ef09 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Wed, 23 Aug 2023 11:12:48 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20survey=20=E5=8C=85=E6=95=B4=E4=BD=93?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/file/file.go | 112 ++++++++++++++++++++++++++++++ utils/log/survey/analyze.go | 73 ++++---------------- utils/log/survey/logger.go | 2 + utils/log/survey/options.go | 10 +++ utils/log/survey/survey.go | 118 +++++++++++++++++++++++++++++--- utils/log/survey/survey_test.go | 29 ++++++-- 6 files changed, 269 insertions(+), 75 deletions(-) diff --git a/utils/file/file.go b/utils/file/file.go index 871eeb4..97a1e5a 100644 --- a/utils/file/file.go +++ b/utils/file/file.go @@ -5,6 +5,7 @@ import ( "io" "os" "path/filepath" + "sync" ) // PathExist 路径是否存在 @@ -81,6 +82,27 @@ func ReadBlockHook(filePath string, bufferSize int, hook func(data []byte)) erro } } +// ReadLine 分行读取文件 +// - 将filePath路径对应的文件数据并将读到的每一行传入hook函数中,当过程中如果产生错误则会返回error。 +func ReadLine(filePath string, hook func(line string)) error { + f, err := os.Open(filePath) + if err != nil { + panic(err) + } + reader := bufio.NewReader(f) + for { + line, _, err := reader.ReadLine() + if err == io.EOF { + break + } + if err != nil { + return err + } + hook(string(line)) + } + return nil +} + // LineCount 统计文件行数 func LineCount(filePath string) int { file, err := os.Open(filePath) @@ -126,3 +148,93 @@ func Paths(dir string) []string { } return paths } + +// ReadLineWithParallel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理,当过程中如果产生错误则会发生 panic,过程前发生错误将会返回 error +// - 由于是并行处理,所以处理过程中的顺序是不确定的。 +func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string)) error { + file, err := os.Open(filename) + if err != nil { + return err + } + defer func() { + _ = file.Close() + }() + + chunks := FindLineChunks(file, chunkSize) + var wg sync.WaitGroup + for _, chunk := range chunks { + wg.Add(1) + go func(chunk [2]int64) { + defer wg.Done() + + r := io.NewSectionReader(file, chunk[0], chunk[1]-chunk[0]) + + scanner := bufio.NewScanner(r) + for scanner.Scan() { + handlerFunc(scanner.Text()) + } + + if err := scanner.Err(); err != nil { + panic(err) + } + }(chunk) + } + wg.Wait() + return nil +} + +// FindLineChunks 查找文件按照每行划分的分块,每个分块的大小将在 chunkSize 和分割后的分块距离行首及行尾的距离中范围内 +// - 使用该函数得到的分块是完整的行,不会出现行被分割的情况 +// - 当过程中发生错误将会发生 panic +// - 返回值的成员是一个长度为 2 的数组,第一个元素是分块的起始位置,第二个元素是分块的结束位置 +func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 { + var chunks [][2]int64 + + fileSize, err := file.Seek(0, io.SeekEnd) + if err != nil { + panic(err) + } + _, err = file.Seek(0, io.SeekStart) + if err != nil { + panic(err) + } + + currentPos := int64(0) + for currentPos < fileSize { + start := currentPos + if start != 0 { // 不是文件的开头 + for { + b := make([]byte, 1) + if _, err = file.ReadAt(b, start); err != nil { + panic(err) + } + if b[0] == '\n' { + start++ // 移动到下一行的开始 + break + } + start-- + } + } + + end := start + chunkSize + if end < fileSize { // 不是文件的末尾 + for { + b := make([]byte, 1) + if _, err = file.ReadAt(b, end); err != nil { + panic(err) + } + if b[0] == '\n' { + break + } + end++ + } + } else { + end = fileSize + } + + chunks = append(chunks, [2]int64{start, end}) + currentPos = end + 1 + } + + return chunks +} diff --git a/utils/log/survey/analyze.go b/utils/log/survey/analyze.go index 31302a4..5713853 100644 --- a/utils/log/survey/analyze.go +++ b/utils/log/survey/analyze.go @@ -1,68 +1,21 @@ package survey -import ( - "bufio" - "github.com/kercylan98/minotaur/utils/super" - "io" - "os" - "time" +import "github.com/tidwall/gjson" + +type ( + Result = gjson.Result ) -// All 处理特定记录器特定日期的所有记录,当 handle 返回 false 时停止处理 -func All(name string, t time.Time, handle func(record map[string]any) bool) { - logger := survey[name] - if logger == nil { - return - } - fp := logger.filePath(t.Format(logger.layout)) - logger.wl.Lock() - defer logger.wl.Unlock() +// R 记录器所记录的一条数据 +type R string - f, err := os.Open(fp) - if err != nil { - return - } - defer func() { - _ = f.Close() - }() - reader := bufio.NewReader(f) - var m = make(map[string]any) - for { - line, _, err := reader.ReadLine() - if err == io.EOF { - break - } - if err != nil { - panic(err) - } - if err = super.UnmarshalJSON(line[logger.dataLayoutLen:], &m); err != nil { - panic(err) - } - if !handle(m) { - break - } - } +// Get 获取指定 key 的值 +// - 当 key 为嵌套 key 时,使用 . 进行分割,例如:a.b.c +// - 更多用法参考:https://github.com/tidwall/gjson +func (slf R) Get(key string) Result { + return gjson.Get(string(slf), key) } -// Sum 处理特定记录器特定日期的所有记录,根据指定的字段进行汇总 -func Sum(name string, t time.Time, field string) float64 { - var res float64 - All(name, t, func(record map[string]any) bool { - v, exist := record[field] - if !exist { - return true - } - switch value := v.(type) { - case float64: - res += value - case int: - res += float64(value) - case int64: - res += float64(value) - case string: - res += super.StringToFloat64(value) - } - return true - }) - return res +func (slf R) String() string { + return string(slf) } diff --git a/utils/log/survey/logger.go b/utils/log/survey/logger.go index 25a4a7c..5b3a556 100644 --- a/utils/log/survey/logger.go +++ b/utils/log/survey/logger.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "sync" + "time" ) const ( @@ -28,6 +29,7 @@ type logger struct { layoutLen int dataLayout string dataLayoutLen int + interval time.Duration } // flush 将记录器缓冲区的数据写入到文件 diff --git a/utils/log/survey/options.go b/utils/log/survey/options.go index b2e4b6c..5568301 100644 --- a/utils/log/survey/options.go +++ b/utils/log/survey/options.go @@ -1,5 +1,7 @@ package survey +import "time" + // Option 选项 type Option func(logger *logger) @@ -11,3 +13,11 @@ func WithLayout(layout string) Option { logger.layoutLen = len(layout) } } + +// WithFlushInterval 设置日志文件刷新间隔 +// - 默认为 3s +func WithFlushInterval(interval time.Duration) Option { + return func(logger *logger) { + logger.interval = interval + } +} diff --git a/utils/log/survey/survey.go b/utils/log/survey/survey.go index c9d3a65..f2fba71 100644 --- a/utils/log/survey/survey.go +++ b/utils/log/survey/survey.go @@ -2,22 +2,31 @@ package survey import ( "fmt" + "github.com/kercylan98/minotaur/utils/file" "github.com/kercylan98/minotaur/utils/log" "github.com/kercylan98/minotaur/utils/super" "path/filepath" "strings" + "sync" "time" ) var ( - survey = make(map[string]*logger) + survey = make(map[string]*logger) + timers = make(map[time.Duration]*time.Timer) + timerSurvey = make(map[time.Duration]map[string]struct{}) + timerSurveyLock sync.Mutex ) -// RegSurvey 注册一个运营日志记录器 -func RegSurvey(name, filePath string, options ...Option) { +// Reg 注册一个运营日志记录器 +func Reg(name, filePath string, options ...Option) { fn := filepath.Base(filePath) ext := filepath.Ext(fn) fn = strings.TrimSuffix(fn, ext) + + timerSurveyLock.Lock() + defer timerSurveyLock.Unlock() + _, exist := survey[name] if exist { panic(fmt.Errorf("survey %s already exist", name)) @@ -31,12 +40,35 @@ func RegSurvey(name, filePath string, options ...Option) { layoutLen: len(time.DateOnly), dataLayout: time.DateTime, dataLayoutLen: len(time.DateTime) + 3, + interval: time.Second * 3, } for _, option := range options { option(logger) } + + _, exist = timers[logger.interval] + if !exist { + t := time.NewTimer(logger.interval) + timers[logger.interval] = t + timerSurvey[logger.interval] = make(map[string]struct{}) + go func(interval time.Duration) { + for { + <-t.C + timerSurveyLock.Lock() + for n := range timerSurvey[interval] { + survey[n].flush() + } + timerSurveyLock.Unlock() + if !t.Reset(interval) { + break + } + } + }(logger.interval) + } + timerSurvey[logger.interval][name] = struct{}{} + survey[name] = logger - log.Info("Survey", log.String("Action", "RegSurvey"), log.String("Name", name), log.String("FilePath", dir+"/"+fn+".${DATE}"+ext)) + log.Info("Survey", log.String("Action", "Reg"), log.String("Name", name), log.String("FilePath", dir+"/"+fn+".${DATE}"+ext)) } // Record 记录一条运营日志 @@ -48,9 +80,79 @@ func Record(name string, data map[string]any) { logger.writer(fmt.Sprintf("%s - %s\n", time.Now().Format(time.DateTime), super.MarshalJSON(data))) } -// Flush 将所有运营日志记录器的缓冲区数据写入到文件 -func Flush() { - for _, logger := range survey { - logger.flush() +// Flush 将运营日志记录器的缓冲区数据写入到文件 +// - name 为空时,将所有记录器的缓冲区数据写入到文件 +func Flush(names ...string) { + timerSurveyLock.Lock() + defer timerSurveyLock.Unlock() + if len(names) == 0 { + for _, logger := range survey { + logger.flush() + } + return + } + for _, n := range names { + l, e := survey[n] + if e { + l.flush() + } + } +} + +// Close 关闭运营日志记录器 +func Close(names ...string) { + timerSurveyLock.Lock() + defer timerSurveyLock.Unlock() + if len(names) == 0 { + for _, timer := range timers { + timer.Stop() + } + Flush() + return + } + for _, name := range names { + l, e := survey[name] + if e { + delete(survey, name) + delete(timerSurvey[l.interval], name) + if len(timerSurvey[l.interval]) == 0 { + delete(timerSurvey, l.interval) + timers[l.interval].Stop() + delete(timers, l.interval) + } + l.flush() + } + } +} + +// All 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic +// - handle 为并行执行的,需要自行处理并发安全 +func All(name string, t time.Time, handle func(record R) bool) { + timerSurveyLock.Lock() + logger := survey[name] + timerSurveyLock.Unlock() + if logger == nil { + return + } + fp := logger.filePath(t.Format(logger.layout)) + logger.wl.Lock() + defer logger.wl.Unlock() + err := file.ReadLineWithParallel(fp, 1*1024*1024*1024, func(s string) { + handle(R(s)) + }) + if err != nil { + panic(err) + } +} + +// AllWithPath 处理特定记录器特定日期的所有记录,当发生错误时,会发生 panic +// - handle 为并行执行的,需要自行处理并发安全 +// - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据 +func AllWithPath(filePath string, handle func(record R) bool) { + err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { + handle(R(s)) + }) + if err != nil { + panic(err) } } diff --git a/utils/log/survey/survey_test.go b/utils/log/survey/survey_test.go index 2a568e7..6947d7a 100644 --- a/utils/log/survey/survey_test.go +++ b/utils/log/survey/survey_test.go @@ -4,18 +4,33 @@ import ( "fmt" "github.com/kercylan98/minotaur/utils/log/survey" "os" + "sync/atomic" "testing" "time" ) func TestRecord(t *testing.T) { _ = os.MkdirAll("./test", os.ModePerm) - survey.RegSurvey("GLOBAL_DATA", "./test/global_data.log") - survey.Record("GLOBAL_DATA", map[string]any{ - "joinTime": time.Now().Unix(), - "action": 1, + survey.Reg("GLOBAL_DATA", "./test/global_data.log") + now := time.Now() + //for i := 0; i < 100000000; i++ { + // survey.Record("GLOBAL_DATA", map[string]any{ + // "joinTime": time.Now().Unix(), + // "action": random.Int64(1, 999), + // }) + // // 每500w flush一次 + // if i%5000000 == 0 { + // survey.Flush() + // } + //} + //survey.Flush() + // + var i atomic.Int64 + survey.All("GLOBAL_DATA", time.Now(), func(record survey.R) bool { + i.Add(record.Get("action").Int()) + return true }) - survey.Flush() - - fmt.Println(survey.Sum("GLOBAL_DATA", time.Now(), "action")) + fmt.Println("write cost:", time.Since(now), i.Load()) } + +// Line: 30000001, time: 1.45s