1
18
19 package xdsclient
20
21 import (
22 "bytes"
23 "context"
24 "encoding/json"
25 "fmt"
26 "sync"
27 "sync/atomic"
28 "time"
29
30 "google.golang.org/grpc/internal"
31 "google.golang.org/grpc/internal/cache"
32 "google.golang.org/grpc/internal/grpcsync"
33 "google.golang.org/grpc/internal/xds/bootstrap"
34 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
35 )
36
37
38
39
40
41
42
43
44
45
46
47
48 func New() (XDSClient, func(), error) {
49 return newRefCountedWithConfig(nil)
50 }
51
52
53
54
55
56
57
58
59
60
61
62
63
64 func NewWithConfig(config *bootstrap.Config) (XDSClient, func(), error) {
65 return newRefCountedWithConfig(config)
66 }
67
68
69 func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (*clientImpl, error) {
70 ctx, cancel := context.WithCancel(context.Background())
71 c := &clientImpl{
72 done: grpcsync.NewEvent(),
73 config: config,
74 watchExpiryTimeout: watchExpiryTimeout,
75 serializer: grpcsync.NewCallbackSerializer(ctx),
76 serializerClose: cancel,
77 resourceTypes: newResourceTypeRegistry(),
78 authorities: make(map[string]*authority),
79 idleAuthorities: cache.NewTimeoutCache(idleAuthorityDeleteTimeout),
80 }
81
82 c.logger = prefixLogger(c)
83 c.logger.Infof("Created client to xDS management server: %s", config.XDSServer)
84 return c, nil
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98 func NewWithConfigForTesting(config *bootstrap.Config, watchExpiryTimeout, authorityIdleTimeout time.Duration) (XDSClient, func(), error) {
99 cl, err := newWithConfig(config, watchExpiryTimeout, authorityIdleTimeout)
100 if err != nil {
101 return nil, nil, err
102 }
103 return cl, grpcsync.OnceFunc(cl.close), nil
104 }
105
106 func init() {
107 internal.TriggerXDSResourceNameNotFoundClient = triggerXDSResourceNameNotFoundClient
108 }
109
110 var singletonClientForTesting = atomic.Pointer[clientRefCounted]{}
111
112 func triggerXDSResourceNameNotFoundClient(resourceType, resourceName string) error {
113 c := singletonClientForTesting.Load()
114 return internal.TriggerXDSResourceNameNotFoundForTesting.(func(func(xdsresource.Type, string) error, string, string) error)(c.clientImpl.triggerResourceNotFoundForTesting, resourceType, resourceName)
115 }
116
117
118
119
120
121
122
123
124
125
126
127 func NewWithBootstrapContentsForTesting(contents []byte) (XDSClient, func(), error) {
128
129 buf := bytes.Buffer{}
130 err := json.Indent(&buf, contents, "", "")
131 if err != nil {
132 return nil, nil, fmt.Errorf("xds: error normalizing JSON: %v", err)
133 }
134 contents = bytes.TrimSpace(buf.Bytes())
135
136 c, err := getOrMakeClientForTesting(contents)
137 if err != nil {
138 return nil, nil, err
139 }
140 singletonClientForTesting.Store(c)
141 return c, grpcsync.OnceFunc(func() {
142 clientsMu.Lock()
143 defer clientsMu.Unlock()
144 if c.decrRef() == 0 {
145 c.close()
146 delete(clients, string(contents))
147 singletonClientForTesting.Store(nil)
148 }
149 }), nil
150 }
151
152
153
154
155
156
157 func getOrMakeClientForTesting(config []byte) (*clientRefCounted, error) {
158 clientsMu.Lock()
159 defer clientsMu.Unlock()
160
161 if c := clients[string(config)]; c != nil {
162 c.incrRef()
163 return c, nil
164 }
165
166 bcfg, err := bootstrap.NewConfigFromContents(config)
167 if err != nil {
168 return nil, fmt.Errorf("bootstrap config %s: %v", string(config), err)
169 }
170 cImpl, err := newWithConfig(bcfg, defaultWatchExpiryTimeout, defaultIdleAuthorityDeleteTimeout)
171 if err != nil {
172 return nil, fmt.Errorf("creating xDS client: %v", err)
173 }
174 c := &clientRefCounted{clientImpl: cImpl, refCount: 1}
175 clients[string(config)] = c
176 return c, nil
177 }
178
179 var (
180 clients = map[string]*clientRefCounted{}
181 clientsMu sync.Mutex
182 )
183
View as plain text