...
1 package sd
2
3 import (
4 "io"
5 "sort"
6 "sync"
7 "time"
8
9 "github.com/go-kit/kit/endpoint"
10 "github.com/go-kit/log"
11 )
12
13
14
15
16 type endpointCache struct {
17 options endpointerOptions
18 mtx sync.RWMutex
19 factory Factory
20 cache map[string]endpointCloser
21 err error
22 endpoints []endpoint.Endpoint
23 logger log.Logger
24 invalidateDeadline time.Time
25 timeNow func() time.Time
26 }
27
28 type endpointCloser struct {
29 endpoint.Endpoint
30 io.Closer
31 }
32
33
34 func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache {
35 return &endpointCache{
36 options: options,
37 factory: factory,
38 cache: map[string]endpointCloser{},
39 logger: logger,
40 timeNow: time.Now,
41 }
42 }
43
44
45
46
47
48 func (c *endpointCache) Update(event Event) {
49 c.mtx.Lock()
50 defer c.mtx.Unlock()
51
52
53 if event.Err == nil {
54 c.updateCache(event.Instances)
55 c.err = nil
56 return
57 }
58
59
60 c.logger.Log("err", event.Err)
61 if !c.options.invalidateOnError {
62 return
63 }
64 if c.err != nil {
65 return
66 }
67 c.err = event.Err
68
69 c.invalidateDeadline = c.timeNow().Add(c.options.invalidateTimeout)
70 return
71 }
72
73 func (c *endpointCache) updateCache(instances []string) {
74
75 sort.Strings(instances)
76
77
78 cache := make(map[string]endpointCloser, len(instances))
79 for _, instance := range instances {
80
81 if sc, ok := c.cache[instance]; ok {
82 cache[instance] = sc
83 delete(c.cache, instance)
84 continue
85 }
86
87
88 service, closer, err := c.factory(instance)
89 if err != nil {
90 c.logger.Log("instance", instance, "err", err)
91 continue
92 }
93 cache[instance] = endpointCloser{service, closer}
94 }
95
96
97 for _, sc := range c.cache {
98 if sc.Closer != nil {
99 sc.Closer.Close()
100 }
101 }
102
103
104 endpoints := make([]endpoint.Endpoint, 0, len(cache))
105 for _, instance := range instances {
106
107 if _, ok := cache[instance]; !ok {
108 continue
109 }
110 endpoints = append(endpoints, cache[instance].Endpoint)
111 }
112
113
114 c.endpoints = endpoints
115 c.cache = cache
116 }
117
118
119
120 func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) {
121
122
123 c.mtx.RLock()
124
125 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
126 defer c.mtx.RUnlock()
127 return c.endpoints, nil
128 }
129
130 c.mtx.RUnlock()
131
132
133 c.mtx.Lock()
134 defer c.mtx.Unlock()
135
136
137 if c.err == nil || c.timeNow().Before(c.invalidateDeadline) {
138 return c.endpoints, nil
139 }
140
141 c.updateCache(nil)
142 return nil, c.err
143 }
144
View as plain text