1
2
3
4
5 package semaphore_test
6
7 import (
8 "context"
9 "math/rand"
10 "runtime"
11 "sync"
12 "testing"
13 "time"
14
15 "golang.org/x/sync/errgroup"
16 "golang.org/x/sync/semaphore"
17 )
18
19 const maxSleep = 1 * time.Millisecond
20
21 func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) {
22 for i := 0; i < loops; i++ {
23 sem.Acquire(context.Background(), n)
24 time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond)
25 sem.Release(n)
26 }
27 }
28
29 func TestWeighted(t *testing.T) {
30 t.Parallel()
31
32 n := runtime.GOMAXPROCS(0)
33 loops := 10000 / n
34 sem := semaphore.NewWeighted(int64(n))
35 var wg sync.WaitGroup
36 wg.Add(n)
37 for i := 0; i < n; i++ {
38 i := i
39 go func() {
40 defer wg.Done()
41 HammerWeighted(sem, int64(i), loops)
42 }()
43 }
44 wg.Wait()
45 }
46
47 func TestWeightedPanic(t *testing.T) {
48 t.Parallel()
49
50 defer func() {
51 if recover() == nil {
52 t.Fatal("release of an unacquired weighted semaphore did not panic")
53 }
54 }()
55 w := semaphore.NewWeighted(1)
56 w.Release(1)
57 }
58
59 func TestWeightedTryAcquire(t *testing.T) {
60 t.Parallel()
61
62 ctx := context.Background()
63 sem := semaphore.NewWeighted(2)
64 tries := []bool{}
65 sem.Acquire(ctx, 1)
66 tries = append(tries, sem.TryAcquire(1))
67 tries = append(tries, sem.TryAcquire(1))
68
69 sem.Release(2)
70
71 tries = append(tries, sem.TryAcquire(1))
72 sem.Acquire(ctx, 1)
73 tries = append(tries, sem.TryAcquire(1))
74
75 want := []bool{true, false, true, false}
76 for i := range tries {
77 if tries[i] != want[i] {
78 t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
79 }
80 }
81 }
82
83 func TestWeightedAcquire(t *testing.T) {
84 t.Parallel()
85
86 ctx := context.Background()
87 sem := semaphore.NewWeighted(2)
88 tryAcquire := func(n int64) bool {
89 ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
90 defer cancel()
91 return sem.Acquire(ctx, n) == nil
92 }
93
94 tries := []bool{}
95 sem.Acquire(ctx, 1)
96 tries = append(tries, tryAcquire(1))
97 tries = append(tries, tryAcquire(1))
98
99 sem.Release(2)
100
101 tries = append(tries, tryAcquire(1))
102 sem.Acquire(ctx, 1)
103 tries = append(tries, tryAcquire(1))
104
105 want := []bool{true, false, true, false}
106 for i := range tries {
107 if tries[i] != want[i] {
108 t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
109 }
110 }
111 }
112
113 func TestWeightedDoesntBlockIfTooBig(t *testing.T) {
114 t.Parallel()
115
116 const n = 2
117 sem := semaphore.NewWeighted(n)
118 {
119 ctx, cancel := context.WithCancel(context.Background())
120 defer cancel()
121 go sem.Acquire(ctx, n+1)
122 }
123
124 g, ctx := errgroup.WithContext(context.Background())
125 for i := n * 3; i > 0; i-- {
126 g.Go(func() error {
127 err := sem.Acquire(ctx, 1)
128 if err == nil {
129 time.Sleep(1 * time.Millisecond)
130 sem.Release(1)
131 }
132 return err
133 })
134 }
135 if err := g.Wait(); err != nil {
136 t.Errorf("semaphore.NewWeighted(%v) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1)
137 }
138 }
139
140
141
142 func TestLargeAcquireDoesntStarve(t *testing.T) {
143 t.Parallel()
144
145 ctx := context.Background()
146 n := int64(runtime.GOMAXPROCS(0))
147 sem := semaphore.NewWeighted(n)
148 running := true
149
150 var wg sync.WaitGroup
151 wg.Add(int(n))
152 for i := n; i > 0; i-- {
153 sem.Acquire(ctx, 1)
154 go func() {
155 defer func() {
156 sem.Release(1)
157 wg.Done()
158 }()
159 for running {
160 time.Sleep(1 * time.Millisecond)
161 sem.Release(1)
162 sem.Acquire(ctx, 1)
163 }
164 }()
165 }
166
167 sem.Acquire(ctx, n)
168 running = false
169 sem.Release(n)
170 wg.Wait()
171 }
172
173
174 func TestAllocCancelDoesntStarve(t *testing.T) {
175 sem := semaphore.NewWeighted(10)
176
177
178 sem.Acquire(context.Background(), 1)
179
180
181 ctx, cancel := context.WithCancel(context.Background())
182 defer cancel()
183 go func() {
184 sem.Acquire(ctx, 10)
185 }()
186
187
188 for sem.TryAcquire(1) {
189 sem.Release(1)
190 runtime.Gosched()
191 }
192
193
194
195 go cancel()
196
197 err := sem.Acquire(context.Background(), 1)
198 if err != nil {
199 t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err)
200 }
201 sem.Release(1)
202 }
203
204 func TestWeightedAcquireCanceled(t *testing.T) {
205
206 sem := semaphore.NewWeighted(2)
207 ctx, cancel := context.WithCancel(context.Background())
208 sem.Acquire(context.Background(), 1)
209 ch := make(chan struct{})
210 go func() {
211
212 for sem.TryAcquire(1) {
213 sem.Release(1)
214 }
215
216 cancel()
217 sem.Release(1)
218 close(ch)
219 }()
220
221
222 if err := sem.Acquire(ctx, 2); err != context.Canceled {
223 t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err)
224 }
225
226
227 <-ch
228 if !sem.TryAcquire(2) {
229 t.Fatal("TryAcquire after canceled Acquire failed")
230 }
231
232
233 sem.Release(2)
234 if err := sem.Acquire(ctx, 1); err != context.Canceled {
235 t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err)
236 }
237 }
238
View as plain text