1 /* 2 * 3 * Copyright 2021 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package rls 20 21 import ( 22 "container/list" 23 "time" 24 25 "google.golang.org/grpc/internal/backoff" 26 internalgrpclog "google.golang.org/grpc/internal/grpclog" 27 "google.golang.org/grpc/internal/grpcsync" 28 ) 29 30 // cacheKey represents the key used to uniquely identify an entry in the data 31 // cache and in the pending requests map. 32 type cacheKey struct { 33 // path is the full path of the incoming RPC request. 34 path string 35 // keys is a stringified version of the RLS request key map built using the 36 // RLS keyBuilder. Since maps are not a type which is comparable in Go, it 37 // cannot be part of the key for another map (entries in the data cache and 38 // pending requests map are stored in maps). 39 keys string 40 } 41 42 // cacheEntry wraps all the data to be stored in a data cache entry. 43 type cacheEntry struct { 44 // childPolicyWrappers contains the list of child policy wrappers 45 // corresponding to the targets returned by the RLS server for this entry. 46 childPolicyWrappers []*childPolicyWrapper 47 // headerData is received in the RLS response and is to be sent in the 48 // X-Google-RLS-Data header for matching RPCs. 49 headerData string 50 // expiryTime is the absolute time at which this cache entry entry stops 51 // being valid. When an RLS request succeeds, this is set to the current 52 // time plus the max_age field from the LB policy config. 53 expiryTime time.Time 54 // staleTime is the absolute time after which this cache entry will be 55 // proactively refreshed if an incoming RPC matches this entry. When an RLS 56 // request succeeds, this is set to the current time plus the stale_age from 57 // the LB policy config. 58 staleTime time.Time 59 // earliestEvictTime is the absolute time before which this entry should not 60 // be evicted from the cache. When a cache entry is created, this is set to 61 // the current time plus a default value of 5 seconds. This is required to 62 // make sure that a new entry added to the cache is not evicted before the 63 // RLS response arrives (usually when the cache is too small). 64 earliestEvictTime time.Time 65 66 // status stores the RPC status of the previous RLS request for this 67 // entry. Picks for entries with a non-nil value for this field are failed 68 // with the error stored here. 69 status error 70 // backoffState contains all backoff related state. When an RLS request 71 // succeeds, backoffState is reset. This state moves between the data cache 72 // and the pending requests map. 73 backoffState *backoffState 74 // backoffTime is the absolute time at which the backoff period for this 75 // entry ends. When an RLS request fails, this is set to the current time 76 // plus the backoff value returned by the backoffState. The backoff timer is 77 // also setup with this value. No new RLS requests are sent out for this 78 // entry until the backoff period ends. 79 // 80 // Set to zero time instant upon a successful RLS response. 81 backoffTime time.Time 82 // backoffExpiryTime is the absolute time at which an entry which has gone 83 // through backoff stops being valid. When an RLS request fails, this is 84 // set to the current time plus twice the backoff time. The cache expiry 85 // timer will only delete entries for which both expiryTime and 86 // backoffExpiryTime are in the past. 87 // 88 // Set to zero time instant upon a successful RLS response. 89 backoffExpiryTime time.Time 90 91 // size stores the size of this cache entry. Used to enforce the cache size 92 // specified in the LB policy configuration. 93 size int64 94 } 95 96 // backoffState wraps all backoff related state associated with a cache entry. 97 type backoffState struct { 98 // retries keeps track of the number of RLS failures, to be able to 99 // determine the amount of time to backoff before the next attempt. 100 retries int 101 // bs is the exponential backoff implementation which returns the amount of 102 // time to backoff, given the number of retries. 103 bs backoff.Strategy 104 // timer fires when the backoff period ends and incoming requests after this 105 // will trigger a new RLS request. 106 timer *time.Timer 107 } 108 109 // lru is a cache implementation with a least recently used eviction policy. 110 // Internally it uses a doubly linked list, with the least recently used element 111 // at the front of the list and the most recently used element at the back of 112 // the list. The value stored in this cache will be of type `cacheKey`. 113 // 114 // It is not safe for concurrent access. 115 type lru struct { 116 ll *list.List 117 118 // A map from the value stored in the lru to its underlying list element is 119 // maintained to have a clean API. Without this, a subset of the lru's API 120 // would accept/return cacheKey while another subset would accept/return 121 // list elements. 122 m map[cacheKey]*list.Element 123 } 124 125 // newLRU creates a new cache with a least recently used eviction policy. 126 func newLRU() *lru { 127 return &lru{ 128 ll: list.New(), 129 m: make(map[cacheKey]*list.Element), 130 } 131 } 132 133 func (l *lru) addEntry(key cacheKey) { 134 e := l.ll.PushBack(key) 135 l.m[key] = e 136 } 137 138 func (l *lru) makeRecent(key cacheKey) { 139 e := l.m[key] 140 l.ll.MoveToBack(e) 141 } 142 143 func (l *lru) removeEntry(key cacheKey) { 144 e := l.m[key] 145 l.ll.Remove(e) 146 delete(l.m, key) 147 } 148 149 func (l *lru) getLeastRecentlyUsed() cacheKey { 150 e := l.ll.Front() 151 if e == nil { 152 return cacheKey{} 153 } 154 return e.Value.(cacheKey) 155 } 156 157 // dataCache contains a cache of RLS data used by the LB policy to make routing 158 // decisions. 159 // 160 // The dataCache will be keyed by the request's path and keys, represented by 161 // the `cacheKey` type. It will maintain the cache keys in an `lru` and the 162 // cache data, represented by the `cacheEntry` type, in a native map. 163 // 164 // It is not safe for concurrent access. 165 type dataCache struct { 166 maxSize int64 // Maximum allowed size. 167 currentSize int64 // Current size. 168 keys *lru // Cache keys maintained in lru order. 169 entries map[cacheKey]*cacheEntry 170 logger *internalgrpclog.PrefixLogger 171 shutdown *grpcsync.Event 172 } 173 174 func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache { 175 return &dataCache{ 176 maxSize: size, 177 keys: newLRU(), 178 entries: make(map[cacheKey]*cacheEntry), 179 logger: logger, 180 shutdown: grpcsync.NewEvent(), 181 } 182 } 183 184 // resize changes the maximum allowed size of the data cache. 185 // 186 // The return value indicates if an entry with a valid backoff timer was 187 // evicted. This is important to the RLS LB policy which would send a new picker 188 // on the channel to re-process any RPCs queued as a result of this backoff 189 // timer. 190 func (dc *dataCache) resize(size int64) (backoffCancelled bool) { 191 if dc.shutdown.HasFired() { 192 return false 193 } 194 195 backoffCancelled = false 196 for dc.currentSize > size { 197 key := dc.keys.getLeastRecentlyUsed() 198 entry, ok := dc.entries[key] 199 if !ok { 200 // This should never happen. 201 dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to resize it", key) 202 break 203 } 204 205 // When we encounter a cache entry whose minimum expiration time is in 206 // the future, we abort the LRU pass, which may temporarily leave the 207 // cache being too large. This is necessary to ensure that in cases 208 // where the cache is too small, when we receive an RLS Response, we 209 // keep the resulting cache entry around long enough for the pending 210 // incoming requests to be re-processed through the new Picker. If we 211 // didn't do this, then we'd risk throwing away each RLS response as we 212 // receive it, in which case we would fail to actually route any of our 213 // incoming requests. 214 if entry.earliestEvictTime.After(time.Now()) { 215 dc.logger.Warningf("cachekey %+v is too recent to be evicted. Stopping cache resizing for now", key) 216 break 217 } 218 219 // Stop the backoff timer before evicting the entry. 220 if entry.backoffState != nil && entry.backoffState.timer != nil { 221 if entry.backoffState.timer.Stop() { 222 entry.backoffState.timer = nil 223 backoffCancelled = true 224 } 225 } 226 dc.deleteAndcleanup(key, entry) 227 } 228 dc.maxSize = size 229 return backoffCancelled 230 } 231 232 // evictExpiredEntries sweeps through the cache and deletes expired entries. An 233 // expired entry is one for which both the `expiryTime` and `backoffExpiryTime` 234 // fields are in the past. 235 // 236 // The return value indicates if any expired entries were evicted. 237 // 238 // The LB policy invokes this method periodically to purge expired entries. 239 func (dc *dataCache) evictExpiredEntries() bool { 240 if dc.shutdown.HasFired() { 241 return false 242 } 243 244 evicted := false 245 for key, entry := range dc.entries { 246 // Only evict entries for which both the data expiration time and 247 // backoff expiration time fields are in the past. 248 now := time.Now() 249 if entry.expiryTime.After(now) || entry.backoffExpiryTime.After(now) { 250 continue 251 } 252 dc.deleteAndcleanup(key, entry) 253 evicted = true 254 } 255 return evicted 256 } 257 258 // resetBackoffState sweeps through the cache and for entries with a backoff 259 // state, the backoff timer is cancelled and the backoff state is reset. The 260 // return value indicates if any entries were mutated in this fashion. 261 // 262 // The LB policy invokes this method when the control channel moves from READY 263 // to TRANSIENT_FAILURE back to READY. See `monitorConnectivityState` method on 264 // the `controlChannel` type for more details. 265 func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) bool { 266 if dc.shutdown.HasFired() { 267 return false 268 } 269 270 backoffReset := false 271 for _, entry := range dc.entries { 272 if entry.backoffState == nil { 273 continue 274 } 275 if entry.backoffState.timer != nil { 276 entry.backoffState.timer.Stop() 277 entry.backoffState.timer = nil 278 } 279 entry.backoffState = &backoffState{bs: newBackoffState.bs} 280 entry.backoffTime = time.Time{} 281 entry.backoffExpiryTime = time.Time{} 282 backoffReset = true 283 } 284 return backoffReset 285 } 286 287 // addEntry adds a cache entry for the given key. 288 // 289 // Return value backoffCancelled indicates if a cache entry with a valid backoff 290 // timer was evicted to make space for the current entry. This is important to 291 // the RLS LB policy which would send a new picker on the channel to re-process 292 // any RPCs queued as a result of this backoff timer. 293 // 294 // Return value ok indicates if entry was successfully added to the cache. 295 func (dc *dataCache) addEntry(key cacheKey, entry *cacheEntry) (backoffCancelled bool, ok bool) { 296 if dc.shutdown.HasFired() { 297 return false, false 298 } 299 300 // Handle the extremely unlikely case that a single entry is bigger than the 301 // size of the cache. 302 if entry.size > dc.maxSize { 303 return false, false 304 } 305 dc.entries[key] = entry 306 dc.currentSize += entry.size 307 dc.keys.addEntry(key) 308 // If the new entry makes the cache go over its configured size, remove some 309 // old entries. 310 if dc.currentSize > dc.maxSize { 311 backoffCancelled = dc.resize(dc.maxSize) 312 } 313 return backoffCancelled, true 314 } 315 316 // updateEntrySize updates the size of a cache entry and the current size of the 317 // data cache. An entry's size can change upon receipt of an RLS response. 318 func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) { 319 dc.currentSize -= entry.size 320 entry.size = newSize 321 dc.currentSize += entry.size 322 } 323 324 func (dc *dataCache) getEntry(key cacheKey) *cacheEntry { 325 if dc.shutdown.HasFired() { 326 return nil 327 } 328 329 entry, ok := dc.entries[key] 330 if !ok { 331 return nil 332 } 333 dc.keys.makeRecent(key) 334 return entry 335 } 336 337 func (dc *dataCache) removeEntryForTesting(key cacheKey) { 338 entry, ok := dc.entries[key] 339 if !ok { 340 return 341 } 342 dc.deleteAndcleanup(key, entry) 343 } 344 345 // deleteAndCleanup performs actions required at the time of deleting an entry 346 // from the data cache. 347 // - the entry is removed from the map of entries 348 // - current size of the data cache is update 349 // - the key is removed from the LRU 350 func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) { 351 delete(dc.entries, key) 352 dc.currentSize -= entry.size 353 dc.keys.removeEntry(key) 354 } 355 356 func (dc *dataCache) stop() { 357 for key, entry := range dc.entries { 358 dc.deleteAndcleanup(key, entry) 359 } 360 dc.shutdown.Fire() 361 } 362