From 9f27da2dce15847139011d178447bf7c19783fa6 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Tue, 28 Nov 2023 12:09:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=B8=BA=20survey=20=E5=8C=85=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=A2=9E=E9=87=8F=E8=AF=BB=E5=8F=96=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=B9=B6=E6=94=B9=E5=96=84=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 此提交扩展了文件读取功能,通过在utils/file/file.go中的ReadLineWithParallel和FindLineChunks函数包含一个'start'参数,实现从特定位置的增量读读取。另外,当扫描器遇到错误时,通过消除panic表达式并直接返回,改善了utils/file/file.go中的错误处理,使得函数能够继续处理。utils/log/survey/survey.go也应用了来自utils/file/file.go的这些改进,将旧的功能替换为新添加的增量读取功能。 --- utils/file/file.go | 5 +++-- utils/log/survey/analyzer_test.go | 12 +++++++++--- utils/log/survey/survey.go | 16 ++++++++++++++++ 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/utils/file/file.go b/utils/file/file.go index cbe6a60..7f6ae98 100644 --- a/utils/file/file.go +++ b/utils/file/file.go @@ -155,14 +155,15 @@ func Paths(dir string) []string { // - 可通过 start 参数指定开始读取的位置,如果不指定则从文件开头开始读取。 func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string), start ...int64) (n int64, err error) { file, err := os.Open(filename) + offset := slice.GetValue(start, 0) if err != nil { - return 0, err + return offset, err } defer func() { _ = file.Close() }() - chunks := FindLineChunksByOffset(file, slice.GetValue(start, 0), chunkSize) + chunks := FindLineChunksByOffset(file, offset, chunkSize) var end int64 var endMutex sync.Mutex var wg sync.WaitGroup diff --git a/utils/log/survey/analyzer_test.go b/utils/log/survey/analyzer_test.go index 0eaa05d..689b9a3 100644 --- a/utils/log/survey/analyzer_test.go +++ b/utils/log/survey/analyzer_test.go @@ -6,10 +6,10 @@ import ( "testing" ) -func TestClose(t *testing.T) { +func TestIncrementAnalyze(t *testing.T) { path := `./test/day.2023-09-06.log` - report := survey.Analyze(path, func(analyzer *survey.Analyzer, record survey.R) { + reader := survey.IncrementAnalyze(path, func(analyzer *survey.Analyzer, record survey.R) { switch record.GetString("type") { case "open_conn": analyzer.IncreaseValueNonRepeat("开播人数", record, 1, "live_id") @@ -44,5 +44,11 @@ func TestClose(t *testing.T) { } }) - fmt.Println(report.FilterSub("warzone0009")) + for i := 0; i < 10; i++ { + report, err := reader() + if err != nil { + t.Fatal(err) + } + fmt.Println(report.FilterSub("warzone0009")) + } } diff --git a/utils/log/survey/survey.go b/utils/log/survey/survey.go index 181286f..8fff7c5 100644 --- a/utils/log/survey/survey.go +++ b/utils/log/survey/survey.go @@ -151,3 +151,19 @@ func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R)) return newReport(analyzer) } + +// IncrementAnalyze 增量分析,返回一个函数,每次调用该函数都会分析文件中新增的内容 +func IncrementAnalyze(filePath string, handle func(analyzer *Analyzer, record R)) func() (*Report, error) { + var analyzer = new(Analyzer) + var offset int64 + return func() (*Report, error) { + var err error + offset, err = file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { + handle(analyzer, R(s)) + }, offset) + if err != nil { + return nil, err + } + return newReport(analyzer), nil + } +}