feat: 增加了增量读取功能并改善了错误处理

此提交在文件读取功能上进行了扩展,通过在utils/file/file.go中的ReadLineWithParallel函数和FindLineChunks函数添加“start”参数,实现了从指定位置进行增量读取。此外,当扫描器遇到错误时,utils
/ file /
file.go中的错误处理得到了改善,删除了panic表达式,而是直接返回,让函数继续处理。同时在utils/log/survey/survey.go中实现了来自utils/
file/file.go的功能,以使用新的增量读取功能替换旧功能。
This commit is contained in:
kercylan98 2023-11-28 11:59:55 +08:00
parent c10494d3c2
commit b11baa3653
3 changed files with 38 additions and 10 deletions

View File

@ -2,6 +2,7 @@ package file
import (
"bufio"
"github.com/kercylan98/minotaur/utils/slice"
"io"
"os"
"path/filepath"
@ -149,25 +150,34 @@ func Paths(dir string) []string {
return paths
}
// ReadLineWithParallel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理,当过程中如果产生错误则会发生 panic过程前发生错误将会返回 error
// ReadLineWithParallelByChannel 并行的分行读取文件并行处理,处理过程中会将每一行的内容传入 handlerFunc 中进行处理
// - 由于是并行处理,所以处理过程中的顺序是不确定的。
func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string)) error {
// - 可通过 start 参数指定开始读取的位置,如果不指定则从文件开头开始读取。
func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(string), start ...int64) (n int64, err error) {
file, err := os.Open(filename)
if err != nil {
return err
return 0, err
}
defer func() {
_ = file.Close()
}()
chunks := FindLineChunks(file, chunkSize)
chunks := FindLineChunksByOffset(file, slice.GetValue(start, 0), chunkSize)
var end int64
var endMutex sync.Mutex
var wg sync.WaitGroup
for _, chunk := range chunks {
wg.Add(1)
go func(chunk [2]int64) {
defer wg.Done()
r := io.NewSectionReader(file, chunk[0], chunk[1]-chunk[0])
endMutex.Lock()
e := chunk[1] - chunk[0]
if e > end {
end = e + 1
}
endMutex.Unlock()
r := io.NewSectionReader(file, chunk[0], e)
scanner := bufio.NewScanner(r)
for scanner.Scan() {
@ -175,12 +185,12 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str
}
if err := scanner.Err(); err != nil {
panic(err)
return
}
}(chunk)
}
wg.Wait()
return nil
return end, nil
}
// FindLineChunks 查找文件按照每行划分的分块,每个分块的大小将在 chunkSize 和分割后的分块距离行首及行尾的距离中范围内
@ -188,6 +198,11 @@ func ReadLineWithParallel(filename string, chunkSize int64, handlerFunc func(str
// - 当过程中发生错误将会发生 panic
// - 返回值的成员是一个长度为 2 的数组,第一个元素是分块的起始位置,第二个元素是分块的结束位置
func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 {
return FindLineChunksByOffset(file, 0, chunkSize)
}
// FindLineChunksByOffset 该函数与 FindLineChunks 类似,不同的是该函数可以指定 offset 从指定位置开始读取文件
func FindLineChunksByOffset(file *os.File, offset, chunkSize int64) [][2]int64 {
var chunks [][2]int64
fileSize, err := file.Seek(0, io.SeekEnd)
@ -199,7 +214,7 @@ func FindLineChunks(file *os.File, chunkSize int64) [][2]int64 {
panic(err)
}
currentPos := int64(0)
currentPos := offset
for currentPos < fileSize {
start := currentPos
if start != 0 { // 不是文件的开头

View File

@ -5,6 +5,7 @@ import (
"github.com/kercylan98/minotaur/utils/file"
"strings"
"testing"
"time"
)
func TestFilePaths(t *testing.T) {
@ -20,3 +21,15 @@ func TestFilePaths(t *testing.T) {
}
fmt.Println("total line:", line, "total file:", fileCount)
}
func TestNewIncrementReader(t *testing.T) {
n, _ := file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) {
t.Log(s)
})
time.Sleep(time.Second * 3)
n, _ = file.ReadLineWithParallel(`./test/t.log`, 1*1024*1024*1024, func(s string) {
t.Log(s)
}, n)
}

View File

@ -127,7 +127,7 @@ func Close(names ...string) {
// - 适用于外部进程对于日志文件的读取,但是需要注意的是,此时日志文件可能正在被写入,所以可能会读取到错误的数据
func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report {
analyzer := new(Analyzer)
err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
_, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(analyzer, R(s))
})
if err != nil {
@ -141,7 +141,7 @@ func Analyze(filePath string, handle func(analyzer *Analyzer, record R)) *Report
func AnalyzeMulti(filePaths []string, handle func(analyzer *Analyzer, record R)) *Report {
analyzer := new(Analyzer)
for _, filePath := range filePaths {
err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
_, err := file.ReadLineWithParallel(filePath, 1*1024*1024*1024, func(s string) {
handle(analyzer, R(s))
})
if err != nil {