...
1
18
19
20 package clustermanager
21
22 import (
23 "encoding/json"
24 "fmt"
25 "time"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/grpclog"
29 "google.golang.org/grpc/internal/balancergroup"
30 internalgrpclog "google.golang.org/grpc/internal/grpclog"
31 "google.golang.org/grpc/internal/hierarchy"
32 "google.golang.org/grpc/internal/pretty"
33 "google.golang.org/grpc/resolver"
34 "google.golang.org/grpc/serviceconfig"
35 )
36
37 const balancerName = "xds_cluster_manager_experimental"
38
39 func init() {
40 balancer.Register(bb{})
41 }
42
43 type bb struct{}
44
45 func (bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
46 b := &bal{}
47 b.logger = prefixLogger(b)
48 b.stateAggregator = newBalancerStateAggregator(cc, b.logger)
49 b.stateAggregator.start()
50 b.bg = balancergroup.New(balancergroup.Options{
51 CC: cc,
52 BuildOpts: opts,
53 StateAggregator: b.stateAggregator,
54 Logger: b.logger,
55 SubBalancerCloseTimeout: time.Duration(0),
56 })
57 b.bg.Start()
58 b.logger.Infof("Created")
59 return b
60 }
61
62 func (bb) Name() string {
63 return balancerName
64 }
65
66 func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
67 return parseConfig(c)
68 }
69
70 type bal struct {
71 logger *internalgrpclog.PrefixLogger
72
73
74
75 bg *balancergroup.BalancerGroup
76 stateAggregator *balancerStateAggregator
77
78 children map[string]childConfig
79 }
80
81 func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) {
82 update := false
83 addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
84
85
86 for name := range b.children {
87 if _, ok := newConfig.Children[name]; !ok {
88 b.stateAggregator.remove(name)
89 b.bg.Remove(name)
90 update = true
91 }
92 }
93
94
95
96
97 for name, newT := range newConfig.Children {
98 if _, ok := b.children[name]; !ok {
99
100 b.stateAggregator.add(name)
101
102 b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name))
103 } else {
104
105 if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name {
106 b.bg.UpdateBuilder(name, balancer.Get(newT.ChildPolicy.Name))
107 }
108 }
109
110 _ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{
111 ResolverState: resolver.State{
112 Addresses: addressesSplit[name],
113 ServiceConfig: s.ResolverState.ServiceConfig,
114 Attributes: s.ResolverState.Attributes,
115 },
116 BalancerConfig: newT.ChildPolicy.Config,
117 })
118 }
119
120 b.children = newConfig.Children
121 if update {
122 b.stateAggregator.buildAndUpdate()
123 }
124 }
125
126 func (b *bal) UpdateClientConnState(s balancer.ClientConnState) error {
127 newConfig, ok := s.BalancerConfig.(*lbConfig)
128 if !ok {
129 return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
130 }
131 b.logger.Infof("update with config %+v, resolver state %+v", pretty.ToJSON(s.BalancerConfig), s.ResolverState)
132
133 b.stateAggregator.pauseStateUpdates()
134 defer b.stateAggregator.resumeStateUpdates()
135 b.updateChildren(s, newConfig)
136 return nil
137 }
138
139 func (b *bal) ResolverError(err error) {
140 b.bg.ResolverError(err)
141 }
142
143 func (b *bal) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
144 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
145 }
146
147 func (b *bal) Close() {
148 b.stateAggregator.close()
149 b.bg.Close()
150 b.logger.Infof("Shutdown")
151 }
152
153 func (b *bal) ExitIdle() {
154 b.bg.ExitIdle()
155 }
156
157 const prefix = "[xds-cluster-manager-lb %p] "
158
159 var logger = grpclog.Component("xds")
160
161 func prefixLogger(p *bal) *internalgrpclog.PrefixLogger {
162 return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
163 }
164
View as plain text