...

Source file src/golang.org/x/sync/errgroup/errgroup_example_md5all_test.go

Documentation: golang.org/x/sync/errgroup

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package errgroup_test
     6  
     7  import (
     8  	"context"
     9  	"crypto/md5"
    10  	"fmt"
    11  	"io/ioutil"
    12  	"log"
    13  	"os"
    14  	"path/filepath"
    15  
    16  	"golang.org/x/sync/errgroup"
    17  )
    18  
    19  // Pipeline demonstrates the use of a Group to implement a multi-stage
    20  // pipeline: a version of the MD5All function with bounded parallelism from
    21  // https://blog.golang.org/pipelines.
    22  func ExampleGroup_pipeline() {
    23  	m, err := MD5All(context.Background(), ".")
    24  	if err != nil {
    25  		log.Fatal(err)
    26  	}
    27  
    28  	for k, sum := range m {
    29  		fmt.Printf("%s:\t%x\n", k, sum)
    30  	}
    31  }
    32  
    33  type result struct {
    34  	path string
    35  	sum  [md5.Size]byte
    36  }
    37  
    38  // MD5All reads all the files in the file tree rooted at root and returns a map
    39  // from file path to the MD5 sum of the file's contents. If the directory walk
    40  // fails or any read operation fails, MD5All returns an error.
    41  func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
    42  	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
    43  	// - even in case of error! - we know that all of the goroutines have finished
    44  	// and the memory they were using can be garbage-collected.
    45  	g, ctx := errgroup.WithContext(ctx)
    46  	paths := make(chan string)
    47  
    48  	g.Go(func() error {
    49  		defer close(paths)
    50  		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
    51  			if err != nil {
    52  				return err
    53  			}
    54  			if !info.Mode().IsRegular() {
    55  				return nil
    56  			}
    57  			select {
    58  			case paths <- path:
    59  			case <-ctx.Done():
    60  				return ctx.Err()
    61  			}
    62  			return nil
    63  		})
    64  	})
    65  
    66  	// Start a fixed number of goroutines to read and digest files.
    67  	c := make(chan result)
    68  	const numDigesters = 20
    69  	for i := 0; i < numDigesters; i++ {
    70  		g.Go(func() error {
    71  			for path := range paths {
    72  				data, err := ioutil.ReadFile(path)
    73  				if err != nil {
    74  					return err
    75  				}
    76  				select {
    77  				case c <- result{path, md5.Sum(data)}:
    78  				case <-ctx.Done():
    79  					return ctx.Err()
    80  				}
    81  			}
    82  			return nil
    83  		})
    84  	}
    85  	go func() {
    86  		g.Wait()
    87  		close(c)
    88  	}()
    89  
    90  	m := make(map[string][md5.Size]byte)
    91  	for r := range c {
    92  		m[r.path] = r.sum
    93  	}
    94  	// Check whether any of the goroutines failed. Since g is accumulating the
    95  	// errors, we don't need to send them (or check for them) in the individual
    96  	// results sent on the channel.
    97  	if err := g.Wait(); err != nil {
    98  		return nil, err
    99  	}
   100  	return m, nil
   101  }
   102  

View as plain text