...

Source file src/github.com/go-kit/kit/sd/consul/instancer.go

Documentation: github.com/go-kit/kit/sd/consul

     1  package consul
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"time"
     7  
     8  	consul "github.com/hashicorp/consul/api"
     9  
    10  	"github.com/go-kit/kit/sd"
    11  	"github.com/go-kit/kit/sd/internal/instance"
    12  	"github.com/go-kit/kit/util/conn"
    13  	"github.com/go-kit/log"
    14  )
    15  
    16  const defaultIndex = 0
    17  
    18  // errStopped notifies the loop to quit. aka stopped via quitc
    19  var errStopped = errors.New("quit and closed consul instancer")
    20  
    21  // Instancer yields instances for a service in Consul.
    22  type Instancer struct {
    23  	cache       *instance.Cache
    24  	client      Client
    25  	logger      log.Logger
    26  	service     string
    27  	tags        []string
    28  	passingOnly bool
    29  	quitc       chan struct{}
    30  }
    31  
    32  // NewInstancer returns a Consul instancer that publishes instances for the
    33  // requested service. It only returns instances for which all of the passed tags
    34  // are present.
    35  func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
    36  	s := &Instancer{
    37  		cache:       instance.NewCache(),
    38  		client:      client,
    39  		logger:      log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
    40  		service:     service,
    41  		tags:        tags,
    42  		passingOnly: passingOnly,
    43  		quitc:       make(chan struct{}),
    44  	}
    45  
    46  	instances, index, err := s.getInstances(defaultIndex, nil)
    47  	if err == nil {
    48  		s.logger.Log("instances", len(instances))
    49  	} else {
    50  		s.logger.Log("err", err)
    51  	}
    52  
    53  	s.cache.Update(sd.Event{Instances: instances, Err: err})
    54  	go s.loop(index)
    55  	return s
    56  }
    57  
    58  // Stop terminates the instancer.
    59  func (s *Instancer) Stop() {
    60  	close(s.quitc)
    61  }
    62  
    63  func (s *Instancer) loop(lastIndex uint64) {
    64  	var (
    65  		instances []string
    66  		err       error
    67  		d         time.Duration = 10 * time.Millisecond
    68  		index     uint64
    69  	)
    70  	for {
    71  		instances, index, err = s.getInstances(lastIndex, s.quitc)
    72  		switch {
    73  		case errors.Is(err, errStopped):
    74  			return // stopped via quitc
    75  		case err != nil:
    76  			s.logger.Log("err", err)
    77  			time.Sleep(d)
    78  			d = conn.Exponential(d)
    79  			s.cache.Update(sd.Event{Err: err})
    80  		case index == defaultIndex:
    81  			s.logger.Log("err", "index is not sane")
    82  			time.Sleep(d)
    83  			d = conn.Exponential(d)
    84  		case index < lastIndex:
    85  			s.logger.Log("err", "index is less than previous; resetting to default")
    86  			lastIndex = defaultIndex
    87  			time.Sleep(d)
    88  			d = conn.Exponential(d)
    89  		default:
    90  			lastIndex = index
    91  			s.cache.Update(sd.Event{Instances: instances})
    92  			d = 10 * time.Millisecond
    93  		}
    94  	}
    95  }
    96  
    97  func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
    98  	tag := ""
    99  	if len(s.tags) > 0 {
   100  		tag = s.tags[0]
   101  	}
   102  
   103  	// Consul doesn't support more than one tag in its service query method.
   104  	// https://github.com/hashicorp/consul/issues/294
   105  	// Hashi suggest prepared queries, but they don't support blocking.
   106  	// https://www.consul.io/docs/agent/http/query.html#execute
   107  	// If we want blocking for efficiency, we must filter tags manually.
   108  
   109  	type response struct {
   110  		instances []string
   111  		index     uint64
   112  	}
   113  
   114  	var (
   115  		errc = make(chan error, 1)
   116  		resc = make(chan response, 1)
   117  	)
   118  
   119  	go func() {
   120  		entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{
   121  			WaitIndex: lastIndex,
   122  		})
   123  		if err != nil {
   124  			errc <- err
   125  			return
   126  		}
   127  		if len(s.tags) > 1 {
   128  			entries = filterEntries(entries, s.tags[1:]...)
   129  		}
   130  		resc <- response{
   131  			instances: makeInstances(entries),
   132  			index:     meta.LastIndex,
   133  		}
   134  	}()
   135  
   136  	select {
   137  	case err := <-errc:
   138  		return nil, 0, err
   139  	case res := <-resc:
   140  		return res.instances, res.index, nil
   141  	case <-interruptc:
   142  		return nil, 0, errStopped
   143  	}
   144  }
   145  
   146  // Register implements Instancer.
   147  func (s *Instancer) Register(ch chan<- sd.Event) {
   148  	s.cache.Register(ch)
   149  }
   150  
   151  // Deregister implements Instancer.
   152  func (s *Instancer) Deregister(ch chan<- sd.Event) {
   153  	s.cache.Deregister(ch)
   154  }
   155  
   156  func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
   157  	var es []*consul.ServiceEntry
   158  
   159  ENTRIES:
   160  	for _, entry := range entries {
   161  		ts := make(map[string]struct{}, len(entry.Service.Tags))
   162  		for _, tag := range entry.Service.Tags {
   163  			ts[tag] = struct{}{}
   164  		}
   165  
   166  		for _, tag := range tags {
   167  			if _, ok := ts[tag]; !ok {
   168  				continue ENTRIES
   169  			}
   170  		}
   171  		es = append(es, entry)
   172  	}
   173  
   174  	return es
   175  }
   176  
   177  func makeInstances(entries []*consul.ServiceEntry) []string {
   178  	instances := make([]string, len(entries))
   179  	for i, entry := range entries {
   180  		addr := entry.Node.Address
   181  		if entry.Service.Address != "" {
   182  			addr = entry.Service.Address
   183  		}
   184  		instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)
   185  	}
   186  	return instances
   187  }
   188  

View as plain text