1
17
18 package xdsclient
19
20 import (
21 "context"
22 "errors"
23 "fmt"
24 "strings"
25 "sync"
26 "time"
27
28 "google.golang.org/grpc/internal/grpclog"
29 "google.golang.org/grpc/internal/grpcsync"
30 "google.golang.org/grpc/internal/xds/bootstrap"
31 "google.golang.org/grpc/xds/internal/xdsclient/load"
32 "google.golang.org/grpc/xds/internal/xdsclient/transport"
33 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
34 "google.golang.org/protobuf/types/known/anypb"
35 )
36
37 type watchState int
38
39 const (
40 watchStateStarted watchState = iota
41 watchStateRequested
42 watchStateReceived
43 watchStateTimeout
44 watchStateCanceled
45 )
46
47 type resourceState struct {
48 watchers map[xdsresource.ResourceWatcher]bool
49 cache xdsresource.ResourceData
50 md xdsresource.UpdateMetadata
51 deletionIgnored bool
52
53
54 wTimer *time.Timer
55 wState watchState
56 }
57
58
59
60
61
62
63
64
65
66 type authority struct {
67 serverCfg *bootstrap.ServerConfig
68 bootstrapCfg *bootstrap.Config
69 refCount int
70 serializer *grpcsync.CallbackSerializer
71 resourceTypeGetter func(string) xdsresource.Type
72 transport *transport.Transport
73 watchExpiryTimeout time.Duration
74 logger *grpclog.PrefixLogger
75
76
77
78
79
80
81
82
83
84 resourcesMu sync.Mutex
85 resources map[xdsresource.Type]map[string]*resourceState
86 closed bool
87 }
88
89
90
91
92 type authorityArgs struct {
93
94
95
96
97 serverCfg *bootstrap.ServerConfig
98 bootstrapCfg *bootstrap.Config
99 serializer *grpcsync.CallbackSerializer
100 resourceTypeGetter func(string) xdsresource.Type
101 watchExpiryTimeout time.Duration
102 logger *grpclog.PrefixLogger
103 }
104
105 func newAuthority(args authorityArgs) (*authority, error) {
106 ret := &authority{
107 serverCfg: args.serverCfg,
108 bootstrapCfg: args.bootstrapCfg,
109 serializer: args.serializer,
110 resourceTypeGetter: args.resourceTypeGetter,
111 watchExpiryTimeout: args.watchExpiryTimeout,
112 logger: args.logger,
113 resources: make(map[xdsresource.Type]map[string]*resourceState),
114 }
115
116 tr, err := transport.New(transport.Options{
117 ServerCfg: *args.serverCfg,
118 OnRecvHandler: ret.handleResourceUpdate,
119 OnErrorHandler: ret.newConnectionError,
120 OnSendHandler: ret.transportOnSendHandler,
121 Logger: args.logger,
122 NodeProto: args.bootstrapCfg.NodeProto,
123 })
124 if err != nil {
125 return nil, fmt.Errorf("creating new transport to %q: %v", args.serverCfg, err)
126 }
127 ret.transport = tr
128 return ret, nil
129 }
130
131
132
133
134 func (a *authority) transportOnSendHandler(u *transport.ResourceSendInfo) {
135 rType := a.resourceTypeGetter(u.URL)
136
137
138 if rType == nil {
139 a.logger.Warningf("Unknown resource type url: %s.", u.URL)
140 return
141 }
142 a.resourcesMu.Lock()
143 defer a.resourcesMu.Unlock()
144 a.startWatchTimersLocked(rType, u.ResourceNames)
145 }
146
147 func (a *authority) handleResourceUpdate(resourceUpdate transport.ResourceUpdate) error {
148 rType := a.resourceTypeGetter(resourceUpdate.URL)
149 if rType == nil {
150 return xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource URL %v unknown in response from server", resourceUpdate.URL)
151 }
152
153 opts := &xdsresource.DecodeOptions{
154 BootstrapConfig: a.bootstrapCfg,
155 ServerConfig: a.serverCfg,
156 }
157 updates, md, err := decodeAllResources(opts, rType, resourceUpdate)
158 a.updateResourceStateAndScheduleCallbacks(rType, updates, md)
159 return err
160 }
161
162 func (a *authority) updateResourceStateAndScheduleCallbacks(rType xdsresource.Type, updates map[string]resourceDataErrTuple, md xdsresource.UpdateMetadata) {
163 a.resourcesMu.Lock()
164 defer a.resourcesMu.Unlock()
165
166 resourceStates := a.resources[rType]
167 for name, uErr := range updates {
168 if state, ok := resourceStates[name]; ok {
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 if state.wState == watchStateStarted || state.wState == watchStateRequested {
190
191
192
193
194
195 if state.wTimer != nil {
196 state.wTimer.Stop()
197 }
198 state.wState = watchStateReceived
199 }
200
201 if uErr.err != nil {
202
203
204 state.md.ErrState = md.ErrState
205 state.md.Status = md.Status
206 for watcher := range state.watchers {
207 watcher := watcher
208 err := uErr.err
209 a.serializer.Schedule(func(context.Context) { watcher.OnError(err) })
210 }
211 continue
212 }
213
214 if state.deletionIgnored {
215 state.deletionIgnored = false
216 a.logger.Infof("A valid update was received for resource %q of type %q after previously ignoring a deletion", name, rType.TypeName())
217 }
218
219
220 if state.cache == nil || !state.cache.Equal(uErr.resource) {
221 for watcher := range state.watchers {
222 watcher := watcher
223 resource := uErr.resource
224 a.serializer.Schedule(func(context.Context) { watcher.OnUpdate(resource) })
225 }
226 }
227
228 a.logger.Debugf("Resource type %q with name %q added to cache", rType.TypeName(), name)
229 state.cache = uErr.resource
230
231
232
233 state.md = md
234 state.md.ErrState = nil
235 state.md.Status = xdsresource.ServiceStatusACKed
236 if md.ErrState != nil {
237 state.md.Version = md.ErrState.Version
238 }
239 }
240 }
241
242
243
244
245
246
247 if !rType.AllResourcesRequiredInSotW() {
248 return
249 }
250 for name, state := range resourceStates {
251 if state.cache == nil {
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267 continue
268 }
269 if _, ok := updates[name]; !ok {
270
271
272
273
274 if state.md.Status == xdsresource.ServiceStatusNotExist {
275 continue
276 }
277
278
279
280
281
282 if a.serverCfg.IgnoreResourceDeletion {
283 if !state.deletionIgnored {
284 state.deletionIgnored = true
285 a.logger.Warningf("Ignoring resource deletion for resource %q of type %q", name, rType.TypeName())
286 }
287 continue
288 }
289
290
291
292
293 state.cache = nil
294 state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
295 for watcher := range state.watchers {
296 watcher := watcher
297 a.serializer.Schedule(func(context.Context) { watcher.OnResourceDoesNotExist() })
298 }
299 }
300 }
301 }
302
303 type resourceDataErrTuple struct {
304 resource xdsresource.ResourceData
305 err error
306 }
307
308 func decodeAllResources(opts *xdsresource.DecodeOptions, rType xdsresource.Type, update transport.ResourceUpdate) (map[string]resourceDataErrTuple, xdsresource.UpdateMetadata, error) {
309 timestamp := time.Now()
310 md := xdsresource.UpdateMetadata{
311 Version: update.Version,
312 Timestamp: timestamp,
313 }
314
315 topLevelErrors := make([]error, 0)
316 perResourceErrors := make(map[string]error)
317 ret := make(map[string]resourceDataErrTuple)
318 for _, r := range update.Resources {
319 result, err := rType.Decode(opts, r)
320
321
322
323 name := ""
324 if result != nil {
325 name = xdsresource.ParseName(result.Name).String()
326 }
327 if err == nil {
328 ret[name] = resourceDataErrTuple{resource: result.Resource}
329 continue
330 }
331 if name == "" {
332 topLevelErrors = append(topLevelErrors, err)
333 continue
334 }
335 perResourceErrors[name] = err
336
337
338 ret[name] = resourceDataErrTuple{err: err}
339 }
340
341 if len(topLevelErrors) == 0 && len(perResourceErrors) == 0 {
342 md.Status = xdsresource.ServiceStatusACKed
343 return ret, md, nil
344 }
345
346 md.Status = xdsresource.ServiceStatusNACKed
347 errRet := combineErrors(rType.TypeName(), topLevelErrors, perResourceErrors)
348 md.ErrState = &xdsresource.UpdateErrorMetadata{
349 Version: update.Version,
350 Err: errRet,
351 Timestamp: timestamp,
352 }
353 return ret, md, errRet
354 }
355
356
357
358
359
360
361 func (a *authority) startWatchTimersLocked(rType xdsresource.Type, resourceNames []string) {
362 resourceStates := a.resources[rType]
363 for _, resourceName := range resourceNames {
364 if state, ok := resourceStates[resourceName]; ok {
365 if state.wState != watchStateStarted {
366 continue
367 }
368 state.wTimer = time.AfterFunc(a.watchExpiryTimeout, func() {
369 a.handleWatchTimerExpiry(rType, resourceName, state)
370 })
371 state.wState = watchStateRequested
372 }
373 }
374 }
375
376
377
378
379
380
381 func (a *authority) stopWatchTimersLocked() {
382 for _, rType := range a.resources {
383 for resourceName, state := range rType {
384 if state.wState != watchStateRequested {
385 continue
386 }
387 if !state.wTimer.Stop() {
388
389
390
391 a.logger.Warningf("Watch timer for resource %v already fired. Ignoring here.", resourceName)
392 continue
393 }
394 state.wTimer = nil
395 state.wState = watchStateStarted
396 }
397 }
398 }
399
400
401
402 func (a *authority) newConnectionError(err error) {
403 a.resourcesMu.Lock()
404 defer a.resourcesMu.Unlock()
405
406 a.stopWatchTimersLocked()
407
408
409
410
411
412
413 if xdsresource.ErrType(err) == xdsresource.ErrTypeStreamFailedAfterRecv {
414 a.logger.Warningf("Watchers not notified since ADS stream failed after having received at least one response: %v", err)
415 return
416 }
417
418 for _, rType := range a.resources {
419 for _, state := range rType {
420
421 for watcher := range state.watchers {
422 watcher := watcher
423 a.serializer.Schedule(func(context.Context) {
424 watcher.OnError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err))
425 })
426 }
427 }
428 }
429 }
430
431
432 func (a *authority) refLocked() {
433 a.refCount++
434 }
435
436
437 func (a *authority) unrefLocked() int {
438 a.refCount--
439 return a.refCount
440 }
441
442 func (a *authority) close() {
443 a.transport.Close()
444
445 a.resourcesMu.Lock()
446 a.closed = true
447 a.resourcesMu.Unlock()
448 }
449
450 func (a *authority) watchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) func() {
451 a.logger.Debugf("New watch for type %q, resource name %q", rType.TypeName(), resourceName)
452 a.resourcesMu.Lock()
453 defer a.resourcesMu.Unlock()
454
455
456
457 resources := a.resources[rType]
458 if resources == nil {
459 resources = make(map[string]*resourceState)
460 a.resources[rType] = resources
461 }
462
463
464
465
466 state := resources[resourceName]
467 if state == nil {
468 a.logger.Debugf("First watch for type %q, resource name %q", rType.TypeName(), resourceName)
469 state = &resourceState{
470 watchers: make(map[xdsresource.ResourceWatcher]bool),
471 md: xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusRequested},
472 wState: watchStateStarted,
473 }
474 resources[resourceName] = state
475 a.sendDiscoveryRequestLocked(rType, resources)
476 }
477
478 state.watchers[watcher] = true
479
480
481 if state.cache != nil {
482 if a.logger.V(2) {
483 a.logger.Infof("Resource type %q with resource name %q found in cache: %s", rType.TypeName(), resourceName, state.cache.ToJSON())
484 }
485 resource := state.cache
486 a.serializer.Schedule(func(context.Context) { watcher.OnUpdate(resource) })
487 }
488
489 return func() {
490 a.resourcesMu.Lock()
491 defer a.resourcesMu.Unlock()
492
493
494
495
496
497
498 state.wState = watchStateCanceled
499 delete(state.watchers, watcher)
500 if len(state.watchers) > 0 {
501 return
502 }
503
504
505
506
507 a.logger.Debugf("Removing last watch for type %q, resource name %q", rType.TypeName(), resourceName)
508 delete(resources, resourceName)
509 a.sendDiscoveryRequestLocked(rType, resources)
510 }
511 }
512
513 func (a *authority) handleWatchTimerExpiry(rType xdsresource.Type, resourceName string, state *resourceState) {
514 a.resourcesMu.Lock()
515 defer a.resourcesMu.Unlock()
516
517 if a.closed {
518 return
519 }
520 a.logger.Warningf("Watch for resource %q of type %s timed out", resourceName, rType.TypeName())
521
522 switch state.wState {
523 case watchStateRequested:
524
525
526 case watchStateCanceled:
527 return
528 default:
529 a.logger.Warningf("Unexpected watch state %q for resource %q.", state.wState, resourceName)
530 return
531 }
532
533 state.wState = watchStateTimeout
534
535
536 state.cache = nil
537 state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
538 for watcher := range state.watchers {
539 watcher := watcher
540 a.serializer.Schedule(func(context.Context) { watcher.OnResourceDoesNotExist() })
541 }
542 }
543
544 func (a *authority) triggerResourceNotFoundForTesting(rType xdsresource.Type, resourceName string) {
545 a.resourcesMu.Lock()
546 defer a.resourcesMu.Unlock()
547
548 if a.closed {
549 return
550 }
551 resourceStates := a.resources[rType]
552 state, ok := resourceStates[resourceName]
553 if !ok {
554 return
555 }
556
557
558 if state.wState == watchStateCanceled || state.wState == watchStateTimeout {
559 return
560 }
561 state.wState = watchStateTimeout
562 state.cache = nil
563 state.md = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
564 for watcher := range state.watchers {
565 watcher := watcher
566 a.serializer.Schedule(func(context.Context) { watcher.OnResourceDoesNotExist() })
567 }
568 }
569
570
571
572
573
574
575 func (a *authority) sendDiscoveryRequestLocked(rType xdsresource.Type, resources map[string]*resourceState) {
576 resourcesToRequest := make([]string, len(resources))
577 i := 0
578 for name := range resources {
579 resourcesToRequest[i] = name
580 i++
581 }
582 a.transport.SendRequest(rType.TypeURL(), resourcesToRequest)
583 }
584
585 func (a *authority) reportLoad() (*load.Store, func()) {
586 return a.transport.ReportLoad()
587 }
588
589 func (a *authority) dumpResources() map[string]map[string]xdsresource.UpdateWithMD {
590 a.resourcesMu.Lock()
591 defer a.resourcesMu.Unlock()
592
593 dump := make(map[string]map[string]xdsresource.UpdateWithMD)
594 for rType, resourceStates := range a.resources {
595 states := make(map[string]xdsresource.UpdateWithMD)
596 for name, state := range resourceStates {
597 var raw *anypb.Any
598 if state.cache != nil {
599 raw = state.cache.Raw()
600 }
601 states[name] = xdsresource.UpdateWithMD{
602 MD: state.md,
603 Raw: raw,
604 }
605 }
606 dump[rType.TypeURL()] = states
607 }
608 return dump
609 }
610
611 func combineErrors(rType string, topLevelErrors []error, perResourceErrors map[string]error) error {
612 var errStrB strings.Builder
613 errStrB.WriteString(fmt.Sprintf("error parsing %q response: ", rType))
614 if len(topLevelErrors) > 0 {
615 errStrB.WriteString("top level errors: ")
616 for i, err := range topLevelErrors {
617 if i != 0 {
618 errStrB.WriteString(";\n")
619 }
620 errStrB.WriteString(err.Error())
621 }
622 }
623 if len(perResourceErrors) > 0 {
624 var i int
625 for name, err := range perResourceErrors {
626 if i != 0 {
627 errStrB.WriteString(";\n")
628 }
629 i++
630 errStrB.WriteString(fmt.Sprintf("resource %q: %v", name, err.Error()))
631 }
632 }
633 return errors.New(errStrB.String())
634 }
635
View as plain text