1
18
19 package test
20
21 import (
22 "context"
23 "encoding/json"
24 "errors"
25 "fmt"
26 "strings"
27 "testing"
28
29 "github.com/google/go-cmp/cmp"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/internal"
35 "google.golang.org/grpc/internal/balancer/stub"
36 "google.golang.org/grpc/internal/stubserver"
37 "google.golang.org/grpc/internal/testutils"
38 "google.golang.org/grpc/resolver"
39 "google.golang.org/grpc/resolver/manual"
40 "google.golang.org/grpc/serviceconfig"
41 "google.golang.org/grpc/status"
42
43 testgrpc "google.golang.org/grpc/interop/grpc_testing"
44 testpb "google.golang.org/grpc/interop/grpc_testing"
45 )
46
47
48
49
50
51
52
53 func (s) TestResolverUpdateDuringBuild_ServiceConfigParseError(t *testing.T) {
54
55
56 r := manual.NewBuilderWithScheme("whatever")
57 r.InitialState(resolver.State{ServiceConfig: &serviceconfig.ParseResult{Err: errors.New("resolver build err")}})
58
59 cc, err := grpc.NewClient(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
60 if err != nil {
61 t.Fatalf("NewClient(_, _) = _, %v; want _, nil", err)
62 }
63 defer cc.Close()
64
65 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
66 defer cancel()
67 client := testgrpc.NewTestServiceClient(cc)
68 const wantMsg = "error parsing service config"
69 const wantCode = codes.Unavailable
70 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) {
71 t.Fatalf("EmptyCall RPC failed: %v; want code: %v, want message: %q", err, wantCode, wantMsg)
72 }
73 }
74
75 type fakeConfig struct {
76 serviceconfig.Config
77 }
78
79
80
81
82
83
84
85 func (s) TestResolverUpdateDuringBuild_ServiceConfigInvalidTypeError(t *testing.T) {
86
87
88 r := manual.NewBuilderWithScheme("whatever")
89 r.InitialState(resolver.State{ServiceConfig: &serviceconfig.ParseResult{Config: fakeConfig{}}})
90
91 cc, err := grpc.NewClient(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
92 if err != nil {
93 t.Fatalf("NewClient(_, _) = _, %v; want _, nil", err)
94 }
95 defer cc.Close()
96
97 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
98 defer cancel()
99 client := testgrpc.NewTestServiceClient(cc)
100 const wantMsg = "illegal service config type"
101 const wantCode = codes.Unavailable
102 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) {
103 t.Fatalf("EmptyCall RPC failed: %v; want code: %v, want message: %q", err, wantCode, wantMsg)
104 }
105 }
106
107
108
109
110
111 func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) {
112 r := manual.NewBuilderWithScheme("whatever")
113
114 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
115 if err != nil {
116 t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
117 }
118 defer cc.Close()
119
120 scpr := r.CC.ParseServiceConfig("bad json service config")
121 r.UpdateState(resolver.State{ServiceConfig: scpr})
122
123 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
124 defer cancel()
125 client := testgrpc.NewTestServiceClient(cc)
126 const wantMsg = "error parsing service config"
127 const wantCode = codes.Unavailable
128 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) {
129 t.Fatalf("EmptyCall RPC failed: %v; want code: %v, want message: %q", err, wantCode, wantMsg)
130 }
131 }
132
133 func verifyClientConnStateUpdate(got, want balancer.ClientConnState) error {
134 if got, want := got.ResolverState.Addresses, want.ResolverState.Addresses; !cmp.Equal(got, want) {
135 return fmt.Errorf("update got unexpected addresses: %v, want %v", got, want)
136 }
137 if got, want := got.ResolverState.ServiceConfig.Config, want.ResolverState.ServiceConfig.Config; !internal.EqualServiceConfigForTesting(got, want) {
138 return fmt.Errorf("received unexpected service config: \ngot: %v \nwant: %v", got, want)
139 }
140 if got, want := got.BalancerConfig, want.BalancerConfig; !cmp.Equal(got, want) {
141 return fmt.Errorf("received unexpected balancer config: \ngot: %v \nwant: %v", cmp.Diff(nil, got), cmp.Diff(nil, want))
142 }
143 return nil
144 }
145
146
147
148
149
150 func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
151 type wrappingBalancerConfig struct {
152 serviceconfig.LoadBalancingConfig
153 Config string `json:"config,omitempty"`
154 }
155
156
157
158 ccUpdateCh := testutils.NewChannel()
159 stub.Register(t.Name(), stub.BalancerFuncs{
160 Init: func(bd *stub.BalancerData) {
161 pf := balancer.Get(grpc.PickFirstBalancerName)
162 bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions)
163 },
164 ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
165 cfg := &wrappingBalancerConfig{}
166 if err := json.Unmarshal(lbCfg, cfg); err != nil {
167 return nil, err
168 }
169 return cfg, nil
170 },
171 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
172 if _, ok := ccs.BalancerConfig.(*wrappingBalancerConfig); !ok {
173 return fmt.Errorf("received balancer config of unsupported type %T", ccs.BalancerConfig)
174 }
175 bal := bd.Data.(balancer.Balancer)
176 ccUpdateCh.Send(ccs)
177 ccs.BalancerConfig = nil
178 return bal.UpdateClientConnState(ccs)
179 },
180 })
181
182
183 backend := &stubserver.StubServer{
184 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
185 }
186 if err := backend.StartServer(); err != nil {
187 t.Fatalf("Failed to start backend: %v", err)
188 }
189 t.Logf("Started TestService backend at: %q", backend.Address)
190 defer backend.Stop()
191
192 r := manual.NewBuilderWithScheme("whatever")
193
194 cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
195 if err != nil {
196 t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
197 }
198 defer cc.Close()
199
200
201 addrs := []resolver.Address{{Addr: backend.Address}}
202 const lbCfg = "wrapping balancer LB policy config"
203 goodSC := r.CC.ParseServiceConfig(fmt.Sprintf(`
204 {
205 "loadBalancingConfig": [
206 {
207 "%v": {
208 "config": "%s"
209 }
210 }
211 ]
212 }`, t.Name(), lbCfg))
213 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: goodSC})
214
215 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
216 defer cancel()
217 wantCCS := balancer.ClientConnState{
218 ResolverState: resolver.State{
219 Addresses: addrs,
220 ServiceConfig: goodSC,
221 },
222 BalancerConfig: &wrappingBalancerConfig{Config: lbCfg},
223 }
224 ccs, err := ccUpdateCh.Receive(ctx)
225 if err != nil {
226 t.Fatalf("Timeout when waiting for ClientConnState update from grpc")
227 }
228 gotCCS := ccs.(balancer.ClientConnState)
229 if err := verifyClientConnStateUpdate(gotCCS, wantCCS); err != nil {
230 t.Fatal(err)
231 }
232
233
234 client := testgrpc.NewTestServiceClient(cc)
235 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
236 t.Fatalf("EmptyCall RPC failed: %v", err)
237 }
238
239
240
241
242 badSC := r.CC.ParseServiceConfig("bad json service config")
243 wantCCS.ResolverState.ServiceConfig = badSC
244 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: badSC})
245 ccs, err = ccUpdateCh.Receive(ctx)
246 if err != nil {
247 t.Fatalf("Timeout when waiting for ClientConnState update from grpc")
248 }
249 gotCCS = ccs.(balancer.ClientConnState)
250 if err := verifyClientConnStateUpdate(gotCCS, wantCCS); err != nil {
251 t.Fatal(err)
252 }
253
254
255
256 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
257 t.Fatalf("EmptyCall RPC failed: %v", err)
258 }
259 }
260
View as plain text