1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "sync/atomic"
22 "testing"
23 "time"
24
25 "golang.org/x/sync/errgroup"
26 )
27
28 func fcSettings(c int, s int, l LimitExceededBehavior) FlowControlSettings {
29 return FlowControlSettings{
30 MaxOutstandingMessages: c,
31 MaxOutstandingBytes: s,
32 LimitExceededBehavior: l,
33 }
34 }
35
36 func TestFlowControllerCancel(t *testing.T) {
37
38 t.Parallel()
39 fc := newFlowController(fcSettings(3, 10, FlowControlBlock))
40 if err := fc.acquire(context.Background(), 5); err != nil {
41 t.Fatal(err)
42 }
43
44 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
45 defer cancel()
46 if err := fc.acquire(ctx, 6); err != context.DeadlineExceeded {
47 t.Fatalf("got %v, expected DeadlineExceeded", err)
48 }
49
50 go func() {
51 time.Sleep(5 * time.Millisecond)
52 fc.release(ctx, 5)
53 }()
54 if err := fc.acquire(context.Background(), 6); err != nil {
55 t.Errorf("got %v, expected nil", err)
56 }
57 }
58
59 func TestFlowControllerLargeRequest(t *testing.T) {
60
61 t.Parallel()
62 fc := newFlowController(fcSettings(3, 10, FlowControlBlock))
63 err := fc.acquire(context.Background(), 11)
64 if err != nil {
65 t.Fatal(err)
66 }
67 }
68
69 func TestFlowControllerNoStarve(t *testing.T) {
70
71
72 t.Parallel()
73 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
74 defer cancel()
75 fc := newFlowController(fcSettings(10, 10, FlowControlBlock))
76 first := make(chan int)
77 for i := 0; i < 20; i++ {
78 go func() {
79 for {
80 if err := fc.acquire(ctx, 1); err != nil {
81 if err != context.Canceled {
82 t.Error(err)
83 }
84 return
85 }
86 select {
87 case first <- 1:
88 default:
89 }
90 fc.release(ctx, 1)
91 }
92 }()
93 }
94 <-first
95 if err := fc.acquire(ctx, 11); err != nil {
96 t.Errorf("got %v, want nil", err)
97 }
98 }
99
100 func TestFlowControllerSaturation(t *testing.T) {
101 t.Parallel()
102 const (
103 maxCount = 6
104 maxSize = 10
105 )
106 for _, test := range []struct {
107 acquireSize int
108 wantCount, wantSize int64
109 }{
110 {
111
112 acquireSize: 1,
113 wantCount: 6,
114 wantSize: 6,
115 },
116 {
117
118
119 acquireSize: 2,
120 wantCount: 5,
121 wantSize: 10,
122 },
123 {
124
125
126 acquireSize: 3,
127 wantCount: 3,
128 wantSize: 9,
129 },
130 } {
131 fc := newFlowController(fcSettings(maxCount, maxSize, FlowControlBlock))
132
133
134 var curSize int64
135 success := errors.New("")
136
137 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
138 defer cancel()
139 g, ctx := errgroup.WithContext(ctx)
140 for i := 0; i < 10; i++ {
141 g.Go(func() error {
142 var hitCount, hitSize bool
143
144
145
146 for i := 0; i < 100 || !hitCount || !hitSize; i++ {
147 select {
148 case <-ctx.Done():
149 return ctx.Err()
150 default:
151 }
152 if err := fc.acquire(ctx, test.acquireSize); err != nil {
153 return err
154 }
155 c := int64(fc.count())
156 if c > test.wantCount {
157 return fmt.Errorf("count %d exceeds want %d", c, test.wantCount)
158 }
159 if c == test.wantCount {
160 hitCount = true
161 }
162 s := atomic.AddInt64(&curSize, int64(test.acquireSize))
163 if s > test.wantSize {
164 return fmt.Errorf("size %d exceeds want %d", s, test.wantSize)
165 }
166 if s == test.wantSize {
167 hitSize = true
168 }
169 time.Sleep(5 * time.Millisecond)
170 if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
171 return errors.New("negative size")
172 }
173 fc.release(ctx, test.acquireSize)
174 }
175 return success
176 })
177 }
178 if err := g.Wait(); err != success {
179 t.Errorf("%+v: %v", test, err)
180 continue
181 }
182 }
183 }
184
185 func TestFlowControllerUnboundedCount(t *testing.T) {
186 t.Parallel()
187 ctx := context.Background()
188 fc := newFlowController(fcSettings(0, 10, FlowControlSignalError))
189
190
191 if err := fc.acquire(ctx, 4); err != nil {
192 t.Errorf("got %v, wanted no error", err)
193 }
194
195
196 if err := fc.acquire(ctx, 4); err != nil {
197 t.Errorf("got %v, wanted no error", err)
198 }
199
200
201 if err := fc.acquire(ctx, 3); err == nil {
202 t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingBytes)
203 }
204 }
205
206 func TestFlowControllerUnboundedCount2(t *testing.T) {
207 t.Parallel()
208 ctx := context.Background()
209 fc := newFlowController(fcSettings(0, 0, FlowControlSignalError))
210
211 if err := fc.acquire(ctx, 4); err != nil {
212 t.Errorf("got %v, wanted no error", err)
213 }
214 fc.release(ctx, 1)
215 fc.release(ctx, 1)
216 fc.release(ctx, 1)
217 wantCount := int64(0)
218 c := int64(fc.count())
219 if c != wantCount {
220 t.Fatalf("got count %d, want %d", c, wantCount)
221 }
222 }
223
224 func TestFlowControllerUnboundedBytes(t *testing.T) {
225 t.Parallel()
226 ctx := context.Background()
227 fc := newFlowController(fcSettings(2, 0, FlowControlSignalError))
228
229
230 if err := fc.acquire(ctx, 4e9); err != nil {
231 t.Errorf("got %v, wanted no error", err)
232 }
233
234
235 if err := fc.acquire(ctx, 4e9); err != nil {
236 t.Errorf("got %v, wanted no error", err)
237 }
238
239
240 if err := fc.acquire(ctx, 3); err == nil {
241 t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingMessages)
242 }
243 }
244
View as plain text