...
1
18
19 package ringhash_test
20
21 import (
22 "context"
23 "testing"
24 "time"
25
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/connectivity"
28 "google.golang.org/grpc/credentials/insecure"
29 "google.golang.org/grpc/internal/grpctest"
30 "google.golang.org/grpc/internal/testutils"
31 "google.golang.org/grpc/resolver"
32 "google.golang.org/grpc/resolver/manual"
33
34 testgrpc "google.golang.org/grpc/interop/grpc_testing"
35 testpb "google.golang.org/grpc/interop/grpc_testing"
36
37 _ "google.golang.org/grpc/xds/internal/balancer/ringhash"
38 )
39
40 type s struct {
41 grpctest.Tester
42 }
43
44 func Test(t *testing.T) {
45 grpctest.RunSubTests(t, s{})
46 }
47
48 const (
49 defaultTestTimeout = 10 * time.Second
50 defaultTestShortTimeout = 10 * time.Millisecond
51 )
52
53 type testService struct {
54 testgrpc.TestServiceServer
55 }
56
57 func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
58 return &testpb.Empty{}, nil
59 }
60
61
62
63
64
65 func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) {
66
67 l, err := testutils.LocalTCPListener()
68 if err != nil {
69 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
70 }
71 lis := testutils.NewRestartableListener(l)
72
73
74 server := grpc.NewServer()
75 defer server.Stop()
76 testgrpc.RegisterTestServiceServer(server, &testService{})
77 go func() {
78 if err := server.Serve(lis); err != nil {
79 t.Errorf("Serve() failed: %v", err)
80 }
81 }()
82
83
84
85
86 const ringHashServiceConfig = `{"loadBalancingConfig": [{"ring_hash_experimental":{}}]}`
87 r := manual.NewBuilderWithScheme("whatever")
88 dopts := []grpc.DialOption{
89 grpc.WithTransportCredentials(insecure.NewCredentials()),
90 grpc.WithResolvers(r),
91 grpc.WithDefaultServiceConfig(ringHashServiceConfig),
92 }
93 cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
94 if err != nil {
95 t.Fatalf("failed to dial local test server: %v", err)
96 }
97 defer cc.Close()
98
99
100 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
101
102 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
103 defer cancel()
104 client := testgrpc.NewTestServiceClient(cc)
105 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
106 t.Fatalf("rpc EmptyCall() failed: %v", err)
107 }
108
109
110
111
112 lis.Stop()
113
114 testutils.AwaitState(ctx, t, cc, connectivity.Idle)
115
116
117
118 client.EmptyCall(ctx, &testpb.Empty{})
119
120 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
121
122
123 if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil {
124 t.Fatal("EmptyCall RPC succeeded when the channel is in TRANSIENT_FAILURE")
125 }
126
127
128
129
130 lis.Restart()
131 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
132 if cc.GetState() == connectivity.Ready {
133 break
134 }
135 }
136 if err := ctx.Err(); err != nil {
137 t.Fatalf("Timeout waiting for channel to reach READT after server restart: %v", err)
138 }
139
140
141 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
142 t.Fatalf("rpc EmptyCall() failed: %v", err)
143 }
144 }
145
View as plain text