...
1
2
3
4
5 package errgroup_test
6
7 import (
8 "context"
9 "crypto/md5"
10 "fmt"
11 "io/ioutil"
12 "log"
13 "os"
14 "path/filepath"
15
16 "golang.org/x/sync/errgroup"
17 )
18
19
20
21
22 func ExampleGroup_pipeline() {
23 m, err := MD5All(context.Background(), ".")
24 if err != nil {
25 log.Fatal(err)
26 }
27
28 for k, sum := range m {
29 fmt.Printf("%s:\t%x\n", k, sum)
30 }
31 }
32
33 type result struct {
34 path string
35 sum [md5.Size]byte
36 }
37
38
39
40
41 func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
42
43
44
45 g, ctx := errgroup.WithContext(ctx)
46 paths := make(chan string)
47
48 g.Go(func() error {
49 defer close(paths)
50 return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
51 if err != nil {
52 return err
53 }
54 if !info.Mode().IsRegular() {
55 return nil
56 }
57 select {
58 case paths <- path:
59 case <-ctx.Done():
60 return ctx.Err()
61 }
62 return nil
63 })
64 })
65
66
67 c := make(chan result)
68 const numDigesters = 20
69 for i := 0; i < numDigesters; i++ {
70 g.Go(func() error {
71 for path := range paths {
72 data, err := ioutil.ReadFile(path)
73 if err != nil {
74 return err
75 }
76 select {
77 case c <- result{path, md5.Sum(data)}:
78 case <-ctx.Done():
79 return ctx.Err()
80 }
81 }
82 return nil
83 })
84 }
85 go func() {
86 g.Wait()
87 close(c)
88 }()
89
90 m := make(map[string][md5.Size]byte)
91 for r := range c {
92 m[r.path] = r.sum
93 }
94
95
96
97 if err := g.Wait(); err != nil {
98 return nil, err
99 }
100 return m, nil
101 }
102
View as plain text