...

Source file src/github.com/go-kit/kit/sd/endpointer.go

Documentation: github.com/go-kit/kit/sd

     1  package sd
     2  
     3  import (
     4  	"time"
     5  
     6  	"github.com/go-kit/kit/endpoint"
     7  	"github.com/go-kit/log"
     8  )
     9  
    10  // Endpointer listens to a service discovery system and yields a set of
    11  // identical endpoints on demand. An error indicates a problem with connectivity
    12  // to the service discovery system, or within the system itself; an Endpointer
    13  // may yield no endpoints without error.
    14  type Endpointer interface {
    15  	Endpoints() ([]endpoint.Endpoint, error)
    16  }
    17  
    18  // FixedEndpointer yields a fixed set of endpoints.
    19  type FixedEndpointer []endpoint.Endpoint
    20  
    21  // Endpoints implements Endpointer.
    22  func (s FixedEndpointer) Endpoints() ([]endpoint.Endpoint, error) { return s, nil }
    23  
    24  // NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
    25  // and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
    26  // keeps returning previously created Endpoints assuming they are still good, unless
    27  // this behavior is disabled via InvalidateOnError option.
    28  func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer {
    29  	opts := endpointerOptions{}
    30  	for _, opt := range options {
    31  		opt(&opts)
    32  	}
    33  	se := &DefaultEndpointer{
    34  		cache:     newEndpointCache(f, logger, opts),
    35  		instancer: src,
    36  		ch:        make(chan Event),
    37  	}
    38  	go se.receive()
    39  	src.Register(se.ch)
    40  	return se
    41  }
    42  
    43  // EndpointerOption allows control of endpointCache behavior.
    44  type EndpointerOption func(*endpointerOptions)
    45  
    46  // InvalidateOnError returns EndpointerOption that controls how the Endpointer
    47  // behaves when then Instancer publishes an Event containing an error.
    48  // Without this option the Endpointer continues returning the last known
    49  // endpoints. With this option, the Endpointer continues returning the last
    50  // known endpoints until the timeout elapses, then closes all active endpoints
    51  // and starts returning an error. Once the Instancer sends a new update with
    52  // valid resource instances, the normal operation is resumed.
    53  func InvalidateOnError(timeout time.Duration) EndpointerOption {
    54  	return func(opts *endpointerOptions) {
    55  		opts.invalidateOnError = true
    56  		opts.invalidateTimeout = timeout
    57  	}
    58  }
    59  
    60  type endpointerOptions struct {
    61  	invalidateOnError bool
    62  	invalidateTimeout time.Duration
    63  }
    64  
    65  // DefaultEndpointer implements an Endpointer interface.
    66  // When created with NewEndpointer function, it automatically registers
    67  // as a subscriber to events from the Instances and maintains a list
    68  // of active Endpoints.
    69  type DefaultEndpointer struct {
    70  	cache     *endpointCache
    71  	instancer Instancer
    72  	ch        chan Event
    73  }
    74  
    75  func (de *DefaultEndpointer) receive() {
    76  	for event := range de.ch {
    77  		de.cache.Update(event)
    78  	}
    79  }
    80  
    81  // Close deregisters DefaultEndpointer from the Instancer and stops the internal go-routine.
    82  func (de *DefaultEndpointer) Close() {
    83  	de.instancer.Deregister(de.ch)
    84  	close(de.ch)
    85  }
    86  
    87  // Endpoints implements Endpointer.
    88  func (de *DefaultEndpointer) Endpoints() ([]endpoint.Endpoint, error) {
    89  	return de.cache.Endpoints()
    90  }
    91  

View as plain text