1
17
18
19
20 package resolver
21
22 import (
23 "context"
24 "fmt"
25 "sync/atomic"
26
27 "google.golang.org/grpc/internal"
28 "google.golang.org/grpc/internal/grpclog"
29 "google.golang.org/grpc/internal/grpcrand"
30 "google.golang.org/grpc/internal/grpcsync"
31 "google.golang.org/grpc/internal/pretty"
32 iresolver "google.golang.org/grpc/internal/resolver"
33 "google.golang.org/grpc/internal/wrr"
34 "google.golang.org/grpc/internal/xds/bootstrap"
35 "google.golang.org/grpc/resolver"
36 rinternal "google.golang.org/grpc/xds/internal/resolver/internal"
37 "google.golang.org/grpc/xds/internal/xdsclient"
38 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
39 )
40
41
42
43
44
45 const Scheme = "xds"
46
47
48
49
50 func newBuilderForTesting(config []byte) (resolver.Builder, error) {
51 return &xdsResolverBuilder{
52 newXDSClient: func() (xdsclient.XDSClient, func(), error) {
53 return xdsclient.NewWithBootstrapContentsForTesting(config)
54 },
55 }, nil
56 }
57
58 func init() {
59 resolver.Register(&xdsResolverBuilder{})
60 internal.NewXDSResolverWithConfigForTesting = newBuilderForTesting
61
62 rinternal.NewWRR = wrr.NewRandom
63 rinternal.NewXDSClient = xdsclient.New
64 }
65
66 type xdsResolverBuilder struct {
67 newXDSClient func() (xdsclient.XDSClient, func(), error)
68 }
69
70
71
72
73
74 func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (_ resolver.Resolver, retErr error) {
75 r := &xdsResolver{
76 cc: cc,
77 activeClusters: make(map[string]*clusterInfo),
78 channelID: grpcrand.Uint64(),
79 }
80 defer func() {
81 if retErr != nil {
82 r.Close()
83 }
84 }()
85 r.logger = prefixLogger(r)
86 r.logger.Infof("Creating resolver for target: %+v", target)
87
88
89
90
91
92
93
94
95 ctx, cancel := context.WithCancel(context.Background())
96 r.serializer = grpcsync.NewCallbackSerializer(ctx)
97 r.serializerCancel = cancel
98
99
100 newXDSClient := rinternal.NewXDSClient.(func() (xdsclient.XDSClient, func(), error))
101 if b.newXDSClient != nil {
102 newXDSClient = b.newXDSClient
103 }
104 client, close, err := newXDSClient()
105 if err != nil {
106 return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
107 }
108 r.xdsClient = client
109 r.xdsClientClose = close
110
111
112 template, err := r.sanityChecksOnBootstrapConfig(target, opts, r.xdsClient)
113 if err != nil {
114 return nil, err
115 }
116 r.dataplaneAuthority = opts.Authority
117 r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, target.Endpoint())
118 r.listenerWatcher = newListenerWatcher(r.ldsResourceName, r)
119 return r, nil
120 }
121
122
123
124
125
126
127
128
129
130
131 func (r *xdsResolver) sanityChecksOnBootstrapConfig(target resolver.Target, opts resolver.BuildOptions, client xdsclient.XDSClient) (string, error) {
132 bootstrapConfig := client.BootstrapConfig()
133 if bootstrapConfig == nil {
134
135
136 return "", fmt.Errorf("xds: bootstrap configuration is empty")
137 }
138
139
140
141
142 template := bootstrapConfig.ClientDefaultListenerResourceNameTemplate
143 if authority := target.URL.Host; authority != "" {
144 a := bootstrapConfig.Authorities[authority]
145 if a == nil {
146 return "", fmt.Errorf("xds: authority %q specified in dial target %q is not found in the bootstrap file", authority, target)
147 }
148 if a.ClientListenerResourceNameTemplate != "" {
149
150
151
152 template = a.ClientListenerResourceNameTemplate
153 }
154 }
155 return template, nil
156 }
157
158
159 func (*xdsResolverBuilder) Scheme() string {
160 return Scheme
161 }
162
163
164
165
166
167
168 type xdsResolver struct {
169 cc resolver.ClientConn
170 logger *grpclog.PrefixLogger
171
172 xdsClient xdsclient.XDSClient
173 xdsClientClose func()
174
175
176 channelID uint64
177
178
179
180
181
182
183
184
185 serializer *grpcsync.CallbackSerializer
186 serializerCancel context.CancelFunc
187
188
189
190
191
192 dataplaneAuthority string
193
194 ldsResourceName string
195 listenerWatcher *listenerWatcher
196 listenerUpdateRecvd bool
197 currentListener xdsresource.ListenerUpdate
198
199 rdsResourceName string
200 routeConfigWatcher *routeConfigWatcher
201 routeConfigUpdateRecvd bool
202 currentRouteConfig xdsresource.RouteConfigUpdate
203 currentVirtualHost *xdsresource.VirtualHost
204
205
206
207 activeClusters map[string]*clusterInfo
208
209 curConfigSelector *configSelector
210 }
211
212
213 func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {}
214
215 func (r *xdsResolver) Close() {
216
217
218
219 r.serializerCancel()
220 <-r.serializer.Done()
221
222
223
224
225
226 if r.listenerWatcher != nil {
227 r.listenerWatcher.stop()
228 }
229 if r.routeConfigWatcher != nil {
230 r.routeConfigWatcher.stop()
231 }
232 if r.xdsClientClose != nil {
233 r.xdsClientClose()
234 }
235 r.logger.Infof("Shutdown")
236 }
237
238
239
240
241
242
243
244
245 func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
246
247
248
249 r.pruneActiveClusters()
250
251 if cs == nil && len(r.activeClusters) == 0 {
252
253
254
255 r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")})
256 return true
257 }
258
259 sc, err := serviceConfigJSON(r.activeClusters)
260 if err != nil {
261
262 r.logger.Errorf("For Listener resource %q and RouteConfiguration resource %q, failed to marshal newly built service config: %v", r.ldsResourceName, r.rdsResourceName, err)
263 r.cc.ReportError(err)
264 return false
265 }
266 r.logger.Infof("For Listener resource %q and RouteConfiguration resource %q, generated service config: %v", r.ldsResourceName, r.rdsResourceName, pretty.FormatJSON(sc))
267
268
269 state := iresolver.SetConfigSelector(resolver.State{
270 ServiceConfig: r.cc.ParseServiceConfig(string(sc)),
271 }, cs)
272 r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient))
273 return true
274 }
275
276
277
278
279
280
281 func (r *xdsResolver) newConfigSelector() (*configSelector, error) {
282 cs := &configSelector{
283 r: r,
284 virtualHost: virtualHost{
285 httpFilterConfigOverride: r.currentVirtualHost.HTTPFilterConfigOverride,
286 retryConfig: r.currentVirtualHost.RetryConfig,
287 },
288 routes: make([]route, len(r.currentVirtualHost.Routes)),
289 clusters: make(map[string]*clusterInfo),
290 httpFilterConfig: r.currentListener.HTTPFilters,
291 }
292
293 for i, rt := range r.currentVirtualHost.Routes {
294 clusters := rinternal.NewWRR.(func() wrr.WRR)()
295 if rt.ClusterSpecifierPlugin != "" {
296 clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
297 clusters.Add(&routeCluster{
298 name: clusterName,
299 }, 1)
300 ci := r.addOrGetActiveClusterInfo(clusterName)
301 ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.currentRouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])}
302 cs.clusters[clusterName] = ci
303 } else {
304 for cluster, wc := range rt.WeightedClusters {
305 clusterName := clusterPrefix + cluster
306 clusters.Add(&routeCluster{
307 name: clusterName,
308 httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
309 }, int64(wc.Weight))
310 ci := r.addOrGetActiveClusterInfo(clusterName)
311 ci.cfg = xdsChildConfig{ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster})}
312 cs.clusters[clusterName] = ci
313 }
314 }
315 cs.routes[i].clusters = clusters
316
317 var err error
318 cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
319 if err != nil {
320 return nil, err
321 }
322 cs.routes[i].actionType = rt.ActionType
323 if rt.MaxStreamDuration == nil {
324 cs.routes[i].maxStreamDuration = r.currentListener.MaxStreamDuration
325 } else {
326 cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
327 }
328
329 cs.routes[i].httpFilterConfigOverride = rt.HTTPFilterConfigOverride
330 cs.routes[i].retryConfig = rt.RetryConfig
331 cs.routes[i].hashPolicies = rt.HashPolicies
332 }
333
334
335
336
337 for _, ci := range cs.clusters {
338 atomic.AddInt32(&ci.refCount, 1)
339 }
340
341 return cs, nil
342 }
343
344
345
346 func (r *xdsResolver) pruneActiveClusters() {
347 for cluster, ci := range r.activeClusters {
348 if atomic.LoadInt32(&ci.refCount) == 0 {
349 delete(r.activeClusters, cluster)
350 }
351 }
352 }
353
354 func (r *xdsResolver) addOrGetActiveClusterInfo(name string) *clusterInfo {
355 ci := r.activeClusters[name]
356 if ci != nil {
357 return ci
358 }
359
360 ci = &clusterInfo{refCount: 0}
361 r.activeClusters[name] = ci
362 return ci
363 }
364
365 type clusterInfo struct {
366
367 refCount int32
368
369
370 cfg xdsChildConfig
371 }
372
373
374
375
376 func (r *xdsResolver) resolutionComplete() bool {
377 return r.listenerUpdateRecvd && r.routeConfigUpdateRecvd && r.currentVirtualHost != nil
378 }
379
380
381
382
383
384
385
386
387
388
389
390
391
392 func (r *xdsResolver) onResolutionComplete() {
393 if !r.resolutionComplete() {
394 return
395 }
396
397 cs, err := r.newConfigSelector()
398 if err != nil {
399 r.logger.Warningf("Failed to build a config selector for resource %q: %v", r.ldsResourceName, err)
400 r.cc.ReportError(err)
401 return
402 }
403
404 if !r.sendNewServiceConfig(cs) {
405
406
407
408 cs.stop()
409 return
410 }
411
412 r.curConfigSelector.stop()
413 r.curConfigSelector = cs
414 }
415
416 func (r *xdsResolver) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) {
417 matchVh := xdsresource.FindBestMatchingVirtualHost(r.dataplaneAuthority, update.VirtualHosts)
418 if matchVh == nil {
419 r.onError(fmt.Errorf("no matching virtual host found for %q", r.dataplaneAuthority))
420 return
421 }
422 r.currentRouteConfig = update
423 r.currentVirtualHost = matchVh
424 r.routeConfigUpdateRecvd = true
425
426 r.onResolutionComplete()
427 }
428
429
430
431
432
433
434 func (r *xdsResolver) onError(err error) {
435 r.cc.ReportError(err)
436 }
437
438
439
440
441
442 func (r *xdsResolver) onResourceNotFound() {
443
444
445
446
447
448
449
450
451 r.sendNewServiceConfig(nil)
452
453
454 r.curConfigSelector.stop()
455 r.curConfigSelector = nil
456 }
457
458
459 func (r *xdsResolver) onListenerResourceUpdate(update xdsresource.ListenerUpdate) {
460 if r.logger.V(2) {
461 r.logger.Infof("Received update for Listener resource %q: %v", r.ldsResourceName, pretty.ToJSON(update))
462 }
463
464 r.currentListener = update
465 r.listenerUpdateRecvd = true
466
467 if update.InlineRouteConfig != nil {
468
469
470 r.rdsResourceName = ""
471 if r.routeConfigWatcher != nil {
472 r.routeConfigWatcher.stop()
473 r.routeConfigWatcher = nil
474 }
475
476 r.applyRouteConfigUpdate(*update.InlineRouteConfig)
477 return
478 }
479
480
481
482
483
484 if r.rdsResourceName == update.RouteConfigName {
485 r.onResolutionComplete()
486 return
487 }
488
489
490
491
492
493
494 r.rdsResourceName = update.RouteConfigName
495 if r.routeConfigWatcher != nil {
496 r.routeConfigWatcher.stop()
497 r.currentVirtualHost = nil
498 r.routeConfigUpdateRecvd = false
499 }
500 r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r)
501 }
502
503 func (r *xdsResolver) onListenerResourceError(err error) {
504 if r.logger.V(2) {
505 r.logger.Infof("Received error for Listener resource %q: %v", r.ldsResourceName, err)
506 }
507 r.onError(err)
508 }
509
510
511 func (r *xdsResolver) onListenerResourceNotFound() {
512 if r.logger.V(2) {
513 r.logger.Infof("Received resource-not-found-error for Listener resource %q", r.ldsResourceName)
514 }
515
516 r.listenerUpdateRecvd = false
517
518 if r.routeConfigWatcher != nil {
519 r.routeConfigWatcher.stop()
520 }
521 r.rdsResourceName = ""
522 r.currentVirtualHost = nil
523 r.routeConfigUpdateRecvd = false
524 r.routeConfigWatcher = nil
525
526 r.onResourceNotFound()
527 }
528
529
530 func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update xdsresource.RouteConfigUpdate) {
531 if r.logger.V(2) {
532 r.logger.Infof("Received update for RouteConfiguration resource %q: %v", name, pretty.ToJSON(update))
533 }
534
535 if r.rdsResourceName != name {
536
537 return
538 }
539
540 r.applyRouteConfigUpdate(update)
541 }
542
543
544 func (r *xdsResolver) onRouteConfigResourceError(name string, err error) {
545 if r.logger.V(2) {
546 r.logger.Infof("Received error for RouteConfiguration resource %q: %v", name, err)
547 }
548 r.onError(err)
549 }
550
551
552 func (r *xdsResolver) onRouteConfigResourceNotFound(name string) {
553 if r.logger.V(2) {
554 r.logger.Infof("Received resource-not-found-error for RouteConfiguration resource %q", name)
555 }
556
557 if r.rdsResourceName != name {
558 return
559 }
560 r.onResourceNotFound()
561 }
562
563
564 func (r *xdsResolver) onClusterRefDownToZero() {
565 r.sendNewServiceConfig(r.curConfigSelector)
566 }
567
View as plain text