...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterresolver

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package clusterresolver
    20  
    21  import (
    22  	"context"
    23  	"sync"
    24  
    25  	"google.golang.org/grpc/internal/grpclog"
    26  	"google.golang.org/grpc/internal/grpcsync"
    27  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    28  )
    29  
    30  // resourceUpdate is a combined update from all the resources, in the order of
    31  // priority. For example, it can be {EDS, EDS, DNS}.
    32  type resourceUpdate struct {
    33  	priorities []priorityConfig
    34  	err        error
    35  }
    36  
    37  // topLevelResolver is used by concrete endpointsResolver implementations for
    38  // reporting updates and errors. The `resourceResolver` type implements this
    39  // interface and takes appropriate actions upon receipt of updates and errors
    40  // from underlying concrete resolvers.
    41  type topLevelResolver interface {
    42  	onUpdate()
    43  }
    44  
    45  // endpointsResolver wraps the functionality to resolve a given resource name to
    46  // a set of endpoints. The mechanism used by concrete implementations depend on
    47  // the supported discovery mechanism type.
    48  type endpointsResolver interface {
    49  	// lastUpdate returns endpoint results from the most recent resolution.
    50  	//
    51  	// The type of the first return result is dependent on the resolver
    52  	// implementation.
    53  	//
    54  	// The second return result indicates whether the resolver was able to
    55  	// successfully resolve the resource name to endpoints. If set to false, the
    56  	// first return result is invalid and must not be used.
    57  	lastUpdate() (any, bool)
    58  
    59  	// resolverNow triggers re-resolution of the resource.
    60  	resolveNow()
    61  
    62  	// stop stops resolution of the resource. Implementations must not invoke
    63  	// any methods on the topLevelResolver interface once `stop()` returns.
    64  	stop()
    65  }
    66  
    67  // discoveryMechanismKey is {type+resource_name}, it's used as the map key, so
    68  // that the same resource resolver can be reused (e.g. when there are two
    69  // mechanisms, both for the same EDS resource, but has different circuit
    70  // breaking config.
    71  type discoveryMechanismKey struct {
    72  	typ  DiscoveryMechanismType
    73  	name string
    74  }
    75  
    76  // discoveryMechanismAndResolver is needed to keep the resolver and the
    77  // discovery mechanism together, because resolvers can be shared. And we need
    78  // the mechanism for fields like circuit breaking, LRS etc when generating the
    79  // balancer config.
    80  type discoveryMechanismAndResolver struct {
    81  	dm DiscoveryMechanism
    82  	r  endpointsResolver
    83  
    84  	childNameGen *nameGenerator
    85  }
    86  
    87  type resourceResolver struct {
    88  	parent           *clusterResolverBalancer
    89  	logger           *grpclog.PrefixLogger
    90  	updateChannel    chan *resourceUpdate
    91  	serializer       *grpcsync.CallbackSerializer
    92  	serializerCancel context.CancelFunc
    93  
    94  	// mu protects the slice and map, and content of the resolvers in the slice.
    95  	mu         sync.Mutex
    96  	mechanisms []DiscoveryMechanism
    97  	children   []discoveryMechanismAndResolver
    98  	// childrenMap's value only needs the resolver implementation (type
    99  	// discoveryMechanism) and the childNameGen. The other two fields are not
   100  	// used.
   101  	//
   102  	// TODO(cleanup): maybe we can make a new type with just the necessary
   103  	// fields, and use it here instead.
   104  	childrenMap map[discoveryMechanismKey]discoveryMechanismAndResolver
   105  	// Each new discovery mechanism needs a child name generator to reuse child
   106  	// policy names. But to make sure the names across discover mechanism
   107  	// doesn't conflict, we need a seq ID. This ID is incremented for each new
   108  	// discover mechanism.
   109  	childNameGeneratorSeqID uint64
   110  }
   111  
   112  func newResourceResolver(parent *clusterResolverBalancer, logger *grpclog.PrefixLogger) *resourceResolver {
   113  	rr := &resourceResolver{
   114  		parent:        parent,
   115  		logger:        logger,
   116  		updateChannel: make(chan *resourceUpdate, 1),
   117  		childrenMap:   make(map[discoveryMechanismKey]discoveryMechanismAndResolver),
   118  	}
   119  	ctx, cancel := context.WithCancel(context.Background())
   120  	rr.serializer = grpcsync.NewCallbackSerializer(ctx)
   121  	rr.serializerCancel = cancel
   122  	return rr
   123  }
   124  
   125  func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool {
   126  	if len(a) != len(b) {
   127  		return false
   128  	}
   129  	for i, aa := range a {
   130  		bb := b[i]
   131  		if !aa.Equal(bb) {
   132  			return false
   133  		}
   134  	}
   135  	return true
   136  }
   137  
   138  func discoveryMechanismToKey(dm DiscoveryMechanism) discoveryMechanismKey {
   139  	switch dm.Type {
   140  	case DiscoveryMechanismTypeEDS:
   141  		nameToWatch := dm.EDSServiceName
   142  		if nameToWatch == "" {
   143  			nameToWatch = dm.Cluster
   144  		}
   145  		return discoveryMechanismKey{typ: dm.Type, name: nameToWatch}
   146  	case DiscoveryMechanismTypeLogicalDNS:
   147  		return discoveryMechanismKey{typ: dm.Type, name: dm.DNSHostname}
   148  	default:
   149  		return discoveryMechanismKey{}
   150  	}
   151  }
   152  
   153  func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) {
   154  	rr.mu.Lock()
   155  	defer rr.mu.Unlock()
   156  	if equalDiscoveryMechanisms(rr.mechanisms, mechanisms) {
   157  		return
   158  	}
   159  	rr.mechanisms = mechanisms
   160  	rr.children = make([]discoveryMechanismAndResolver, len(mechanisms))
   161  	newDMs := make(map[discoveryMechanismKey]bool)
   162  
   163  	// Start one watch for each new discover mechanism {type+resource_name}.
   164  	for i, dm := range mechanisms {
   165  		dmKey := discoveryMechanismToKey(dm)
   166  		newDMs[dmKey] = true
   167  		dmAndResolver, ok := rr.childrenMap[dmKey]
   168  		if ok {
   169  			// If this is not new, keep the fields (especially childNameGen),
   170  			// and only update the DiscoveryMechanism.
   171  			//
   172  			// Note that the same dmKey doesn't mean the same
   173  			// DiscoveryMechanism. There are fields (e.g.
   174  			// MaxConcurrentRequests) in DiscoveryMechanism that are not copied
   175  			// to dmKey, we need to keep those updated.
   176  			dmAndResolver.dm = dm
   177  			rr.children[i] = dmAndResolver
   178  			continue
   179  		}
   180  
   181  		// Create resolver for a newly seen resource.
   182  		var resolver endpointsResolver
   183  		switch dm.Type {
   184  		case DiscoveryMechanismTypeEDS:
   185  			resolver = newEDSResolver(dmKey.name, rr.parent.xdsClient, rr, rr.logger)
   186  		case DiscoveryMechanismTypeLogicalDNS:
   187  			resolver = newDNSResolver(dmKey.name, rr, rr.logger)
   188  		}
   189  		dmAndResolver = discoveryMechanismAndResolver{
   190  			dm:           dm,
   191  			r:            resolver,
   192  			childNameGen: newNameGenerator(rr.childNameGeneratorSeqID),
   193  		}
   194  		rr.childrenMap[dmKey] = dmAndResolver
   195  		rr.children[i] = dmAndResolver
   196  		rr.childNameGeneratorSeqID++
   197  	}
   198  
   199  	// Stop the resources that were removed.
   200  	for dm, r := range rr.childrenMap {
   201  		if !newDMs[dm] {
   202  			delete(rr.childrenMap, dm)
   203  			go r.r.stop()
   204  		}
   205  	}
   206  	// Regenerate even if there's no change in discovery mechanism, in case
   207  	// priority order changed.
   208  	rr.generateLocked()
   209  }
   210  
   211  // resolveNow is typically called to trigger re-resolve of DNS. The EDS
   212  // resolveNow() is a noop.
   213  func (rr *resourceResolver) resolveNow() {
   214  	rr.mu.Lock()
   215  	defer rr.mu.Unlock()
   216  	for _, r := range rr.childrenMap {
   217  		r.r.resolveNow()
   218  	}
   219  }
   220  
   221  func (rr *resourceResolver) stop(closing bool) {
   222  	rr.mu.Lock()
   223  
   224  	// Save the previous childrenMap to stop the children outside the mutex,
   225  	// and reinitialize the map.  We only need to reinitialize to allow for the
   226  	// policy to be reused if the resource comes back.  In practice, this does
   227  	// not happen as the parent LB policy will also be closed, causing this to
   228  	// be removed entirely, but a future use case might want to reuse the
   229  	// policy instead.
   230  	cm := rr.childrenMap
   231  	rr.childrenMap = make(map[discoveryMechanismKey]discoveryMechanismAndResolver)
   232  	rr.mechanisms = nil
   233  	rr.children = nil
   234  
   235  	rr.mu.Unlock()
   236  
   237  	for _, r := range cm {
   238  		r.r.stop()
   239  	}
   240  
   241  	if closing {
   242  		rr.serializerCancel()
   243  		<-rr.serializer.Done()
   244  	}
   245  
   246  	// stop() is called when the LB policy is closed or when the underlying
   247  	// cluster resource is removed by the management server. In the latter case,
   248  	// an empty config update needs to be pushed to the child policy to ensure
   249  	// that a picker that fails RPCs is sent up to the channel.
   250  	//
   251  	// Resource resolver implementations are expected to not send any updates
   252  	// after they are stopped. Therefore, we don't have to worry about another
   253  	// write to this channel happening at the same time as this one.
   254  	select {
   255  	case <-rr.updateChannel:
   256  	default:
   257  	}
   258  	rr.updateChannel <- &resourceUpdate{}
   259  }
   260  
   261  // generateLocked collects updates from all resolvers. It pushes the combined
   262  // result on the update channel if all child resolvers have received at least
   263  // one update. Otherwise it returns early.
   264  //
   265  // caller must hold rr.mu.
   266  func (rr *resourceResolver) generateLocked() {
   267  	var ret []priorityConfig
   268  	for _, rDM := range rr.children {
   269  		u, ok := rDM.r.lastUpdate()
   270  		if !ok {
   271  			// Don't send updates to parent until all resolvers have update to
   272  			// send.
   273  			return
   274  		}
   275  		switch uu := u.(type) {
   276  		case xdsresource.EndpointsUpdate:
   277  			ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu, childNameGen: rDM.childNameGen})
   278  		case []string:
   279  			ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu, childNameGen: rDM.childNameGen})
   280  		}
   281  	}
   282  	select {
   283  	case <-rr.updateChannel:
   284  	default:
   285  	}
   286  	rr.updateChannel <- &resourceUpdate{priorities: ret}
   287  }
   288  
   289  func (rr *resourceResolver) onUpdate() {
   290  	rr.serializer.Schedule(func(context.Context) {
   291  		rr.mu.Lock()
   292  		rr.generateLocked()
   293  		rr.mu.Unlock()
   294  	})
   295  }
   296  

View as plain text