...

Source file src/github.com/go-kit/kit/sd/etcdv3/instancer.go

Documentation: github.com/go-kit/kit/sd/etcdv3

     1  package etcdv3
     2  
     3  import (
     4  	"github.com/go-kit/kit/sd"
     5  	"github.com/go-kit/kit/sd/internal/instance"
     6  	"github.com/go-kit/log"
     7  )
     8  
     9  // Instancer yields instances stored in a certain etcd keyspace. Any kind of
    10  // change in that keyspace is watched and will update the Instancer's Instancers.
    11  type Instancer struct {
    12  	cache  *instance.Cache
    13  	client Client
    14  	prefix string
    15  	logger log.Logger
    16  	quitc  chan struct{}
    17  }
    18  
    19  // NewInstancer returns an etcd instancer. It will start watching the given
    20  // prefix for changes, and update the subscribers.
    21  func NewInstancer(c Client, prefix string, logger log.Logger) (*Instancer, error) {
    22  	s := &Instancer{
    23  		client: c,
    24  		prefix: prefix,
    25  		cache:  instance.NewCache(),
    26  		logger: logger,
    27  		quitc:  make(chan struct{}),
    28  	}
    29  
    30  	instances, err := s.client.GetEntries(s.prefix)
    31  	if err == nil {
    32  		logger.Log("prefix", s.prefix, "instances", len(instances))
    33  	} else {
    34  		logger.Log("prefix", s.prefix, "err", err)
    35  	}
    36  	s.cache.Update(sd.Event{Instances: instances, Err: err})
    37  
    38  	go s.loop()
    39  	return s, nil
    40  }
    41  
    42  func (s *Instancer) loop() {
    43  	ch := make(chan struct{})
    44  	go s.client.WatchPrefix(s.prefix, ch)
    45  
    46  	for {
    47  		select {
    48  		case <-ch:
    49  			instances, err := s.client.GetEntries(s.prefix)
    50  			if err != nil {
    51  				s.logger.Log("msg", "failed to retrieve entries", "err", err)
    52  				s.cache.Update(sd.Event{Err: err})
    53  				continue
    54  			}
    55  			s.cache.Update(sd.Event{Instances: instances})
    56  
    57  		case <-s.quitc:
    58  			return
    59  		}
    60  	}
    61  }
    62  
    63  // Stop terminates the Instancer.
    64  func (s *Instancer) Stop() {
    65  	close(s.quitc)
    66  }
    67  
    68  // Register implements Instancer.
    69  func (s *Instancer) Register(ch chan<- sd.Event) {
    70  	s.cache.Register(ch)
    71  }
    72  
    73  // Deregister implements Instancer.
    74  func (s *Instancer) Deregister(ch chan<- sd.Event) {
    75  	s.cache.Deregister(ch)
    76  }
    77  

View as plain text