1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package cache_test
17
18 import (
19 "context"
20 "fmt"
21 "reflect"
22 "sync"
23 "testing"
24 "time"
25
26 core "github.com/datawire/ambassador/v2/pkg/api/envoy/config/core/v3"
27 discovery "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v3"
28 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
29 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3"
30 rsrc "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/resource/v3"
31 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/test/resource/v3"
32 )
33
34 type group struct{}
35
36 const (
37 key = "node"
38 )
39
40 func (group) ID(node *core.Node) string {
41 if node != nil {
42 return node.Id
43 }
44 return key
45 }
46
47 var (
48 version = "x"
49 version2 = "y"
50
51 snapshot = cache.NewSnapshot(version,
52 []types.Resource{testEndpoint},
53 []types.Resource{testCluster},
54 []types.Resource{testRoute},
55 []types.Resource{testListener},
56 []types.Resource{testRuntime},
57 []types.Resource{testSecret[0]})
58
59 ttl = 2 * time.Second
60 heartbeat = time.Second
61
62 snapshotWithTtl = cache.NewSnapshotWithTtls(version,
63 []types.ResourceWithTtl{{Resource: testEndpoint, Ttl: &ttl}},
64 []types.ResourceWithTtl{{Resource: testCluster}},
65 []types.ResourceWithTtl{{Resource: testRoute}},
66 []types.ResourceWithTtl{{Resource: testListener}},
67 []types.ResourceWithTtl{{Resource: testRuntime}},
68 []types.ResourceWithTtl{{Resource: testSecret[0]}})
69
70 names = map[string][]string{
71 rsrc.EndpointType: {clusterName},
72 rsrc.ClusterType: nil,
73 rsrc.RouteType: {routeName},
74 rsrc.ListenerType: nil,
75 rsrc.RuntimeType: nil,
76 }
77
78 testTypes = []string{
79 rsrc.EndpointType,
80 rsrc.ClusterType,
81 rsrc.RouteType,
82 rsrc.ListenerType,
83 rsrc.RuntimeType,
84 }
85 )
86
87 type logger struct {
88 t *testing.T
89 }
90
91 func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) }
92 func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) }
93 func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) }
94 func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }
95
96 func TestSnapshotCacheWithTtl(t *testing.T) {
97 ctx, cancel := context.WithCancel(context.Background())
98 defer cancel()
99 c := cache.NewSnapshotCacheWithHeartbeating(ctx, true, group{}, logger{t: t}, time.Second)
100
101 if _, err := c.GetSnapshot(key); err == nil {
102 t.Errorf("unexpected snapshot found for key %q", key)
103 }
104
105 if err := c.SetSnapshot(key, snapshotWithTtl); err != nil {
106 t.Fatal(err)
107 }
108
109 snap, err := c.GetSnapshot(key)
110 if err != nil {
111 t.Fatal(err)
112 }
113 if !reflect.DeepEqual(snap, snapshotWithTtl) {
114 t.Errorf("expect snapshot: %v, got: %v", snapshotWithTtl, snap)
115 }
116
117 wg := sync.WaitGroup{}
118
119 for _, typ := range testTypes {
120 wg.Add(1)
121 t.Run(typ, func(t *testing.T) {
122 defer wg.Done()
123 value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
124 select {
125 case out := <-value:
126 if gotVersion, _ := out.GetVersion(); gotVersion != version {
127 t.Errorf("got version %q, want %q", gotVersion, version)
128 }
129 if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) {
130 t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResourcesAndTtl(typ))
131 }
132 case <-time.After(2 * time.Second):
133 t.Errorf("failed to receive snapshot response")
134 }
135 })
136 }
137 wg.Wait()
138
139
140 wg = sync.WaitGroup{}
141 updatesByType := map[string]int{}
142 for _, typ := range testTypes {
143 wg.Add(1)
144 go func(typ string) {
145 defer wg.Done()
146
147 end := time.After(5 * time.Second)
148 for {
149 value, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version})
150
151 select {
152 case out := <-value:
153 if gotVersion, _ := out.GetVersion(); gotVersion != version {
154 t.Errorf("got version %q, want %q", gotVersion, version)
155 }
156 if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) {
157 t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResources(typ))
158 }
159
160 if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) {
161 t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResources(typ))
162 }
163
164 updatesByType[typ]++
165 case <-end:
166 cancel()
167 return
168 }
169 }
170 }(typ)
171 }
172
173 wg.Wait()
174
175 if len(updatesByType) != 1 {
176 t.Errorf("expected to only receive updates for TTL'd type, got %v", updatesByType)
177 }
178
179 if updatesByType[rsrc.EndpointType] < 2 {
180 t.Errorf("expected at least two TTL updates for endpoints, got %d", updatesByType[rsrc.EndpointType])
181 }
182 }
183
184 func TestSnapshotCache(t *testing.T) {
185 c := cache.NewSnapshotCache(true, group{}, logger{t: t})
186
187 if _, err := c.GetSnapshot(key); err == nil {
188 t.Errorf("unexpected snapshot found for key %q", key)
189 }
190
191 if err := c.SetSnapshot(key, snapshot); err != nil {
192 t.Fatal(err)
193 }
194
195 snap, err := c.GetSnapshot(key)
196 if err != nil {
197 t.Fatal(err)
198 }
199 if !reflect.DeepEqual(snap, snapshot) {
200 t.Errorf("expect snapshot: %v, got: %v", snapshot, snap)
201 }
202
203
204
205 value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}})
206 select {
207 case out := <-value:
208 t.Errorf("watch for endpoints and mismatched names => got %v, want none", out)
209 case <-time.After(time.Second / 4):
210 }
211
212 for _, typ := range testTypes {
213 t.Run(typ, func(t *testing.T) {
214 value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
215 select {
216 case out := <-value:
217 if gotVersion, _ := out.GetVersion(); gotVersion != version {
218 t.Errorf("got version %q, want %q", gotVersion, version)
219 }
220 if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) {
221 t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ))
222 }
223 case <-time.After(time.Second):
224 t.Fatal("failed to receive snapshot response")
225 }
226 })
227 }
228 }
229
230 func TestSnapshotCacheFetch(t *testing.T) {
231 c := cache.NewSnapshotCache(true, group{}, logger{t: t})
232 if err := c.SetSnapshot(key, snapshot); err != nil {
233 t.Fatal(err)
234 }
235
236 for _, typ := range testTypes {
237 t.Run(typ, func(t *testing.T) {
238 resp, err := c.Fetch(context.Background(), &discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
239 if err != nil || resp == nil {
240 t.Fatal("unexpected error or null response")
241 }
242 if gotVersion, _ := resp.GetVersion(); gotVersion != version {
243 t.Errorf("got version %q, want %q", gotVersion, version)
244 }
245 })
246 }
247
248
249 if resp, err := c.Fetch(context.Background(),
250 &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType, Node: &core.Node{Id: "oof"}}); resp != nil || err == nil {
251 t.Errorf("missing snapshot: response is not nil %v", resp)
252 }
253
254
255 if resp, err := c.Fetch(context.Background(),
256 &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType, VersionInfo: version}); resp != nil || err == nil {
257 t.Errorf("latest version: response is not nil %v", resp)
258 }
259 }
260
261 func TestSnapshotCacheWatch(t *testing.T) {
262 c := cache.NewSnapshotCache(true, group{}, logger{t: t})
263 watches := make(map[string]chan cache.Response)
264 for _, typ := range testTypes {
265 watches[typ], _ = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
266 }
267 if err := c.SetSnapshot(key, snapshot); err != nil {
268 t.Fatal(err)
269 }
270 for _, typ := range testTypes {
271 t.Run(typ, func(t *testing.T) {
272 select {
273 case out := <-watches[typ]:
274 if gotVersion, _ := out.GetVersion(); gotVersion != version {
275 t.Errorf("got version %q, want %q", gotVersion, version)
276 }
277 if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) {
278 t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ))
279 }
280 case <-time.After(time.Second):
281 t.Fatal("failed to receive snapshot response")
282 }
283 })
284 }
285
286
287 for _, typ := range testTypes {
288 watches[typ], _ = c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version})
289 }
290 if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) {
291 t.Errorf("watches should be created for the latest version: %d", count)
292 }
293
294
295 snapshot2 := snapshot
296 snapshot2.Resources[types.Endpoint] = cache.NewResources(version2, []types.Resource{resource.MakeEndpoint(clusterName, 9090)})
297 if err := c.SetSnapshot(key, snapshot2); err != nil {
298 t.Fatal(err)
299 }
300 if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes)-1 {
301 t.Errorf("watches should be preserved for all but one: %d", count)
302 }
303
304
305 select {
306 case out := <-watches[rsrc.EndpointType]:
307 if gotVersion, _ := out.GetVersion(); gotVersion != version2 {
308 t.Errorf("got version %q, want %q", gotVersion, version2)
309 }
310 if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot2.Resources[types.Endpoint].Items) {
311 t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot2.Resources[types.Endpoint].Items)
312 }
313 case <-time.After(time.Second):
314 t.Fatal("failed to receive snapshot response")
315 }
316 }
317
318 func TestConcurrentSetWatch(t *testing.T) {
319 c := cache.NewSnapshotCache(false, group{}, logger{t: t})
320 for i := 0; i < 50; i++ {
321 func(i int) {
322 t.Run(fmt.Sprintf("worker%d", i), func(t *testing.T) {
323 t.Parallel()
324 id := fmt.Sprintf("%d", i%2)
325 var cancel func()
326 if i < 25 {
327 snap := cache.Snapshot{}
328 snap.Resources[types.Endpoint] = cache.NewResources(fmt.Sprintf("v%d", i), []types.Resource{resource.MakeEndpoint(clusterName, uint32(i))})
329 c.SetSnapshot(id, snap)
330 } else {
331 if cancel != nil {
332 cancel()
333 }
334 _, cancel = c.CreateWatch(&discovery.DiscoveryRequest{
335 Node: &core.Node{Id: id},
336 TypeUrl: rsrc.EndpointType,
337 })
338 }
339 })
340 }(i)
341 }
342 }
343
344 func TestSnapshotCacheWatchCancel(t *testing.T) {
345 c := cache.NewSnapshotCache(true, group{}, logger{t: t})
346 for _, typ := range testTypes {
347 _, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]})
348 cancel()
349 }
350
351 if keys := c.GetStatusKeys(); len(keys) == 0 {
352 t.Error("got 0, want status info for the node")
353 }
354
355 for _, typ := range testTypes {
356 if count := c.GetStatusInfo(key).GetNumWatches(); count > 0 {
357 t.Errorf("watches should be released for %s", typ)
358 }
359 }
360
361 if empty := c.GetStatusInfo("missing"); empty != nil {
362 t.Errorf("should not return a status for unknown key: got %#v", empty)
363 }
364 }
365
366 func TestSnapshotClear(t *testing.T) {
367 c := cache.NewSnapshotCache(true, group{}, logger{t: t})
368 if err := c.SetSnapshot(key, snapshot); err != nil {
369 t.Fatal(err)
370 }
371 c.ClearSnapshot(key)
372 if empty := c.GetStatusInfo(key); empty != nil {
373 t.Errorf("cache should be cleared")
374 }
375 if keys := c.GetStatusKeys(); len(keys) != 0 {
376 t.Errorf("keys should be empty")
377 }
378 }
379
View as plain text