1
16
17 package source_test
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "sigs.k8s.io/controller-runtime/pkg/client"
25 "sigs.k8s.io/controller-runtime/pkg/event"
26 "sigs.k8s.io/controller-runtime/pkg/handler"
27 "sigs.k8s.io/controller-runtime/pkg/source"
28
29 . "github.com/onsi/ginkgo/v2"
30 . "github.com/onsi/gomega"
31 appsv1 "k8s.io/api/apps/v1"
32 corev1 "k8s.io/api/core/v1"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 kubeinformers "k8s.io/client-go/informers"
35 toolscache "k8s.io/client-go/tools/cache"
36 "k8s.io/client-go/util/workqueue"
37 )
38
39 var _ = Describe("Source", func() {
40 var instance1, instance2 source.Source
41 var obj client.Object
42 var q workqueue.RateLimitingInterface
43 var c1, c2 chan interface{}
44 var ns string
45 count := 0
46
47 BeforeEach(func() {
48
49 ns = fmt.Sprintf("controller-source-kindsource-%v", count)
50 count++
51 _, err := clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
52 ObjectMeta: metav1.ObjectMeta{Name: ns},
53 }, metav1.CreateOptions{})
54 Expect(err).NotTo(HaveOccurred())
55
56 q = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
57 c1 = make(chan interface{})
58 c2 = make(chan interface{})
59 })
60
61 AfterEach(func() {
62 err := clientset.CoreV1().Namespaces().Delete(ctx, ns, metav1.DeleteOptions{})
63 Expect(err).NotTo(HaveOccurred())
64 close(c1)
65 close(c2)
66 })
67
68 Describe("Kind", func() {
69 Context("for a Deployment resource", func() {
70 obj = &appsv1.Deployment{}
71
72 It("should provide Deployment Events", func() {
73 var created, updated, deleted *appsv1.Deployment
74 var err error
75
76
77 client := clientset.AppsV1().Deployments(ns)
78 deployment := &appsv1.Deployment{
79 ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"},
80 Spec: appsv1.DeploymentSpec{
81 Selector: &metav1.LabelSelector{
82 MatchLabels: map[string]string{"foo": "bar"},
83 },
84 Template: corev1.PodTemplateSpec{
85 ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
86 Spec: corev1.PodSpec{
87 Containers: []corev1.Container{
88 {
89 Name: "nginx",
90 Image: "nginx",
91 },
92 },
93 },
94 },
95 },
96 }
97
98
99 newHandler := func(c chan interface{}) handler.Funcs {
100 return handler.Funcs{
101 CreateFunc: func(ctx context.Context, evt event.CreateEvent, rli workqueue.RateLimitingInterface) {
102 defer GinkgoRecover()
103 Expect(rli).To(Equal(q))
104 c <- evt
105 },
106 UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, rli workqueue.RateLimitingInterface) {
107 defer GinkgoRecover()
108 Expect(rli).To(Equal(q))
109 c <- evt
110 },
111 DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, rli workqueue.RateLimitingInterface) {
112 defer GinkgoRecover()
113 Expect(rli).To(Equal(q))
114 c <- evt
115 },
116 }
117 }
118 handler1 := newHandler(c1)
119 handler2 := newHandler(c2)
120
121
122 instance1 = source.Kind(icache, obj, handler1)
123 instance2 = source.Kind(icache, obj, handler2)
124 Expect(instance1.Start(ctx, q)).To(Succeed())
125 Expect(instance2.Start(ctx, q)).To(Succeed())
126
127 By("Creating a Deployment and expecting the CreateEvent.")
128 created, err = client.Create(ctx, deployment, metav1.CreateOptions{})
129 Expect(err).NotTo(HaveOccurred())
130 Expect(created).NotTo(BeNil())
131
132
133 evt := <-c1
134 createEvt, ok := evt.(event.CreateEvent)
135 Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.CreateEvent{}))
136 Expect(createEvt.Object).To(Equal(created))
137
138
139 evt = <-c2
140 createEvt, ok = evt.(event.CreateEvent)
141 Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.CreateEvent{}))
142 Expect(createEvt.Object).To(Equal(created))
143
144 By("Updating a Deployment and expecting the UpdateEvent.")
145 updated = created.DeepCopy()
146 updated.Labels = map[string]string{"biz": "buz"}
147 updated, err = client.Update(ctx, updated, metav1.UpdateOptions{})
148 Expect(err).NotTo(HaveOccurred())
149
150
151 evt = <-c1
152 updateEvt, ok := evt.(event.UpdateEvent)
153 Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.UpdateEvent{}))
154
155 Expect(updateEvt.ObjectNew).To(Equal(updated))
156
157 Expect(updateEvt.ObjectOld).To(Equal(created))
158
159
160 evt = <-c2
161 updateEvt, ok = evt.(event.UpdateEvent)
162 Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.UpdateEvent{}))
163
164 Expect(updateEvt.ObjectNew).To(Equal(updated))
165
166 Expect(updateEvt.ObjectOld).To(Equal(created))
167
168 By("Deleting a Deployment and expecting the Delete.")
169 deleted = updated.DeepCopy()
170 err = client.Delete(ctx, created.Name, metav1.DeleteOptions{})
171 Expect(err).NotTo(HaveOccurred())
172
173 deleted.SetResourceVersion("")
174 evt = <-c1
175 deleteEvt, ok := evt.(event.DeleteEvent)
176 Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.DeleteEvent{}))
177 deleteEvt.Object.SetResourceVersion("")
178 Expect(deleteEvt.Object).To(Equal(deleted))
179
180 evt = <-c2
181 deleteEvt, ok = evt.(event.DeleteEvent)
182 Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.DeleteEvent{}))
183 deleteEvt.Object.SetResourceVersion("")
184 Expect(deleteEvt.Object).To(Equal(deleted))
185 })
186 })
187
188
189 PContext("for a Foo CRD resource", func() {
190 It("should provide Foo Events", func() {
191
192 })
193 })
194 })
195
196 Describe("Informer", func() {
197 var c chan struct{}
198 var rs *appsv1.ReplicaSet
199 var depInformer toolscache.SharedIndexInformer
200 var informerFactory kubeinformers.SharedInformerFactory
201 var stopTest chan struct{}
202
203 BeforeEach(func() {
204 stopTest = make(chan struct{})
205 informerFactory = kubeinformers.NewSharedInformerFactory(clientset, time.Second*30)
206 depInformer = informerFactory.Apps().V1().ReplicaSets().Informer()
207 informerFactory.Start(stopTest)
208 Eventually(depInformer.HasSynced).Should(BeTrue())
209
210 c = make(chan struct{})
211 rs = &appsv1.ReplicaSet{
212 ObjectMeta: metav1.ObjectMeta{Name: "informer-rs-name"},
213 Spec: appsv1.ReplicaSetSpec{
214 Selector: &metav1.LabelSelector{
215 MatchLabels: map[string]string{"foo": "bar"},
216 },
217 Template: corev1.PodTemplateSpec{
218 ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
219 Spec: corev1.PodSpec{
220 Containers: []corev1.Container{
221 {
222 Name: "nginx",
223 Image: "nginx",
224 },
225 },
226 },
227 },
228 },
229 }
230 })
231
232 AfterEach(func() {
233 close(stopTest)
234 })
235
236 Context("for a ReplicaSet resource", func() {
237 It("should provide a ReplicaSet CreateEvent", func() {
238 c := make(chan struct{})
239
240 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
241 instance := &source.Informer{
242 Informer: depInformer,
243 Handler: handler.Funcs{
244 CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
245 defer GinkgoRecover()
246 var err error
247 rs, err := clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{})
248 Expect(err).NotTo(HaveOccurred())
249
250 Expect(q2).To(BeIdenticalTo(q))
251 Expect(evt.Object).To(Equal(rs))
252 close(c)
253 },
254 UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
255 defer GinkgoRecover()
256 Fail("Unexpected UpdateEvent")
257 },
258 DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
259 defer GinkgoRecover()
260 Fail("Unexpected DeleteEvent")
261 },
262 GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
263 defer GinkgoRecover()
264 Fail("Unexpected GenericEvent")
265 },
266 },
267 }
268 err := instance.Start(ctx, q)
269 Expect(err).NotTo(HaveOccurred())
270
271 _, err = clientset.AppsV1().ReplicaSets("default").Create(ctx, rs, metav1.CreateOptions{})
272 Expect(err).NotTo(HaveOccurred())
273 <-c
274 })
275
276 It("should provide a ReplicaSet UpdateEvent", func() {
277 var err error
278 rs, err = clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{})
279 Expect(err).NotTo(HaveOccurred())
280
281 rs2 := rs.DeepCopy()
282 rs2.SetLabels(map[string]string{"biz": "baz"})
283
284 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
285 instance := &source.Informer{
286 Informer: depInformer,
287 Handler: handler.Funcs{
288 CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
289 },
290 UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
291 defer GinkgoRecover()
292 var err error
293 rs2, err := clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{})
294 Expect(err).NotTo(HaveOccurred())
295
296 Expect(q2).To(Equal(q))
297 Expect(evt.ObjectOld).To(Equal(rs))
298
299 Expect(evt.ObjectNew).To(Equal(rs2))
300
301 close(c)
302 },
303 DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
304 defer GinkgoRecover()
305 Fail("Unexpected DeleteEvent")
306 },
307 GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
308 defer GinkgoRecover()
309 Fail("Unexpected GenericEvent")
310 },
311 },
312 }
313 err = instance.Start(ctx, q)
314 Expect(err).NotTo(HaveOccurred())
315
316 _, err = clientset.AppsV1().ReplicaSets("default").Update(ctx, rs2, metav1.UpdateOptions{})
317 Expect(err).NotTo(HaveOccurred())
318 <-c
319 })
320
321 It("should provide a ReplicaSet DeletedEvent", func() {
322 c := make(chan struct{})
323
324 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
325 instance := &source.Informer{
326 Informer: depInformer,
327 Handler: handler.Funcs{
328 CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
329 },
330 UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
331 },
332 DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) {
333 defer GinkgoRecover()
334 Expect(q2).To(Equal(q))
335 Expect(evt.Object.GetName()).To(Equal(rs.Name))
336 close(c)
337 },
338 GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
339 defer GinkgoRecover()
340 Fail("Unexpected GenericEvent")
341 },
342 },
343 }
344 err := instance.Start(ctx, q)
345 Expect(err).NotTo(HaveOccurred())
346
347 err = clientset.AppsV1().ReplicaSets("default").Delete(ctx, rs.Name, metav1.DeleteOptions{})
348 Expect(err).NotTo(HaveOccurred())
349 <-c
350 })
351 })
352 })
353 })
354
View as plain text