...
1
2
3
4
5
6
7 package singleflight
8
9 import (
10 "bytes"
11 "errors"
12 "fmt"
13 "runtime"
14 "runtime/debug"
15 "sync"
16 )
17
18
19
20 var errGoexit = errors.New("runtime.Goexit was called")
21
22
23
24 type panicError struct {
25 value interface{}
26 stack []byte
27 }
28
29
30 func (p *panicError) Error() string {
31 return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
32 }
33
34 func (p *panicError) Unwrap() error {
35 err, ok := p.value.(error)
36 if !ok {
37 return nil
38 }
39
40 return err
41 }
42
43 func newPanicError(v interface{}) error {
44 stack := debug.Stack()
45
46
47
48
49 if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
50 stack = stack[line+1:]
51 }
52 return &panicError{value: v, stack: stack}
53 }
54
55
56 type call struct {
57 wg sync.WaitGroup
58
59
60
61 val interface{}
62 err error
63
64
65
66
67 dups int
68 chans []chan<- Result
69 }
70
71
72
73 type Group struct {
74 mu sync.Mutex
75 m map[string]*call
76 }
77
78
79
80 type Result struct {
81 Val interface{}
82 Err error
83 Shared bool
84 }
85
86
87
88
89
90
91 func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
92 g.mu.Lock()
93 if g.m == nil {
94 g.m = make(map[string]*call)
95 }
96 if c, ok := g.m[key]; ok {
97 c.dups++
98 g.mu.Unlock()
99 c.wg.Wait()
100
101 if e, ok := c.err.(*panicError); ok {
102 panic(e)
103 } else if c.err == errGoexit {
104 runtime.Goexit()
105 }
106 return c.val, c.err, true
107 }
108 c := new(call)
109 c.wg.Add(1)
110 g.m[key] = c
111 g.mu.Unlock()
112
113 g.doCall(c, key, fn)
114 return c.val, c.err, c.dups > 0
115 }
116
117
118
119
120
121 func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
122 ch := make(chan Result, 1)
123 g.mu.Lock()
124 if g.m == nil {
125 g.m = make(map[string]*call)
126 }
127 if c, ok := g.m[key]; ok {
128 c.dups++
129 c.chans = append(c.chans, ch)
130 g.mu.Unlock()
131 return ch
132 }
133 c := &call{chans: []chan<- Result{ch}}
134 c.wg.Add(1)
135 g.m[key] = c
136 g.mu.Unlock()
137
138 go g.doCall(c, key, fn)
139
140 return ch
141 }
142
143
144 func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
145 normalReturn := false
146 recovered := false
147
148
149
150 defer func() {
151
152 if !normalReturn && !recovered {
153 c.err = errGoexit
154 }
155
156 g.mu.Lock()
157 defer g.mu.Unlock()
158 c.wg.Done()
159 if g.m[key] == c {
160 delete(g.m, key)
161 }
162
163 if e, ok := c.err.(*panicError); ok {
164
165
166 if len(c.chans) > 0 {
167 go panic(e)
168 select {}
169 } else {
170 panic(e)
171 }
172 } else if c.err == errGoexit {
173
174 } else {
175
176 for _, ch := range c.chans {
177 ch <- Result{c.val, c.err, c.dups > 0}
178 }
179 }
180 }()
181
182 func() {
183 defer func() {
184 if !normalReturn {
185
186
187
188
189
190
191
192 if r := recover(); r != nil {
193 c.err = newPanicError(r)
194 }
195 }
196 }()
197
198 c.val, c.err = fn()
199 normalReturn = true
200 }()
201
202 if !normalReturn {
203 recovered = true
204 }
205 }
206
207
208
209
210 func (g *Group) Forget(key string) {
211 g.mu.Lock()
212 delete(g.m, key)
213 g.mu.Unlock()
214 }
215
View as plain text