1 package kates
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "strconv"
8 "strings"
9 "sync"
10 "testing"
11 "time"
12
13 "github.com/stretchr/testify/assert"
14 "github.com/stretchr/testify/require"
15 "k8s.io/apimachinery/pkg/types"
16
17 "github.com/datawire/dlib/dlog"
18 dtest_k3s "github.com/datawire/dtest"
19 )
20
21 func testClient(t *testing.T, ctx context.Context) (context.Context, *Client) {
22 if ctx == nil {
23 ctx = dlog.NewTestContext(t, false)
24 }
25 cli, err := NewClient(ClientConfig{Kubeconfig: dtest_k3s.KubeVersionConfig(ctx, dtest_k3s.Kube22)})
26 require.NoError(t, err)
27 return ctx, cli
28 }
29
30 func TestCRUD(t *testing.T) {
31 ctx, cli := testClient(t, nil)
32
33 cm := &ConfigMap{
34 TypeMeta: TypeMeta{
35 Kind: "ConfigMap",
36 },
37 ObjectMeta: ObjectMeta{
38 Name: "test-crud-configmap",
39 },
40 }
41
42 assert.Equal(t, cm.GetResourceVersion(), "")
43
44 err := cli.Get(ctx, cm, nil)
45 assert.Error(t, err, "expecting not found error")
46 if !IsNotFound(err) {
47 t.Error(err)
48 return
49 }
50
51 created := &ConfigMap{}
52 err = cli.Create(ctx, cm, created)
53 assert.NoError(t, err)
54 assert.NotEqual(t, created.GetResourceVersion(), "")
55
56 created.Labels = map[string]string{"foo": "bar"}
57 updated := &ConfigMap{}
58 err = cli.Update(ctx, created, updated)
59 assert.NoError(t, err)
60
61 gotten := &ConfigMap{}
62 err = cli.Get(ctx, cm, gotten)
63 assert.NoError(t, err)
64 assert.Equal(t, gotten.GetName(), cm.GetName())
65 assert.Equal(t, gotten.Labels["foo"], "bar")
66
67 err = cli.Delete(ctx, cm, nil)
68 assert.NoError(t, err)
69
70 err = cli.Get(ctx, cm, nil)
71 assert.Error(t, err, "expecting not found error")
72 assert.True(t, IsNotFound(err), "expecting not found error")
73 }
74
75 func TestUpsert(t *testing.T) {
76 ctx, cli := testClient(t, nil)
77
78 cm := &ConfigMap{
79 TypeMeta: TypeMeta{
80 Kind: "ConfigMap",
81 },
82 ObjectMeta: ObjectMeta{
83 Name: "test-upsert-configmap",
84 Labels: map[string]string{
85 "foo": "bar",
86 },
87 },
88 }
89
90 defer func() {
91 assert.NoError(t, cli.Delete(ctx, cm, nil))
92 }()
93
94 err := cli.Upsert(ctx, cm, cm, cm)
95 assert.NoError(t, err)
96 assert.NotEqual(t, "", cm.GetResourceVersion())
97
98 src := &ConfigMap{
99 TypeMeta: TypeMeta{
100 Kind: "ConfigMap",
101 },
102 ObjectMeta: ObjectMeta{
103 Name: "test-upsert-configmap",
104 Labels: map[string]string{
105 "foo": "baz",
106 },
107 },
108 }
109
110 err = cli.Upsert(ctx, cm, src, cm)
111 assert.NoError(t, err)
112 assert.Equal(t, "baz", cm.Labels["foo"])
113 }
114
115 func TestPatch(t *testing.T) {
116 ctx, cli := testClient(t, nil)
117
118 cm := &ConfigMap{
119 TypeMeta: TypeMeta{
120 Kind: "ConfigMap",
121 },
122 ObjectMeta: ObjectMeta{
123 Name: "test-patch-configmap",
124 Labels: map[string]string{
125 "foo": "bar",
126 },
127 },
128 }
129
130 err := cli.Create(ctx, cm, cm)
131 assert.NoError(t, err)
132
133 defer func() {
134 assert.NoError(t, cli.Delete(ctx, cm, nil))
135 }()
136
137 err = cli.Patch(ctx, cm, StrategicMergePatchType, []byte(`{"metadata": {"annotations": {"moo": "arf"}}}`), cm)
138 assert.NoError(t, err)
139 assert.Equal(t, "arf", cm.GetAnnotations()["moo"])
140 }
141
142 func TestList(t *testing.T) {
143 ctx, cli := testClient(t, nil)
144
145 namespaces := make([]*Namespace, 0)
146
147 err := cli.List(ctx, Query{Kind: "namespaces"}, &namespaces)
148 assert.NoError(t, err)
149
150
151
152 assert.True(t, len(namespaces) > 0)
153
154 found := false
155 for _, ns := range namespaces {
156 if ns.GetName() == "default" {
157 found = true
158 break
159 }
160 }
161
162 assert.True(t, found)
163 }
164
165 func TestListSelector(t *testing.T) {
166 ctx, cli := testClient(t, nil)
167
168 myns := &Namespace{
169 TypeMeta: TypeMeta{
170 Kind: "namespace",
171 },
172 ObjectMeta: ObjectMeta{
173 Name: "test-list-selector-namespace",
174 Labels: map[string]string{
175 "foo": "bar",
176 },
177 },
178 }
179
180 err := cli.Create(ctx, myns, myns)
181 assert.NoError(t, err)
182
183 namespaces := make([]*Namespace, 0)
184
185 err = cli.List(ctx, Query{Kind: "namespaces", LabelSelector: "foo=bar"}, &namespaces)
186 assert.NoError(t, err)
187
188 assert.Equal(t, len(namespaces), 1)
189
190 if len(namespaces) == 1 {
191 assert.Equal(t, namespaces[0].GetName(), myns.GetName())
192 }
193
194 err = cli.Delete(ctx, myns, myns)
195 assert.NoError(t, err)
196 }
197
198 func TestShortcut(t *testing.T) {
199 ctx, cli := testClient(t, nil)
200
201 cm := &ConfigMap{
202 TypeMeta: TypeMeta{
203 Kind: "cm",
204 },
205 ObjectMeta: ObjectMeta{
206 Name: "test-shortcut-configmap",
207 },
208 }
209
210 created := &ConfigMap{}
211 err := cli.Create(ctx, cm, created)
212 assert.NoError(t, err)
213
214 err = cli.Delete(ctx, created, nil)
215 assert.NoError(t, err)
216 }
217
218 type TestSnapshot struct {
219 ConfigMaps []*ConfigMap
220 Secrets []*Secret
221 }
222
223
224
225
226
227 func TestCoherence(t *testing.T) {
228 ctx, cli := testClient(t, nil)
229 ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
230 defer cancel()
231
232
233
234 cli.watchUpdated = func(_, obj *Unstructured) {
235 if obj.GetKind() == "ConfigMap" {
236 time.Sleep(5 * time.Second)
237 }
238 }
239
240
241
242
243
244 cm := &ConfigMap{
245 TypeMeta: TypeMeta{
246 Kind: "ConfigMap",
247 },
248 ObjectMeta: ObjectMeta{
249 Name: "test-coherence",
250 Labels: map[string]string{},
251 },
252 }
253
254
255
256
257 secret := &Secret{
258 TypeMeta: TypeMeta{
259 Kind: "Secret",
260 },
261 ObjectMeta: ObjectMeta{
262 Name: "test-coherence",
263 Labels: map[string]string{},
264 },
265 }
266
267 defer func() {
268 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
269 defer cancel()
270 err := cli.Delete(ctx, cm, nil)
271 if err != nil {
272 t.Log(err)
273 }
274 err = cli.Delete(ctx, secret, nil)
275 if err != nil {
276 t.Log(err)
277 }
278 }()
279
280 err := cli.Get(ctx, cm, nil)
281 assert.Error(t, err, "expecting not found error")
282 if !IsNotFound(err) {
283 t.Error(err)
284 return
285 }
286
287 err = cli.Get(ctx, secret, nil)
288 assert.Error(t, err, "expecting not found error")
289 if !IsNotFound(err) {
290 t.Error(err)
291 return
292 }
293
294 acc, err := cli.Watch(ctx,
295 Query{Name: "ConfigMaps", Kind: "ConfigMap"},
296 Query{Name: "Secrets", Kind: "Secret"})
297 require.NoError(t, err)
298 snap := &TestSnapshot{}
299
300 COUNT := 25
301
302
303
304
305
306 mutex := &sync.Mutex{}
307 lastSentByUpsert := 0
308 lastSeenByWatch := 0
309
310 done := make(chan struct{})
311 go func() {
312 defer cancel()
313 defer close(done)
314
315 for {
316 var deltas []*Delta
317 select {
318 case <-acc.Changed():
319 mutex.Lock()
320 updated, err := acc.UpdateWithDeltas(ctx, snap, &deltas)
321 assert.NoError(t, err)
322 if !updated {
323 mutex.Unlock()
324 continue
325 }
326 case <-ctx.Done():
327 return
328 }
329
330 for _, delta := range deltas {
331 bytes, err := json.Marshal(delta)
332 assert.NoError(t, err)
333 t.Log(string(bytes))
334 }
335
336 func() {
337 defer mutex.Unlock()
338
339 var cmFromWatch *ConfigMap
340 for _, c := range snap.ConfigMaps {
341 if c.GetName() == "test-coherence" {
342 cmFromWatch = c
343 break
344 }
345 }
346
347 if lastSentByUpsert > 0 {
348 assert.NotNil(t, cmFromWatch)
349 if cmFromWatch != nil {
350 lbl := cmFromWatch.GetLabels()["counter"]
351 parts := strings.Split(lbl, "-")
352 require.Equal(t, 2, len(parts))
353 i, err := strconv.Atoi(parts[1])
354 require.NoError(t, err)
355 lastSeenByWatch = i
356
357
358
359
360
361 assert.Equal(t, lastSentByUpsert, lastSeenByWatch)
362 }
363 }
364
365 if lastSeenByWatch == COUNT {
366 cancel()
367 }
368 }()
369 }
370 }()
371
372
373 for counter := 0; counter <= COUNT; counter += 1 {
374 mutex.Lock()
375 func() {
376 defer mutex.Unlock()
377 lbl := fmt.Sprintf("upsert-%d", counter)
378 t.Log(lbl)
379
380 labels := cm.GetLabels()
381 labels["counter"] = lbl
382 cm.SetLabels(labels)
383
384 err := cli.Upsert(ctx, cm, cm, nil)
385 require.NoError(t, err)
386
387 labels = secret.GetLabels()
388 labels["counter"] = lbl
389 secret.SetLabels(labels)
390 err = cli.Upsert(ctx, secret, secret, nil)
391 require.NoError(t, err)
392
393 lastSentByUpsert = counter
394 }()
395 }
396
397 <-done
398 }
399
400 func TestDeltas(t *testing.T) {
401 doDeltaTest(t, 0, func(_, _ *Unstructured) {})
402 }
403
404 func TestDeltasWithLocalDelay(t *testing.T) {
405 doDeltaTest(t, 3*time.Second, func(_, _ *Unstructured) {})
406 }
407
408 func TestDeltasWithRemoteDelay(t *testing.T) {
409 doDeltaTest(t, 0, func(old, new *Unstructured) {
410
411 obj := new
412 if obj == nil {
413 obj = old
414 }
415
416 if strings.HasPrefix(obj.GetName(), "test-deltas") {
417 time.Sleep(3 * time.Second)
418 }
419 })
420 }
421
422 func doDeltaTest(t *testing.T, localDelay time.Duration, watchHook func(*Unstructured, *Unstructured)) {
423 _ctx, cli := testClient(t, nil)
424 var (
425 _cm1 = &ConfigMap{
426 TypeMeta: TypeMeta{
427 Kind: "ConfigMap",
428 },
429 ObjectMeta: ObjectMeta{
430 Name: "test-deltas-1",
431 Labels: map[string]string{},
432 },
433 }
434 _cm2 = &ConfigMap{
435 TypeMeta: TypeMeta{
436 Kind: "ConfigMap",
437 },
438 ObjectMeta: ObjectMeta{
439 Name: "test-deltas-2",
440 Labels: map[string]string{},
441 },
442 }
443 )
444 t.Cleanup(func() {
445 if err := cli.Delete(_ctx, _cm1, nil); err != nil && !IsNotFound(err) {
446 t.Error(err)
447 }
448 if err := cli.Delete(_ctx, _cm2, nil); err != nil && !IsNotFound(err) {
449 t.Error(err)
450 }
451 })
452
453 ctx, cancel := context.WithTimeout(_ctx, 30*time.Second)
454 defer cancel()
455
456 cli.watchAdded = watchHook
457 cli.watchUpdated = watchHook
458 cli.watchDeleted = watchHook
459
460 cm1, cm2 := _cm1, _cm2
461
462 defer func() {
463 ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
464 defer cancel()
465 if cm1 != nil {
466 err := cli.Delete(ctx, cm1, nil)
467 if err != nil {
468 t.Log(err)
469 }
470 }
471 err := cli.Delete(ctx, cm2, nil)
472 if err != nil {
473 t.Log(err)
474 }
475 }()
476
477 err := cli.Get(ctx, cm1, nil)
478 assert.Error(t, err, "expecting not found error")
479 if !IsNotFound(err) {
480 t.Error(err)
481 return
482 }
483
484 err = cli.Get(ctx, cm2, nil)
485 assert.Error(t, err, "expecting not found error")
486 if !IsNotFound(err) {
487 t.Error(err)
488 return
489 }
490
491 acc, err := cli.Watch(ctx, Query{Name: "ConfigMaps", Kind: "ConfigMap"})
492 require.NoError(t, err)
493 snap := &TestSnapshot{}
494
495 err = cli.Upsert(ctx, cm1, cm1, nil)
496 require.NoError(t, err)
497 err = cli.Upsert(ctx, cm2, cm2, nil)
498 require.NoError(t, err)
499
500 time.Sleep(localDelay)
501
502 for {
503 <-acc.Changed()
504 var deltas []*Delta
505 updated, err := acc.UpdateWithDeltas(ctx, snap, &deltas)
506 require.NoError(t, err)
507 if !updated {
508 continue
509 }
510
511 checkForDelta(t, ObjectAdd, "test-deltas-1", deltas)
512 checkForDelta(t, ObjectAdd, "test-deltas-2", deltas)
513 break
514 }
515
516 cm1.SetLabels(map[string]string{"foo": "bar"})
517 err = cli.Upsert(ctx, cm1, cm1, nil)
518 require.NoError(t, err)
519
520 for {
521 <-acc.Changed()
522 var deltas []*Delta
523 updated, err := acc.UpdateWithDeltas(ctx, snap, &deltas)
524 require.NoError(t, err)
525 if !updated {
526 continue
527 }
528
529 checkForDelta(t, ObjectUpdate, "test-deltas-1", deltas)
530 checkNoDelta(t, "test-deltas-2", deltas)
531 break
532 }
533
534 err = cli.Delete(ctx, cm1, nil)
535 require.NoError(t, err)
536 cm1 = nil
537
538 time.Sleep(localDelay)
539
540 for {
541 <-acc.Changed()
542 var deltas []*Delta
543 updated, err := acc.UpdateWithDeltas(ctx, snap, &deltas)
544 require.NoError(t, err)
545 if !updated {
546 continue
547 }
548
549 checkForDelta(t, ObjectDelete, "test-deltas-1", deltas)
550 checkNoDelta(t, "test-deltas-2", deltas)
551 break
552 }
553
554 cancel()
555 }
556
557 func checkForDelta(t *testing.T, dt DeltaType, name string, deltas []*Delta) {
558 for _, delta := range deltas {
559 if delta.DeltaType == dt && delta.GetName() == name {
560 return
561 }
562 }
563
564 assert.Fail(t, fmt.Sprintf("could not find delta %d %s", dt, name))
565 }
566
567 func checkNoDelta(t *testing.T, name string, deltas []*Delta) {
568 for _, delta := range deltas {
569 if delta.GetName() == name {
570 assert.Fail(t, fmt.Sprintf("found delta %s: %d", name, delta.DeltaType))
571 return
572 }
573 }
574 }
575
576
577
578
579
580
581
582 func TestPatchWatch(t *testing.T) {
583 require := require.New(t)
584 assert := assert.New(t)
585
586 ctx := context.Background()
587
588 cli, err := NewClient(ClientConfig{})
589 require.NoError(err)
590
591
592 field, err := cli.newField(Query{Name: "Pods", Kind: "pods"})
593 require.NoError(err)
594
595
596 makePod := func(namespace, name string, version int) *Unstructured {
597 un := &Unstructured{}
598 un.SetGroupVersionKind(field.mapping.GroupVersionKind)
599 un.SetNamespace(namespace)
600 un.SetName(name)
601 un.SetUID(types.UID(fmt.Sprintf("UID:%s.%s", namespace, name)))
602 un.SetResourceVersion(fmt.Sprintf("%d", version))
603 return un
604 }
605
606
607
608
609
610
611
612
613
614 p1 := makePod("default", "foo", 1)
615 p1Key := unKey(p1)
616
617 p1Newer := makePod("default", "foo", 2)
618 require.Equal(p1Key, unKey(p1Newer))
619
620
621 cli.canonical[p1Key] = p1
622 delete(field.values, p1Key)
623 err = cli.patchWatch(ctx, field)
624 require.NoError(err)
625 assert.Equal(p1, field.values[p1Key])
626
627
628 cli.canonical[p1Key] = p1Newer
629 field.values[p1Key] = p1
630 err = cli.patchWatch(ctx, field)
631 require.NoError(err)
632 assert.Equal(p1Newer, field.values[p1Key])
633
634
635 cli.canonical[p1Key] = p1
636 field.values[p1Key] = p1Newer
637 err = cli.patchWatch(ctx, field)
638 require.NoError(err)
639 assert.Equal(p1Newer, field.values[p1Key])
640 assert.NotContains(cli.canonical, p1Key)
641
642
643 cli.canonical[p1Key] = nil
644 field.values[p1Key] = p1Newer
645 err = cli.patchWatch(ctx, field)
646 require.NoError(err)
647 assert.NotContains(field.values, p1Key)
648 }
649
View as plain text