...

Source file src/github.com/letsencrypt/boulder/redis/lookup.go

Documentation: github.com/letsencrypt/boulder/redis

     1  package redis
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"net"
     8  	"strings"
     9  	"time"
    10  
    11  	"github.com/letsencrypt/boulder/cmd"
    12  	blog "github.com/letsencrypt/boulder/log"
    13  	"github.com/prometheus/client_golang/prometheus"
    14  
    15  	"github.com/redis/go-redis/v9"
    16  )
    17  
    18  var ErrNoShardsResolved = errors.New("0 shards were resolved")
    19  
    20  // lookup wraps a Redis ring client by reference and keeps the Redis ring shards
    21  // up to date via periodic SRV lookups.
    22  type lookup struct {
    23  	// srvLookups is a list of SRV records to be looked up.
    24  	srvLookups []cmd.ServiceDomain
    25  
    26  	// updateFrequency is the frequency of periodic SRV lookups. Defaults to 30
    27  	// seconds.
    28  	updateFrequency time.Duration
    29  
    30  	// updateTimeout is the timeout for each SRV lookup. Defaults to 90% of the
    31  	// update frequency.
    32  	updateTimeout time.Duration
    33  
    34  	// dnsAuthority is the single <hostname|IPv4|[IPv6]>:<port> of the DNS
    35  	// server to be used for SRV lookups. If the address contains a hostname it
    36  	// will be resolved via the system DNS. If the port is left unspecified it
    37  	// will default to '53'. If this field is left unspecified the system DNS
    38  	// will be used for resolution.
    39  	dnsAuthority string
    40  
    41  	// stop is a context.CancelFunc that can be used to stop the goroutine
    42  	// responsible for performing periodic SRV lookups.
    43  	stop context.CancelFunc
    44  
    45  	resolver *net.Resolver
    46  	ring     *redis.Ring
    47  	logger   blog.Logger
    48  	stats    prometheus.Registerer
    49  }
    50  
    51  // newLookup constructs and returns a new lookup instance. An initial SRV lookup
    52  // is performed to populate the Redis ring shards. If this lookup fails or
    53  // otherwise results in an empty set of resolved shards, an error is returned.
    54  func newLookup(srvLookups []cmd.ServiceDomain, dnsAuthority string, frequency time.Duration, ring *redis.Ring, logger blog.Logger, stats prometheus.Registerer) (*lookup, error) {
    55  	updateFrequency := frequency
    56  	if updateFrequency <= 0 {
    57  		// Set default frequency.
    58  		updateFrequency = 30 * time.Second
    59  	}
    60  	// Set default timeout to 90% of the update frequency.
    61  	updateTimeout := updateFrequency - updateFrequency/10
    62  
    63  	lookup := &lookup{
    64  		srvLookups:      srvLookups,
    65  		ring:            ring,
    66  		logger:          logger,
    67  		stats:           stats,
    68  		updateFrequency: updateFrequency,
    69  		updateTimeout:   updateTimeout,
    70  		dnsAuthority:    dnsAuthority,
    71  	}
    72  
    73  	if dnsAuthority == "" {
    74  		// Use the system DNS resolver.
    75  		lookup.resolver = net.DefaultResolver
    76  	} else {
    77  		// Setup a custom DNS resolver.
    78  		host, port, err := net.SplitHostPort(dnsAuthority)
    79  		if err != nil {
    80  			// Assume only hostname or IPv4 address was specified.
    81  			host = dnsAuthority
    82  			port = "53"
    83  		}
    84  		lookup.dnsAuthority = net.JoinHostPort(host, port)
    85  		lookup.resolver = &net.Resolver{
    86  			PreferGo: true,
    87  			Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
    88  				// The custom resolver closes over the lookup.dnsAuthority field
    89  				// so it can be swapped out in testing.
    90  				return net.Dial(network, lookup.dnsAuthority)
    91  			},
    92  		}
    93  	}
    94  
    95  	ctx, cancel := context.WithTimeout(context.Background(), updateTimeout)
    96  	defer cancel()
    97  	tempErr, nonTempErr := lookup.updateNow(ctx)
    98  	if tempErr != nil {
    99  		// Log and discard temporary errors, as they're likely to be transient
   100  		// (e.g. network connectivity issues).
   101  		logger.Warningf("resolving ring shards: %s", tempErr)
   102  	}
   103  	if nonTempErr != nil && errors.Is(nonTempErr, ErrNoShardsResolved) {
   104  		// Non-temporary errors are always logged inside of updateNow(), so we
   105  		// only need return the error here if it's ErrNoShardsResolved.
   106  		return nil, nonTempErr
   107  	}
   108  
   109  	return lookup, nil
   110  }
   111  
   112  // updateNow resolves and updates the Redis ring shards accordingly. If all
   113  // lookups fail or otherwise result in an empty set of resolved shards, the
   114  // Redis ring is left unmodified and any errors are returned. If at least one
   115  // lookup succeeds, the Redis ring is updated, and all errors are discarded.
   116  // Non-temporary DNS errors are always logged as they occur, as they're likely
   117  // to be indicative of a misconfiguration.
   118  func (look *lookup) updateNow(ctx context.Context) (tempError, nonTempError error) {
   119  	var tempErrs []error
   120  	handleDNSError := func(err error, srv cmd.ServiceDomain) {
   121  		var dnsErr *net.DNSError
   122  		if errors.As(err, &dnsErr) && (dnsErr.IsTimeout || dnsErr.IsTemporary) {
   123  			tempErrs = append(tempErrs, err)
   124  			return
   125  		}
   126  		// Log non-temporary DNS errors as they occur, as they're likely to be
   127  		// indicative of misconfiguration.
   128  		look.logger.Errf("resolving service _%s._tcp.%s: %s", srv.Service, srv.Domain, err)
   129  	}
   130  
   131  	nextAddrs := make(map[string]string)
   132  	for _, srv := range look.srvLookups {
   133  		_, targets, err := look.resolver.LookupSRV(ctx, srv.Service, "tcp", srv.Domain)
   134  		if err != nil {
   135  			handleDNSError(err, srv)
   136  			// Skip to the next SRV lookup.
   137  			continue
   138  		}
   139  		if len(targets) <= 0 {
   140  			tempErrs = append(tempErrs, fmt.Errorf("0 targets resolved for service \"_%s._tcp.%s\"", srv.Service, srv.Domain))
   141  			// Skip to the next SRV lookup.
   142  			continue
   143  		}
   144  
   145  		for _, target := range targets {
   146  			host := strings.TrimRight(target.Target, ".")
   147  			if look.dnsAuthority != "" {
   148  				// Lookup A/AAAA records for the SRV target using the custom DNS
   149  				// authority.
   150  				hostAddrs, err := look.resolver.LookupHost(ctx, host)
   151  				if err != nil {
   152  					handleDNSError(err, srv)
   153  					// Skip to the next A/AAAA lookup.
   154  					continue
   155  				}
   156  				if len(hostAddrs) <= 0 {
   157  					tempErrs = append(tempErrs, fmt.Errorf("0 addrs resolved for target %q of service \"_%s._tcp.%s\"", host, srv.Service, srv.Domain))
   158  					// Skip to the next A/AAAA lookup.
   159  					continue
   160  				}
   161  				// Use the first resolved IP address.
   162  				host = hostAddrs[0]
   163  			}
   164  			addr := fmt.Sprintf("%s:%d", host, target.Port)
   165  			nextAddrs[addr] = addr
   166  		}
   167  	}
   168  
   169  	// Only return errors if we failed to resolve any shards.
   170  	if len(nextAddrs) <= 0 {
   171  		return errors.Join(tempErrs...), ErrNoShardsResolved
   172  	}
   173  
   174  	// Some shards were resolved, update the Redis ring and discard all errors.
   175  	look.ring.SetAddrs(nextAddrs)
   176  
   177  	// Update the Redis client metrics.
   178  	MustRegisterClientMetricsCollector(look.ring, look.stats, nextAddrs, look.ring.Options().Username)
   179  
   180  	return nil, nil
   181  }
   182  
   183  // start starts a goroutine that keeps the Redis ring shards up-to-date by
   184  // periodically performing SRV lookups.
   185  func (look *lookup) start() {
   186  	var lookupCtx context.Context
   187  	lookupCtx, look.stop = context.WithCancel(context.Background())
   188  	go func() {
   189  		ticker := time.NewTicker(look.updateFrequency)
   190  		defer ticker.Stop()
   191  		for {
   192  			// Check for context cancellation before we do any work.
   193  			if lookupCtx.Err() != nil {
   194  				return
   195  			}
   196  
   197  			timeoutCtx, cancel := context.WithTimeout(lookupCtx, look.updateTimeout)
   198  			tempErrs, nonTempErrs := look.updateNow(timeoutCtx)
   199  			cancel()
   200  			if tempErrs != nil {
   201  				look.logger.Warningf("resolving ring shards, temporary errors: %s", tempErrs)
   202  				continue
   203  			}
   204  			if nonTempErrs != nil {
   205  				look.logger.Errf("resolving ring shards, non-temporary errors: %s", nonTempErrs)
   206  				continue
   207  			}
   208  
   209  			select {
   210  			case <-ticker.C:
   211  				continue
   212  
   213  			case <-lookupCtx.Done():
   214  				return
   215  			}
   216  		}
   217  	}()
   218  }
   219  

View as plain text