1
18
19 package rls
20
21 import (
22 "errors"
23 "fmt"
24 "strings"
25 "sync/atomic"
26 "time"
27
28 "google.golang.org/grpc/balancer"
29 "google.golang.org/grpc/balancer/rls/internal/keys"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/connectivity"
32 internalgrpclog "google.golang.org/grpc/internal/grpclog"
33 rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
34 "google.golang.org/grpc/metadata"
35 "google.golang.org/grpc/status"
36 )
37
38 var (
39 errRLSThrottled = errors.New("RLS call throttled at client side")
40
41
42 computeDataCacheEntrySize = dcEntrySize
43 )
44
45
46 type exitIdler interface {
47 ExitIdleOne(id string)
48 }
49
50
51
52 type rlsPicker struct {
53
54
55 kbm keys.BuilderMap
56
57
58 origEndpoint string
59
60 lb *rlsBalancer
61
62
63
64 defaultPolicy *childPolicyWrapper
65 ctrlCh *controlChannel
66 maxAge time.Duration
67 staleAge time.Duration
68 bg exitIdler
69 logger *internalgrpclog.PrefixLogger
70 }
71
72
73 func isFullMethodNameValid(name string) bool {
74 return strings.HasPrefix(name, "/") && strings.Count(name, "/") == 2
75 }
76
77
78 func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
79 if name := info.FullMethodName; !isFullMethodNameValid(name) {
80 return balancer.PickResult{}, fmt.Errorf("rls: method name %q is not of the form '/service/method", name)
81 }
82
83
84 md, _ := metadata.FromOutgoingContext(info.Ctx)
85 reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)
86
87 p.lb.cacheMu.Lock()
88 defer p.lb.cacheMu.Unlock()
89
90
91 cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
92 dcEntry := p.lb.dataCache.getEntry(cacheKey)
93 pendingEntry := p.lb.pendingMap[cacheKey]
94 now := time.Now()
95
96 switch {
97
98 case dcEntry == nil && pendingEntry == nil:
99 throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
100 if throttled {
101 return p.useDefaultPickIfPossible(info, errRLSThrottled)
102 }
103 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
104
105
106 case dcEntry == nil && pendingEntry != nil:
107 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
108
109
110 case dcEntry != nil && pendingEntry == nil:
111 if dcEntry.expiryTime.After(now) {
112 if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) {
113 p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
114 }
115
116 res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
117 return res, err
118 }
119
120
121
122 if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
123
124
125
126
127
128 st := dcEntry.status
129 return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
130 }
131
132
133 throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
134 if throttled {
135 return p.useDefaultPickIfPossible(info, errRLSThrottled)
136 }
137 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
138
139
140 default:
141 if dcEntry.expiryTime.After(now) {
142 res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
143 return res, err
144 }
145
146 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
147 }
148 }
149
150
151
152
153
154 func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
155 const rlsDataHeaderName = "x-google-rls-data"
156 for i, cpw := range dcEntry.childPolicyWrappers {
157 state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
158
159
160
161 if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
162
163
164
165 res, err := state.Picker.Pick(info)
166 if err != nil {
167 return res, err
168 }
169 if res.Metadata == nil {
170 res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
171 } else {
172 res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
173 }
174 return res, nil
175 }
176 }
177
178
179 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
180 }
181
182
183
184 func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
185 if p.defaultPolicy != nil {
186 state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
187 return state.Picker.Pick(info)
188 }
189 return balancer.PickResult{}, errOnNoDefault
190 }
191
192
193
194
195
196 func (p *rlsPicker) sendRouteLookupRequestLocked(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool {
197 if p.lb.pendingMap[cacheKey] != nil {
198 return false
199 }
200
201 p.lb.pendingMap[cacheKey] = bs
202 throttled := p.ctrlCh.lookup(reqKeys, reason, staleHeaders, func(targets []string, headerData string, err error) {
203 p.handleRouteLookupResponse(cacheKey, targets, headerData, err)
204 })
205 if throttled {
206 delete(p.lb.pendingMap, cacheKey)
207 }
208 return throttled
209 }
210
211
212
213
214
215
216 func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []string, headerData string, err error) {
217 p.logger.Infof("Received RLS response for key %+v with targets %+v, headerData %q, err: %v", cacheKey, targets, headerData, err)
218
219 p.lb.cacheMu.Lock()
220 defer func() {
221
222
223 p.logger.Infof("Removing pending request entry for key %+v", cacheKey)
224 delete(p.lb.pendingMap, cacheKey)
225 p.lb.sendNewPicker()
226 p.lb.cacheMu.Unlock()
227 }()
228
229
230 dcEntry := p.lb.dataCache.getEntry(cacheKey)
231 if dcEntry == nil {
232 dcEntry = &cacheEntry{}
233 if _, ok := p.lb.dataCache.addEntry(cacheKey, dcEntry); !ok {
234
235
236 p.logger.Warningf("Failed to add data cache entry for %+v", cacheKey)
237 return
238 }
239 }
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 now := time.Now()
255
256
257
258
259 if len(targets) == 0 && err == nil {
260 err = fmt.Errorf("RLS response's target list does not contain any entries for key %+v", cacheKey)
261
262
263
264 }
265 if err != nil {
266 dcEntry.status = err
267 pendingEntry := p.lb.pendingMap[cacheKey]
268 pendingEntry.retries++
269 backoffTime := pendingEntry.bs.Backoff(pendingEntry.retries)
270 dcEntry.backoffState = pendingEntry
271 dcEntry.backoffTime = now.Add(backoffTime)
272 dcEntry.backoffExpiryTime = now.Add(2 * backoffTime)
273 if dcEntry.backoffState.timer != nil {
274 dcEntry.backoffState.timer.Stop()
275 }
276 dcEntry.backoffState.timer = time.AfterFunc(backoffTime, p.lb.sendNewPicker)
277 return
278 }
279
280
281
282
283
284
285
286
287 p.setChildPolicyWrappersInCacheEntry(dcEntry, targets)
288 dcEntry.headerData = headerData
289 dcEntry.expiryTime = now.Add(p.maxAge)
290 if p.staleAge != 0 {
291 dcEntry.staleTime = now.Add(p.staleAge)
292 }
293 dcEntry.earliestEvictTime = now.Add(minEvictDuration)
294 dcEntry.status = nil
295 dcEntry.backoffState = &backoffState{bs: defaultBackoffStrategy}
296 dcEntry.backoffTime = time.Time{}
297 dcEntry.backoffExpiryTime = time.Time{}
298 p.lb.dataCache.updateEntrySize(dcEntry, computeDataCacheEntrySize(cacheKey, dcEntry))
299 }
300
301
302
303
304
305
306 func (p *rlsPicker) setChildPolicyWrappersInCacheEntry(dcEntry *cacheEntry, newTargets []string) {
307
308
309 targetsChanged := true
310 func() {
311 if cpws := dcEntry.childPolicyWrappers; cpws != nil {
312 if len(newTargets) != len(cpws) {
313 return
314 }
315 for i, target := range newTargets {
316 if cpws[i].target != target {
317 return
318 }
319 }
320 targetsChanged = false
321 }
322 }()
323 if !targetsChanged {
324 return
325 }
326
327
328
329
330 newChildPolicies := p.lb.acquireChildPolicyReferences(newTargets)
331 oldChildPolicyTargets := make([]string, len(dcEntry.childPolicyWrappers))
332 for i, cpw := range dcEntry.childPolicyWrappers {
333 oldChildPolicyTargets[i] = cpw.target
334 }
335 p.lb.releaseChildPolicyReferences(oldChildPolicyTargets)
336 dcEntry.childPolicyWrappers = newChildPolicies
337 }
338
339 func dcEntrySize(key cacheKey, entry *cacheEntry) int64 {
340 return int64(len(key.path) + len(key.keys) + len(entry.headerData))
341 }
342
View as plain text