...

Source file src/github.com/go-kit/kit/sd/eureka/instancer.go

Documentation: github.com/go-kit/kit/sd/eureka

     1  package eureka
     2  
     3  import (
     4  	"fmt"
     5  
     6  	"github.com/hudl/fargo"
     7  
     8  	"github.com/go-kit/kit/sd"
     9  	"github.com/go-kit/kit/sd/internal/instance"
    10  	"github.com/go-kit/log"
    11  )
    12  
    13  // Instancer yields instances stored in the Eureka registry for the given app.
    14  // Changes in that app are watched and will update the subscribers.
    15  type Instancer struct {
    16  	cache  *instance.Cache
    17  	conn   fargoConnection
    18  	app    string
    19  	logger log.Logger
    20  	quitc  chan chan struct{}
    21  }
    22  
    23  // NewInstancer returns a Eureka Instancer. It will start watching the given
    24  // app string for changes, and update the subscribers accordingly.
    25  func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instancer {
    26  	logger = log.With(logger, "app", app)
    27  
    28  	s := &Instancer{
    29  		cache:  instance.NewCache(),
    30  		conn:   conn,
    31  		app:    app,
    32  		logger: logger,
    33  		quitc:  make(chan chan struct{}),
    34  	}
    35  
    36  	done := make(chan struct{})
    37  	updates := conn.ScheduleAppUpdates(app, true, done)
    38  	s.consume(<-updates)
    39  	go s.loop(updates, done)
    40  	return s
    41  }
    42  
    43  // Stop terminates the Instancer.
    44  func (s *Instancer) Stop() {
    45  	q := make(chan struct{})
    46  	s.quitc <- q
    47  	<-q
    48  	s.quitc = nil
    49  }
    50  
    51  func (s *Instancer) consume(update fargo.AppUpdate) {
    52  	if update.Err != nil {
    53  		s.logger.Log("during", "Update", "err", update.Err)
    54  		s.cache.Update(sd.Event{Err: update.Err})
    55  		return
    56  	}
    57  	instances := convertFargoAppToInstances(update.App)
    58  	s.logger.Log("instances", len(instances))
    59  	s.cache.Update(sd.Event{Instances: instances})
    60  }
    61  
    62  func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
    63  	defer close(done)
    64  
    65  	for {
    66  		select {
    67  		case update := <-updates:
    68  			s.consume(update)
    69  		case q := <-s.quitc:
    70  			close(q)
    71  			return
    72  		}
    73  	}
    74  }
    75  
    76  func (s *Instancer) getInstances() ([]string, error) {
    77  	app, err := s.conn.GetApp(s.app)
    78  	if err != nil {
    79  		return nil, err
    80  	}
    81  	return convertFargoAppToInstances(app), nil
    82  }
    83  
    84  func convertFargoAppToInstances(app *fargo.Application) []string {
    85  	instances := make([]string, len(app.Instances))
    86  	for i, inst := range app.Instances {
    87  		instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port)
    88  	}
    89  	return instances
    90  }
    91  
    92  // Register implements Instancer.
    93  func (s *Instancer) Register(ch chan<- sd.Event) {
    94  	s.cache.Register(ch)
    95  }
    96  
    97  // Deregister implements Instancer.
    98  func (s *Instancer) Deregister(ch chan<- sd.Event) {
    99  	s.cache.Deregister(ch)
   100  }
   101  
   102  // state returns the current state of instance.Cache, only for testing
   103  func (s *Instancer) state() sd.Event {
   104  	return s.cache.State()
   105  }
   106  

View as plain text