...
1
18
19 package clustermanager
20
21 import (
22 "fmt"
23 "sync"
24
25 "google.golang.org/grpc/balancer"
26 "google.golang.org/grpc/balancer/base"
27 "google.golang.org/grpc/connectivity"
28 "google.golang.org/grpc/internal/grpclog"
29 )
30
31 type subBalancerState struct {
32 state balancer.State
33
34
35
36
37
38 stateToAggregate connectivity.State
39 }
40
41 func (s *subBalancerState) String() string {
42 return fmt.Sprintf("picker:%p,state:%v,stateToAggregate:%v", s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
43 }
44
45 type balancerStateAggregator struct {
46 cc balancer.ClientConn
47 logger *grpclog.PrefixLogger
48
49 mu sync.Mutex
50
51
52
53
54 started bool
55
56
57
58
59 idToPickerState map[string]*subBalancerState
60
61 pauseUpdateState bool
62
63
64 needUpdateStateOnResume bool
65 }
66
67 func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator {
68 return &balancerStateAggregator{
69 cc: cc,
70 logger: logger,
71 idToPickerState: make(map[string]*subBalancerState),
72 }
73 }
74
75
76
77 func (bsa *balancerStateAggregator) start() {
78 bsa.mu.Lock()
79 defer bsa.mu.Unlock()
80 bsa.started = true
81 }
82
83
84
85 func (bsa *balancerStateAggregator) close() {
86 bsa.mu.Lock()
87 defer bsa.mu.Unlock()
88 bsa.started = false
89 bsa.clearStates()
90 }
91
92
93
94
95
96 func (bsa *balancerStateAggregator) add(id string) {
97 bsa.mu.Lock()
98 defer bsa.mu.Unlock()
99 bsa.idToPickerState[id] = &subBalancerState{
100
101
102
103 state: balancer.State{
104 ConnectivityState: connectivity.Connecting,
105 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
106 },
107 stateToAggregate: connectivity.Connecting,
108 }
109 }
110
111
112
113
114
115 func (bsa *balancerStateAggregator) remove(id string) {
116 bsa.mu.Lock()
117 defer bsa.mu.Unlock()
118 if _, ok := bsa.idToPickerState[id]; !ok {
119 return
120 }
121
122
123 delete(bsa.idToPickerState, id)
124 }
125
126
127
128
129 func (bsa *balancerStateAggregator) pauseStateUpdates() {
130 bsa.mu.Lock()
131 defer bsa.mu.Unlock()
132 bsa.pauseUpdateState = true
133 bsa.needUpdateStateOnResume = false
134 }
135
136
137
138 func (bsa *balancerStateAggregator) resumeStateUpdates() {
139 bsa.mu.Lock()
140 defer bsa.mu.Unlock()
141 bsa.pauseUpdateState = false
142 if bsa.needUpdateStateOnResume {
143 bsa.cc.UpdateState(bsa.build())
144 }
145 }
146
147
148
149
150
151 func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) {
152 bsa.mu.Lock()
153 defer bsa.mu.Unlock()
154 pickerSt, ok := bsa.idToPickerState[id]
155 if !ok {
156
157
158 return
159 }
160 if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) {
161
162
163
164
165 pickerSt.stateToAggregate = state.ConnectivityState
166 }
167 pickerSt.state = state
168
169 if !bsa.started {
170 return
171 }
172 if bsa.pauseUpdateState {
173
174
175 bsa.needUpdateStateOnResume = true
176 return
177 }
178 bsa.cc.UpdateState(bsa.build())
179 }
180
181
182
183
184
185 func (bsa *balancerStateAggregator) clearStates() {
186 for _, pState := range bsa.idToPickerState {
187 pState.state = balancer.State{
188 ConnectivityState: connectivity.Connecting,
189 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
190 }
191 pState.stateToAggregate = connectivity.Connecting
192 }
193 }
194
195
196
197 func (bsa *balancerStateAggregator) buildAndUpdate() {
198 bsa.mu.Lock()
199 defer bsa.mu.Unlock()
200 if !bsa.started {
201 return
202 }
203 if bsa.pauseUpdateState {
204
205
206 bsa.needUpdateStateOnResume = true
207 return
208 }
209 bsa.cc.UpdateState(bsa.build())
210 }
211
212
213
214
215 func (bsa *balancerStateAggregator) build() balancer.State {
216
217
218
219
220
221
222
223
224
225
226
227 var readyN, connectingN, idleN int
228 for _, ps := range bsa.idToPickerState {
229 switch ps.stateToAggregate {
230 case connectivity.Ready:
231 readyN++
232 case connectivity.Connecting:
233 connectingN++
234 case connectivity.Idle:
235 idleN++
236 }
237 }
238 var aggregatedState connectivity.State
239 switch {
240 case readyN > 0:
241 aggregatedState = connectivity.Ready
242 case connectingN > 0:
243 aggregatedState = connectivity.Connecting
244 case idleN > 0:
245 aggregatedState = connectivity.Idle
246 default:
247 aggregatedState = connectivity.TransientFailure
248 }
249
250
251
252
253
254
255 bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState)
256 return balancer.State{
257 ConnectivityState: aggregatedState,
258 Picker: newPickerGroup(bsa.idToPickerState),
259 }
260 }
261
View as plain text