...

Source file src/github.com/go-kit/kit/sd/eureka/registrar.go

Documentation: github.com/go-kit/kit/sd/eureka

     1  package eureka
     2  
     3  import (
     4  	"fmt"
     5  	"net/http"
     6  	"sync"
     7  	"time"
     8  
     9  	"github.com/hudl/fargo"
    10  
    11  	"github.com/go-kit/kit/sd"
    12  	"github.com/go-kit/log"
    13  )
    14  
    15  // Matches official Netflix Java client default.
    16  const defaultRenewalInterval = 30 * time.Second
    17  
    18  // The methods of fargo.Connection used in this package.
    19  type fargoConnection interface {
    20  	RegisterInstance(instance *fargo.Instance) error
    21  	DeregisterInstance(instance *fargo.Instance) error
    22  	ReregisterInstance(instance *fargo.Instance) error
    23  	HeartBeatInstance(instance *fargo.Instance) error
    24  	ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
    25  	GetApp(name string) (*fargo.Application, error)
    26  }
    27  
    28  type fargoUnsuccessfulHTTPResponse struct {
    29  	statusCode    int
    30  	messagePrefix string
    31  }
    32  
    33  func (u *fargoUnsuccessfulHTTPResponse) Error() string {
    34  	return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
    35  }
    36  
    37  // Registrar maintains service instance liveness information in Eureka.
    38  type Registrar struct {
    39  	conn     fargoConnection
    40  	instance *fargo.Instance
    41  	logger   log.Logger
    42  	quitc    chan chan struct{}
    43  	sync.Mutex
    44  }
    45  
    46  var _ sd.Registrar = (*Registrar)(nil)
    47  
    48  // NewRegistrar returns an Eureka Registrar acting on behalf of the provided
    49  // Fargo connection and instance. See the integration test for usage examples.
    50  func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
    51  	return &Registrar{
    52  		conn:     conn,
    53  		instance: instance,
    54  		logger:   log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
    55  	}
    56  }
    57  
    58  // Register implements sd.Registrar.
    59  func (r *Registrar) Register() {
    60  	r.Lock()
    61  	defer r.Unlock()
    62  
    63  	if r.quitc != nil {
    64  		return // Already in the registration loop.
    65  	}
    66  
    67  	if err := r.conn.RegisterInstance(r.instance); err != nil {
    68  		r.logger.Log("during", "Register", "err", err)
    69  	}
    70  
    71  	r.quitc = make(chan chan struct{})
    72  	go r.loop()
    73  }
    74  
    75  // Deregister implements sd.Registrar.
    76  func (r *Registrar) Deregister() {
    77  	r.Lock()
    78  	defer r.Unlock()
    79  
    80  	if r.quitc == nil {
    81  		return // Already deregistered.
    82  	}
    83  
    84  	q := make(chan struct{})
    85  	r.quitc <- q
    86  	<-q
    87  	r.quitc = nil
    88  }
    89  
    90  func (r *Registrar) loop() {
    91  	var renewalInterval time.Duration
    92  	if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
    93  		renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
    94  	} else {
    95  		renewalInterval = defaultRenewalInterval
    96  	}
    97  	ticker := time.NewTicker(renewalInterval)
    98  	defer ticker.Stop()
    99  
   100  	for {
   101  		select {
   102  		case <-ticker.C:
   103  			if err := r.heartbeat(); err != nil {
   104  				r.logger.Log("during", "heartbeat", "err", err)
   105  			}
   106  
   107  		case q := <-r.quitc:
   108  			if err := r.conn.DeregisterInstance(r.instance); err != nil {
   109  				r.logger.Log("during", "Deregister", "err", err)
   110  			}
   111  			close(q)
   112  			return
   113  		}
   114  	}
   115  }
   116  
   117  func httpResponseStatusCode(err error) (code int, present bool) {
   118  	if code, ok := fargo.HTTPResponseStatusCode(err); ok {
   119  		return code, true
   120  	}
   121  	// Allow injection of errors for testing.
   122  	if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
   123  		return u.statusCode, true
   124  	}
   125  	return 0, false
   126  }
   127  
   128  func isNotFound(err error) bool {
   129  	code, ok := httpResponseStatusCode(err)
   130  	return ok && code == http.StatusNotFound
   131  }
   132  
   133  func (r *Registrar) heartbeat() error {
   134  	err := r.conn.HeartBeatInstance(r.instance)
   135  	if err == nil {
   136  		return nil
   137  	}
   138  	if isNotFound(err) {
   139  		// Instance expired (e.g. network partition). Re-register.
   140  		return r.conn.ReregisterInstance(r.instance)
   141  	}
   142  	return err
   143  }
   144  

View as plain text