diff --git a/utils/file/file.go b/utils/file/file.go index cbe6a60..7f6ae98 100644 --- a/utils/file/file.go +++ b/utils/file/file.go @@ -155,14 +155,15 @@ func Paths(dir string) []string { // - 可通过 start 参数指定开始读取的位置,如果不指定则从文件开头开始读取。 func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string), start ...int64) (n int64, err error) { file, err := os.Open(filename) + offset := slice.GetValue(start, 0) if err != nil { - return 0, err + return offset, err } defer func() { _ = file.Close() }() - chunks := FindLineChunksByOffset(file, slice.GetValue(start, 0), chunkSize) + chunks := FindLineChunksByOffset(file, offset, chunkSize) var end int64 var endMutex sync.Mutex var wg sync.WaitGroup diff --git a/utils/log/survey/analyzer_test.go b/utils/log/survey/analyzer_test.go index 0eaa05d..689b9a3 100644 --- a/utils/log/survey/analyzer_test.go +++ b/utils/log/survey/analyzer_test.go @@ -6,10 +6,10 @@ import ( "testing" ) -func TestClose(t *testing.T) { +func TestIncrementAnalyze(t *testing.T) { path := `./test/day.2023-09-06.log` - report := survey.Analyze(path, func(analyzer *survey.Analyzer, record survey.R) { + reader := survey.IncrementAnalyze(path, func(analyzer *survey.Analyzer, record survey.R) { switch record.GetString("type") { case "open_conn": analyzer.IncreaseValueNonRepeat("开播人数", record, 1, "live_id") @@ -44,5 +44,11 @@ func TestClose(t *testing.T) { } }) - fmt.Println(report.FilterSub("warzone0009")) + for i := 0; i < 10; i++ { + report, err := reader() + if err != nil { + t.Fatal(err) + } + fmt.Println(report.FilterSub("warzone0009")) + } } diff --git a/utils/log/survey/survey.go b/utils/log/survey/survey.go index 181286f..8fff7c5 100644 --- a/utils/log/survey/survey.go +++ b/utils/log/survey/survey.go @@ -151,3 +151,19 @@ func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R)) return newReport(analyzer) } + +// IncrementAnalyze 增量分析,返回一个函数,每次调用该函数都会分析文件中新增的内容 +func IncrementAnalyze(filePath string, handle func(analyzer *Analyzer, record R)) func() (*Report, error) { + var analyzer = new(Analyzer) + var offset int64 + return func() (*Report, error) { + var err error + offset, err = file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) { + handle(analyzer, R(s)) + }, offset) + if err != nil { + return nil, err + } + return newReport(analyzer), nil + } +}