1
18
19 package test
20
21 import (
22 "context"
23 "fmt"
24 "net"
25 "sync"
26 "testing"
27 "time"
28
29 "golang.org/x/net/http2"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/backoff"
32 "google.golang.org/grpc/balancer"
33 "google.golang.org/grpc/connectivity"
34 "google.golang.org/grpc/credentials/insecure"
35 "google.golang.org/grpc/internal"
36 "google.golang.org/grpc/internal/balancer/stub"
37 "google.golang.org/grpc/internal/grpcsync"
38 "google.golang.org/grpc/internal/testutils"
39 "google.golang.org/grpc/resolver"
40 "google.golang.org/grpc/resolver/manual"
41 )
42
43 const stateRecordingBalancerName = "state_recording_balancer"
44
45 var testBalancerBuilder = newStateRecordingBalancerBuilder()
46
47 func init() {
48 balancer.Register(testBalancerBuilder)
49 }
50
51
52
53
54 func (s) TestStateTransitions_SingleAddress(t *testing.T) {
55 for _, test := range []struct {
56 desc string
57 want []connectivity.State
58 server func(net.Listener) net.Conn
59 }{
60 {
61 desc: "When the server returns server preface, the client enters READY.",
62 want: []connectivity.State{
63 connectivity.Connecting,
64 connectivity.Ready,
65 },
66 server: func(lis net.Listener) net.Conn {
67 conn, err := lis.Accept()
68 if err != nil {
69 t.Error(err)
70 return nil
71 }
72
73 go keepReading(conn)
74
75 framer := http2.NewFramer(conn, conn)
76 if err := framer.WriteSettings(http2.Setting{}); err != nil {
77 t.Errorf("Error while writing settings frame. %v", err)
78 return nil
79 }
80
81 return conn
82 },
83 },
84 {
85 desc: "When the connection is closed before the preface is sent, the client enters TRANSIENT FAILURE.",
86 want: []connectivity.State{
87 connectivity.Connecting,
88 connectivity.TransientFailure,
89 },
90 server: func(lis net.Listener) net.Conn {
91 conn, err := lis.Accept()
92 if err != nil {
93 t.Error(err)
94 return nil
95 }
96
97 conn.Close()
98 return nil
99 },
100 },
101 {
102 desc: `When the server sends its connection preface, but the connection dies before the client can write its
103 connection preface, the client enters TRANSIENT FAILURE.`,
104 want: []connectivity.State{
105 connectivity.Connecting,
106 connectivity.TransientFailure,
107 },
108 server: func(lis net.Listener) net.Conn {
109 conn, err := lis.Accept()
110 if err != nil {
111 t.Error(err)
112 return nil
113 }
114
115 framer := http2.NewFramer(conn, conn)
116 if err := framer.WriteSettings(http2.Setting{}); err != nil {
117 t.Errorf("Error while writing settings frame. %v", err)
118 return nil
119 }
120
121 conn.Close()
122 return nil
123 },
124 },
125 {
126 desc: `When the server reads the client connection preface but does not send its connection preface, the
127 client enters TRANSIENT FAILURE.`,
128 want: []connectivity.State{
129 connectivity.Connecting,
130 connectivity.TransientFailure,
131 },
132 server: func(lis net.Listener) net.Conn {
133 conn, err := lis.Accept()
134 if err != nil {
135 t.Error(err)
136 return nil
137 }
138
139 go keepReading(conn)
140
141 return conn
142 },
143 },
144 } {
145 t.Log(test.desc)
146 testStateTransitionSingleAddress(t, test.want, test.server)
147 }
148 }
149
150 func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
151 pl := testutils.NewPipeListener()
152 defer pl.Close()
153
154
155 var conn net.Conn
156 var connMu sync.Mutex
157 go func() {
158 connMu.Lock()
159 conn = server(pl)
160 connMu.Unlock()
161 }()
162
163 client, err := grpc.Dial("",
164 grpc.WithTransportCredentials(insecure.NewCredentials()),
165 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
166 grpc.WithDialer(pl.Dialer()),
167 grpc.WithConnectParams(grpc.ConnectParams{
168 Backoff: backoff.Config{},
169 MinConnectTimeout: 100 * time.Millisecond,
170 }))
171 if err != nil {
172 t.Fatal(err)
173 }
174 defer client.Close()
175
176 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
177 defer cancel()
178 go testutils.StayConnected(ctx, client)
179
180 stateNotifications := testBalancerBuilder.nextStateNotifier()
181 for i := 0; i < len(want); i++ {
182 select {
183 case <-time.After(defaultTestTimeout):
184 t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
185 case seen := <-stateNotifications:
186 if seen != want[i] {
187 t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
188 }
189 }
190 }
191
192 connMu.Lock()
193 defer connMu.Unlock()
194 if conn != nil {
195 err = conn.Close()
196 if err != nil {
197 t.Fatal(err)
198 }
199 }
200 }
201
202
203 func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
204 lis, err := net.Listen("tcp", "localhost:0")
205 if err != nil {
206 t.Fatalf("Error while listening. Err: %v", err)
207 }
208 defer lis.Close()
209
210 sawReady := make(chan struct{}, 1)
211 defer close(sawReady)
212
213
214 go func() {
215 conn, err := lis.Accept()
216 if err != nil {
217 t.Error(err)
218 return
219 }
220
221 go keepReading(conn)
222
223 framer := http2.NewFramer(conn, conn)
224 if err := framer.WriteSettings(http2.Setting{}); err != nil {
225 t.Errorf("Error while writing settings frame. %v", err)
226 return
227 }
228
229
230 <-sawReady
231
232 conn.Close()
233 }()
234
235 client, err := grpc.Dial(lis.Addr().String(),
236 grpc.WithTransportCredentials(insecure.NewCredentials()),
237 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)))
238 if err != nil {
239 t.Fatal(err)
240 }
241 defer client.Close()
242
243 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
244 defer cancel()
245 go testutils.StayConnected(ctx, client)
246
247 stateNotifications := testBalancerBuilder.nextStateNotifier()
248
249 want := []connectivity.State{
250 connectivity.Connecting,
251 connectivity.Ready,
252 connectivity.Idle,
253 connectivity.Connecting,
254 }
255 for i := 0; i < len(want); i++ {
256 select {
257 case <-time.After(defaultTestTimeout):
258 t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
259 case seen := <-stateNotifications:
260 if seen == connectivity.Ready {
261 sawReady <- struct{}{}
262 }
263 if seen != want[i] {
264 t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
265 }
266 }
267 }
268 }
269
270
271
272 func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
273 lis1, err := net.Listen("tcp", "localhost:0")
274 if err != nil {
275 t.Fatalf("Error while listening. Err: %v", err)
276 }
277 defer lis1.Close()
278
279 lis2, err := net.Listen("tcp", "localhost:0")
280 if err != nil {
281 t.Fatalf("Error while listening. Err: %v", err)
282 }
283 defer lis2.Close()
284
285 server1Done := make(chan struct{})
286 server2Done := make(chan struct{})
287
288
289 go func() {
290 conn, err := lis1.Accept()
291 if err != nil {
292 t.Error(err)
293 return
294 }
295
296 conn.Close()
297 close(server1Done)
298 }()
299
300 go func() {
301 conn, err := lis2.Accept()
302 if err != nil {
303 t.Error(err)
304 return
305 }
306
307 go keepReading(conn)
308
309 framer := http2.NewFramer(conn, conn)
310 if err := framer.WriteSettings(http2.Setting{}); err != nil {
311 t.Errorf("Error while writing settings frame. %v", err)
312 return
313 }
314
315 close(server2Done)
316 }()
317
318 rb := manual.NewBuilderWithScheme("whatever")
319 rb.InitialState(resolver.State{Addresses: []resolver.Address{
320 {Addr: lis1.Addr().String()},
321 {Addr: lis2.Addr().String()},
322 }})
323 client, err := grpc.Dial("whatever:///this-gets-overwritten",
324 grpc.WithTransportCredentials(insecure.NewCredentials()),
325 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
326 grpc.WithResolvers(rb))
327 if err != nil {
328 t.Fatal(err)
329 }
330 defer client.Close()
331
332 stateNotifications := testBalancerBuilder.nextStateNotifier()
333 want := []connectivity.State{
334 connectivity.Connecting,
335 connectivity.Ready,
336 }
337 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
338 defer cancel()
339 for i := 0; i < len(want); i++ {
340 select {
341 case <-ctx.Done():
342 t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
343 case seen := <-stateNotifications:
344 if seen != want[i] {
345 t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
346 }
347 }
348 }
349 select {
350 case <-ctx.Done():
351 t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
352 case <-server1Done:
353 }
354 select {
355 case <-ctx.Done():
356 t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
357 case <-server2Done:
358 }
359 }
360
361
362
363 func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
364 lis1, err := net.Listen("tcp", "localhost:0")
365 if err != nil {
366 t.Fatalf("Error while listening. Err: %v", err)
367 }
368 defer lis1.Close()
369
370
371 lis2, err := net.Listen("tcp", "localhost:0")
372 if err != nil {
373 t.Fatalf("Error while listening. Err: %v", err)
374 }
375 defer lis2.Close()
376
377 server1Done := make(chan struct{})
378 sawReady := make(chan struct{}, 1)
379 defer close(sawReady)
380
381
382 go func() {
383 conn, err := lis1.Accept()
384 if err != nil {
385 t.Error(err)
386 return
387 }
388
389 go keepReading(conn)
390
391 framer := http2.NewFramer(conn, conn)
392 if err := framer.WriteSettings(http2.Setting{}); err != nil {
393 t.Errorf("Error while writing settings frame. %v", err)
394 return
395 }
396
397 <-sawReady
398
399 conn.Close()
400
401 close(server1Done)
402 }()
403
404 rb := manual.NewBuilderWithScheme("whatever")
405 rb.InitialState(resolver.State{Addresses: []resolver.Address{
406 {Addr: lis1.Addr().String()},
407 {Addr: lis2.Addr().String()},
408 }})
409 client, err := grpc.Dial("whatever:///this-gets-overwritten",
410 grpc.WithTransportCredentials(insecure.NewCredentials()),
411 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
412 grpc.WithResolvers(rb))
413 if err != nil {
414 t.Fatal(err)
415 }
416 defer client.Close()
417
418 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
419 defer cancel()
420 go testutils.StayConnected(ctx, client)
421
422 stateNotifications := testBalancerBuilder.nextStateNotifier()
423 want := []connectivity.State{
424 connectivity.Connecting,
425 connectivity.Ready,
426 connectivity.Idle,
427 connectivity.Connecting,
428 }
429 for i := 0; i < len(want); i++ {
430 select {
431 case <-ctx.Done():
432 t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
433 case seen := <-stateNotifications:
434 if seen == connectivity.Ready {
435 sawReady <- struct{}{}
436 }
437 if seen != want[i] {
438 t.Fatalf("expected to see %v at position %d in flow %v, got %v", want[i], i, want, seen)
439 }
440 }
441 }
442 select {
443 case <-ctx.Done():
444 t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
445 case <-server1Done:
446 }
447 }
448
449 type stateRecordingBalancer struct {
450 balancer.Balancer
451 }
452
453 func (b *stateRecordingBalancer) Close() {
454 b.Balancer.Close()
455 }
456
457 type stateRecordingBalancerBuilder struct {
458 mu sync.Mutex
459 notifier chan connectivity.State
460 }
461
462 func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
463 return &stateRecordingBalancerBuilder{}
464 }
465
466 func (b *stateRecordingBalancerBuilder) Name() string {
467 return stateRecordingBalancerName
468 }
469
470 func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
471 stateNotifications := make(chan connectivity.State, 10)
472 b.mu.Lock()
473 b.notifier = stateNotifications
474 b.mu.Unlock()
475 return &stateRecordingBalancer{
476 Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
477 }
478 }
479
480 func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
481 b.mu.Lock()
482 defer b.mu.Unlock()
483 ret := b.notifier
484 b.notifier = nil
485 return ret
486 }
487
488 type stateRecordingCCWrapper struct {
489 balancer.ClientConn
490 notifier chan<- connectivity.State
491 }
492
493 func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
494 oldListener := opts.StateListener
495 opts.StateListener = func(s balancer.SubConnState) {
496 ccw.notifier <- s.ConnectivityState
497 oldListener(s)
498 }
499 return ccw.ClientConn.NewSubConn(addrs, opts)
500 }
501
502
503
504
505
506 func keepReading(conn net.Conn) {
507 buf := make([]byte, 1024)
508 for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
509 }
510 }
511
512 type funcConnectivityStateSubscriber struct {
513 onMsg func(connectivity.State)
514 }
515
516 func (f *funcConnectivityStateSubscriber) OnMessage(msg any) {
517 f.onMsg(msg.(connectivity.State))
518 }
519
520
521
522 func (s) TestConnectivityStateSubscriber(t *testing.T) {
523 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
524 defer cancel()
525
526 sendStates := []connectivity.State{
527 connectivity.Connecting,
528 connectivity.Ready,
529 connectivity.Idle,
530 connectivity.Connecting,
531 connectivity.Idle,
532 connectivity.Connecting,
533 connectivity.Ready,
534 }
535 wantStates := append(sendStates, connectivity.Shutdown)
536
537 const testBalName = "any"
538 bf := stub.BalancerFuncs{
539 UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
540
541 for _, s := range sendStates {
542 t.Logf("Sending state update %s", s)
543 bd.ClientConn.UpdateState(balancer.State{ConnectivityState: s})
544 }
545 return nil
546 },
547 }
548 stub.Register(testBalName, bf)
549
550
551 const testResName = "any"
552 rb := manual.NewBuilderWithScheme(testResName)
553 cc, err := grpc.Dial(testResName+":///",
554 grpc.WithResolvers(rb),
555 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalName)),
556 grpc.WithTransportCredentials(insecure.NewCredentials()),
557 )
558 if err != nil {
559 t.Fatalf("Unexpected error from grpc.Dial: %v", err)
560 }
561
562
563
564 connCh := make(chan connectivity.State, 1)
565 s := &funcConnectivityStateSubscriber{
566 onMsg: func(s connectivity.State) {
567 select {
568 case connCh <- s:
569 case <-ctx.Done():
570 }
571 if s == connectivity.Shutdown {
572 close(connCh)
573 }
574 },
575 }
576
577 internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)
578
579
580 go rb.UpdateState(resolver.State{})
581
582
583 for i, want := range wantStates {
584 if i == len(sendStates) {
585
586
587 cc.Close()
588 }
589 select {
590 case got := <-connCh:
591 if got != want {
592 t.Errorf("Update %v was %s; want %s", i, got, want)
593 } else {
594 t.Logf("Update %v was %s as expected", i, got)
595 }
596 case <-ctx.Done():
597 t.Fatalf("Timed out waiting for state update %v: %s", i, want)
598 }
599 }
600 }
601
View as plain text