...

Source file src/google.golang.org/grpc/xds/internal/server/rds_handler.go

Documentation: google.golang.org/grpc/xds/internal/server

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package server
    20  
    21  import (
    22  	"sync"
    23  
    24  	igrpclog "google.golang.org/grpc/internal/grpclog"
    25  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    26  )
    27  
    28  // rdsHandler handles any RDS queries that need to be started for a given server
    29  // side listeners Filter Chains (i.e. not inline). It persists rdsWatcher
    30  // updates for later use and also determines whether all the rdsWatcher updates
    31  // needed have been received or not.
    32  type rdsHandler struct {
    33  	xdsC   XDSClient
    34  	logger *igrpclog.PrefixLogger
    35  
    36  	callback func(string, rdsWatcherUpdate)
    37  
    38  	// updates is a map from routeName to rdsWatcher update, including
    39  	// RouteConfiguration resources and any errors received. If not written in
    40  	// this map, no RouteConfiguration or error for that route name yet. If
    41  	// update set in value, use that as valid route configuration, otherwise
    42  	// treat as an error case and fail at L7 level.
    43  	updates map[string]rdsWatcherUpdate
    44  
    45  	mu      sync.Mutex
    46  	cancels map[string]func()
    47  }
    48  
    49  // newRDSHandler creates a new rdsHandler to watch for RouteConfiguration
    50  // resources. listenerWrapper updates the list of route names to watch by
    51  // calling updateRouteNamesToWatch() upon receipt of new Listener configuration.
    52  func newRDSHandler(cb func(string, rdsWatcherUpdate), xdsC XDSClient, logger *igrpclog.PrefixLogger) *rdsHandler {
    53  	return &rdsHandler{
    54  		xdsC:     xdsC,
    55  		logger:   logger,
    56  		callback: cb,
    57  		updates:  make(map[string]rdsWatcherUpdate),
    58  		cancels:  make(map[string]func()),
    59  	}
    60  }
    61  
    62  // updateRouteNamesToWatch handles a list of route names to watch for a given
    63  // server side listener (if a filter chain specifies dynamic
    64  // RouteConfiguration). This function handles all the logic with respect to any
    65  // routes that may have been added or deleted as compared to what was previously
    66  // present. Must be called within an xDS Client callback.
    67  func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) {
    68  	rh.mu.Lock()
    69  	defer rh.mu.Unlock()
    70  	// Add and start watches for any new routes in routeNamesToWatch.
    71  	for routeName := range routeNamesToWatch {
    72  		if _, ok := rh.cancels[routeName]; !ok {
    73  			// The xDS client keeps a reference to the watcher until the cancel
    74  			// func is invoked. So, we don't need to keep a reference for fear
    75  			// of it being garbage collected.
    76  			w := &rdsWatcher{parent: rh, routeName: routeName}
    77  			cancel := xdsresource.WatchRouteConfig(rh.xdsC, routeName, w)
    78  			// Set bit on cancel function to eat any RouteConfiguration calls
    79  			// for this watcher after it has been canceled.
    80  			rh.cancels[routeName] = func() {
    81  				w.mu.Lock()
    82  				w.canceled = true
    83  				w.mu.Unlock()
    84  				cancel()
    85  			}
    86  		}
    87  	}
    88  
    89  	// Delete and cancel watches for any routes from persisted routeNamesToWatch
    90  	// that are no longer present.
    91  	for routeName := range rh.cancels {
    92  		if _, ok := routeNamesToWatch[routeName]; !ok {
    93  			rh.cancels[routeName]()
    94  			delete(rh.cancels, routeName)
    95  			delete(rh.updates, routeName)
    96  		}
    97  	}
    98  }
    99  
   100  // determines if all dynamic RouteConfiguration needed has received
   101  // configuration or update. Must be called from an xDS Client Callback.
   102  func (rh *rdsHandler) determineRouteConfigurationReady() bool {
   103  	// Safe to read cancels because only written to in other parts of xDS Client
   104  	// Callbacks, which are sync.
   105  	return len(rh.updates) == len(rh.cancels)
   106  }
   107  
   108  // Must be called from an xDS Client Callback.
   109  func (rh *rdsHandler) handleRouteUpdate(routeName string, update rdsWatcherUpdate) {
   110  	rwu := rh.updates[routeName]
   111  
   112  	// Accept the new update if any of the following are true:
   113  	// 1. we had no valid update data.
   114  	// 2. the update is valid.
   115  	// 3. the update error is ResourceNotFound.
   116  	if rwu.data == nil || update.err == nil || xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
   117  		rwu = update
   118  	}
   119  	rh.updates[routeName] = rwu
   120  	rh.callback(routeName, rwu)
   121  }
   122  
   123  // close() is meant to be called by wrapped listener when the wrapped listener
   124  // is closed, and it cleans up resources by canceling all the active RDS
   125  // watches.
   126  func (rh *rdsHandler) close() {
   127  	rh.mu.Lock()
   128  	defer rh.mu.Unlock()
   129  	for _, cancel := range rh.cancels {
   130  		cancel()
   131  	}
   132  }
   133  
   134  type rdsWatcherUpdate struct {
   135  	data *xdsresource.RouteConfigUpdate
   136  	err  error
   137  }
   138  
   139  // rdsWatcher implements the xdsresource.RouteConfigWatcher interface and is
   140  // passed to the WatchRouteConfig API.
   141  type rdsWatcher struct {
   142  	parent    *rdsHandler
   143  	logger    *igrpclog.PrefixLogger
   144  	routeName string
   145  
   146  	mu       sync.Mutex
   147  	canceled bool // eats callbacks if true
   148  }
   149  
   150  func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) {
   151  	rw.mu.Lock()
   152  	if rw.canceled {
   153  		rw.mu.Unlock()
   154  		return
   155  	}
   156  	rw.mu.Unlock()
   157  	if rw.logger.V(2) {
   158  		rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource)
   159  	}
   160  	rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{data: &update.Resource})
   161  }
   162  
   163  func (rw *rdsWatcher) OnError(err error) {
   164  	rw.mu.Lock()
   165  	if rw.canceled {
   166  		rw.mu.Unlock()
   167  		return
   168  	}
   169  	rw.mu.Unlock()
   170  	if rw.logger.V(2) {
   171  		rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err)
   172  	}
   173  	rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
   174  }
   175  
   176  func (rw *rdsWatcher) OnResourceDoesNotExist() {
   177  	rw.mu.Lock()
   178  	if rw.canceled {
   179  		rw.mu.Unlock()
   180  		return
   181  	}
   182  	rw.mu.Unlock()
   183  	if rw.logger.V(2) {
   184  		rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName)
   185  	}
   186  	err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName)
   187  	rw.parent.handleRouteUpdate(rw.routeName, rdsWatcherUpdate{err: err})
   188  }
   189  

View as plain text