1
2
3
4
5 package semaphore_test
6
7 import (
8 "context"
9 "math/rand"
10 "runtime"
11 "sync"
12 "testing"
13 "time"
14
15 "github.com/letsencrypt/boulder/semaphore"
16 "golang.org/x/sync/errgroup"
17 )
18
19 const maxSleep = 1 * time.Millisecond
20
21 func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) {
22 for i := 0; i < loops; i++ {
23 _ = sem.Acquire(context.Background(), n)
24 time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond)
25 sem.Release(n)
26 }
27 }
28
29 func TestWeighted(t *testing.T) {
30 t.Parallel()
31
32 n := runtime.GOMAXPROCS(0)
33 loops := 10000 / n
34 sem := semaphore.NewWeighted(int64(n), 0)
35 var wg sync.WaitGroup
36 wg.Add(n)
37 for i := 0; i < n; i++ {
38 i := i
39 go func() {
40 defer wg.Done()
41 HammerWeighted(sem, int64(i), loops)
42 }()
43 }
44 wg.Wait()
45 }
46
47 func TestWeightedPanic(t *testing.T) {
48 t.Parallel()
49
50 defer func() {
51 if recover() == nil {
52 t.Fatal("release of an unacquired weighted semaphore did not panic")
53 }
54 }()
55 w := semaphore.NewWeighted(1, 0)
56 w.Release(1)
57 }
58
59 func TestWeightedTryAcquire(t *testing.T) {
60 t.Parallel()
61
62 ctx := context.Background()
63 sem := semaphore.NewWeighted(2, 0)
64 tries := []bool{}
65 _ = sem.Acquire(ctx, 1)
66 tries = append(tries, sem.TryAcquire(1))
67 tries = append(tries, sem.TryAcquire(1))
68
69 sem.Release(2)
70
71 tries = append(tries, sem.TryAcquire(1))
72 _ = sem.Acquire(ctx, 1)
73 tries = append(tries, sem.TryAcquire(1))
74
75 want := []bool{true, false, true, false}
76 for i := range tries {
77 if tries[i] != want[i] {
78 t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
79 }
80 }
81 }
82
83 func TestWeightedAcquire(t *testing.T) {
84 t.Parallel()
85
86 ctx := context.Background()
87 sem := semaphore.NewWeighted(2, 0)
88 tryAcquire := func(n int64) bool {
89 ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
90 defer cancel()
91 return sem.Acquire(ctx, n) == nil
92 }
93
94 tries := []bool{}
95 _ = sem.Acquire(ctx, 1)
96 tries = append(tries, tryAcquire(1))
97 tries = append(tries, tryAcquire(1))
98
99 sem.Release(2)
100
101 tries = append(tries, tryAcquire(1))
102 _ = sem.Acquire(ctx, 1)
103 tries = append(tries, tryAcquire(1))
104
105 want := []bool{true, false, true, false}
106 for i := range tries {
107 if tries[i] != want[i] {
108 t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
109 }
110 }
111 }
112
113 func TestWeightedDoesntBlockIfTooBig(t *testing.T) {
114 t.Parallel()
115
116 const n = 2
117 sem := semaphore.NewWeighted(n, 0)
118 {
119 ctx, cancel := context.WithCancel(context.Background())
120 defer cancel()
121 go func() {
122 _ = sem.Acquire(ctx, n+1)
123 }()
124 }
125
126 g, ctx := errgroup.WithContext(context.Background())
127 for i := n * 3; i > 0; i-- {
128 g.Go(func() error {
129 err := sem.Acquire(ctx, 1)
130 if err == nil {
131 time.Sleep(1 * time.Millisecond)
132 sem.Release(1)
133 }
134 return err
135 })
136 }
137 if err := g.Wait(); err != nil {
138 t.Errorf("semaphore.NewWeighted(%v, 0) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1)
139 }
140 }
141
142
143
144 func TestLargeAcquireDoesntStarve(t *testing.T) {
145 t.Parallel()
146
147 ctx := context.Background()
148 n := int64(runtime.GOMAXPROCS(0))
149 sem := semaphore.NewWeighted(n, 0)
150 running := true
151
152 var wg sync.WaitGroup
153 wg.Add(int(n))
154 for i := n; i > 0; i-- {
155 _ = sem.Acquire(ctx, 1)
156 go func() {
157 defer func() {
158 sem.Release(1)
159 wg.Done()
160 }()
161 for running {
162 time.Sleep(1 * time.Millisecond)
163 sem.Release(1)
164 _ = sem.Acquire(ctx, 1)
165 }
166 }()
167 }
168
169 _ = sem.Acquire(ctx, n)
170 running = false
171 sem.Release(n)
172 wg.Wait()
173 }
174
175
176 func TestAllocCancelDoesntStarve(t *testing.T) {
177 sem := semaphore.NewWeighted(10, 0)
178
179
180 _ = sem.Acquire(context.Background(), 1)
181
182
183 ctx, cancel := context.WithCancel(context.Background())
184 defer cancel()
185 go func() {
186 _ = sem.Acquire(ctx, 10)
187 }()
188
189
190 for sem.TryAcquire(1) {
191 sem.Release(1)
192 runtime.Gosched()
193 }
194
195
196
197 go cancel()
198
199 err := sem.Acquire(context.Background(), 1)
200 if err != nil {
201 t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err)
202 }
203 sem.Release(1)
204 }
205
206 func TestMaxWaiters(t *testing.T) {
207 ctx, cancel := context.WithCancel(context.Background())
208 defer cancel()
209 sem := semaphore.NewWeighted(1, 10)
210 _ = sem.Acquire(ctx, 1)
211
212 for i := 0; i < 10; i++ {
213 go func() {
214 _ = sem.Acquire(ctx, 1)
215 <-ctx.Done()
216 }()
217 }
218
219
220
221
222 for sem.NumWaiters() < 10 {
223 time.Sleep(10 * time.Millisecond)
224 }
225 err := sem.Acquire(ctx, 1)
226 if err != semaphore.ErrMaxWaiters {
227 t.Errorf("expected error when maxWaiters was reached, but got %#v", err)
228 }
229 }
230
View as plain text