...
1
17
18 package xdsclient
19
20 import (
21 "context"
22 "fmt"
23 "sync"
24
25 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
26 )
27
28
29
30
31
32
33 func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) {
34
35
36
37
38
39
40 if c == nil || c.done.HasFired() {
41 logger.Warningf("Watch registered for name %q of type %q, but client is closed", rType.TypeName(), resourceName)
42 return func() {}
43 }
44
45 if err := c.resourceTypes.maybeRegister(rType); err != nil {
46 logger.Warningf("Watch registered for name %q of type %q which is already registered", rType.TypeName(), resourceName)
47 c.serializer.Schedule(func(context.Context) { watcher.OnError(err) })
48 return func() {}
49 }
50
51
52
53 n := xdsresource.ParseName(resourceName)
54 a, unref, err := c.findAuthority(n)
55 if err != nil {
56 logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName(), resourceName, n.Authority)
57 c.serializer.Schedule(func(context.Context) { watcher.OnError(err) })
58 return func() {}
59 }
60 cancelF := a.watchResource(rType, n.String(), watcher)
61 return func() {
62 cancelF()
63 unref()
64 }
65 }
66
67
68
69
70 type resourceTypeRegistry struct {
71 mu sync.Mutex
72 types map[string]xdsresource.Type
73 }
74
75 func newResourceTypeRegistry() *resourceTypeRegistry {
76 return &resourceTypeRegistry{types: make(map[string]xdsresource.Type)}
77 }
78
79 func (r *resourceTypeRegistry) get(url string) xdsresource.Type {
80 r.mu.Lock()
81 defer r.mu.Unlock()
82 return r.types[url]
83 }
84
85 func (r *resourceTypeRegistry) maybeRegister(rType xdsresource.Type) error {
86 r.mu.Lock()
87 defer r.mu.Unlock()
88
89 url := rType.TypeURL()
90 typ, ok := r.types[url]
91 if ok && typ != rType {
92 return fmt.Errorf("attempt to re-register a resource type implementation for %v", rType.TypeName())
93 }
94 r.types[url] = rType
95 return nil
96 }
97
98 func (c *clientImpl) triggerResourceNotFoundForTesting(rType xdsresource.Type, resourceName string) error {
99
100 if c == nil || c.done.HasFired() {
101 return fmt.Errorf("attempt to trigger resource-not-found-error for resource %q of type %q, but client is closed", rType.TypeName(), resourceName)
102 }
103
104 n := xdsresource.ParseName(resourceName)
105 a, unref, err := c.findAuthority(n)
106 if err != nil {
107 return fmt.Errorf("attempt to trigger resource-not-found-error for resource %q of type %q, but authority %q is not found", rType.TypeName(), resourceName, n.Authority)
108 }
109 defer unref()
110 a.triggerResourceNotFoundForTesting(rType, n.String())
111 return nil
112 }
113
View as plain text