1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package cache
17
18 import (
19 "context"
20 "fmt"
21 "sync"
22 "sync/atomic"
23 "time"
24
25 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
26 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/log"
27 )
28
29
30
31
32
33
34
35
36
37
38
39
40 type SnapshotCache interface {
41 Cache
42
43
44
45
46
47
48
49 SetSnapshot(node string, snapshot Snapshot) error
50
51
52 GetSnapshot(node string) (Snapshot, error)
53
54
55 ClearSnapshot(node string)
56
57
58 GetStatusInfo(string) StatusInfo
59
60
61 GetStatusKeys() []string
62 }
63
64 type heartbeatHandle struct {
65 cancel func()
66 }
67
68 type snapshotCache struct {
69
70
71
72
73 watchCount int64
74
75 log log.Logger
76
77
78 ads bool
79
80
81 snapshots map[string]Snapshot
82
83
84 status map[string]*statusInfo
85
86
87 hash NodeHash
88
89 mu sync.RWMutex
90 }
91
92
93
94
95
96
97
98
99
100
101
102
103 func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache {
104 return newSnapshotCache(ads, hash, logger)
105 }
106
107 func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache {
108 cache := &snapshotCache{
109 log: logger,
110 ads: ads,
111 snapshots: make(map[string]Snapshot),
112 status: make(map[string]*statusInfo),
113 hash: hash,
114 }
115
116 return cache
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134 func NewSnapshotCacheWithHeartbeating(ctx context.Context, ads bool, hash NodeHash, logger log.Logger, heartbeatInterval time.Duration) SnapshotCache {
135 cache := newSnapshotCache(ads, hash, logger)
136 go func() {
137 t := time.NewTicker(heartbeatInterval)
138
139 for {
140 select {
141 case <-t.C:
142 cache.mu.Lock()
143 for node := range cache.status {
144
145 cache.sendHeartbeats(ctx, node)
146 }
147 cache.mu.Unlock()
148 case <-ctx.Done():
149 return
150 }
151 }
152 }()
153 return cache
154 }
155
156 func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
157 snapshot := cache.snapshots[node]
158 if info, ok := cache.status[node]; ok {
159 info.mu.Lock()
160 for id, watch := range info.watches {
161
162 version := snapshot.GetVersion(watch.Request.TypeUrl)
163 resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)
164
165
166 resourcesWithTtl := map[string]types.ResourceWithTtl{}
167 for k, v := range resources {
168 if v.Ttl != nil {
169 resourcesWithTtl[k] = v
170 }
171 }
172
173 if len(resourcesWithTtl) == 0 {
174 continue
175 }
176 if cache.log != nil {
177 cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version)
178 }
179
180 cache.respond(watch.Request, watch.Response, resourcesWithTtl, version, true)
181
182
183 delete(info.watches, id)
184 }
185 info.mu.Unlock()
186 }
187 }
188
189
190 func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error {
191 cache.mu.Lock()
192 defer cache.mu.Unlock()
193
194
195 cache.snapshots[node] = snapshot
196
197
198 if info, ok := cache.status[node]; ok {
199 info.mu.Lock()
200 for id, watch := range info.watches {
201 version := snapshot.GetVersion(watch.Request.TypeUrl)
202 if version != watch.Request.VersionInfo {
203 if cache.log != nil {
204 cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version)
205 }
206 resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)
207 cache.respond(watch.Request, watch.Response, resources, version, false)
208
209
210 delete(info.watches, id)
211 }
212 }
213 info.mu.Unlock()
214 }
215
216 return nil
217 }
218
219
220 func (cache *snapshotCache) GetSnapshot(node string) (Snapshot, error) {
221 cache.mu.RLock()
222 defer cache.mu.RUnlock()
223
224 snap, ok := cache.snapshots[node]
225 if !ok {
226 return Snapshot{}, fmt.Errorf("no snapshot found for node %s", node)
227 }
228 return snap, nil
229 }
230
231
232 func (cache *snapshotCache) ClearSnapshot(node string) {
233 cache.mu.Lock()
234 defer cache.mu.Unlock()
235
236 delete(cache.snapshots, node)
237 delete(cache.status, node)
238 }
239
240
241 func nameSet(names []string) map[string]bool {
242 set := make(map[string]bool)
243 for _, name := range names {
244 set[name] = true
245 }
246 return set
247 }
248
249
250 func superset(names map[string]bool, resources map[string]types.ResourceWithTtl) error {
251 for resourceName := range resources {
252 if _, exists := names[resourceName]; !exists {
253 return fmt.Errorf("%q not listed", resourceName)
254 }
255 }
256 return nil
257 }
258
259
260 func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func()) {
261 nodeID := cache.hash.ID(request.Node)
262
263 cache.mu.Lock()
264 defer cache.mu.Unlock()
265
266 info, ok := cache.status[nodeID]
267 if !ok {
268 info = newStatusInfo(request.Node)
269 cache.status[nodeID] = info
270 }
271
272
273 info.mu.Lock()
274 info.lastWatchRequestTime = time.Now()
275 info.mu.Unlock()
276
277
278 value := make(chan Response, 1)
279
280 snapshot, exists := cache.snapshots[nodeID]
281 version := snapshot.GetVersion(request.TypeUrl)
282
283
284 if !exists || request.VersionInfo == version {
285 watchID := cache.nextWatchID()
286 if cache.log != nil {
287 cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID,
288 request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo)
289 }
290 info.mu.Lock()
291 info.watches[watchID] = ResponseWatch{Request: request, Response: value}
292 info.mu.Unlock()
293 return value, cache.cancelWatch(nodeID, watchID)
294 }
295
296
297 resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
298 cache.respond(request, value, resources, version, false)
299
300 return value, nil
301 }
302
303 func (cache *snapshotCache) nextWatchID() int64 {
304 return atomic.AddInt64(&cache.watchCount, 1)
305 }
306
307
308 func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() {
309 return func() {
310
311 cache.mu.Lock()
312 defer cache.mu.Unlock()
313 if info, ok := cache.status[nodeID]; ok {
314 info.mu.Lock()
315 delete(info.watches, watchID)
316 info.mu.Unlock()
317 }
318 }
319 }
320
321
322
323 func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) {
324
325
326 if len(request.ResourceNames) != 0 && cache.ads {
327 if err := superset(nameSet(request.ResourceNames), resources); err != nil {
328 if cache.log != nil {
329 cache.log.Debugf("ADS mode: not responding to request: %v", err)
330 }
331 return
332 }
333 }
334 if cache.log != nil {
335 cache.log.Debugf("respond %s%v version %q with version %q",
336 request.TypeUrl, request.ResourceNames, request.VersionInfo, version)
337 }
338
339 value <- createResponse(request, resources, version, heartbeat)
340 }
341
342 func createResponse(request *Request, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) Response {
343 filtered := make([]types.ResourceWithTtl, 0, len(resources))
344
345
346
347
348 if len(request.ResourceNames) != 0 {
349 set := nameSet(request.ResourceNames)
350 for name, resource := range resources {
351 if set[name] {
352 filtered = append(filtered, resource)
353 }
354 }
355 } else {
356 for _, resource := range resources {
357 filtered = append(filtered, resource)
358 }
359 }
360
361 return &RawResponse{
362 Request: request,
363 Version: version,
364 Resources: filtered,
365 Heartbeat: heartbeat,
366 }
367 }
368
369
370
371 func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Response, error) {
372 nodeID := cache.hash.ID(request.Node)
373
374 cache.mu.RLock()
375 defer cache.mu.RUnlock()
376
377 if snapshot, exists := cache.snapshots[nodeID]; exists {
378
379
380 version := snapshot.GetVersion(request.TypeUrl)
381 if request.VersionInfo == version {
382 if cache.log != nil {
383 cache.log.Warnf("skip fetch: version up to date")
384 }
385 return nil, &types.SkipFetchError{}
386 }
387
388 resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
389 out := createResponse(request, resources, version, false)
390 return out, nil
391 }
392
393 return nil, fmt.Errorf("missing snapshot for %q", nodeID)
394 }
395
396
397 func (cache *snapshotCache) GetStatusInfo(node string) StatusInfo {
398 cache.mu.RLock()
399 defer cache.mu.RUnlock()
400
401 info, exists := cache.status[node]
402 if !exists {
403 if cache.log != nil {
404 cache.log.Warnf("node does not exist")
405 }
406 return nil
407 }
408
409 return info
410 }
411
412
413 func (cache *snapshotCache) GetStatusKeys() []string {
414 cache.mu.RLock()
415 defer cache.mu.RUnlock()
416
417 out := make([]string, 0, len(cache.status))
418 for id := range cache.status {
419 out = append(out, id)
420 }
421
422 return out
423 }
424
View as plain text