1 package k8s_test
2
3 import (
4 "context"
5 "testing"
6 "time"
7
8 "github.com/stretchr/testify/require"
9
10 "github.com/datawire/dlib/dlog"
11 "github.com/emissary-ingress/emissary/v3/pkg/dtest"
12 "github.com/emissary-ingress/emissary/v3/pkg/k8s"
13 )
14
15 const (
16 delay = 10 * time.Second
17 )
18
19 func fetch(ctx context.Context, w *k8s.Watcher, resource, qname string) (result k8s.Resource, err error) {
20 go func() {
21 time.Sleep(delay)
22 w.Stop()
23 }()
24
25 err = w.WatchQuery(k8s.Query{Kind: resource, Namespace: k8s.NamespaceAll}, func(w *k8s.Watcher) error {
26 list, err := w.List(resource)
27 if err != nil {
28 return err
29 }
30 for _, r := range list {
31 if r.QName() == qname {
32 result = r
33 w.Stop()
34 }
35 }
36 return nil
37 })
38 if err != nil {
39 return nil, err
40 }
41
42 if err := w.Wait(ctx); err != nil {
43 return nil, err
44 }
45 return result, nil
46 }
47
48 func info(ctx context.Context) *k8s.KubeInfo {
49 return k8s.NewKubeInfo(dtest.KubeVersionConfig(ctx, dtest.Kube22), "", "")
50 }
51
52 func TestUpdateStatus(t *testing.T) {
53 t.Parallel()
54 ctx := dlog.NewTestContext(t, false)
55 w, err := k8s.NewWatcher(info(ctx))
56 require.NoError(t, err)
57
58 svc, err := fetch(ctx, w, "services", "kubernetes.default")
59 require.NoError(t, err)
60 svc.Status()["loadBalancer"].(map[string]interface{})["ingress"] = []map[string]interface{}{{"hostname": "foo", "ip": "1.2.3.4"}}
61 result, err := w.UpdateStatus(ctx, svc)
62 if err != nil {
63 t.Error(err)
64 return
65 } else {
66 t.Logf("updated %s status, result: %v\n", svc.QName(), result.ResourceVersion())
67 }
68
69 w2, err := k8s.NewWatcher(info(ctx))
70 require.NoError(t, err)
71 svc, err = fetch(ctx, w2, "services", "kubernetes.default")
72 require.NoError(t, err)
73 ingresses := svc.Status()["loadBalancer"].(map[string]interface{})["ingress"].([]interface{})
74 ingress := ingresses[0].(map[string]interface{})
75 if ingress["hostname"] != "foo" {
76 t.Error("expected foo")
77 }
78
79 if ingress["ip"] != "1.2.3.4" {
80 t.Error("expected 1.2.3.4")
81 }
82 }
83
84 func TestWatchCustom(t *testing.T) {
85 t.Parallel()
86 ctx := dlog.NewTestContext(t, false)
87 w, err := k8s.NewWatcher(info(ctx))
88 require.NoError(t, err)
89
90
91
92 xmas, err := fetch(ctx, w, "customs", "xmas.default")
93 require.NoError(t, err)
94 if xmas == nil {
95 t.Error("couldn't find xmas")
96 } else {
97 spec := xmas.Spec()
98 if spec["deck"] != "the halls" {
99 t.Errorf("expected the halls, got %v", spec["deck"])
100 }
101 }
102 }
103
104 func TestWatchCustomCollision(t *testing.T) {
105 t.Parallel()
106 ctx := dlog.NewTestContext(t, false)
107 w, err := k8s.NewWatcher(info(ctx))
108 require.NoError(t, err)
109
110 easter, err := fetch(ctx, w, "csrv", "easter.default")
111 require.NoError(t, err)
112 require.NotNil(t, easter)
113 t.Logf("easter: %#v", easter)
114 require.Equal(t, "the lawn", easter.Spec()["deck"])
115 }
116
117 func TestWatchQuery(t *testing.T) {
118 t.Skip("FIXME(lukeshu): This test is notoriously flakey, and the code under test hasn't changed in ages. Write better tests!")
119 t.Parallel()
120 ctx := dlog.NewTestContext(t, false)
121 w, err := k8s.NewWatcher(info(ctx))
122 require.NoError(t, err)
123
124 services := []string{}
125 err = w.WatchQuery(k8s.Query{
126 Kind: "services",
127 Namespace: k8s.NamespaceAll,
128 FieldSelector: "metadata.name=kubernetes",
129 }, func(w *k8s.Watcher) error {
130 list, err := w.List("services")
131 if err != nil {
132 return err
133 }
134 for _, r := range list {
135 services = append(services, r.QName())
136 }
137 return nil
138 })
139 require.NoError(t, err)
140 time.AfterFunc(1*time.Second, func() {
141 w.Stop()
142 })
143 require.NoError(t, w.Wait(ctx))
144 require.Equal(t, services, []string{"kubernetes.default"})
145 }
146
View as plain text