1 package kates
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7 "testing"
8 "time"
9
10 "github.com/stretchr/testify/assert"
11 "github.com/stretchr/testify/require"
12 )
13
14 type Snap struct {
15 ConfigMaps []*ConfigMap
16 }
17
18
19
20 func TestBootstrapNoNotifyBeforeSync(t *testing.T) {
21
22 ctx, cli := testClient(t, nil)
23 var cms [10]*ConfigMap
24 for i := 0; i < 10; i++ {
25 cm := &ConfigMap{
26 TypeMeta: TypeMeta{
27 Kind: "ConfigMap",
28 },
29 ObjectMeta: ObjectMeta{
30 Name: fmt.Sprintf("test-bootstrap-%d", i),
31 Labels: map[string]string{
32 "test": "test-bootstrap",
33 },
34 },
35 }
36 err := cli.Upsert(ctx, cm, cm, &cm)
37 require.NoError(t, err)
38 cms[i] = cm
39 }
40
41
42 _, cli2 := testClient(t, nil)
43
44
45 cli2.watchAdded = func(old *Unstructured, new *Unstructured) {
46 time.Sleep(1 * time.Second)
47 }
48 acc, err := cli2.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-bootstrap"})
49 require.NoError(t, err)
50
51 snap := &Snap{}
52 for {
53 <-acc.Changed()
54 updated, err := acc.Update(ctx, snap)
55 require.NoError(t, err)
56 if updated {
57 break
58 }
59 }
60
61
62
63
64 assert.Equal(t, 10, len(snap.ConfigMaps))
65
66 t.Cleanup(func() {
67 for _, cm := range cms {
68 if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
69 t.Error(err)
70 }
71 }
72 })
73 }
74
75
76 func TestBootstrapNotifyEvenOnEmptyWatch(t *testing.T) {
77 ctx, cli := testClient(t, nil)
78
79
80 acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "nonexistent-label"})
81 require.NoError(t, err)
82
83 snap := &Snap{}
84 for {
85 <-acc.Changed()
86 updated, err := acc.Update(ctx, snap)
87 require.NoError(t, err)
88 if updated {
89 break
90 }
91 }
92
93
94
95 assert.Equal(t, 0, len(snap.ConfigMaps))
96 }
97
98
99
100 func TestBatchChangesBeforeNotify(t *testing.T) {
101 ctx, cli := testClient(t, nil)
102
103 err := cli.MaxAccumulatorInterval(10 * time.Second)
104 require.NoError(t, err)
105 acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-batch"})
106 require.NoError(t, err)
107
108 snap := &Snap{}
109
110
111
112
113 <-acc.Changed()
114 updated, err := acc.Update(ctx, snap)
115 require.NoError(t, err)
116 if !updated {
117 t.Error("Expected snapshot to be successfully updated after receiving first change event")
118 }
119 assert.Equal(t, 0, len(snap.ConfigMaps))
120
121
122 _, cli2 := testClient(t, nil)
123
124
125
126 var cms [10]*ConfigMap
127 for i := 0; i < 10; i++ {
128 cm := &ConfigMap{
129 TypeMeta: TypeMeta{
130 Kind: "ConfigMap",
131 },
132 ObjectMeta: ObjectMeta{
133 Name: fmt.Sprintf("test-batch-%d", i),
134 Labels: map[string]string{
135 "test": "test-batch",
136 },
137 },
138 }
139 err := cli2.Upsert(ctx, cm, cm, &cm)
140 require.NoError(t, err)
141 cms[i] = cm
142 }
143
144 <-acc.Changed()
145 updated, err = acc.Update(ctx, snap)
146 require.NoError(t, err)
147 if !updated {
148 t.Error("Expected snapshot to be successfully updated after receiving second change event")
149 }
150
151
152
153 assert.Equal(t, 10, len(snap.ConfigMaps))
154
155 t.Cleanup(func() {
156 for _, cm := range cms {
157 if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
158 t.Error(err)
159 }
160 }
161 })
162 }
163
164
165
166
167 func TestNotifyNotInfinitelyBlocked(t *testing.T) {
168 ctx, cli := testClient(t, nil)
169 err := cli.MaxAccumulatorInterval(5 * time.Second)
170 require.NoError(t, err)
171 acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-batch-max"})
172 require.NoError(t, err)
173
174 snap := &Snap{}
175
176 <-acc.Changed()
177 updated, err := acc.Update(ctx, snap)
178 require.NoError(t, err)
179 if !updated {
180 t.Error("Expected snapshot to be successfully updated after receiving first change event")
181 }
182 assert.Equal(t, 0, len(snap.ConfigMaps))
183
184 var cms []*ConfigMap
185 ctx2, cli2 := testClient(t, nil)
186 ctx2, cancel := context.WithCancel(ctx2)
187 var wg sync.WaitGroup
188
189 go func() {
190 wg.Add(1)
191 defer wg.Done()
192 var i int
193 ticker := time.NewTicker(2 * time.Second)
194 for {
195 select {
196 case <-ticker.C:
197 cm := &ConfigMap{
198 TypeMeta: TypeMeta{
199 Kind: "ConfigMap",
200 },
201 ObjectMeta: ObjectMeta{
202 Name: fmt.Sprintf("test-batch-%d", i),
203 Labels: map[string]string{
204 "test": "test-batch-max",
205 },
206 },
207 }
208 err := cli2.Upsert(ctx, cm, cm, &cm)
209 require.NoError(t, err)
210 cms = append(cms, cm)
211 i++
212 case <-ctx2.Done():
213 return
214 }
215 }
216 }()
217
218
219
220
221
222
223
224 select {
225 case <-acc.Changed():
226 updated, err = acc.Update(ctx, snap)
227 require.NoError(t, err)
228 if !updated {
229 t.Error("Expected snapshot to be successfully updated after receiving second change event")
230 }
231 assert.Greater(t, len(snap.ConfigMaps), 0)
232 cancel()
233 wg.Wait()
234 case <-time.After(10 * time.Second):
235 cancel()
236 wg.Wait()
237 t.Error("Timeout after 10s listening for second change. It's possible it's infinitely blocked")
238 }
239
240 t.Cleanup(func() {
241 for _, cm := range cms {
242 if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
243 t.Error(err)
244 }
245 }
246 })
247 }
248
249
250 func TestNotifyOnUpdate(t *testing.T) {
251 ctx, cli := testClient(t, nil)
252 err := cli.MaxAccumulatorInterval(2 * time.Second)
253 require.NoError(t, err)
254 acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap", LabelSelector: "test=test-isolated"})
255 require.NoError(t, err)
256
257 snap := &Snap{}
258
259 waitForChange := func() {
260 <-acc.Changed()
261 updated, err := acc.Update(ctx, snap)
262 require.NoError(t, err)
263 if !updated {
264 t.Error("Expected snapshot to be successfully updated after receiving change event")
265 }
266 }
267
268 waitForChange()
269 assert.Equal(t, 0, len(snap.ConfigMaps))
270
271 var cms [2]*ConfigMap
272
273 cm := &ConfigMap{
274 TypeMeta: TypeMeta{
275 Kind: "ConfigMap",
276 },
277 ObjectMeta: ObjectMeta{
278 Name: "test-isolated-1",
279 Labels: map[string]string{
280 "test": "test-isolated",
281 },
282 },
283 }
284 err = cli.Upsert(ctx, cm, cm, &cm)
285 require.NoError(t, err)
286 cms[0] = cm
287
288 waitForChange()
289 assert.Equal(t, 1, len(snap.ConfigMaps))
290
291
292 time.Sleep(3)
293
294 cm = &ConfigMap{
295 TypeMeta: TypeMeta{
296 Kind: "ConfigMap",
297 },
298 ObjectMeta: ObjectMeta{
299 Name: "test-isolated-2",
300 Labels: map[string]string{
301 "test": "test-isolated",
302 },
303 },
304 }
305 err = cli.Upsert(ctx, cm, cm, &cm)
306 require.NoError(t, err)
307 cms[1] = cm
308
309 waitForChange()
310 assert.Equal(t, 2, len(snap.ConfigMaps))
311
312 t.Cleanup(func() {
313 for _, cm := range cms {
314 if err := cli.Delete(ctx, cm, nil); err != nil && !IsNotFound(err) {
315 t.Error(err)
316 }
317 }
318 })
319 }
320
View as plain text