feat: 为 survey 包增加增量读取功能并改善错误处理

此提交扩展了文件读取功能,通过在utils/file/file.go中的ReadLineWithParallel和FindLineChunks函数包含一个'start'参数,实现从特定位置的增量读读取。另外,当扫描器遇到错误时,通过消除panic表达式并直接返回,改善了utils/file/file.go中的错误处理,使得函数能够继续处理。utils/log/survey/survey.go也应用了来自utils/file/file.go的这些改进,将旧的功能替换为新添加的增量读取功能。
This commit is contained in:
kercylan98 2023-11-28 12:09:03 +08:00
parent b11baa3653
commit 9f27da2dce
3 changed files with 28 additions and 5 deletions

View File

@ -155,14 +155,15 @@ func Paths(dir string) []string {
// - 可通过 start 参数指定开始读取的位置,如果不指定则从文件开头开始读取。 // - 可通过 start 参数指定开始读取的位置,如果不指定则从文件开头开始读取。
func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string), start ...int64) (n int64, err error) { func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string), start ...int64) (n int64, err error) {
file, err := os.Open(filename) file, err := os.Open(filename)
offset := slice.GetValue(start, 0)
if err != nil { if err != nil {
return 0, err return offset, err
} }
defer func() { defer func() {
_ = file.Close() _ = file.Close()
}() }()
chunks := FindLineChunksByOffset(file, slice.GetValue(start, 0), chunkSize) chunks := FindLineChunksByOffset(file, offset, chunkSize)
var end int64 var end int64
var endMutex sync.Mutex var endMutex sync.Mutex
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -6,10 +6,10 @@ import (
"testing" "testing"
) )
func TestClose(t *testing.T) { func TestIncrementAnalyze(t *testing.T) {
path := `./test/day.2023-09-06.log` path := `./test/day.2023-09-06.log`
report := survey.Analyze(path, func(analyzer *survey.Analyzer, record survey.R) { reader := survey.IncrementAnalyze(path, func(analyzer *survey.Analyzer, record survey.R) {
switch record.GetString("type") { switch record.GetString("type") {
case "open_conn": case "open_conn":
analyzer.IncreaseValueNonRepeat("开播人数", record, 1, "live_id") analyzer.IncreaseValueNonRepeat("开播人数", record, 1, "live_id")
@ -44,5 +44,11 @@ func TestClose(t *testing.T) {
} }
}) })
fmt.Println(report.FilterSub("warzone0009")) for i := 0; i < 10; i++ {
report, err := reader()
if err != nil {
t.Fatal(err)
}
fmt.Println(report.FilterSub("warzone0009"))
}
} }

View File

@ -151,3 +151,19 @@ func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R))
return newReport(analyzer) return newReport(analyzer)
} }
// IncrementAnalyze 增量分析,返回一个函数,每次调用该函数都会分析文件中新增的内容
func IncrementAnalyze(filePath string, handle func(analyzer *Analyzer, record R)) func() (*Report, error) {
var analyzer = new(Analyzer)
var offset int64
return func() (*Report, error) {
var err error
offset, err = file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(analyzer, R(s))
}, offset)
if err != nil {
return nil, err
}
return newReport(analyzer), nil
}
}