1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19 "fmt"
20 "math/rand"
21 "testing"
22 "time"
23
24 "cloud.google.com/go/bigquery/storage/apiv1/storagepb"
25 "github.com/googleapis/gax-go/v2"
26 )
27
28 func TestSimpleRouter(t *testing.T) {
29 ctx := context.Background()
30
31 pool := &connectionPool{
32 ctx: ctx,
33 open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
34 return &testAppendRowsClient{}, nil
35 },
36 }
37
38 router := newSimpleRouter("")
39 if err := pool.activateRouter(router); err != nil {
40 t.Errorf("activateRouter: %v", err)
41 }
42
43 ms := &ManagedStream{
44 ctx: ctx,
45 retry: newStatelessRetryer(),
46 }
47
48 pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "")
49
50
51 if _, err := pool.router.pickConnection(pw); err == nil {
52 t.Errorf("pickConnection: expected error, got success")
53 }
54 writer := &ManagedStream{
55 id: "writer",
56 }
57 if err := pool.addWriter(writer); err != nil {
58 t.Errorf("addWriter: %v", err)
59 }
60 if _, err := pool.router.pickConnection(pw); err != nil {
61 t.Errorf("pickConnection error: %v", err)
62 }
63 if err := pool.removeWriter(writer); err != nil {
64 t.Errorf("disconnectWriter: %v", err)
65 }
66 if _, err := pool.router.pickConnection(pw); err == nil {
67 t.Errorf("pickConnection: expected error, got success")
68 }
69 }
70
71 func TestSharedRouter_Basic(t *testing.T) {
72 ctx, cancel := context.WithCancel(context.Background())
73
74 pool := &connectionPool{
75 ctx: ctx,
76 cancel: cancel,
77 open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
78 return &testAppendRowsClient{}, nil
79 },
80 }
81
82 router := newSharedRouter(false, 0)
83 if err := pool.activateRouter(router); err != nil {
84 t.Errorf("activateRouter: %v", err)
85 }
86 if gotConns := len(router.exclusiveConns); gotConns != 0 {
87 t.Errorf("expected zero connections are start, got %d", gotConns)
88 }
89
90 ms := &ManagedStream{
91 ctx: ctx,
92 retry: newStatelessRetryer(),
93 }
94 pw := newPendingWrite(ctx, ms, &storagepb.AppendRowsRequest{}, nil, "", "")
95
96 if _, err := pool.router.pickConnection(pw); err == nil {
97 t.Errorf("pickConnection: expected error, got success")
98 }
99
100 if err := pool.addWriter(ms); err == nil {
101 t.Errorf("expected id-less addWriter to fail")
102 }
103 ms.id = "writer"
104 if err := pool.addWriter(ms); err != nil {
105 t.Errorf("addWriter: %v", err)
106 }
107
108 if _, err := pool.router.pickConnection(pw); err != nil {
109 t.Errorf("pickConnection error: %v", err)
110 }
111 if err := pool.removeWriter(ms); err != nil {
112 t.Errorf("disconnectWriter: %v", err)
113 }
114 if _, err := pool.router.pickConnection(pw); err == nil {
115 t.Errorf("pickConnection: expected error, got success")
116 }
117 }
118
119 func TestSharedRouter_Multiplex(t *testing.T) {
120 ctx, cancel := context.WithCancel(context.Background())
121
122 pool := &connectionPool{
123 id: newUUID(poolIDPrefix),
124 ctx: ctx,
125 cancel: cancel,
126 open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
127 return &testAppendRowsClient{}, nil
128 },
129 baseFlowController: newFlowController(2, 10),
130 }
131 defer pool.Close()
132
133 router := newSharedRouter(true, 3)
134 if err := pool.activateRouter(router); err != nil {
135 t.Errorf("activateRouter: %v", err)
136 }
137
138 wantConnCount := 0
139 if got := len(router.multiConns); wantConnCount != got {
140 t.Errorf("wanted %d conns, got %d", wantConnCount, got)
141 }
142
143 writerA := &ManagedStream{
144 id: newUUID(writerIDPrefix),
145 streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
146 ctx: ctx,
147 cancel: cancel,
148 }
149 if err := pool.router.writerAttach(writerA); err != nil {
150 t.Fatalf("writerA attach: %v", err)
151 }
152
153
154 wantConnCount = 1
155 if got := len(router.multiConns); wantConnCount != got {
156 t.Errorf("wanted %d conns, got %d", wantConnCount, got)
157 }
158
159 writerB := &ManagedStream{
160 id: newUUID(writerIDPrefix),
161 streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
162 ctx: ctx,
163 cancel: cancel,
164 }
165 if err := pool.router.writerAttach(writerB); err != nil {
166 t.Fatalf("writerA attach: %v", err)
167 }
168 writerC := &ManagedStream{
169 id: newUUID(writerIDPrefix),
170 streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
171 ctx: ctx,
172 cancel: cancel,
173 }
174 if err := pool.router.writerAttach(writerC); err != nil {
175 t.Fatalf("writerA attach: %v", err)
176 }
177
178 wantConnCount = 1
179 if got := len(router.multiConns); wantConnCount != got {
180 t.Fatalf("wanted %d conns, got %d", wantConnCount, got)
181 }
182
183 pw := newPendingWrite(ctx, writerA, &storagepb.AppendRowsRequest{}, nil, "", "")
184 conn, err := router.pickConnection(pw)
185 if err != nil {
186 t.Fatalf("pickConnection writerA: %v", err)
187 }
188
189 conn.fc.acquire(ctx, 1)
190 conn.fc.acquire(ctx, 1)
191
192 if !conn.isLoaded() {
193 t.Errorf("expected conn to be loaded, was not")
194 }
195
196 time.Sleep(watchDogInterval * 2)
197
198 wantConnCount = 2
199
200 router.multiMu.RLock()
201 defer router.multiMu.RUnlock()
202 if got := len(router.multiConns); wantConnCount != got {
203 t.Fatalf("wanted %d conns, got %d", wantConnCount, got)
204 }
205 gotLoad0 := router.multiConns[0].curLoad()
206 gotLoad1 := router.multiConns[1].curLoad()
207 if gotLoad0 > gotLoad1 {
208 t.Errorf("expected connections to be ordered by load, got %f, %f", gotLoad0, gotLoad1)
209 }
210
211 connsWithWriters := 0
212 for _, v := range router.invertedMultiMap {
213 if len(v) > 0 {
214 connsWithWriters++
215 }
216 }
217 if connsWithWriters < wantConnCount {
218 t.Errorf("wanted at least %d connections to have writers attached, got %d", wantConnCount, connsWithWriters)
219 }
220
221 }
222
223 func BenchmarkRoutingParallel(b *testing.B) {
224
225 for _, bm := range []struct {
226 desc string
227 router poolRouter
228 numWriters int
229 numDefaultWriters int
230 }{
231 {
232 desc: "SimpleRouter",
233 router: newSimpleRouter(""),
234 numWriters: 1,
235 numDefaultWriters: 1,
236 },
237 {
238 desc: "SimpleRouter",
239 router: newSimpleRouter(""),
240 numWriters: 10,
241 numDefaultWriters: 10,
242 },
243 {
244 desc: "SharedRouter_NoMultiplex",
245 router: newSharedRouter(false, 0),
246 numWriters: 1,
247 numDefaultWriters: 1,
248 },
249 {
250 desc: "SharedRouter_NoMultiplex",
251 router: newSharedRouter(false, 0),
252 numWriters: 10,
253 numDefaultWriters: 10,
254 },
255 {
256 desc: "SharedRouter_Multiplex1conn",
257 router: newSharedRouter(true, 1),
258 numWriters: 1,
259 numDefaultWriters: 1,
260 },
261 {
262 desc: "SharedRouterMultiplex1conn",
263 router: newSharedRouter(true, 1),
264 numWriters: 10,
265 numDefaultWriters: 10,
266 },
267 {
268 desc: "SharedRouterMultiplex1conn",
269 router: newSharedRouter(true, 1),
270 numWriters: 50,
271 numDefaultWriters: 50,
272 },
273 {
274 desc: "SharedRouterMultiplex10conn",
275 router: newSharedRouter(true, 10),
276 numWriters: 50,
277 numDefaultWriters: 50,
278 },
279 } {
280
281 ctx, cancel := context.WithCancel(context.Background())
282 pool := &connectionPool{
283 ctx: ctx,
284 cancel: cancel,
285 open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
286 return &testAppendRowsClient{}, nil
287 },
288 }
289 if err := pool.activateRouter(bm.router); err != nil {
290 b.Errorf("%q: activateRouter: %v", bm.desc, err)
291 }
292
293
294 var explicitWriters []*ManagedStream
295 var defaultWriters []*ManagedStream
296
297 for i := 0; i < bm.numWriters; i++ {
298 wCtx, wCancel := context.WithCancel(ctx)
299 writer := &ManagedStream{
300 id: newUUID(writerIDPrefix),
301 streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/abc123"},
302 ctx: wCtx,
303 cancel: wCancel,
304 retry: newStatelessRetryer(),
305 }
306 explicitWriters = append(explicitWriters, writer)
307 }
308 for i := 0; i < bm.numDefaultWriters; i++ {
309 wCtx, wCancel := context.WithCancel(ctx)
310 writer := &ManagedStream{
311 id: newUUID(writerIDPrefix),
312 streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
313
314 ctx: wCtx,
315 cancel: wCancel,
316 retry: newStatelessRetryer(),
317 }
318 defaultWriters = append(defaultWriters, writer)
319 }
320
321
322 for k, writer := range explicitWriters {
323 if err := pool.addWriter(writer); err != nil {
324 b.Errorf("addWriter %d: %v", k, err)
325 }
326 }
327 for k, writer := range defaultWriters {
328 if err := pool.addWriter(writer); err != nil {
329 b.Errorf("addWriter %d: %v", k, err)
330 }
331 }
332
333 baseBenchName := fmt.Sprintf("%s_%dexwriters_%dmpwriters", bm.desc, bm.numWriters, bm.numDefaultWriters)
334
335
336 if bm.numWriters > 0 {
337 benchName := fmt.Sprintf("%s_explicitwriters", baseBenchName)
338
339 b.Run(benchName, func(b *testing.B) {
340 r := rand.New(rand.NewSource(1))
341 b.ResetTimer()
342 for i := 0; i < b.N; i++ {
343
344 writer := explicitWriters[r.Intn(bm.numWriters)]
345 pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
346 if _, err := bm.router.pickConnection(pw); err != nil {
347 b.Errorf("pickConnection: %v", err)
348 }
349 }
350 })
351 }
352
353
354 if bm.numWriters > 0 {
355 benchName := fmt.Sprintf("%s_explicitwriters_concurrent", baseBenchName)
356 b.Run(benchName, func(b *testing.B) {
357 b.RunParallel(func(pb *testing.PB) {
358 r := rand.New(rand.NewSource(1))
359 for pb.Next() {
360 writer := explicitWriters[r.Intn(bm.numWriters)]
361 pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
362 if _, err := bm.router.pickConnection(pw); err != nil {
363 b.Errorf("pickConnection: %v", err)
364 }
365 }
366 })
367 })
368 }
369
370
371 if bm.numDefaultWriters > 0 {
372 benchName := fmt.Sprintf("%s_defaultwriters", baseBenchName)
373
374 b.Run(benchName, func(b *testing.B) {
375 r := rand.New(rand.NewSource(1))
376 b.ResetTimer()
377 for i := 0; i < b.N; i++ {
378
379 writer := defaultWriters[r.Intn(bm.numDefaultWriters)]
380 pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
381 if _, err := bm.router.pickConnection(pw); err != nil {
382 b.Errorf("pickConnection: %v", err)
383 }
384 }
385 })
386 }
387
388
389 if bm.numDefaultWriters > 0 {
390 benchName := fmt.Sprintf("%s_defaultwriters_concurrent", baseBenchName)
391
392 b.Run(benchName, func(b *testing.B) {
393 b.RunParallel(func(pb *testing.PB) {
394 r := rand.New(rand.NewSource(1))
395 for pb.Next() {
396 writer := defaultWriters[r.Intn(bm.numDefaultWriters)]
397 pw := newPendingWrite(context.Background(), writer, &storagepb.AppendRowsRequest{}, nil, "", "")
398 if _, err := bm.router.pickConnection(pw); err != nil {
399 b.Errorf("pickConnection: %v", err)
400 }
401 }
402 })
403 })
404 }
405
406 for _, writer := range explicitWriters {
407 writer.Close()
408 }
409 for _, writer := range defaultWriters {
410 writer.Close()
411 }
412
413 pool.Close()
414
415 }
416
417 }
418
419 func BenchmarkWatchdogPulse(b *testing.B) {
420 maxFlowInserts := 100
421 maxFlowBytes := 1024
422 for _, numWriters := range []int{1, 2, 5, 10, 50, 100, 250} {
423 for _, numConnections := range []int{1, 2, 4} {
424
425 ctx, cancel := context.WithCancel(context.Background())
426
427 router := newSharedRouter(false, numConnections)
428
429 pool := &connectionPool{
430 ctx: ctx,
431 cancel: cancel,
432 open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
433 return &testAppendRowsClient{}, nil
434 },
435 baseFlowController: newFlowController(maxFlowInserts, maxFlowBytes),
436 }
437 if err := pool.activateRouter(router); err != nil {
438 b.Fatalf("(@%d-@%d): activateRouter: %v", numWriters, numConnections, err)
439 }
440
441
442 router.multiplex = true
443
444 var writers []*ManagedStream
445
446 for i := 0; i < numWriters; i++ {
447 wCtx, wCancel := context.WithCancel(ctx)
448 writer := &ManagedStream{
449 id: newUUID(writerIDPrefix),
450 streamSettings: &streamSettings{streamID: "projects/foo/datasets/bar/tables/baz/streams/_default"},
451
452 ctx: wCtx,
453 cancel: wCancel,
454 retry: newStatelessRetryer(),
455 }
456 writers = append(writers, writer)
457 if err := pool.addWriter(writer); err != nil {
458 b.Fatalf("addWriter %d (@%d-@%d): %v", i, numWriters, numConnections, err)
459 }
460 }
461
462
463 r := rand.New(rand.NewSource(time.Now().UnixNano()))
464 countLoad := make([]int, numConnections)
465 byteLoad := make([]int, numConnections)
466 for i := 0; i < numConnections; i++ {
467 countLoad[i] = r.Intn(maxFlowInserts)
468 byteLoad[i] = r.Intn(maxFlowBytes)
469 }
470
471 benchName := fmt.Sprintf("%dwriters_%dconns", numWriters, numConnections)
472 b.Run(benchName, func(b *testing.B) {
473 if b.N > 9999 {
474 b.Skip("benchmark unstable, only run with -benchtime=NNNNx")
475 }
476 for i := 0; i < b.N; i++ {
477 b.StopTimer()
478
479
480 for c := 0; c < len(router.multiConns); c++ {
481 router.multiConns[c].fc.countTracked = int64(countLoad[c])
482 router.multiConns[c].fc.bytesTracked = int64(byteLoad[c])
483 }
484 for k := range router.multiMap {
485 router.multiMap[k] = router.multiConns[0]
486 }
487 router.invertedMultiMap = make(map[string][]*ManagedStream)
488 writerSlice := make([]*ManagedStream, len(writers))
489 copy(writerSlice, writers)
490 router.invertedMultiMap[router.multiConns[0].id] = writerSlice
491 b.StartTimer()
492 router.watchdogPulse()
493 }
494 })
495
496 for _, writer := range writers {
497 writer.Close()
498 }
499
500 pool.Close()
501
502 }
503 }
504
505 }
506
View as plain text