...
1
18
19 package grpc
20
21 import (
22 "context"
23 "fmt"
24 "io"
25 "sync"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/internal/channelz"
30 istatus "google.golang.org/grpc/internal/status"
31 "google.golang.org/grpc/internal/transport"
32 "google.golang.org/grpc/stats"
33 "google.golang.org/grpc/status"
34 )
35
36
37
38 type pickerWrapper struct {
39 mu sync.Mutex
40 done bool
41 blockingCh chan struct{}
42 picker balancer.Picker
43 statsHandlers []stats.Handler
44 }
45
46 func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
47 return &pickerWrapper{
48 blockingCh: make(chan struct{}),
49 statsHandlers: statsHandlers,
50 }
51 }
52
53
54 func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
55 pw.mu.Lock()
56 if pw.done {
57 pw.mu.Unlock()
58 return
59 }
60 pw.picker = p
61
62 close(pw.blockingCh)
63 pw.blockingCh = make(chan struct{})
64 pw.mu.Unlock()
65 }
66
67
68
69
70
71
72 func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
73 ac := acbw.ac
74 ac.incrCallsStarted()
75 done := result.Done
76 result.Done = func(b balancer.DoneInfo) {
77 if b.Err != nil && b.Err != io.EOF {
78 ac.incrCallsFailed()
79 } else {
80 ac.incrCallsSucceeded()
81 }
82 if done != nil {
83 done(b)
84 }
85 }
86 }
87
88
89
90
91
92
93
94
95 func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
96 var ch chan struct{}
97
98 var lastPickErr error
99
100 for {
101 pw.mu.Lock()
102 if pw.done {
103 pw.mu.Unlock()
104 return nil, balancer.PickResult{}, ErrClientConnClosing
105 }
106
107 if pw.picker == nil {
108 ch = pw.blockingCh
109 }
110 if ch == pw.blockingCh {
111
112
113
114 pw.mu.Unlock()
115 select {
116 case <-ctx.Done():
117 var errStr string
118 if lastPickErr != nil {
119 errStr = "latest balancer error: " + lastPickErr.Error()
120 } else {
121 errStr = fmt.Sprintf("received context error while waiting for new LB policy update: %s", ctx.Err().Error())
122 }
123 switch ctx.Err() {
124 case context.DeadlineExceeded:
125 return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
126 case context.Canceled:
127 return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
128 }
129 case <-ch:
130 }
131 continue
132 }
133
134
135
136
137
138
139
140
141
142 if ch != nil {
143 for _, sh := range pw.statsHandlers {
144 sh.HandleRPC(ctx, &stats.PickerUpdated{})
145 }
146 }
147
148 ch = pw.blockingCh
149 p := pw.picker
150 pw.mu.Unlock()
151
152 pickResult, err := p.Pick(info)
153 if err != nil {
154 if err == balancer.ErrNoSubConnAvailable {
155 continue
156 }
157 if st, ok := status.FromError(err); ok {
158
159
160 if istatus.IsRestrictedControlPlaneCode(st) {
161 err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
162 }
163 return nil, balancer.PickResult{}, dropError{error: err}
164 }
165
166
167 if !failfast {
168 lastPickErr = err
169 continue
170 }
171 return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
172 }
173
174 acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
175 if !ok {
176 logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
177 continue
178 }
179 if t := acbw.ac.getReadyTransport(); t != nil {
180 if channelz.IsOn() {
181 doneChannelzWrapper(acbw, &pickResult)
182 return t, pickResult, nil
183 }
184 return t, pickResult, nil
185 }
186 if pickResult.Done != nil {
187
188
189 pickResult.Done(balancer.DoneInfo{})
190 }
191 logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
192
193
194
195
196 }
197 }
198
199 func (pw *pickerWrapper) close() {
200 pw.mu.Lock()
201 defer pw.mu.Unlock()
202 if pw.done {
203 return
204 }
205 pw.done = true
206 close(pw.blockingCh)
207 }
208
209
210
211 func (pw *pickerWrapper) reset() {
212 pw.mu.Lock()
213 defer pw.mu.Unlock()
214 if pw.done {
215 return
216 }
217 pw.blockingCh = make(chan struct{})
218 }
219
220
221
222 type dropError struct {
223 error
224 }
225
View as plain text