1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package cache
17
18 import (
19 "fmt"
20 "testing"
21
22 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
23 wrappers "github.com/golang/protobuf/ptypes/wrappers"
24 )
25
26 const (
27 testType = "google.protobuf.StringValue"
28 )
29
30 func testResource(s string) types.Resource {
31 return &wrappers.StringValue{Value: s}
32 }
33
34 func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
35 t.Helper()
36 r := <-ch
37 if r.GetRequest().TypeUrl != testType {
38 t.Errorf("unexpected empty request type URL: %q", r.GetRequest().TypeUrl)
39 }
40 out, err := r.GetDiscoveryResponse()
41 if err != nil {
42 t.Fatal(err)
43 }
44 if out.VersionInfo == "" {
45 t.Error("unexpected response empty version")
46 }
47 if n := len(out.Resources); n != num {
48 t.Errorf("unexpected number of responses: got %d, want %d", n, num)
49 }
50 if version != "" && out.VersionInfo != version {
51 t.Errorf("unexpected version: got %q, want %q", out.VersionInfo, version)
52 }
53 if out.TypeUrl != testType {
54 t.Errorf("unexpected type URL: %q", out.TypeUrl)
55 }
56 }
57
58 func checkWatchCount(t *testing.T, c *LinearCache, name string, count int) {
59 t.Helper()
60 if i := c.NumWatches(name); i != count {
61 t.Errorf("unexpected number of watches for %q: got %d, want %d", name, i, count)
62 }
63 }
64
65 func mustBlock(t *testing.T, w <-chan Response) {
66 select {
67 case <-w:
68 t.Error("watch must block")
69 default:
70 }
71 }
72
73 func TestLinearInitialResources(t *testing.T) {
74 c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
75 w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType})
76 verifyResponse(t, w, "0", 1)
77 w, _ = c.CreateWatch(&Request{TypeUrl: testType})
78 verifyResponse(t, w, "0", 2)
79 }
80
81 func TestLinearCornerCases(t *testing.T) {
82 c := NewLinearCache(testType)
83 err := c.UpdateResource("a", nil)
84 if err == nil {
85 t.Error("expected error on nil resource")
86 }
87
88 w, _ := c.CreateWatch(&Request{TypeUrl: "test"})
89 select {
90 case _, more := <-w:
91 if more {
92 t.Error("should be closed by the producer")
93 }
94 default:
95 t.Error("channel should be closed")
96 }
97 }
98
99 func TestLinearBasic(t *testing.T) {
100 c := NewLinearCache(testType)
101
102
103 w1, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
104 mustBlock(t, w1)
105 w, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
106 mustBlock(t, w)
107 checkWatchCount(t, c, "a", 2)
108 checkWatchCount(t, c, "b", 1)
109 c.UpdateResource("a", testResource("a"))
110 checkWatchCount(t, c, "a", 0)
111 checkWatchCount(t, c, "b", 0)
112 verifyResponse(t, w1, "1", 1)
113 verifyResponse(t, w, "1", 1)
114
115
116 w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
117 checkWatchCount(t, c, "a", 0)
118 verifyResponse(t, w, "1", 1)
119 w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
120 checkWatchCount(t, c, "a", 0)
121 verifyResponse(t, w, "1", 1)
122
123
124 c.UpdateResource("b", testResource("b"))
125 c.UpdateResource("a", testResource("aa"))
126 w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
127 verifyResponse(t, w, "3", 1)
128 w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
129 verifyResponse(t, w, "3", 2)
130 }
131
132 func TestLinearVersionPrefix(t *testing.T) {
133 c := NewLinearCache(testType, WithVersionPrefix("instance1-"))
134
135 w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
136 verifyResponse(t, w, "instance1-0", 0)
137
138 c.UpdateResource("a", testResource("a"))
139 w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
140 verifyResponse(t, w, "instance1-1", 1)
141
142 w, _ = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"})
143 mustBlock(t, w)
144 checkWatchCount(t, c, "a", 1)
145 }
146
147 func TestLinearDeletion(t *testing.T) {
148 c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
149 w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"})
150 mustBlock(t, w)
151 checkWatchCount(t, c, "a", 1)
152 c.DeleteResource("a")
153 verifyResponse(t, w, "1", 0)
154 checkWatchCount(t, c, "a", 0)
155 w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
156 verifyResponse(t, w, "1", 1)
157 checkWatchCount(t, c, "b", 0)
158 c.DeleteResource("b")
159 w, _ = c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
160 verifyResponse(t, w, "2", 0)
161 checkWatchCount(t, c, "b", 0)
162 }
163
164 func TestLinearWatchTwo(t *testing.T) {
165 c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
166 w, _ := c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"})
167 mustBlock(t, w)
168 w1, _ := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"})
169 mustBlock(t, w1)
170 c.UpdateResource("a", testResource("aa"))
171
172 verifyResponse(t, w, "1", 1)
173 verifyResponse(t, w1, "1", 2)
174 }
175
176 func TestLinearCancel(t *testing.T) {
177 c := NewLinearCache(testType)
178 c.UpdateResource("a", testResource("a"))
179
180
181 w, cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
182 mustBlock(t, w)
183 checkWatchCount(t, c, "a", 1)
184 cancel()
185 checkWatchCount(t, c, "a", 0)
186
187
188 w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
189 mustBlock(t, w)
190 checkWatchCount(t, c, "a", 1)
191 cancel()
192 checkWatchCount(t, c, "a", 0)
193
194
195 w, cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"})
196 w2, cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"})
197 w3, cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
198 w4, cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"})
199 mustBlock(t, w)
200 mustBlock(t, w2)
201 mustBlock(t, w3)
202 mustBlock(t, w4)
203 checkWatchCount(t, c, "a", 3)
204 checkWatchCount(t, c, "b", 3)
205 cancel()
206 checkWatchCount(t, c, "a", 2)
207 checkWatchCount(t, c, "b", 3)
208 cancel3()
209 checkWatchCount(t, c, "a", 1)
210 checkWatchCount(t, c, "b", 2)
211 cancel2()
212 cancel4()
213 checkWatchCount(t, c, "a", 0)
214 checkWatchCount(t, c, "b", 0)
215 }
216
217 func TestLinearConcurrentSetWatch(t *testing.T) {
218 c := NewLinearCache(testType)
219 n := 50
220 for i := 0; i < 2*n; i++ {
221 func(i int) {
222 t.Run(fmt.Sprintf("worker%d", i), func(t *testing.T) {
223 t.Parallel()
224 id := fmt.Sprintf("%d", i)
225 if i%2 == 0 {
226 t.Logf("update resource %q", id)
227 c.UpdateResource(id, testResource(id))
228 } else {
229 id2 := fmt.Sprintf("%d", i-1)
230 t.Logf("request resources %q and %q", id, id2)
231 value, _ := c.CreateWatch(&Request{
232
233 ResourceNames: []string{id, id2},
234 VersionInfo: "0",
235 TypeUrl: testType,
236 })
237
238 verifyResponse(t, value, "", 1)
239 }
240 })
241 }(i)
242 }
243 }
244
View as plain text