refactor/importgraph: reduce I/O concurrency to avoid EMFILE

Use an concurrency-limiting semaphore to reduce I/O parallelism in Import.

Also, start the producer in a new goroutine so that it runs in parallel
with the consumer.  Paradoxically, this reduces the peak number of
goroutines.

Also, in buildutil.ForEachPackage, make the concurrency limiting
semaphore global, since I/O parallelism is a process-wide resource.

Change-Id: I282b717c50603361826e5675077c9f464c874132
Reviewed-on: https://go-review.googlesource.com/18215
Reviewed-by: Michael Matloob <matloob@golang.org>
This commit is contained in:
Alan Donovan 2015-12-30 14:17:09 -05:00
parent 6e2f52e572
commit 3a85b8da38
2 changed files with 68 additions and 66 deletions

View File

@ -50,10 +50,6 @@ func AllPackages(ctxt *build.Context) []string {
// which must be concurrency-safe. // which must be concurrency-safe.
// //
func ForEachPackage(ctxt *build.Context, found func(importPath string, err error)) { func ForEachPackage(ctxt *build.Context, found func(importPath string, err error)) {
// We use a counting semaphore to limit
// the number of parallel calls to ReadDir.
sema := make(chan bool, 20)
ch := make(chan item) ch := make(chan item)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -61,7 +57,7 @@ func ForEachPackage(ctxt *build.Context, found func(importPath string, err error
root := root root := root
wg.Add(1) wg.Add(1)
go func() { go func() {
allPackages(ctxt, sema, root, ch) allPackages(ctxt, root, ch)
wg.Done() wg.Done()
}() }()
} }
@ -81,7 +77,11 @@ type item struct {
err error // (optional) err error // (optional)
} }
func allPackages(ctxt *build.Context, sema chan bool, root string, ch chan<- item) { // We use a process-wide counting semaphore to limit
// the number of parallel calls to ReadDir.
var ioLimit = make(chan bool, 20)
func allPackages(ctxt *build.Context, root string, ch chan<- item) {
root = filepath.Clean(root) + string(os.PathSeparator) root = filepath.Clean(root) + string(os.PathSeparator)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -102,9 +102,9 @@ func allPackages(ctxt *build.Context, sema chan bool, root string, ch chan<- ite
return return
} }
sema <- true ioLimit <- true
files, err := ReadDir(ctxt, dir) files, err := ReadDir(ctxt, dir)
<-sema <-ioLimit
if pkg != "" || err != nil { if pkg != "" || err != nil {
ch <- item{pkg, err} ch <- item{pkg, err}
} }

View File

@ -68,76 +68,78 @@ func Build(ctxt *build.Context) (forward, reverse Graph, errors map[string]error
ch := make(chan interface{}) ch := make(chan interface{})
sema := make(chan int, 20) // I/O concurrency limiting semaphore go func() {
var wg sync.WaitGroup sema := make(chan int, 20) // I/O concurrency limiting semaphore
buildutil.ForEachPackage(ctxt, func(path string, err error) { var wg sync.WaitGroup
wg.Add(1) buildutil.ForEachPackage(ctxt, func(path string, err error) {
go func() {
defer wg.Done()
if err != nil { if err != nil {
ch <- pathError{path, err} ch <- pathError{path, err}
return return
} }
sema <- 1 wg.Add(1)
bp, err := ctxt.Import(path, "", buildutil.AllowVendor) go func() {
<-sema defer wg.Done()
if err != nil { sema <- 1
if _, ok := err.(*build.NoGoError); ok { bp, err := ctxt.Import(path, "", buildutil.AllowVendor)
// empty directory is not an error <-sema
} else {
ch <- pathError{path, err}
}
// Even in error cases, Import usually returns a package.
}
// absolutize resolves an import path relative if err != nil {
// to the current package bp. if _, ok := err.(*build.NoGoError); ok {
// The absolute form may contain "vendor". // empty directory is not an error
// } else {
// The vendoring feature slows down Build by 3×. ch <- pathError{path, err}
// Here are timings from a 1400 package workspace:
// 1100ms: current code (with vendor check)
// 880ms: with a nonblocking cache around ctxt.IsDir
// 840ms: nonblocking cache with duplicate suppression
// 340ms: original code (no vendor check)
// TODO(adonovan): optimize, somehow.
absolutize := func(path string) string { return path }
if buildutil.AllowVendor != 0 {
memo := make(map[string]string)
absolutize = func(path string) string {
canon, ok := memo[path]
if !ok {
sema <- 1
bp2, _ := ctxt.Import(path, bp.Dir, build.FindOnly|buildutil.AllowVendor)
<-sema
if bp2 != nil {
canon = bp2.ImportPath
} else {
canon = path
}
memo[path] = canon
} }
return canon // Even in error cases, Import usually returns a package.
} }
}
if bp != nil { // absolutize resolves an import path relative
for _, imp := range bp.Imports { // to the current package bp.
ch <- importEdge{path, absolutize(imp)} // The absolute form may contain "vendor".
//
// The vendoring feature slows down Build by 3×.
// Here are timings from a 1400 package workspace:
// 1100ms: current code (with vendor check)
// 880ms: with a nonblocking cache around ctxt.IsDir
// 840ms: nonblocking cache with duplicate suppression
// 340ms: original code (no vendor check)
// TODO(adonovan): optimize, somehow.
absolutize := func(path string) string { return path }
if buildutil.AllowVendor != 0 {
memo := make(map[string]string)
absolutize = func(path string) string {
canon, ok := memo[path]
if !ok {
sema <- 1
bp2, _ := ctxt.Import(path, bp.Dir, build.FindOnly|buildutil.AllowVendor)
<-sema
if bp2 != nil {
canon = bp2.ImportPath
} else {
canon = path
}
memo[path] = canon
}
return canon
}
} }
for _, imp := range bp.TestImports {
ch <- importEdge{path, absolutize(imp)} if bp != nil {
for _, imp := range bp.Imports {
ch <- importEdge{path, absolutize(imp)}
}
for _, imp := range bp.TestImports {
ch <- importEdge{path, absolutize(imp)}
}
for _, imp := range bp.XTestImports {
ch <- importEdge{path, absolutize(imp)}
}
} }
for _, imp := range bp.XTestImports {
ch <- importEdge{path, absolutize(imp)} }()
} })
}
}()
})
go func() {
wg.Wait() wg.Wait()
close(ch) close(ch)
}() }()