...
1
18
19
20
21
22
23
24 package priority
25
26 import (
27 "encoding/json"
28 "fmt"
29 "sync"
30 "time"
31
32 "google.golang.org/grpc/balancer"
33 "google.golang.org/grpc/balancer/base"
34 "google.golang.org/grpc/connectivity"
35 "google.golang.org/grpc/internal/balancergroup"
36 "google.golang.org/grpc/internal/buffer"
37 "google.golang.org/grpc/internal/grpclog"
38 "google.golang.org/grpc/internal/grpcsync"
39 "google.golang.org/grpc/internal/hierarchy"
40 "google.golang.org/grpc/internal/pretty"
41 "google.golang.org/grpc/resolver"
42 "google.golang.org/grpc/serviceconfig"
43 )
44
45
46 const Name = "priority_experimental"
47
48
49
50 var DefaultSubBalancerCloseTimeout = 15 * time.Minute
51
52 func init() {
53 balancer.Register(bb{})
54 }
55
56 type bb struct{}
57
58 func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
59 b := &priorityBalancer{
60 cc: cc,
61 done: grpcsync.NewEvent(),
62 children: make(map[string]*childBalancer),
63 childBalancerStateUpdate: buffer.NewUnbounded(),
64 }
65
66 b.logger = prefixLogger(b)
67 b.bg = balancergroup.New(balancergroup.Options{
68 CC: cc,
69 BuildOpts: bOpts,
70 StateAggregator: b,
71 Logger: b.logger,
72 SubBalancerCloseTimeout: DefaultSubBalancerCloseTimeout,
73 })
74 b.bg.Start()
75 go b.run()
76 b.logger.Infof("Created")
77 return b
78 }
79
80 func (b bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
81 return parseConfig(s)
82 }
83
84 func (bb) Name() string {
85 return Name
86 }
87
88
89
90 type timerWrapper struct {
91 stopped bool
92 timer *time.Timer
93 }
94
95 type priorityBalancer struct {
96 logger *grpclog.PrefixLogger
97 cc balancer.ClientConn
98 bg *balancergroup.BalancerGroup
99 done *grpcsync.Event
100 childBalancerStateUpdate *buffer.Unbounded
101
102 mu sync.Mutex
103 childInUse string
104
105 priorities []string
106
107 children map[string]*childBalancer
108
109
110
111
112
113
114 inhibitPickerUpdates bool
115 }
116
117 func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
118 if b.logger.V(2) {
119 b.logger.Infof("Received an update with balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
120 }
121 newConfig, ok := s.BalancerConfig.(*LBConfig)
122 if !ok {
123 return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
124 }
125 addressesSplit := hierarchy.Group(s.ResolverState.Addresses)
126
127 b.mu.Lock()
128
129
130 for name, newSubConfig := range newConfig.Children {
131 bb := balancer.Get(newSubConfig.Config.Name)
132 if bb == nil {
133 b.logger.Errorf("balancer name %v from config is not registered", newSubConfig.Config.Name)
134 continue
135 }
136
137 currentChild, ok := b.children[name]
138 if !ok {
139
140
141
142 cb := newChildBalancer(name, b, bb.Name(), b.cc)
143 cb.updateConfig(newSubConfig, resolver.State{
144 Addresses: addressesSplit[name],
145 ServiceConfig: s.ResolverState.ServiceConfig,
146 Attributes: s.ResolverState.Attributes,
147 })
148 b.children[name] = cb
149 continue
150 }
151
152
153
154
155
156 if currentChild.balancerName != bb.Name() {
157 currentChild.stop()
158 currentChild.updateBalancerName(bb.Name())
159 }
160
161
162
163
164 currentChild.updateConfig(newSubConfig, resolver.State{
165 Addresses: addressesSplit[name],
166 ServiceConfig: s.ResolverState.ServiceConfig,
167 Attributes: s.ResolverState.Attributes,
168 })
169 }
170
171 for name, oldChild := range b.children {
172 if _, ok := newConfig.Children[name]; !ok {
173 oldChild.stop()
174 delete(b.children, name)
175 }
176 }
177
178
179 b.priorities = newConfig.Priorities
180
181
182 if len(b.priorities) == 0 {
183 b.childInUse = ""
184 b.cc.UpdateState(balancer.State{
185 ConnectivityState: connectivity.TransientFailure,
186 Picker: base.NewErrPicker(ErrAllPrioritiesRemoved),
187 })
188 b.mu.Unlock()
189 return nil
190 }
191
192
193
194
195
196
197
198
199 b.inhibitPickerUpdates = true
200
201
202 done := make(chan struct{})
203 b.childBalancerStateUpdate.Put(resumePickerUpdates{done: done})
204 b.mu.Unlock()
205 <-done
206
207 return nil
208 }
209
210 func (b *priorityBalancer) ResolverError(err error) {
211 b.bg.ResolverError(err)
212 }
213
214 func (b *priorityBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
215 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
216 }
217
218 func (b *priorityBalancer) Close() {
219 b.bg.Close()
220 b.childBalancerStateUpdate.Close()
221
222 b.mu.Lock()
223 defer b.mu.Unlock()
224 b.done.Fire()
225
226
227 b.childInUse = ""
228
229
230 for _, child := range b.children {
231 child.stop()
232 }
233 }
234
235 func (b *priorityBalancer) ExitIdle() {
236 b.bg.ExitIdle()
237 }
238
239
240
241 func (b *priorityBalancer) UpdateState(childName string, state balancer.State) {
242 b.childBalancerStateUpdate.Put(childBalancerState{
243 name: childName,
244 s: state,
245 })
246 }
247
248 type childBalancerState struct {
249 name string
250 s balancer.State
251 }
252
253 type resumePickerUpdates struct {
254 done chan struct{}
255 }
256
257
258
259
260 func (b *priorityBalancer) run() {
261 for {
262 select {
263 case u, ok := <-b.childBalancerStateUpdate.Get():
264 if !ok {
265 return
266 }
267 b.childBalancerStateUpdate.Load()
268
269
270
271 b.mu.Lock()
272 if b.done.HasFired() {
273 return
274 }
275 switch s := u.(type) {
276 case childBalancerState:
277 b.handleChildStateUpdate(s.name, s.s)
278 case resumePickerUpdates:
279 b.inhibitPickerUpdates = false
280 b.syncPriority(b.childInUse)
281 close(s.done)
282 }
283 b.mu.Unlock()
284 case <-b.done.Done():
285 return
286 }
287 }
288 }
289
View as plain text