...

Source file src/google.golang.org/grpc/balancer/rls/picker.go

Documentation: google.golang.org/grpc/balancer/rls

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package rls
    20  
    21  import (
    22  	"errors"
    23  	"fmt"
    24  	"strings"
    25  	"sync/atomic"
    26  	"time"
    27  
    28  	"google.golang.org/grpc/balancer"
    29  	"google.golang.org/grpc/balancer/rls/internal/keys"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/connectivity"
    32  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
    33  	rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
    34  	"google.golang.org/grpc/metadata"
    35  	"google.golang.org/grpc/status"
    36  )
    37  
    38  var (
    39  	errRLSThrottled = errors.New("RLS call throttled at client side")
    40  
    41  	// Function to compute data cache entry size.
    42  	computeDataCacheEntrySize = dcEntrySize
    43  )
    44  
    45  // exitIdler wraps the only method on the BalancerGroup that the picker calls.
    46  type exitIdler interface {
    47  	ExitIdleOne(id string)
    48  }
    49  
    50  // rlsPicker selects the subConn to be used for a particular RPC. It does not
    51  // manage subConns directly and delegates to pickers provided by child policies.
    52  type rlsPicker struct {
    53  	// The keyBuilder map used to generate RLS keys for the RPC. This is built
    54  	// by the LB policy based on the received ServiceConfig.
    55  	kbm keys.BuilderMap
    56  	// Endpoint from the user's original dial target. Used to set the `host_key`
    57  	// field in `extra_keys`.
    58  	origEndpoint string
    59  
    60  	lb *rlsBalancer
    61  
    62  	// The picker is given its own copy of the below fields from the RLS LB policy
    63  	// to avoid having to grab the mutex on the latter.
    64  	defaultPolicy *childPolicyWrapper // Child policy for the default target.
    65  	ctrlCh        *controlChannel     // Control channel to the RLS server.
    66  	maxAge        time.Duration       // Cache max age from LB config.
    67  	staleAge      time.Duration       // Cache stale age from LB config.
    68  	bg            exitIdler
    69  	logger        *internalgrpclog.PrefixLogger
    70  }
    71  
    72  // isFullMethodNameValid return true if name is of the form `/service/method`.
    73  func isFullMethodNameValid(name string) bool {
    74  	return strings.HasPrefix(name, "/") && strings.Count(name, "/") == 2
    75  }
    76  
    77  // Pick makes the routing decision for every outbound RPC.
    78  func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    79  	if name := info.FullMethodName; !isFullMethodNameValid(name) {
    80  		return balancer.PickResult{}, fmt.Errorf("rls: method name %q is not of the form '/service/method", name)
    81  	}
    82  
    83  	// Build the request's keys using the key builders from LB config.
    84  	md, _ := metadata.FromOutgoingContext(info.Ctx)
    85  	reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)
    86  
    87  	p.lb.cacheMu.Lock()
    88  	defer p.lb.cacheMu.Unlock()
    89  
    90  	// Lookup data cache and pending request map using request path and keys.
    91  	cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
    92  	dcEntry := p.lb.dataCache.getEntry(cacheKey)
    93  	pendingEntry := p.lb.pendingMap[cacheKey]
    94  	now := time.Now()
    95  
    96  	switch {
    97  	// No data cache entry. No pending request.
    98  	case dcEntry == nil && pendingEntry == nil:
    99  		throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
   100  		if throttled {
   101  			return p.useDefaultPickIfPossible(info, errRLSThrottled)
   102  		}
   103  		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
   104  
   105  	// No data cache entry. Pending request exits.
   106  	case dcEntry == nil && pendingEntry != nil:
   107  		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
   108  
   109  	// Data cache hit. No pending request.
   110  	case dcEntry != nil && pendingEntry == nil:
   111  		if dcEntry.expiryTime.After(now) {
   112  			if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) {
   113  				p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
   114  			}
   115  			// Delegate to child policies.
   116  			res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
   117  			return res, err
   118  		}
   119  
   120  		// We get here only if the data cache entry has expired. If entry is in
   121  		// backoff, delegate to default target or fail the pick.
   122  		if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
   123  			// Avoid propagating the status code received on control plane RPCs to the
   124  			// data plane which can lead to unexpected outcomes as we do not control
   125  			// the status code sent by the control plane. Propagating the status
   126  			// message received from the control plane is still fine, as it could be
   127  			// useful for debugging purposes.
   128  			st := dcEntry.status
   129  			return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
   130  		}
   131  
   132  		// We get here only if the entry has expired and is not in backoff.
   133  		throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
   134  		if throttled {
   135  			return p.useDefaultPickIfPossible(info, errRLSThrottled)
   136  		}
   137  		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
   138  
   139  	// Data cache hit. Pending request exists.
   140  	default:
   141  		if dcEntry.expiryTime.After(now) {
   142  			res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
   143  			return res, err
   144  		}
   145  		// Data cache entry has expired and pending request exists. Queue pick.
   146  		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
   147  	}
   148  }
   149  
   150  // delegateToChildPoliciesLocked is a helper function which iterates through the
   151  // list of child policy wrappers in a cache entry and attempts to find a child
   152  // policy to which this RPC can be routed to. If all child policies are in
   153  // TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
   154  func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
   155  	const rlsDataHeaderName = "x-google-rls-data"
   156  	for i, cpw := range dcEntry.childPolicyWrappers {
   157  		state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
   158  		// Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if
   159  		// it is the last one (which handles the case of delegating to the last
   160  		// child picker if all child polcies are in TRANSIENT_FAILURE).
   161  		if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
   162  			// Any header data received from the RLS server is stored in the
   163  			// cache entry and needs to be sent to the actual backend in the
   164  			// X-Google-RLS-Data header.
   165  			res, err := state.Picker.Pick(info)
   166  			if err != nil {
   167  				return res, err
   168  			}
   169  			if res.Metadata == nil {
   170  				res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
   171  			} else {
   172  				res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
   173  			}
   174  			return res, nil
   175  		}
   176  	}
   177  	// In the unlikely event that we have a cache entry with no targets, we end up
   178  	// queueing the RPC.
   179  	return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
   180  }
   181  
   182  // useDefaultPickIfPossible is a helper method which delegates to the default
   183  // target if one is configured, or fails the pick with the given error.
   184  func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
   185  	if p.defaultPolicy != nil {
   186  		state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
   187  		return state.Picker.Pick(info)
   188  	}
   189  	return balancer.PickResult{}, errOnNoDefault
   190  }
   191  
   192  // sendRouteLookupRequestLocked adds an entry to the pending request map and
   193  // sends out an RLS request using the passed in arguments. Returns a value
   194  // indicating if the request was throttled by the client-side adaptive
   195  // throttler.
   196  func (p *rlsPicker) sendRouteLookupRequestLocked(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool {
   197  	if p.lb.pendingMap[cacheKey] != nil {
   198  		return false
   199  	}
   200  
   201  	p.lb.pendingMap[cacheKey] = bs
   202  	throttled := p.ctrlCh.lookup(reqKeys, reason, staleHeaders, func(targets []string, headerData string, err error) {
   203  		p.handleRouteLookupResponse(cacheKey, targets, headerData, err)
   204  	})
   205  	if throttled {
   206  		delete(p.lb.pendingMap, cacheKey)
   207  	}
   208  	return throttled
   209  }
   210  
   211  // handleRouteLookupResponse is the callback invoked by the control channel upon
   212  // receipt of an RLS response. Modifies the data cache and pending requests map
   213  // and sends a new picker.
   214  //
   215  // Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu.
   216  func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []string, headerData string, err error) {
   217  	p.logger.Infof("Received RLS response for key %+v with targets %+v, headerData %q, err: %v", cacheKey, targets, headerData, err)
   218  
   219  	p.lb.cacheMu.Lock()
   220  	defer func() {
   221  		// Pending request map entry is unconditionally deleted since the request is
   222  		// no longer pending.
   223  		p.logger.Infof("Removing pending request entry for key %+v", cacheKey)
   224  		delete(p.lb.pendingMap, cacheKey)
   225  		p.lb.sendNewPicker()
   226  		p.lb.cacheMu.Unlock()
   227  	}()
   228  
   229  	// Lookup the data cache entry or create a new one.
   230  	dcEntry := p.lb.dataCache.getEntry(cacheKey)
   231  	if dcEntry == nil {
   232  		dcEntry = &cacheEntry{}
   233  		if _, ok := p.lb.dataCache.addEntry(cacheKey, dcEntry); !ok {
   234  			// This is a very unlikely case where we are unable to add a
   235  			// data cache entry. Log and leave.
   236  			p.logger.Warningf("Failed to add data cache entry for %+v", cacheKey)
   237  			return
   238  		}
   239  	}
   240  
   241  	// For failed requests, the data cache entry is modified as follows:
   242  	// - status is set to error returned from the control channel
   243  	// - current backoff state is available in the pending entry
   244  	//   - `retries` field is incremented and
   245  	//   - backoff state is moved to the data cache
   246  	// - backoffTime is set to the time indicated by the backoff state
   247  	// - backoffExpirationTime is set to twice the backoff time
   248  	// - backoffTimer is set to fire after backoffTime
   249  	//
   250  	// When a proactive cache refresh fails, this would leave the targets and the
   251  	// expiry time from the old entry unchanged. And this mean that the old valid
   252  	// entry would be used until expiration, and a new picker would be sent upon
   253  	// backoff expiry.
   254  	now := time.Now()
   255  
   256  	// "An RLS request is considered to have failed if it returns a non-OK
   257  	// status or the RLS response's targets list is non-empty." - RLS LB Policy
   258  	// design.
   259  	if len(targets) == 0 && err == nil {
   260  		err = fmt.Errorf("RLS response's target list does not contain any entries for key %+v", cacheKey)
   261  		// If err is set, rpc error from the control plane and no control plane
   262  		// configuration is why no targets were passed into this helper, no need
   263  		// to specify and tell the user this information.
   264  	}
   265  	if err != nil {
   266  		dcEntry.status = err
   267  		pendingEntry := p.lb.pendingMap[cacheKey]
   268  		pendingEntry.retries++
   269  		backoffTime := pendingEntry.bs.Backoff(pendingEntry.retries)
   270  		dcEntry.backoffState = pendingEntry
   271  		dcEntry.backoffTime = now.Add(backoffTime)
   272  		dcEntry.backoffExpiryTime = now.Add(2 * backoffTime)
   273  		if dcEntry.backoffState.timer != nil {
   274  			dcEntry.backoffState.timer.Stop()
   275  		}
   276  		dcEntry.backoffState.timer = time.AfterFunc(backoffTime, p.lb.sendNewPicker)
   277  		return
   278  	}
   279  
   280  	// For successful requests, the cache entry is modified as follows:
   281  	// - childPolicyWrappers is set to point to the child policy wrappers
   282  	//   associated with the targets specified in the received response
   283  	// - headerData is set to the value received in the response
   284  	// - expiryTime, stateTime and earliestEvictionTime are set
   285  	// - status is set to nil (OK status)
   286  	// - backoff state is cleared
   287  	p.setChildPolicyWrappersInCacheEntry(dcEntry, targets)
   288  	dcEntry.headerData = headerData
   289  	dcEntry.expiryTime = now.Add(p.maxAge)
   290  	if p.staleAge != 0 {
   291  		dcEntry.staleTime = now.Add(p.staleAge)
   292  	}
   293  	dcEntry.earliestEvictTime = now.Add(minEvictDuration)
   294  	dcEntry.status = nil
   295  	dcEntry.backoffState = &backoffState{bs: defaultBackoffStrategy}
   296  	dcEntry.backoffTime = time.Time{}
   297  	dcEntry.backoffExpiryTime = time.Time{}
   298  	p.lb.dataCache.updateEntrySize(dcEntry, computeDataCacheEntrySize(cacheKey, dcEntry))
   299  }
   300  
   301  // setChildPolicyWrappersInCacheEntry sets up the childPolicyWrappers field in
   302  // the cache entry to point to the child policy wrappers for the targets
   303  // specified in the RLS response.
   304  //
   305  // Caller must hold a write-lock on p.lb.cacheMu.
   306  func (p *rlsPicker) setChildPolicyWrappersInCacheEntry(dcEntry *cacheEntry, newTargets []string) {
   307  	// If the childPolicyWrappers field is already pointing to the right targets,
   308  	// then the field's value does not need to change.
   309  	targetsChanged := true
   310  	func() {
   311  		if cpws := dcEntry.childPolicyWrappers; cpws != nil {
   312  			if len(newTargets) != len(cpws) {
   313  				return
   314  			}
   315  			for i, target := range newTargets {
   316  				if cpws[i].target != target {
   317  					return
   318  				}
   319  			}
   320  			targetsChanged = false
   321  		}
   322  	}()
   323  	if !targetsChanged {
   324  		return
   325  	}
   326  
   327  	// If the childPolicyWrappers field is not already set to the right targets,
   328  	// then it must be reset. We construct a new list of child policies and
   329  	// then swap out the old list for the new one.
   330  	newChildPolicies := p.lb.acquireChildPolicyReferences(newTargets)
   331  	oldChildPolicyTargets := make([]string, len(dcEntry.childPolicyWrappers))
   332  	for i, cpw := range dcEntry.childPolicyWrappers {
   333  		oldChildPolicyTargets[i] = cpw.target
   334  	}
   335  	p.lb.releaseChildPolicyReferences(oldChildPolicyTargets)
   336  	dcEntry.childPolicyWrappers = newChildPolicies
   337  }
   338  
   339  func dcEntrySize(key cacheKey, entry *cacheEntry) int64 {
   340  	return int64(len(key.path) + len(key.keys) + len(entry.headerData))
   341  }
   342  

View as plain text