...
1 package instance
2
3 import (
4 "reflect"
5 "sort"
6 "sync"
7
8 "github.com/go-kit/kit/sd"
9 )
10
11
12
13 type Cache struct {
14 mtx sync.RWMutex
15 state sd.Event
16 reg registry
17 }
18
19
20 func NewCache() *Cache {
21 return &Cache{
22 reg: registry{},
23 }
24 }
25
26
27
28 func (c *Cache) Update(event sd.Event) {
29 c.mtx.Lock()
30 defer c.mtx.Unlock()
31
32 sort.Strings(event.Instances)
33 if reflect.DeepEqual(c.state, event) {
34 return
35 }
36
37 c.state = event
38 c.reg.broadcast(event)
39 }
40
41
42 func (c *Cache) State() sd.Event {
43 c.mtx.RLock()
44 event := c.state
45 c.mtx.RUnlock()
46 eventCopy := copyEvent(event)
47 return eventCopy
48 }
49
50
51
52 func (c *Cache) Stop() {}
53
54
55 func (c *Cache) Register(ch chan<- sd.Event) {
56 c.mtx.Lock()
57 defer c.mtx.Unlock()
58 c.reg.register(ch)
59 event := c.state
60 eventCopy := copyEvent(event)
61
62 ch <- eventCopy
63 }
64
65
66 func (c *Cache) Deregister(ch chan<- sd.Event) {
67 c.mtx.Lock()
68 defer c.mtx.Unlock()
69 c.reg.deregister(ch)
70 }
71
72
73 type registry map[chan<- sd.Event]struct{}
74
75 func (r registry) broadcast(event sd.Event) {
76 for c := range r {
77 eventCopy := copyEvent(event)
78 c <- eventCopy
79 }
80 }
81
82 func (r registry) register(c chan<- sd.Event) {
83 r[c] = struct{}{}
84 }
85
86 func (r registry) deregister(c chan<- sd.Event) {
87 delete(r, c)
88 }
89
90
91 func copyEvent(e sd.Event) sd.Event {
92
93
94
95 if e.Instances == nil {
96 return e
97 }
98 instances := make([]string, len(e.Instances))
99 copy(instances, e.Instances)
100 e.Instances = instances
101 return e
102 }
103
View as plain text