...

Source file src/github.com/go-kit/kit/sd/internal/instance/cache.go

Documentation: github.com/go-kit/kit/sd/internal/instance

     1  package instance
     2  
     3  import (
     4  	"reflect"
     5  	"sort"
     6  	"sync"
     7  
     8  	"github.com/go-kit/kit/sd"
     9  )
    10  
    11  // Cache keeps track of resource instances provided to it via Update method
    12  // and implements the Instancer interface
    13  type Cache struct {
    14  	mtx   sync.RWMutex
    15  	state sd.Event
    16  	reg   registry
    17  }
    18  
    19  // NewCache creates a new Cache.
    20  func NewCache() *Cache {
    21  	return &Cache{
    22  		reg: registry{},
    23  	}
    24  }
    25  
    26  // Update receives new instances from service discovery, stores them internally,
    27  // and notifies all registered listeners.
    28  func (c *Cache) Update(event sd.Event) {
    29  	c.mtx.Lock()
    30  	defer c.mtx.Unlock()
    31  
    32  	sort.Strings(event.Instances)
    33  	if reflect.DeepEqual(c.state, event) {
    34  		return // no need to broadcast the same instances
    35  	}
    36  
    37  	c.state = event
    38  	c.reg.broadcast(event)
    39  }
    40  
    41  // State returns the current state of discovery (instances or error) as sd.Event
    42  func (c *Cache) State() sd.Event {
    43  	c.mtx.RLock()
    44  	event := c.state
    45  	c.mtx.RUnlock()
    46  	eventCopy := copyEvent(event)
    47  	return eventCopy
    48  }
    49  
    50  // Stop implements Instancer. Since the cache is just a plain-old store of data,
    51  // Stop is a no-op.
    52  func (c *Cache) Stop() {}
    53  
    54  // Register implements Instancer.
    55  func (c *Cache) Register(ch chan<- sd.Event) {
    56  	c.mtx.Lock()
    57  	defer c.mtx.Unlock()
    58  	c.reg.register(ch)
    59  	event := c.state
    60  	eventCopy := copyEvent(event)
    61  	// always push the current state to new channels
    62  	ch <- eventCopy
    63  }
    64  
    65  // Deregister implements Instancer.
    66  func (c *Cache) Deregister(ch chan<- sd.Event) {
    67  	c.mtx.Lock()
    68  	defer c.mtx.Unlock()
    69  	c.reg.deregister(ch)
    70  }
    71  
    72  // registry is not goroutine-safe.
    73  type registry map[chan<- sd.Event]struct{}
    74  
    75  func (r registry) broadcast(event sd.Event) {
    76  	for c := range r {
    77  		eventCopy := copyEvent(event)
    78  		c <- eventCopy
    79  	}
    80  }
    81  
    82  func (r registry) register(c chan<- sd.Event) {
    83  	r[c] = struct{}{}
    84  }
    85  
    86  func (r registry) deregister(c chan<- sd.Event) {
    87  	delete(r, c)
    88  }
    89  
    90  // copyEvent does a deep copy on sd.Event
    91  func copyEvent(e sd.Event) sd.Event {
    92  	// observers all need their own copy of event
    93  	// because they can directly modify event.Instances
    94  	// for example, by calling sort.Strings
    95  	if e.Instances == nil {
    96  		return e
    97  	}
    98  	instances := make([]string, len(e.Instances))
    99  	copy(instances, e.Instances)
   100  	e.Instances = instances
   101  	return e
   102  }
   103  

View as plain text