1
18
19
20
21
22 package clusterresolver
23
24 import (
25 "encoding/json"
26 "errors"
27 "fmt"
28
29 "google.golang.org/grpc/attributes"
30 "google.golang.org/grpc/balancer"
31 "google.golang.org/grpc/balancer/base"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/internal/balancer/nop"
34 "google.golang.org/grpc/internal/buffer"
35 "google.golang.org/grpc/internal/grpclog"
36 "google.golang.org/grpc/internal/grpcsync"
37 "google.golang.org/grpc/internal/pretty"
38 "google.golang.org/grpc/resolver"
39 "google.golang.org/grpc/serviceconfig"
40 "google.golang.org/grpc/xds/internal/balancer/outlierdetection"
41 "google.golang.org/grpc/xds/internal/balancer/priority"
42 "google.golang.org/grpc/xds/internal/xdsclient"
43 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
44 )
45
46
47 const Name = "cluster_resolver_experimental"
48
49 var (
50 errBalancerClosed = errors.New("cdsBalancer is closed")
51 newChildBalancer = func(bb balancer.Builder, cc balancer.ClientConn, o balancer.BuildOptions) balancer.Balancer {
52 return bb.Build(cc, o)
53 }
54 )
55
56 func init() {
57 balancer.Register(bb{})
58 }
59
60 type bb struct{}
61
62
63 func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
64 priorityBuilder := balancer.Get(priority.Name)
65 if priorityBuilder == nil {
66 logger.Errorf("%q LB policy is needed but not registered", priority.Name)
67 return nop.NewBalancer(cc, fmt.Errorf("%q LB policy is needed but not registered", priority.Name))
68 }
69 priorityConfigParser, ok := priorityBuilder.(balancer.ConfigParser)
70 if !ok {
71 logger.Errorf("%q LB policy does not implement a config parser", priority.Name)
72 return nop.NewBalancer(cc, fmt.Errorf("%q LB policy does not implement a config parser", priority.Name))
73 }
74
75 b := &clusterResolverBalancer{
76 bOpts: opts,
77 updateCh: buffer.NewUnbounded(),
78 closed: grpcsync.NewEvent(),
79 done: grpcsync.NewEvent(),
80
81 priorityBuilder: priorityBuilder,
82 priorityConfigParser: priorityConfigParser,
83 }
84 b.logger = prefixLogger(b)
85 b.logger.Infof("Created")
86
87 b.resourceWatcher = newResourceResolver(b, b.logger)
88 b.cc = &ccWrapper{
89 ClientConn: cc,
90 b: b,
91 resourceWatcher: b.resourceWatcher,
92 }
93
94 go b.run()
95 return b
96 }
97
98 func (bb) Name() string {
99 return Name
100 }
101
102 func (bb) ParseConfig(j json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
103 odBuilder := balancer.Get(outlierdetection.Name)
104 if odBuilder == nil {
105
106
107 return nil, fmt.Errorf("%q LB policy is needed but not registered", outlierdetection.Name)
108 }
109 odParser, ok := odBuilder.(balancer.ConfigParser)
110 if !ok {
111
112 return nil, fmt.Errorf("%q LB policy does not implement a config parser", outlierdetection.Name)
113 }
114
115 var cfg *LBConfig
116 if err := json.Unmarshal(j, &cfg); err != nil {
117 return nil, fmt.Errorf("unable to unmarshal balancer config %s into cluster-resolver config, error: %v", string(j), err)
118 }
119
120 for i, dm := range cfg.DiscoveryMechanisms {
121 lbCfg, err := odParser.ParseConfig(dm.OutlierDetection)
122 if err != nil {
123 return nil, fmt.Errorf("error parsing Outlier Detection config %v: %v", dm.OutlierDetection, err)
124 }
125 odCfg, ok := lbCfg.(*outlierdetection.LBConfig)
126 if !ok {
127
128
129 return nil, fmt.Errorf("odParser returned config with unexpected type %T: %v", lbCfg, lbCfg)
130 }
131 cfg.DiscoveryMechanisms[i].outlierDetection = *odCfg
132 }
133 if err := json.Unmarshal(cfg.XDSLBPolicy, &cfg.xdsLBPolicy); err != nil {
134
135
136
137
138
139 return nil, fmt.Errorf("error unmarshalling xDS LB Policy: %v", err)
140 }
141 return cfg, nil
142 }
143
144
145 type ccUpdate struct {
146 state balancer.ClientConnState
147 err error
148 }
149
150 type exitIdle struct{}
151
152
153
154 type clusterResolverBalancer struct {
155 cc balancer.ClientConn
156 bOpts balancer.BuildOptions
157 updateCh *buffer.Unbounded
158 resourceWatcher *resourceResolver
159 logger *grpclog.PrefixLogger
160 closed *grpcsync.Event
161 done *grpcsync.Event
162
163 priorityBuilder balancer.Builder
164 priorityConfigParser balancer.ConfigParser
165
166 config *LBConfig
167 configRaw *serviceconfig.ParseResult
168 xdsClient xdsclient.XDSClient
169 attrsWithClient *attributes.Attributes
170
171 child balancer.Balancer
172 priorities []priorityConfig
173 watchUpdateReceived bool
174 }
175
176
177
178
179
180
181 func (b *clusterResolverBalancer) handleClientConnUpdate(update *ccUpdate) {
182 if err := update.err; err != nil {
183 b.handleErrorFromUpdate(err, true)
184 return
185 }
186
187 b.logger.Infof("Received new balancer config: %v", pretty.ToJSON(update.state.BalancerConfig))
188 cfg, _ := update.state.BalancerConfig.(*LBConfig)
189 if cfg == nil {
190 b.logger.Warningf("Ignoring unsupported balancer configuration of type: %T", update.state.BalancerConfig)
191 return
192 }
193
194 b.config = cfg
195 b.configRaw = update.state.ResolverState.ServiceConfig
196 b.resourceWatcher.updateMechanisms(cfg.DiscoveryMechanisms)
197
198
199
200
201 if !b.watchUpdateReceived {
202 return
203 }
204 b.updateChildConfig()
205 }
206
207
208
209 func (b *clusterResolverBalancer) handleResourceUpdate(update *resourceUpdate) {
210 if err := update.err; err != nil {
211 b.handleErrorFromUpdate(err, false)
212 return
213 }
214
215 b.watchUpdateReceived = true
216 b.priorities = update.priorities
217
218
219
220
221 b.updateChildConfig()
222 }
223
224
225
226
227
228
229
230 func (b *clusterResolverBalancer) updateChildConfig() {
231 if b.child == nil {
232 b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts)
233 }
234
235 childCfgBytes, addrs, err := buildPriorityConfigJSON(b.priorities, &b.config.xdsLBPolicy)
236 if err != nil {
237 b.logger.Warningf("Failed to build child policy config: %v", err)
238 return
239 }
240 childCfg, err := b.priorityConfigParser.ParseConfig(childCfgBytes)
241 if err != nil {
242 b.logger.Warningf("Failed to parse child policy config. This should never happen because the config was generated: %v", err)
243 return
244 }
245 if b.logger.V(2) {
246 b.logger.Infof("Built child policy config: %s", pretty.ToJSON(childCfg))
247 }
248
249 endpoints := make([]resolver.Endpoint, len(addrs))
250 for i, a := range addrs {
251 endpoints[i].Attributes = a.BalancerAttributes
252 endpoints[i].Addresses = []resolver.Address{a}
253 endpoints[i].Addresses[0].BalancerAttributes = nil
254 }
255 if err := b.child.UpdateClientConnState(balancer.ClientConnState{
256 ResolverState: resolver.State{
257 Endpoints: endpoints,
258 Addresses: addrs,
259 ServiceConfig: b.configRaw,
260 Attributes: b.attrsWithClient,
261 },
262 BalancerConfig: childCfg,
263 }); err != nil {
264 b.logger.Warningf("Failed to push config to child policy: %v", err)
265 }
266 }
267
268
269
270
271 func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bool) {
272 b.logger.Warningf("Received error: %v", err)
273
274
275
276
277
278
279
280
281 if fromParent && xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound {
282 b.resourceWatcher.stop(false)
283 }
284
285 if b.child != nil {
286 b.child.ResolverError(err)
287 return
288 }
289 b.cc.UpdateState(balancer.State{
290 ConnectivityState: connectivity.TransientFailure,
291 Picker: base.NewErrPicker(err),
292 })
293 }
294
295
296
297
298 func (b *clusterResolverBalancer) run() {
299 for {
300 select {
301 case u, ok := <-b.updateCh.Get():
302 if !ok {
303 return
304 }
305 b.updateCh.Load()
306 switch update := u.(type) {
307 case *ccUpdate:
308 b.handleClientConnUpdate(update)
309 case exitIdle:
310 if b.child == nil {
311 b.logger.Errorf("xds: received ExitIdle with no child balancer")
312 break
313 }
314
315
316
317
318 if ei, ok := b.child.(balancer.ExitIdler); ok {
319 ei.ExitIdle()
320 }
321 }
322 case u := <-b.resourceWatcher.updateChannel:
323 b.handleResourceUpdate(u)
324
325
326
327 case <-b.closed.Done():
328 b.resourceWatcher.stop(true)
329
330 if b.child != nil {
331 b.child.Close()
332 b.child = nil
333 }
334 b.updateCh.Close()
335
336 b.logger.Infof("Shutdown")
337 b.done.Fire()
338 return
339 }
340 }
341 }
342
343
344
345 func (b *clusterResolverBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
346 if b.closed.HasFired() {
347 b.logger.Warningf("Received update from gRPC {%+v} after close", state)
348 return errBalancerClosed
349 }
350
351 if b.xdsClient == nil {
352 c := xdsclient.FromResolverState(state.ResolverState)
353 if c == nil {
354 return balancer.ErrBadResolverState
355 }
356 b.xdsClient = c
357 b.attrsWithClient = state.ResolverState.Attributes
358 }
359
360 b.updateCh.Put(&ccUpdate{state: state})
361 return nil
362 }
363
364
365 func (b *clusterResolverBalancer) ResolverError(err error) {
366 if b.closed.HasFired() {
367 b.logger.Warningf("Received resolver error {%v} after close", err)
368 return
369 }
370 b.updateCh.Put(&ccUpdate{err: err})
371 }
372
373
374 func (b *clusterResolverBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
375 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
376 }
377
378
379 func (b *clusterResolverBalancer) Close() {
380 b.closed.Fire()
381 <-b.done.Done()
382 }
383
384 func (b *clusterResolverBalancer) ExitIdle() {
385 b.updateCh.Put(exitIdle{})
386 }
387
388
389
390
391
392
393 type ccWrapper struct {
394 balancer.ClientConn
395 b *clusterResolverBalancer
396 resourceWatcher *resourceResolver
397 }
398
399 func (c *ccWrapper) ResolveNow(resolver.ResolveNowOptions) {
400 c.resourceWatcher.resolveNow()
401 }
402
View as plain text