1
18
19 package grpc
20
21 import (
22 "encoding/json"
23 "errors"
24 "fmt"
25
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/connectivity"
28 internalgrpclog "google.golang.org/grpc/internal/grpclog"
29 "google.golang.org/grpc/internal/grpcrand"
30 "google.golang.org/grpc/internal/pretty"
31 "google.golang.org/grpc/resolver"
32 "google.golang.org/grpc/serviceconfig"
33 )
34
35 const (
36
37 PickFirstBalancerName = "pick_first"
38 logPrefix = "[pick-first-lb %p] "
39 )
40
41 type pickfirstBuilder struct{}
42
43 func (pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
44 b := &pickfirstBalancer{cc: cc}
45 b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
46 return b
47 }
48
49 func (pickfirstBuilder) Name() string {
50 return PickFirstBalancerName
51 }
52
53 type pfConfig struct {
54 serviceconfig.LoadBalancingConfig `json:"-"`
55
56
57
58
59 ShuffleAddressList bool `json:"shuffleAddressList"`
60 }
61
62 func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
63 var cfg pfConfig
64 if err := json.Unmarshal(js, &cfg); err != nil {
65 return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
66 }
67 return cfg, nil
68 }
69
70 type pickfirstBalancer struct {
71 logger *internalgrpclog.PrefixLogger
72 state connectivity.State
73 cc balancer.ClientConn
74 subConn balancer.SubConn
75 }
76
77 func (b *pickfirstBalancer) ResolverError(err error) {
78 if b.logger.V(2) {
79 b.logger.Infof("Received error from the name resolver: %v", err)
80 }
81 if b.subConn == nil {
82 b.state = connectivity.TransientFailure
83 }
84
85 if b.state != connectivity.TransientFailure {
86
87
88 return
89 }
90 b.cc.UpdateState(balancer.State{
91 ConnectivityState: connectivity.TransientFailure,
92 Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
93 })
94 }
95
96 func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
97 if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
98
99
100 if b.subConn != nil {
101
102
103 b.subConn.Shutdown()
104 b.subConn = nil
105 }
106 b.ResolverError(errors.New("produced zero addresses"))
107 return balancer.ErrBadResolverState
108 }
109
110
111 cfg, ok := state.BalancerConfig.(pfConfig)
112 if state.BalancerConfig != nil && !ok {
113 return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v", state.BalancerConfig, state.BalancerConfig)
114 }
115
116 if b.logger.V(2) {
117 b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
118 }
119
120 var addrs []resolver.Address
121 if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
122
123
124
125 if cfg.ShuffleAddressList {
126 endpoints = append([]resolver.Endpoint{}, endpoints...)
127 grpcrand.Shuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
128 }
129
130
131
132 for _, endpoint := range endpoints {
133
134
135
136 addrs = append(addrs, endpoint.Addresses...)
137 }
138 } else {
139
140
141
142
143
144
145 addrs = state.ResolverState.Addresses
146 if cfg.ShuffleAddressList {
147 addrs = append([]resolver.Address{}, addrs...)
148 grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
149 }
150 }
151
152 if b.subConn != nil {
153 b.cc.UpdateAddresses(b.subConn, addrs)
154 return nil
155 }
156
157 var subConn balancer.SubConn
158 subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{
159 StateListener: func(state balancer.SubConnState) {
160 b.updateSubConnState(subConn, state)
161 },
162 })
163 if err != nil {
164 if b.logger.V(2) {
165 b.logger.Infof("Failed to create new SubConn: %v", err)
166 }
167 b.state = connectivity.TransientFailure
168 b.cc.UpdateState(balancer.State{
169 ConnectivityState: connectivity.TransientFailure,
170 Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
171 })
172 return balancer.ErrBadResolverState
173 }
174 b.subConn = subConn
175 b.state = connectivity.Idle
176 b.cc.UpdateState(balancer.State{
177 ConnectivityState: connectivity.Connecting,
178 Picker: &picker{err: balancer.ErrNoSubConnAvailable},
179 })
180 b.subConn.Connect()
181 return nil
182 }
183
184
185
186 func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
187 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
188 }
189
190 func (b *pickfirstBalancer) updateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
191 if b.logger.V(2) {
192 b.logger.Infof("Received SubConn state update: %p, %+v", subConn, state)
193 }
194 if b.subConn != subConn {
195 if b.logger.V(2) {
196 b.logger.Infof("Ignored state change because subConn is not recognized")
197 }
198 return
199 }
200 if state.ConnectivityState == connectivity.Shutdown {
201 b.subConn = nil
202 return
203 }
204
205 switch state.ConnectivityState {
206 case connectivity.Ready:
207 b.cc.UpdateState(balancer.State{
208 ConnectivityState: state.ConnectivityState,
209 Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
210 })
211 case connectivity.Connecting:
212 if b.state == connectivity.TransientFailure {
213
214 return
215 }
216 b.cc.UpdateState(balancer.State{
217 ConnectivityState: state.ConnectivityState,
218 Picker: &picker{err: balancer.ErrNoSubConnAvailable},
219 })
220 case connectivity.Idle:
221 if b.state == connectivity.TransientFailure {
222
223
224 b.subConn.Connect()
225 return
226 }
227 b.cc.UpdateState(balancer.State{
228 ConnectivityState: state.ConnectivityState,
229 Picker: &idlePicker{subConn: subConn},
230 })
231 case connectivity.TransientFailure:
232 b.cc.UpdateState(balancer.State{
233 ConnectivityState: state.ConnectivityState,
234 Picker: &picker{err: state.ConnectionError},
235 })
236 }
237 b.state = state.ConnectivityState
238 }
239
240 func (b *pickfirstBalancer) Close() {
241 }
242
243 func (b *pickfirstBalancer) ExitIdle() {
244 if b.subConn != nil && b.state == connectivity.Idle {
245 b.subConn.Connect()
246 }
247 }
248
249 type picker struct {
250 result balancer.PickResult
251 err error
252 }
253
254 func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
255 return p.result, p.err
256 }
257
258
259
260 type idlePicker struct {
261 subConn balancer.SubConn
262 }
263
264 func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
265 i.subConn.Connect()
266 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
267 }
268
View as plain text