...

Source file src/github.com/go-kit/kit/sd/dnssrv/instancer.go

Documentation: github.com/go-kit/kit/sd/dnssrv

     1  package dnssrv
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"net"
     7  	"time"
     8  
     9  	"github.com/go-kit/kit/sd"
    10  	"github.com/go-kit/kit/sd/internal/instance"
    11  	"github.com/go-kit/log"
    12  )
    13  
    14  // ErrPortZero is returned by the resolve machinery
    15  // when a DNS resolver returns an SRV record with its
    16  // port set to zero.
    17  var ErrPortZero = errors.New("resolver returned SRV record with port 0")
    18  
    19  // Instancer yields instances from the named DNS SRV record. The name is
    20  // resolved on a fixed schedule. Priorities and weights are ignored.
    21  type Instancer struct {
    22  	cache  *instance.Cache
    23  	name   string
    24  	logger log.Logger
    25  	quit   chan struct{}
    26  }
    27  
    28  // NewInstancer returns a DNS SRV instancer.
    29  func NewInstancer(
    30  	name string,
    31  	ttl time.Duration,
    32  	logger log.Logger,
    33  ) *Instancer {
    34  	return NewInstancerDetailed(name, time.NewTicker(ttl), net.LookupSRV, logger)
    35  }
    36  
    37  // NewInstancerDetailed is the same as NewInstancer, but allows users to
    38  // provide an explicit lookup refresh ticker instead of a TTL, and specify the
    39  // lookup function instead of using net.LookupSRV.
    40  func NewInstancerDetailed(
    41  	name string,
    42  	refresh *time.Ticker,
    43  	lookup Lookup,
    44  	logger log.Logger,
    45  ) *Instancer {
    46  	p := &Instancer{
    47  		cache:  instance.NewCache(),
    48  		name:   name,
    49  		logger: logger,
    50  		quit:   make(chan struct{}),
    51  	}
    52  
    53  	instances, err := p.resolve(lookup)
    54  	if err == nil {
    55  		logger.Log("name", name, "instances", len(instances))
    56  	} else {
    57  		logger.Log("name", name, "err", err)
    58  	}
    59  	p.cache.Update(sd.Event{Instances: instances, Err: err})
    60  
    61  	go p.loop(refresh, lookup)
    62  	return p
    63  }
    64  
    65  // Stop terminates the Instancer.
    66  func (in *Instancer) Stop() {
    67  	close(in.quit)
    68  }
    69  
    70  func (in *Instancer) loop(t *time.Ticker, lookup Lookup) {
    71  	defer t.Stop()
    72  	for {
    73  		select {
    74  		case <-t.C:
    75  			instances, err := in.resolve(lookup)
    76  			if err != nil {
    77  				in.logger.Log("name", in.name, "err", err)
    78  				in.cache.Update(sd.Event{Err: err})
    79  				continue // don't replace potentially-good with bad
    80  			}
    81  			in.cache.Update(sd.Event{Instances: instances})
    82  
    83  		case <-in.quit:
    84  			return
    85  		}
    86  	}
    87  }
    88  
    89  func (in *Instancer) resolve(lookup Lookup) ([]string, error) {
    90  	_, addrs, err := lookup("", "", in.name)
    91  	if err != nil {
    92  		return nil, err
    93  	}
    94  	instances := make([]string, len(addrs))
    95  	for i, addr := range addrs {
    96  		if addr.Port == 0 {
    97  			return nil, ErrPortZero
    98  		}
    99  		instances[i] = net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port))
   100  	}
   101  	return instances, nil
   102  }
   103  
   104  // Register implements Instancer.
   105  func (in *Instancer) Register(ch chan<- sd.Event) {
   106  	in.cache.Register(ch)
   107  }
   108  
   109  // Deregister implements Instancer.
   110  func (in *Instancer) Deregister(ch chan<- sd.Event) {
   111  	in.cache.Deregister(ch)
   112  }
   113  

View as plain text