...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterresolver/resource_resolver_dns.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterresolver

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package clusterresolver
    20  
    21  import (
    22  	"fmt"
    23  	"net/url"
    24  	"sync"
    25  
    26  	"google.golang.org/grpc/internal/grpclog"
    27  	"google.golang.org/grpc/internal/pretty"
    28  	"google.golang.org/grpc/resolver"
    29  	"google.golang.org/grpc/serviceconfig"
    30  )
    31  
    32  var (
    33  	newDNS = func(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    34  		// The dns resolver is registered by the grpc package. So, this call to
    35  		// resolver.Get() is never expected to return nil.
    36  		return resolver.Get("dns").Build(target, cc, opts)
    37  	}
    38  )
    39  
    40  // dnsDiscoveryMechanism watches updates for the given DNS hostname.
    41  //
    42  // It implements resolver.ClientConn interface to work with the DNS resolver.
    43  type dnsDiscoveryMechanism struct {
    44  	target           string
    45  	topLevelResolver topLevelResolver
    46  	dnsR             resolver.Resolver
    47  	logger           *grpclog.PrefixLogger
    48  
    49  	mu             sync.Mutex
    50  	addrs          []string
    51  	updateReceived bool
    52  }
    53  
    54  // newDNSResolver creates an endpoints resolver which uses a DNS resolver under
    55  // the hood.
    56  //
    57  // An error in parsing the provided target string or an error in creating a DNS
    58  // resolver means that we will never be able to resolve the provided target
    59  // strings to endpoints. The topLevelResolver propagates address updates to the
    60  // clusterresolver LB policy **only** after it receives updates from all its
    61  // child resolvers. Therefore, an error here means that the topLevelResolver
    62  // will never send address updates to the clusterresolver LB policy.
    63  //
    64  // Calling the onError() callback will ensure that this error is
    65  // propagated to the child policy which eventually move the channel to
    66  // transient failure.
    67  //
    68  // The `dnsR` field is unset if we run into errors in this function. Therefore, a
    69  // nil check is required wherever we access that field.
    70  func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *grpclog.PrefixLogger) *dnsDiscoveryMechanism {
    71  	ret := &dnsDiscoveryMechanism{
    72  		target:           target,
    73  		topLevelResolver: topLevelResolver,
    74  		logger:           logger,
    75  	}
    76  	u, err := url.Parse("dns:///" + target)
    77  	if err != nil {
    78  		if ret.logger.V(2) {
    79  			ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target)
    80  		}
    81  		ret.updateReceived = true
    82  		ret.topLevelResolver.onUpdate()
    83  		return ret
    84  	}
    85  
    86  	r, err := newDNS(resolver.Target{URL: *u}, ret, resolver.BuildOptions{})
    87  	if err != nil {
    88  		if ret.logger.V(2) {
    89  			ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err)
    90  		}
    91  		ret.updateReceived = true
    92  		ret.topLevelResolver.onUpdate()
    93  		return ret
    94  	}
    95  	ret.dnsR = r
    96  	return ret
    97  }
    98  
    99  func (dr *dnsDiscoveryMechanism) lastUpdate() (any, bool) {
   100  	dr.mu.Lock()
   101  	defer dr.mu.Unlock()
   102  
   103  	if !dr.updateReceived {
   104  		return nil, false
   105  	}
   106  	return dr.addrs, true
   107  }
   108  
   109  func (dr *dnsDiscoveryMechanism) resolveNow() {
   110  	if dr.dnsR != nil {
   111  		dr.dnsR.ResolveNow(resolver.ResolveNowOptions{})
   112  	}
   113  }
   114  
   115  // The definition of stop() mentions that implementations must not invoke any
   116  // methods on the topLevelResolver once the call to `stop()` returns. The
   117  // underlying dns resolver does not send any updates to the resolver.ClientConn
   118  // interface passed to it (implemented by dnsDiscoveryMechanism in this case)
   119  // after its `Close()` returns. Therefore, we can guarantee that no methods of
   120  // the topLevelResolver are invoked after we return from this method.
   121  func (dr *dnsDiscoveryMechanism) stop() {
   122  	if dr.dnsR != nil {
   123  		dr.dnsR.Close()
   124  	}
   125  }
   126  
   127  // dnsDiscoveryMechanism needs to implement resolver.ClientConn interface to receive
   128  // updates from the real DNS resolver.
   129  
   130  func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
   131  	if dr.logger.V(2) {
   132  		dr.logger.Infof("DNS discovery mechanism for resource %q reported an update: %s", dr.target, pretty.ToJSON(state))
   133  	}
   134  
   135  	dr.mu.Lock()
   136  	var addrs []string
   137  	if len(state.Endpoints) > 0 {
   138  		// Assume 1 address per endpoint, which is how DNS is expected to
   139  		// behave.  The slice will grow as needed, however.
   140  		addrs = make([]string, 0, len(state.Endpoints))
   141  		for _, e := range state.Endpoints {
   142  			for _, a := range e.Addresses {
   143  				addrs = append(addrs, a.Addr)
   144  			}
   145  		}
   146  	} else {
   147  		addrs = make([]string, len(state.Addresses))
   148  		for i, a := range state.Addresses {
   149  			addrs[i] = a.Addr
   150  		}
   151  	}
   152  	dr.addrs = addrs
   153  	dr.updateReceived = true
   154  	dr.mu.Unlock()
   155  
   156  	dr.topLevelResolver.onUpdate()
   157  	return nil
   158  }
   159  
   160  func (dr *dnsDiscoveryMechanism) ReportError(err error) {
   161  	if dr.logger.V(2) {
   162  		dr.logger.Infof("DNS discovery mechanism for resource %q reported error: %v", dr.target, err)
   163  	}
   164  
   165  	dr.mu.Lock()
   166  	// If a previous good update was received, suppress the error and continue
   167  	// using the previous update. If RPCs were succeeding prior to this, they
   168  	// will continue to do so. Also suppress errors if we previously received an
   169  	// error, since there will be no downstream effects of propagating this
   170  	// error.
   171  	if dr.updateReceived {
   172  		dr.mu.Unlock()
   173  		return
   174  	}
   175  	dr.addrs = nil
   176  	dr.updateReceived = true
   177  	dr.mu.Unlock()
   178  
   179  	dr.topLevelResolver.onUpdate()
   180  }
   181  
   182  func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {
   183  	dr.UpdateState(resolver.State{Addresses: addresses})
   184  }
   185  
   186  func (dr *dnsDiscoveryMechanism) ParseServiceConfig(string) *serviceconfig.ParseResult {
   187  	return &serviceconfig.ParseResult{Err: fmt.Errorf("service config not supported")}
   188  }
   189  

View as plain text