...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver_eds.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterresolver

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package clusterresolver
    20  
    21  import (
    22  	"sync"
    23  
    24  	"google.golang.org/grpc/internal/grpclog"
    25  	"google.golang.org/grpc/internal/grpcsync"
    26  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    27  )
    28  
    29  type edsDiscoveryMechanism struct {
    30  	nameToWatch      string
    31  	cancelWatch      func()
    32  	topLevelResolver topLevelResolver
    33  	stopped          *grpcsync.Event
    34  	logger           *grpclog.PrefixLogger
    35  
    36  	mu     sync.Mutex
    37  	update *xdsresource.EndpointsUpdate // Nil indicates no update received so far.
    38  }
    39  
    40  func (er *edsDiscoveryMechanism) lastUpdate() (any, bool) {
    41  	er.mu.Lock()
    42  	defer er.mu.Unlock()
    43  
    44  	if er.update == nil {
    45  		return nil, false
    46  	}
    47  	return *er.update, true
    48  }
    49  
    50  func (er *edsDiscoveryMechanism) resolveNow() {
    51  }
    52  
    53  // The definition of stop() mentions that implementations must not invoke any
    54  // methods on the topLevelResolver once the call to `stop()` returns.
    55  func (er *edsDiscoveryMechanism) stop() {
    56  	// Canceling a watch with the xDS client can race with an xDS response
    57  	// received around the same time, and can result in the watch callback being
    58  	// invoked after the watch is canceled. Callers need to handle this race,
    59  	// and we fire the stopped event here to ensure that a watch callback
    60  	// invocation around the same time becomes a no-op.
    61  	er.stopped.Fire()
    62  	er.cancelWatch()
    63  }
    64  
    65  // newEDSResolver returns an implementation of the endpointsResolver interface
    66  // that uses EDS to resolve the given name to endpoints.
    67  func newEDSResolver(nameToWatch string, producer xdsresource.Producer, topLevelResolver topLevelResolver, logger *grpclog.PrefixLogger) *edsDiscoveryMechanism {
    68  	ret := &edsDiscoveryMechanism{
    69  		nameToWatch:      nameToWatch,
    70  		topLevelResolver: topLevelResolver,
    71  		logger:           logger,
    72  		stopped:          grpcsync.NewEvent(),
    73  	}
    74  	ret.cancelWatch = xdsresource.WatchEndpoints(producer, nameToWatch, ret)
    75  	return ret
    76  }
    77  
    78  // OnUpdate is invoked to report an update for the resource being watched.
    79  func (er *edsDiscoveryMechanism) OnUpdate(update *xdsresource.EndpointsResourceData) {
    80  	if er.stopped.HasFired() {
    81  		return
    82  	}
    83  
    84  	er.mu.Lock()
    85  	er.update = &update.Resource
    86  	er.mu.Unlock()
    87  
    88  	er.topLevelResolver.onUpdate()
    89  }
    90  
    91  func (er *edsDiscoveryMechanism) OnError(err error) {
    92  	if er.stopped.HasFired() {
    93  		return
    94  	}
    95  
    96  	if er.logger.V(2) {
    97  		er.logger.Infof("EDS discovery mechanism for resource %q reported error: %v", er.nameToWatch, err)
    98  	}
    99  
   100  	er.mu.Lock()
   101  	if er.update != nil {
   102  		// Continue using a previously received good configuration if one
   103  		// exists.
   104  		er.mu.Unlock()
   105  		return
   106  	}
   107  
   108  	// Else report an empty update that would result in no priority child being
   109  	// created for this discovery mechanism. This would result in the priority
   110  	// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
   111  	// localities) if this was the only discovery mechanism, or would result in
   112  	// the priority LB policy using a lower priority discovery mechanism when
   113  	// that becomes available.
   114  	er.update = &xdsresource.EndpointsUpdate{}
   115  	er.mu.Unlock()
   116  
   117  	er.topLevelResolver.onUpdate()
   118  }
   119  
   120  func (er *edsDiscoveryMechanism) OnResourceDoesNotExist() {
   121  	if er.stopped.HasFired() {
   122  		return
   123  	}
   124  
   125  	if er.logger.V(2) {
   126  		er.logger.Infof("EDS discovery mechanism for resource %q reported resource-does-not-exist error", er.nameToWatch)
   127  	}
   128  
   129  	// Report an empty update that would result in no priority child being
   130  	// created for this discovery mechanism. This would result in the priority
   131  	// LB policy reporting TRANSIENT_FAILURE (as there would be no priorities or
   132  	// localities) if this was the only discovery mechanism, or would result in
   133  	// the priority LB policy using a lower priority discovery mechanism when
   134  	// that becomes available.
   135  	er.mu.Lock()
   136  	er.update = &xdsresource.EndpointsUpdate{}
   137  	er.mu.Unlock()
   138  
   139  	er.topLevelResolver.onUpdate()
   140  }
   141  

View as plain text