From 3a85b8da3830e027d8fd2d446a8675d0ad9a57b3 Mon Sep 17 00:00:00 2001 From: Alan Donovan Date: Wed, 30 Dec 2015 14:17:09 -0500 Subject: [PATCH] refactor/importgraph: reduce I/O concurrency to avoid EMFILE Use an concurrency-limiting semaphore to reduce I/O parallelism in Import. Also, start the producer in a new goroutine so that it runs in parallel with the consumer. Paradoxically, this reduces the peak number of goroutines. Also, in buildutil.ForEachPackage, make the concurrency limiting semaphore global, since I/O parallelism is a process-wide resource. Change-Id: I282b717c50603361826e5675077c9f464c874132 Reviewed-on: https://go-review.googlesource.com/18215 Reviewed-by: Michael Matloob --- go/buildutil/allpackages.go | 16 ++--- refactor/importgraph/graph.go | 118 +++++++++++++++++----------------- 2 files changed, 68 insertions(+), 66 deletions(-) diff --git a/go/buildutil/allpackages.go b/go/buildutil/allpackages.go index d28a5d0e..bee5fc17 100644 --- a/go/buildutil/allpackages.go +++ b/go/buildutil/allpackages.go @@ -50,10 +50,6 @@ func AllPackages(ctxt *build.Context) []string { // which must be concurrency-safe. // func ForEachPackage(ctxt *build.Context, found func(importPath string, err error)) { - // We use a counting semaphore to limit - // the number of parallel calls to ReadDir. - sema := make(chan bool, 20) - ch := make(chan item) var wg sync.WaitGroup @@ -61,7 +57,7 @@ func ForEachPackage(ctxt *build.Context, found func(importPath string, err error root := root wg.Add(1) go func() { - allPackages(ctxt, sema, root, ch) + allPackages(ctxt, root, ch) wg.Done() }() } @@ -81,7 +77,11 @@ type item struct { err error // (optional) } -func allPackages(ctxt *build.Context, sema chan bool, root string, ch chan<- item) { +// We use a process-wide counting semaphore to limit +// the number of parallel calls to ReadDir. +var ioLimit = make(chan bool, 20) + +func allPackages(ctxt *build.Context, root string, ch chan<- item) { root = filepath.Clean(root) + string(os.PathSeparator) var wg sync.WaitGroup @@ -102,9 +102,9 @@ func allPackages(ctxt *build.Context, sema chan bool, root string, ch chan<- ite return } - sema <- true + ioLimit <- true files, err := ReadDir(ctxt, dir) - <-sema + <-ioLimit if pkg != "" || err != nil { ch <- item{pkg, err} } diff --git a/refactor/importgraph/graph.go b/refactor/importgraph/graph.go index 8a833dd2..7afb1807 100644 --- a/refactor/importgraph/graph.go +++ b/refactor/importgraph/graph.go @@ -68,76 +68,78 @@ func Build(ctxt *build.Context) (forward, reverse Graph, errors map[string]error ch := make(chan interface{}) - sema := make(chan int, 20) // I/O concurrency limiting semaphore - var wg sync.WaitGroup - buildutil.ForEachPackage(ctxt, func(path string, err error) { - wg.Add(1) - go func() { - defer wg.Done() + go func() { + sema := make(chan int, 20) // I/O concurrency limiting semaphore + var wg sync.WaitGroup + buildutil.ForEachPackage(ctxt, func(path string, err error) { if err != nil { ch <- pathError{path, err} return } - sema <- 1 - bp, err := ctxt.Import(path, "", buildutil.AllowVendor) - <-sema + wg.Add(1) + go func() { + defer wg.Done() - if err != nil { - if _, ok := err.(*build.NoGoError); ok { - // empty directory is not an error - } else { - ch <- pathError{path, err} - } - // Even in error cases, Import usually returns a package. - } + sema <- 1 + bp, err := ctxt.Import(path, "", buildutil.AllowVendor) + <-sema - // absolutize resolves an import path relative - // to the current package bp. - // The absolute form may contain "vendor". - // - // The vendoring feature slows down Build by 3×. - // Here are timings from a 1400 package workspace: - // 1100ms: current code (with vendor check) - // 880ms: with a nonblocking cache around ctxt.IsDir - // 840ms: nonblocking cache with duplicate suppression - // 340ms: original code (no vendor check) - // TODO(adonovan): optimize, somehow. - absolutize := func(path string) string { return path } - if buildutil.AllowVendor != 0 { - memo := make(map[string]string) - absolutize = func(path string) string { - canon, ok := memo[path] - if !ok { - sema <- 1 - bp2, _ := ctxt.Import(path, bp.Dir, build.FindOnly|buildutil.AllowVendor) - <-sema - - if bp2 != nil { - canon = bp2.ImportPath - } else { - canon = path - } - memo[path] = canon + if err != nil { + if _, ok := err.(*build.NoGoError); ok { + // empty directory is not an error + } else { + ch <- pathError{path, err} } - return canon + // Even in error cases, Import usually returns a package. } - } - if bp != nil { - for _, imp := range bp.Imports { - ch <- importEdge{path, absolutize(imp)} + // absolutize resolves an import path relative + // to the current package bp. + // The absolute form may contain "vendor". + // + // The vendoring feature slows down Build by 3×. + // Here are timings from a 1400 package workspace: + // 1100ms: current code (with vendor check) + // 880ms: with a nonblocking cache around ctxt.IsDir + // 840ms: nonblocking cache with duplicate suppression + // 340ms: original code (no vendor check) + // TODO(adonovan): optimize, somehow. + absolutize := func(path string) string { return path } + if buildutil.AllowVendor != 0 { + memo := make(map[string]string) + absolutize = func(path string) string { + canon, ok := memo[path] + if !ok { + sema <- 1 + bp2, _ := ctxt.Import(path, bp.Dir, build.FindOnly|buildutil.AllowVendor) + <-sema + + if bp2 != nil { + canon = bp2.ImportPath + } else { + canon = path + } + memo[path] = canon + } + return canon + } } - for _, imp := range bp.TestImports { - ch <- importEdge{path, absolutize(imp)} + + if bp != nil { + for _, imp := range bp.Imports { + ch <- importEdge{path, absolutize(imp)} + } + for _, imp := range bp.TestImports { + ch <- importEdge{path, absolutize(imp)} + } + for _, imp := range bp.XTestImports { + ch <- importEdge{path, absolutize(imp)} + } } - for _, imp := range bp.XTestImports { - ch <- importEdge{path, absolutize(imp)} - } - } - }() - }) - go func() { + + }() + }) wg.Wait() close(ch) }()