...

Source file src/google.golang.org/grpc/balancer/rls/cache.go

Documentation: google.golang.org/grpc/balancer/rls

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package rls
    20  
    21  import (
    22  	"container/list"
    23  	"time"
    24  
    25  	"google.golang.org/grpc/internal/backoff"
    26  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
    27  	"google.golang.org/grpc/internal/grpcsync"
    28  )
    29  
    30  // cacheKey represents the key used to uniquely identify an entry in the data
    31  // cache and in the pending requests map.
    32  type cacheKey struct {
    33  	// path is the full path of the incoming RPC request.
    34  	path string
    35  	// keys is a stringified version of the RLS request key map built using the
    36  	// RLS keyBuilder. Since maps are not a type which is comparable in Go, it
    37  	// cannot be part of the key for another map (entries in the data cache and
    38  	// pending requests map are stored in maps).
    39  	keys string
    40  }
    41  
    42  // cacheEntry wraps all the data to be stored in a data cache entry.
    43  type cacheEntry struct {
    44  	// childPolicyWrappers contains the list of child policy wrappers
    45  	// corresponding to the targets returned by the RLS server for this entry.
    46  	childPolicyWrappers []*childPolicyWrapper
    47  	// headerData is received in the RLS response and is to be sent in the
    48  	// X-Google-RLS-Data header for matching RPCs.
    49  	headerData string
    50  	// expiryTime is the absolute time at which this cache entry entry stops
    51  	// being valid. When an RLS request succeeds, this is set to the current
    52  	// time plus the max_age field from the LB policy config.
    53  	expiryTime time.Time
    54  	// staleTime is the absolute time after which this cache entry will be
    55  	// proactively refreshed if an incoming RPC matches this entry. When an RLS
    56  	// request succeeds, this is set to the current time plus the stale_age from
    57  	// the LB policy config.
    58  	staleTime time.Time
    59  	// earliestEvictTime is the absolute time before which this entry should not
    60  	// be evicted from the cache. When a cache entry is created, this is set to
    61  	// the current time plus a default value of 5 seconds. This is required to
    62  	// make sure that a new entry added to the cache is not evicted before the
    63  	// RLS response arrives (usually when the cache is too small).
    64  	earliestEvictTime time.Time
    65  
    66  	// status stores the RPC status of the previous RLS request for this
    67  	// entry. Picks for entries with a non-nil value for this field are failed
    68  	// with the error stored here.
    69  	status error
    70  	// backoffState contains all backoff related state. When an RLS request
    71  	// succeeds, backoffState is reset. This state moves between the data cache
    72  	// and the pending requests map.
    73  	backoffState *backoffState
    74  	// backoffTime is the absolute time at which the backoff period for this
    75  	// entry ends. When an RLS request fails, this is set to the current time
    76  	// plus the backoff value returned by the backoffState. The backoff timer is
    77  	// also setup with this value. No new RLS requests are sent out for this
    78  	// entry until the backoff period ends.
    79  	//
    80  	// Set to zero time instant upon a successful RLS response.
    81  	backoffTime time.Time
    82  	// backoffExpiryTime is the absolute time at which an entry which has gone
    83  	// through backoff stops being valid.  When an RLS request fails, this is
    84  	// set to the current time plus twice the backoff time. The cache expiry
    85  	// timer will only delete entries for which both expiryTime and
    86  	// backoffExpiryTime are in the past.
    87  	//
    88  	// Set to zero time instant upon a successful RLS response.
    89  	backoffExpiryTime time.Time
    90  
    91  	// size stores the size of this cache entry. Used to enforce the cache size
    92  	// specified in the LB policy configuration.
    93  	size int64
    94  }
    95  
    96  // backoffState wraps all backoff related state associated with a cache entry.
    97  type backoffState struct {
    98  	// retries keeps track of the number of RLS failures, to be able to
    99  	// determine the amount of time to backoff before the next attempt.
   100  	retries int
   101  	// bs is the exponential backoff implementation which returns the amount of
   102  	// time to backoff, given the number of retries.
   103  	bs backoff.Strategy
   104  	// timer fires when the backoff period ends and incoming requests after this
   105  	// will trigger a new RLS request.
   106  	timer *time.Timer
   107  }
   108  
   109  // lru is a cache implementation with a least recently used eviction policy.
   110  // Internally it uses a doubly linked list, with the least recently used element
   111  // at the front of the list and the most recently used element at the back of
   112  // the list. The value stored in this cache will be of type `cacheKey`.
   113  //
   114  // It is not safe for concurrent access.
   115  type lru struct {
   116  	ll *list.List
   117  
   118  	// A map from the value stored in the lru to its underlying list element is
   119  	// maintained to have a clean API. Without this, a subset of the lru's API
   120  	// would accept/return cacheKey while another subset would accept/return
   121  	// list elements.
   122  	m map[cacheKey]*list.Element
   123  }
   124  
   125  // newLRU creates a new cache with a least recently used eviction policy.
   126  func newLRU() *lru {
   127  	return &lru{
   128  		ll: list.New(),
   129  		m:  make(map[cacheKey]*list.Element),
   130  	}
   131  }
   132  
   133  func (l *lru) addEntry(key cacheKey) {
   134  	e := l.ll.PushBack(key)
   135  	l.m[key] = e
   136  }
   137  
   138  func (l *lru) makeRecent(key cacheKey) {
   139  	e := l.m[key]
   140  	l.ll.MoveToBack(e)
   141  }
   142  
   143  func (l *lru) removeEntry(key cacheKey) {
   144  	e := l.m[key]
   145  	l.ll.Remove(e)
   146  	delete(l.m, key)
   147  }
   148  
   149  func (l *lru) getLeastRecentlyUsed() cacheKey {
   150  	e := l.ll.Front()
   151  	if e == nil {
   152  		return cacheKey{}
   153  	}
   154  	return e.Value.(cacheKey)
   155  }
   156  
   157  // dataCache contains a cache of RLS data used by the LB policy to make routing
   158  // decisions.
   159  //
   160  // The dataCache will be keyed by the request's path and keys, represented by
   161  // the `cacheKey` type. It will maintain the cache keys in an `lru` and the
   162  // cache data, represented by the `cacheEntry` type, in a native map.
   163  //
   164  // It is not safe for concurrent access.
   165  type dataCache struct {
   166  	maxSize     int64 // Maximum allowed size.
   167  	currentSize int64 // Current size.
   168  	keys        *lru  // Cache keys maintained in lru order.
   169  	entries     map[cacheKey]*cacheEntry
   170  	logger      *internalgrpclog.PrefixLogger
   171  	shutdown    *grpcsync.Event
   172  }
   173  
   174  func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache {
   175  	return &dataCache{
   176  		maxSize:  size,
   177  		keys:     newLRU(),
   178  		entries:  make(map[cacheKey]*cacheEntry),
   179  		logger:   logger,
   180  		shutdown: grpcsync.NewEvent(),
   181  	}
   182  }
   183  
   184  // resize changes the maximum allowed size of the data cache.
   185  //
   186  // The return value indicates if an entry with a valid backoff timer was
   187  // evicted. This is important to the RLS LB policy which would send a new picker
   188  // on the channel to re-process any RPCs queued as a result of this backoff
   189  // timer.
   190  func (dc *dataCache) resize(size int64) (backoffCancelled bool) {
   191  	if dc.shutdown.HasFired() {
   192  		return false
   193  	}
   194  
   195  	backoffCancelled = false
   196  	for dc.currentSize > size {
   197  		key := dc.keys.getLeastRecentlyUsed()
   198  		entry, ok := dc.entries[key]
   199  		if !ok {
   200  			// This should never happen.
   201  			dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to resize it", key)
   202  			break
   203  		}
   204  
   205  		// When we encounter a cache entry whose minimum expiration time is in
   206  		// the future, we abort the LRU pass, which may temporarily leave the
   207  		// cache being too large. This is necessary to ensure that in cases
   208  		// where the cache is too small, when we receive an RLS Response, we
   209  		// keep the resulting cache entry around long enough for the pending
   210  		// incoming requests to be re-processed through the new Picker. If we
   211  		// didn't do this, then we'd risk throwing away each RLS response as we
   212  		// receive it, in which case we would fail to actually route any of our
   213  		// incoming requests.
   214  		if entry.earliestEvictTime.After(time.Now()) {
   215  			dc.logger.Warningf("cachekey %+v is too recent to be evicted. Stopping cache resizing for now", key)
   216  			break
   217  		}
   218  
   219  		// Stop the backoff timer before evicting the entry.
   220  		if entry.backoffState != nil && entry.backoffState.timer != nil {
   221  			if entry.backoffState.timer.Stop() {
   222  				entry.backoffState.timer = nil
   223  				backoffCancelled = true
   224  			}
   225  		}
   226  		dc.deleteAndcleanup(key, entry)
   227  	}
   228  	dc.maxSize = size
   229  	return backoffCancelled
   230  }
   231  
   232  // evictExpiredEntries sweeps through the cache and deletes expired entries. An
   233  // expired entry is one for which both the `expiryTime` and `backoffExpiryTime`
   234  // fields are in the past.
   235  //
   236  // The return value indicates if any expired entries were evicted.
   237  //
   238  // The LB policy invokes this method periodically to purge expired entries.
   239  func (dc *dataCache) evictExpiredEntries() bool {
   240  	if dc.shutdown.HasFired() {
   241  		return false
   242  	}
   243  
   244  	evicted := false
   245  	for key, entry := range dc.entries {
   246  		// Only evict entries for which both the data expiration time and
   247  		// backoff expiration time fields are in the past.
   248  		now := time.Now()
   249  		if entry.expiryTime.After(now) || entry.backoffExpiryTime.After(now) {
   250  			continue
   251  		}
   252  		dc.deleteAndcleanup(key, entry)
   253  		evicted = true
   254  	}
   255  	return evicted
   256  }
   257  
   258  // resetBackoffState sweeps through the cache and for entries with a backoff
   259  // state, the backoff timer is cancelled and the backoff state is reset. The
   260  // return value indicates if any entries were mutated in this fashion.
   261  //
   262  // The LB policy invokes this method when the control channel moves from READY
   263  // to TRANSIENT_FAILURE back to READY. See `monitorConnectivityState` method on
   264  // the `controlChannel` type for more details.
   265  func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) bool {
   266  	if dc.shutdown.HasFired() {
   267  		return false
   268  	}
   269  
   270  	backoffReset := false
   271  	for _, entry := range dc.entries {
   272  		if entry.backoffState == nil {
   273  			continue
   274  		}
   275  		if entry.backoffState.timer != nil {
   276  			entry.backoffState.timer.Stop()
   277  			entry.backoffState.timer = nil
   278  		}
   279  		entry.backoffState = &backoffState{bs: newBackoffState.bs}
   280  		entry.backoffTime = time.Time{}
   281  		entry.backoffExpiryTime = time.Time{}
   282  		backoffReset = true
   283  	}
   284  	return backoffReset
   285  }
   286  
   287  // addEntry adds a cache entry for the given key.
   288  //
   289  // Return value backoffCancelled indicates if a cache entry with a valid backoff
   290  // timer was evicted to make space for the current entry. This is important to
   291  // the RLS LB policy which would send a new picker on the channel to re-process
   292  // any RPCs queued as a result of this backoff timer.
   293  //
   294  // Return value ok indicates if entry was successfully added to the cache.
   295  func (dc *dataCache) addEntry(key cacheKey, entry *cacheEntry) (backoffCancelled bool, ok bool) {
   296  	if dc.shutdown.HasFired() {
   297  		return false, false
   298  	}
   299  
   300  	// Handle the extremely unlikely case that a single entry is bigger than the
   301  	// size of the cache.
   302  	if entry.size > dc.maxSize {
   303  		return false, false
   304  	}
   305  	dc.entries[key] = entry
   306  	dc.currentSize += entry.size
   307  	dc.keys.addEntry(key)
   308  	// If the new entry makes the cache go over its configured size, remove some
   309  	// old entries.
   310  	if dc.currentSize > dc.maxSize {
   311  		backoffCancelled = dc.resize(dc.maxSize)
   312  	}
   313  	return backoffCancelled, true
   314  }
   315  
   316  // updateEntrySize updates the size of a cache entry and the current size of the
   317  // data cache. An entry's size can change upon receipt of an RLS response.
   318  func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) {
   319  	dc.currentSize -= entry.size
   320  	entry.size = newSize
   321  	dc.currentSize += entry.size
   322  }
   323  
   324  func (dc *dataCache) getEntry(key cacheKey) *cacheEntry {
   325  	if dc.shutdown.HasFired() {
   326  		return nil
   327  	}
   328  
   329  	entry, ok := dc.entries[key]
   330  	if !ok {
   331  		return nil
   332  	}
   333  	dc.keys.makeRecent(key)
   334  	return entry
   335  }
   336  
   337  func (dc *dataCache) removeEntryForTesting(key cacheKey) {
   338  	entry, ok := dc.entries[key]
   339  	if !ok {
   340  		return
   341  	}
   342  	dc.deleteAndcleanup(key, entry)
   343  }
   344  
   345  // deleteAndCleanup performs actions required at the time of deleting an entry
   346  // from the data cache.
   347  // - the entry is removed from the map of entries
   348  // - current size of the data cache is update
   349  // - the key is removed from the LRU
   350  func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) {
   351  	delete(dc.entries, key)
   352  	dc.currentSize -= entry.size
   353  	dc.keys.removeEntry(key)
   354  }
   355  
   356  func (dc *dataCache) stop() {
   357  	for key, entry := range dc.entries {
   358  		dc.deleteAndcleanup(key, entry)
   359  	}
   360  	dc.shutdown.Fire()
   361  }
   362  

View as plain text