1
18
19 package resolver_test
20
21 import (
22 "context"
23 "fmt"
24 "net/url"
25 "strings"
26 "testing"
27 "time"
28
29 "github.com/google/go-cmp/cmp"
30 "github.com/google/go-cmp/cmp/cmpopts"
31 "google.golang.org/grpc/internal"
32 "google.golang.org/grpc/internal/grpctest"
33 iresolver "google.golang.org/grpc/internal/resolver"
34 "google.golang.org/grpc/internal/testutils"
35 xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
36 "google.golang.org/grpc/internal/testutils/xds/e2e"
37 "google.golang.org/grpc/resolver"
38 "google.golang.org/grpc/serviceconfig"
39 xdsresolver "google.golang.org/grpc/xds/internal/resolver"
40 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
41
42 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
43 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
44 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
45 )
46
47 type s struct {
48 grpctest.Tester
49 }
50
51 func Test(t *testing.T) {
52 grpctest.RunSubTests(t, s{})
53 }
54
55 const (
56 defaultTestTimeout = 10 * time.Second
57 defaultTestShortTimeout = 100 * time.Microsecond
58
59 defaultTestServiceName = "service-name"
60 defaultTestRouteConfigName = "route-config-name"
61 defaultTestClusterName = "cluster-name"
62 )
63
64
65
66 var wantDefaultServiceConfig = fmt.Sprintf(`{
67 "loadBalancingConfig": [{
68 "xds_cluster_manager_experimental": {
69 "children": {
70 "cluster:%s": {
71 "childPolicy": [{
72 "cds_experimental": {
73 "cluster": "%s"
74 }
75 }]
76 }
77 }
78 }
79 }]
80 }`, defaultTestClusterName, defaultTestClusterName)
81
82
83
84
85
86
87 func buildResolverForTarget(t *testing.T, target resolver.Target) (chan resolver.State, chan error, resolver.Resolver) {
88 t.Helper()
89
90 builder := resolver.Get(xdsresolver.Scheme)
91 if builder == nil {
92 t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
93 }
94
95 stateCh := make(chan resolver.State, 1)
96 updateStateF := func(s resolver.State) error {
97 stateCh <- s
98 return nil
99 }
100 errCh := make(chan error, 1)
101 reportErrorF := func(err error) {
102 select {
103 case errCh <- err:
104 default:
105 }
106 }
107 tcc := &testutils.ResolverClientConn{Logger: t, UpdateStateF: updateStateF, ReportErrorF: reportErrorF}
108 r, err := builder.Build(target, tcc, resolver.BuildOptions{
109 Authority: url.PathEscape(target.Endpoint()),
110 })
111 if err != nil {
112 t.Fatalf("Failed to build xDS resolver for target %q: %v", target, err)
113 }
114 t.Cleanup(r.Close)
115 return stateCh, errCh, r
116 }
117
118
119
120
121
122
123
124
125
126
127 func verifyUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State, wantSC string) iresolver.ConfigSelector {
128 t.Helper()
129
130 var state resolver.State
131 select {
132 case <-ctx.Done():
133 t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
134 case state = <-stateCh:
135 if err := state.ServiceConfig.Err; err != nil {
136 t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
137 }
138 if wantSC == "" {
139 break
140 }
141 wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(wantSC)
142 if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
143 t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
144 }
145 }
146 cs := iresolver.GetConfigSelector(state)
147 if cs == nil {
148 t.Fatal("Received nil config selector in update from resolver")
149 }
150 return cs
151 }
152
153
154
155
156 func verifyNoUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan resolver.State) {
157 t.Helper()
158
159 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
160 defer sCancel()
161 select {
162 case <-sCtx.Done():
163 case u := <-stateCh:
164 t.Fatalf("Received update from resolver %v when none expected", u)
165 }
166 }
167
168
169
170 func verifyErrorFromResolver(ctx context.Context, t *testing.T, errCh chan error, wantErr string) {
171 t.Helper()
172
173 select {
174 case <-ctx.Done():
175 t.Fatal("Timeout when waiting for error to be propagated to the ClientConn")
176 case gotErr := <-errCh:
177 if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) {
178 t.Fatalf("Received error from resolver %q, want %q", gotErr, wantErr)
179 }
180 }
181 }
182
183
184
185
186
187
188
189
190 func setupManagementServerForTest(ctx context.Context, t *testing.T, nodeID string) (*e2e.ManagementServer, chan []string, chan []string) {
191 t.Helper()
192
193 listenerResourceNamesCh := make(chan []string, 1)
194 routeConfigResourceNamesCh := make(chan []string, 1)
195
196
197
198
199 mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{
200 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
201 switch req.GetTypeUrl() {
202 case version.V3ListenerURL:
203 select {
204 case <-listenerResourceNamesCh:
205 default:
206 }
207 select {
208 case listenerResourceNamesCh <- req.GetResourceNames():
209 default:
210 }
211 case version.V3RouteConfigURL:
212 select {
213 case <-routeConfigResourceNamesCh:
214 default:
215 }
216 select {
217 case routeConfigResourceNamesCh <- req.GetResourceNames():
218 default:
219 }
220 }
221 return nil
222 },
223 AllowResourceSubset: true,
224 })
225 if err != nil {
226 t.Fatalf("Failed to start xDS management server: %v", err)
227 }
228 t.Cleanup(mgmtServer.Stop)
229
230
231 cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
232 NodeID: nodeID,
233 ServerURI: mgmtServer.Address,
234 })
235 if err != nil {
236 t.Fatal(err)
237 }
238 t.Cleanup(cleanup)
239 return mgmtServer, listenerResourceNamesCh, routeConfigResourceNamesCh
240 }
241
242
243
244
245 func configureResourcesOnManagementServer(ctx context.Context, t *testing.T, mgmtServer *e2e.ManagementServer, nodeID string, listeners []*v3listenerpb.Listener, routes []*v3routepb.RouteConfiguration) {
246 resources := e2e.UpdateOptions{
247 NodeID: nodeID,
248 Listeners: listeners,
249 Routes: routes,
250 SkipValidation: true,
251 }
252 if err := mgmtServer.Update(ctx, resources); err != nil {
253 t.Fatal(err)
254 }
255 }
256
257
258
259 func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) {
260 t.Helper()
261
262 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
263 select {
264 case <-ctx.Done():
265 case gotNames := <-namesCh:
266 if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty()) {
267 return
268 }
269 t.Logf("Received resource names %v, want %v", gotNames, wantNames)
270 }
271 }
272 t.Fatalf("Timeout waiting for resource to be requested from the management server")
273 }
274
View as plain text