...

Source file src/github.com/go-kit/kit/sd/etcd/instancer.go

Documentation: github.com/go-kit/kit/sd/etcd

     1  package etcd
     2  
     3  import (
     4  	"github.com/go-kit/kit/sd"
     5  	"github.com/go-kit/kit/sd/internal/instance"
     6  	"github.com/go-kit/log"
     7  )
     8  
     9  // Instancer yields instances stored in a certain etcd keyspace. Any kind of
    10  // change in that keyspace is watched and will update the Instancer's Instancers.
    11  type Instancer struct {
    12  	cache  *instance.Cache
    13  	client Client
    14  	prefix string
    15  	logger log.Logger
    16  	quitc  chan struct{}
    17  }
    18  
    19  // NewInstancer returns an etcd instancer. It will start watching the given
    20  // prefix for changes, and update the subscribers.
    21  func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error) {
    22  	s := &Instancer{
    23  		client: c,
    24  		prefix: prefix,
    25  		cache:  instance.NewCache(),
    26  		logger: logger,
    27  		quitc:  make(chan struct{}),
    28  	}
    29  
    30  	instances, err := s.client.GetEntries(s.prefix)
    31  	if err == nil {
    32  		logger.Log("prefix", s.prefix, "instances", len(instances))
    33  	} else {
    34  		logger.Log("prefix", s.prefix, "err", err)
    35  	}
    36  	s.cache.Update(sd.Event{Instances: instances, Err: err})
    37  
    38  	go s.loop()
    39  	return s, nil
    40  }
    41  
    42  func (s *Instancer) loop() {
    43  	ch := make(chan struct{})
    44  	go s.client.WatchPrefix(s.prefix, ch)
    45  	for {
    46  		select {
    47  		case <-ch:
    48  			instances, err := s.client.GetEntries(s.prefix)
    49  			if err != nil {
    50  				s.logger.Log("msg", "failed to retrieve entries", "err", err)
    51  				s.cache.Update(sd.Event{Err: err})
    52  				continue
    53  			}
    54  			s.cache.Update(sd.Event{Instances: instances})
    55  
    56  		case <-s.quitc:
    57  			return
    58  		}
    59  	}
    60  }
    61  
    62  // Stop terminates the Instancer.
    63  func (s *Instancer) Stop() {
    64  	close(s.quitc)
    65  }
    66  
    67  // Register implements Instancer.
    68  func (s *Instancer) Register(ch chan<- sd.Event) {
    69  	s.cache.Register(ch)
    70  }
    71  
    72  // Deregister implements Instancer.
    73  func (s *Instancer) Deregister(ch chan<- sd.Event) {
    74  	s.cache.Deregister(ch)
    75  }
    76  

View as plain text