...

Source file src/github.com/go-kit/kit/sd/endpoint_cache.go

Documentation: github.com/go-kit/kit/sd

     1  package sd
     2  
     3  import (
     4  	"io"
     5  	"sort"
     6  	"sync"
     7  	"time"
     8  
     9  	"github.com/go-kit/kit/endpoint"
    10  	"github.com/go-kit/log"
    11  )
    12  
    13  // endpointCache collects the most recent set of instances from a service discovery
    14  // system, creates endpoints for them using a factory function, and makes
    15  // them available to consumers.
    16  type endpointCache struct {
    17  	options            endpointerOptions
    18  	mtx                sync.RWMutex
    19  	factory            Factory
    20  	cache              map[string]endpointCloser
    21  	err                error
    22  	endpoints          []endpoint.Endpoint
    23  	logger             log.Logger
    24  	invalidateDeadline time.Time
    25  	timeNow            func() time.Time
    26  }
    27  
    28  type endpointCloser struct {
    29  	endpoint.Endpoint
    30  	io.Closer
    31  }
    32  
    33  // newEndpointCache returns a new, empty endpointCache.
    34  func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache {
    35  	return &endpointCache{
    36  		options: options,
    37  		factory: factory,
    38  		cache:   map[string]endpointCloser{},
    39  		logger:  logger,
    40  		timeNow: time.Now,
    41  	}
    42  }
    43  
    44  // Update should be invoked by clients with a complete set of current instance
    45  // strings whenever that set changes. The cache manufactures new endpoints via
    46  // the factory, closes old endpoints when they disappear, and persists existing
    47  // endpoints if they survive through an update.
    48  func (c *endpointCache) Update(event Event) {
    49  	c.mtx.Lock()
    50  	defer c.mtx.Unlock()
    51  
    52  	// Happy path.
    53  	if event.Err == nil {
    54  		c.updateCache(event.Instances)
    55  		c.err = nil
    56  		return
    57  	}
    58  
    59  	// Sad path. Something's gone wrong in sd.
    60  	c.logger.Log("err", event.Err)
    61  	if !c.options.invalidateOnError {
    62  		return // keep returning the last known endpoints on error
    63  	}
    64  	if c.err != nil {
    65  		return // already in the error state, do nothing & keep original error
    66  	}
    67  	c.err = event.Err
    68  	// set new deadline to invalidate Endpoints unless non-error Event is received
    69  	c.invalidateDeadline = c.timeNow().Add(c.options.invalidateTimeout)
    70  	return
    71  }
    72  
    73  func (c *endpointCache) updateCache(instances []string) {
    74  	// Deterministic order (for later).
    75  	sort.Strings(instances)
    76  
    77  	// Produce the current set of services.
    78  	cache := make(map[string]endpointCloser, len(instances))
    79  	for _, instance := range instances {
    80  		// If it already exists, just copy it over.
    81  		if sc, ok := c.cache[instance]; ok {
    82  			cache[instance] = sc
    83  			delete(c.cache, instance)
    84  			continue
    85  		}
    86  
    87  		// If it doesn't exist, create it.
    88  		service, closer, err := c.factory(instance)
    89  		if err != nil {
    90  			c.logger.Log("instance", instance, "err", err)
    91  			continue
    92  		}
    93  		cache[instance] = endpointCloser{service, closer}
    94  	}
    95  
    96  	// Close any leftover endpoints.
    97  	for _, sc := range c.cache {
    98  		if sc.Closer != nil {
    99  			sc.Closer.Close()
   100  		}
   101  	}
   102  
   103  	// Populate the slice of endpoints.
   104  	endpoints := make([]endpoint.Endpoint, 0, len(cache))
   105  	for _, instance := range instances {
   106  		// A bad factory may mean an instance is not present.
   107  		if _, ok := cache[instance]; !ok {
   108  			continue
   109  		}
   110  		endpoints = append(endpoints, cache[instance].Endpoint)
   111  	}
   112  
   113  	// Swap and trigger GC for old copies.
   114  	c.endpoints = endpoints
   115  	c.cache = cache
   116  }
   117  
   118  // Endpoints yields the current set of (presumably identical) endpoints, ordered
   119  // lexicographically by the corresponding instance string.
   120  func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) {
   121  	// in the steady state we're going to have many goroutines calling Endpoints()
   122  	// concurrently, so to minimize contention we use a shared R-lock.
   123  	c.mtx.RLock()
   124  
   125  	if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
   126  		defer c.mtx.RUnlock()
   127  		return c.endpoints, nil
   128  	}
   129  
   130  	c.mtx.RUnlock()
   131  
   132  	// in case of an error, switch to an exclusive lock.
   133  	c.mtx.Lock()
   134  	defer c.mtx.Unlock()
   135  
   136  	// re-check condition due to a race between RUnlock() and Lock().
   137  	if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
   138  		return c.endpoints, nil
   139  	}
   140  
   141  	c.updateCache(nil) // close any remaining active endpoints
   142  	return nil, c.err
   143  }
   144  

View as plain text