1
18
19
20
21
22
23
24 package clusterimpl
25
26 import (
27 "encoding/json"
28 "fmt"
29 "sync"
30 "sync/atomic"
31
32 "google.golang.org/grpc/balancer"
33 "google.golang.org/grpc/connectivity"
34 "google.golang.org/grpc/internal/balancer/gracefulswitch"
35 "google.golang.org/grpc/internal/buffer"
36 "google.golang.org/grpc/internal/grpclog"
37 "google.golang.org/grpc/internal/grpcsync"
38 "google.golang.org/grpc/internal/pretty"
39 "google.golang.org/grpc/internal/xds"
40 "google.golang.org/grpc/internal/xds/bootstrap"
41 "google.golang.org/grpc/resolver"
42 "google.golang.org/grpc/serviceconfig"
43 xdsinternal "google.golang.org/grpc/xds/internal"
44 "google.golang.org/grpc/xds/internal/balancer/loadstore"
45 "google.golang.org/grpc/xds/internal/xdsclient"
46 "google.golang.org/grpc/xds/internal/xdsclient/load"
47 )
48
49 const (
50
51 Name = "xds_cluster_impl_experimental"
52 defaultRequestCountMax = 1024
53 )
54
55 func init() {
56 balancer.Register(bb{})
57 }
58
59 type bb struct{}
60
61 func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
62 b := &clusterImplBalancer{
63 ClientConn: cc,
64 bOpts: bOpts,
65 closed: grpcsync.NewEvent(),
66 done: grpcsync.NewEvent(),
67 loadWrapper: loadstore.NewWrapper(),
68 pickerUpdateCh: buffer.NewUnbounded(),
69 requestCountMax: defaultRequestCountMax,
70 }
71 b.logger = prefixLogger(b)
72 b.child = gracefulswitch.NewBalancer(b, bOpts)
73 go b.run()
74 b.logger.Infof("Created")
75 return b
76 }
77
78 func (bb) Name() string {
79 return Name
80 }
81
82 func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
83 return parseConfig(c)
84 }
85
86 type clusterImplBalancer struct {
87 balancer.ClientConn
88
89
90
91
92
93
94
95
96
97 mu sync.Mutex
98 closed *grpcsync.Event
99 done *grpcsync.Event
100
101 bOpts balancer.BuildOptions
102 logger *grpclog.PrefixLogger
103 xdsClient xdsclient.XDSClient
104
105 config *LBConfig
106 child *gracefulswitch.Balancer
107 cancelLoadReport func()
108 edsServiceName string
109 lrsServer *bootstrap.ServerConfig
110 loadWrapper *loadstore.Wrapper
111
112 clusterNameMu sync.Mutex
113 clusterName string
114
115
116
117
118
119 childState balancer.State
120 dropCategories []DropConfig
121 drops []*dropper
122 requestCounterCluster string
123 requestCounterService string
124 requestCounter *xdsclient.ClusterRequestsCounter
125 requestCountMax uint32
126 telemetryLabels map[string]string
127 pickerUpdateCh *buffer.Unbounded
128 }
129
130
131
132 func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
133 var updateLoadClusterAndService bool
134
135
136
137 clusterName := b.getClusterName()
138 if clusterName != newConfig.Cluster {
139 updateLoadClusterAndService = true
140 b.setClusterName(newConfig.Cluster)
141 clusterName = newConfig.Cluster
142 }
143 if b.edsServiceName != newConfig.EDSServiceName {
144 updateLoadClusterAndService = true
145 b.edsServiceName = newConfig.EDSServiceName
146 }
147 if updateLoadClusterAndService {
148
149
150
151
152
153
154
155
156 b.loadWrapper.UpdateClusterAndService(clusterName, b.edsServiceName)
157 }
158
159 var (
160 stopOldLoadReport bool
161 startNewLoadReport bool
162 )
163
164
165 if b.lrsServer == nil {
166 if newConfig.LoadReportingServer != nil {
167
168 b.lrsServer = newConfig.LoadReportingServer
169 startNewLoadReport = true
170 }
171
172 } else if newConfig.LoadReportingServer == nil {
173
174 b.lrsServer = newConfig.LoadReportingServer
175 stopOldLoadReport = true
176 } else {
177
178
179 if !b.lrsServer.Equal(newConfig.LoadReportingServer) {
180 b.lrsServer = newConfig.LoadReportingServer
181 stopOldLoadReport = true
182 startNewLoadReport = true
183 }
184 }
185
186 if stopOldLoadReport {
187 if b.cancelLoadReport != nil {
188 b.cancelLoadReport()
189 b.cancelLoadReport = nil
190 if !startNewLoadReport {
191
192
193 b.loadWrapper.UpdateLoadStore(nil)
194 }
195 }
196 }
197 if startNewLoadReport {
198 var loadStore *load.Store
199 if b.xdsClient != nil {
200 loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer)
201 }
202 b.loadWrapper.UpdateLoadStore(loadStore)
203 }
204
205 return nil
206 }
207
208 func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
209 if b.closed.HasFired() {
210 b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
211 return nil
212 }
213
214 if b.logger.V(2) {
215 b.logger.Infof("Received update from resolver, balancer config: %s", pretty.ToJSON(s.BalancerConfig))
216 }
217 newConfig, ok := s.BalancerConfig.(*LBConfig)
218 if !ok {
219 return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
220 }
221
222
223
224
225 bb := balancer.Get(newConfig.ChildPolicy.Name)
226 if bb == nil {
227 return fmt.Errorf("balancer %q not registered", newConfig.ChildPolicy.Name)
228 }
229
230 if b.xdsClient == nil {
231 c := xdsclient.FromResolverState(s.ResolverState)
232 if c == nil {
233 return balancer.ErrBadResolverState
234 }
235 b.xdsClient = c
236 }
237
238
239
240
241
242 if err := b.updateLoadStore(newConfig); err != nil {
243 return err
244 }
245
246 if b.config == nil || b.config.ChildPolicy.Name != newConfig.ChildPolicy.Name {
247 if err := b.child.SwitchTo(bb); err != nil {
248 return fmt.Errorf("error switching to child of type %q: %v", newConfig.ChildPolicy.Name, err)
249 }
250 }
251 b.config = newConfig
252
253
254
255 b.pickerUpdateCh.Put(newConfig)
256
257
258 return b.child.UpdateClientConnState(balancer.ClientConnState{
259 ResolverState: s.ResolverState,
260 BalancerConfig: b.config.ChildPolicy.Config,
261 })
262 }
263
264 func (b *clusterImplBalancer) ResolverError(err error) {
265 if b.closed.HasFired() {
266 b.logger.Warningf("xds: received resolver error {%+v} after clusterImplBalancer was closed", err)
267 return
268 }
269 b.child.ResolverError(err)
270 }
271
272 func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer.SubConnState, cb func(balancer.SubConnState)) {
273 if b.closed.HasFired() {
274 b.logger.Warningf("xds: received subconn state change {%+v, %+v} after clusterImplBalancer was closed", sc, s)
275 return
276 }
277
278
279
280
281
282
283
284
285 if s.ConnectivityState == connectivity.TransientFailure {
286 b.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
287 }
288
289 if cb != nil {
290 cb(s)
291 }
292 }
293
294 func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
295 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, s)
296 }
297
298 func (b *clusterImplBalancer) Close() {
299 b.mu.Lock()
300 b.closed.Fire()
301 b.mu.Unlock()
302
303 b.child.Close()
304 b.childState = balancer.State{}
305 b.pickerUpdateCh.Close()
306 <-b.done.Done()
307 b.logger.Infof("Shutdown")
308 }
309
310 func (b *clusterImplBalancer) ExitIdle() {
311 b.child.ExitIdle()
312 }
313
314
315
316 func (b *clusterImplBalancer) UpdateState(state balancer.State) {
317
318 b.pickerUpdateCh.Put(state)
319 }
320
321 func (b *clusterImplBalancer) setClusterName(n string) {
322 b.clusterNameMu.Lock()
323 defer b.clusterNameMu.Unlock()
324 b.clusterName = n
325 }
326
327 func (b *clusterImplBalancer) getClusterName() string {
328 b.clusterNameMu.Lock()
329 defer b.clusterNameMu.Unlock()
330 return b.clusterName
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344 type scWrapper struct {
345 balancer.SubConn
346
347
348 locality atomic.Value
349 }
350
351 func (scw *scWrapper) updateLocalityID(lID xdsinternal.LocalityID) {
352 scw.locality.Store(lID)
353 }
354
355 func (scw *scWrapper) localityID() xdsinternal.LocalityID {
356 lID, _ := scw.locality.Load().(xdsinternal.LocalityID)
357 return lID
358 }
359
360 func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
361 clusterName := b.getClusterName()
362 newAddrs := make([]resolver.Address, len(addrs))
363 var lID xdsinternal.LocalityID
364 for i, addr := range addrs {
365 newAddrs[i] = xds.SetXDSHandshakeClusterName(addr, clusterName)
366 lID = xdsinternal.GetLocalityID(newAddrs[i])
367 }
368 var sc balancer.SubConn
369 oldListener := opts.StateListener
370 opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state, oldListener) }
371 sc, err := b.ClientConn.NewSubConn(newAddrs, opts)
372 if err != nil {
373 return nil, err
374 }
375
376 ret := &scWrapper{SubConn: sc}
377 ret.updateLocalityID(lID)
378 return ret, nil
379 }
380
381 func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) {
382 b.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
383 }
384
385 func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
386 clusterName := b.getClusterName()
387 newAddrs := make([]resolver.Address, len(addrs))
388 var lID xdsinternal.LocalityID
389 for i, addr := range addrs {
390 newAddrs[i] = xds.SetXDSHandshakeClusterName(addr, clusterName)
391 lID = xdsinternal.GetLocalityID(newAddrs[i])
392 }
393 if scw, ok := sc.(*scWrapper); ok {
394 scw.updateLocalityID(lID)
395
396
397 sc = scw.SubConn
398 }
399 b.ClientConn.UpdateAddresses(sc, newAddrs)
400 }
401
402 type dropConfigs struct {
403 drops []*dropper
404 requestCounter *xdsclient.ClusterRequestsCounter
405 requestCountMax uint32
406 }
407
408
409
410
411 func (b *clusterImplBalancer) handleDropAndRequestCount(newConfig *LBConfig) *dropConfigs {
412
413 var updatePicker bool
414 if !equalDropCategories(b.dropCategories, newConfig.DropCategories) {
415 b.dropCategories = newConfig.DropCategories
416 b.drops = make([]*dropper, 0, len(newConfig.DropCategories))
417 for _, c := range newConfig.DropCategories {
418 b.drops = append(b.drops, newDropper(c))
419 }
420 updatePicker = true
421 }
422
423
424
425 if b.requestCounterCluster != newConfig.Cluster || b.requestCounterService != newConfig.EDSServiceName {
426 b.requestCounterCluster = newConfig.Cluster
427 b.requestCounterService = newConfig.EDSServiceName
428 b.requestCounter = xdsclient.GetClusterRequestsCounter(newConfig.Cluster, newConfig.EDSServiceName)
429 updatePicker = true
430 }
431
432
433 var newRequestCountMax uint32 = 1024
434 if newConfig.MaxConcurrentRequests != nil {
435 newRequestCountMax = *newConfig.MaxConcurrentRequests
436 }
437 if b.requestCountMax != newRequestCountMax {
438 b.requestCountMax = newRequestCountMax
439 updatePicker = true
440 }
441
442 if !updatePicker {
443 return nil
444 }
445 return &dropConfigs{
446 drops: b.drops,
447 requestCounter: b.requestCounter,
448 requestCountMax: b.requestCountMax,
449 }
450 }
451
452 func (b *clusterImplBalancer) run() {
453 defer b.done.Fire()
454 for {
455 select {
456 case update, ok := <-b.pickerUpdateCh.Get():
457 if !ok {
458 return
459 }
460 b.pickerUpdateCh.Load()
461 b.mu.Lock()
462 if b.closed.HasFired() {
463 b.mu.Unlock()
464 return
465 }
466 switch u := update.(type) {
467 case balancer.State:
468 b.childState = u
469 b.ClientConn.UpdateState(balancer.State{
470 ConnectivityState: b.childState.ConnectivityState,
471 Picker: b.newPicker(&dropConfigs{
472 drops: b.drops,
473 requestCounter: b.requestCounter,
474 requestCountMax: b.requestCountMax,
475 }),
476 })
477 case *LBConfig:
478 b.telemetryLabels = u.TelemetryLabels
479 dc := b.handleDropAndRequestCount(u)
480 if dc != nil && b.childState.Picker != nil {
481 b.ClientConn.UpdateState(balancer.State{
482 ConnectivityState: b.childState.ConnectivityState,
483 Picker: b.newPicker(dc),
484 })
485 }
486 }
487 b.mu.Unlock()
488 case <-b.closed.Done():
489 if b.cancelLoadReport != nil {
490 b.cancelLoadReport()
491 b.cancelLoadReport = nil
492 }
493 return
494 }
495 }
496 }
497
View as plain text