1
16
17 package source_test
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 . "github.com/onsi/ginkgo/v2"
25 . "github.com/onsi/gomega"
26
27 "sigs.k8s.io/controller-runtime/pkg/cache/informertest"
28 "sigs.k8s.io/controller-runtime/pkg/client"
29 "sigs.k8s.io/controller-runtime/pkg/event"
30 "sigs.k8s.io/controller-runtime/pkg/handler"
31 "sigs.k8s.io/controller-runtime/pkg/predicate"
32 "sigs.k8s.io/controller-runtime/pkg/source"
33
34 corev1 "k8s.io/api/core/v1"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/client-go/util/workqueue"
37 )
38
39 var _ = Describe("Source", func() {
40 Describe("Kind", func() {
41 var c chan struct{}
42 var p *corev1.Pod
43 var ic *informertest.FakeInformers
44
45 BeforeEach(func() {
46 ic = &informertest.FakeInformers{}
47 c = make(chan struct{})
48 p = &corev1.Pod{
49 Spec: corev1.PodSpec{
50 Containers: []corev1.Container{
51 {Name: "test", Image: "test"},
52 },
53 },
54 }
55 })
56
57 Context("for a Pod resource", func() {
58 It("should provide a Pod CreateEvent", func() {
59 c := make(chan struct{})
60 p := &corev1.Pod{
61 Spec: corev1.PodSpec{
62 Containers: []corev1.Container{
63 {Name: "test", Image: "test"},
64 },
65 },
66 }
67
68 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
69 instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{
70 CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) {
71 defer GinkgoRecover()
72 Expect(q2).To(Equal(q))
73 Expect(evt.Object).To(Equal(p))
74 close(c)
75 },
76 UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
77 defer GinkgoRecover()
78 Fail("Unexpected UpdateEvent")
79 },
80 DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
81 defer GinkgoRecover()
82 Fail("Unexpected DeleteEvent")
83 },
84 GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
85 defer GinkgoRecover()
86 Fail("Unexpected GenericEvent")
87 },
88 })
89 err := instance.Start(ctx, q)
90 Expect(err).NotTo(HaveOccurred())
91 Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
92
93 i, err := ic.FakeInformerFor(ctx, &corev1.Pod{})
94 Expect(err).NotTo(HaveOccurred())
95
96 i.Add(p)
97 <-c
98 })
99
100 It("should provide a Pod UpdateEvent", func() {
101 p2 := p.DeepCopy()
102 p2.SetLabels(map[string]string{"biz": "baz"})
103
104 ic := &informertest.FakeInformers{}
105 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
106 instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{
107 CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) {
108 defer GinkgoRecover()
109 Fail("Unexpected CreateEvent")
110 },
111 UpdateFunc: func(ctx context.Context, evt event.TypedUpdateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) {
112 defer GinkgoRecover()
113 Expect(q2).To(BeIdenticalTo(q))
114 Expect(evt.ObjectOld).To(Equal(p))
115
116 Expect(evt.ObjectNew).To(Equal(p2))
117
118 close(c)
119 },
120 DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
121 defer GinkgoRecover()
122 Fail("Unexpected DeleteEvent")
123 },
124 GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
125 defer GinkgoRecover()
126 Fail("Unexpected GenericEvent")
127 },
128 })
129 err := instance.Start(ctx, q)
130 Expect(err).NotTo(HaveOccurred())
131 Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
132
133 i, err := ic.FakeInformerFor(ctx, &corev1.Pod{})
134 Expect(err).NotTo(HaveOccurred())
135
136 i.Update(p, p2)
137 <-c
138 })
139
140 It("should provide a Pod DeletedEvent", func() {
141 c := make(chan struct{})
142 p := &corev1.Pod{
143 Spec: corev1.PodSpec{
144 Containers: []corev1.Container{
145 {Name: "test", Image: "test"},
146 },
147 },
148 }
149
150 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
151 instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{
152 CreateFunc: func(context.Context, event.TypedCreateEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
153 defer GinkgoRecover()
154 Fail("Unexpected DeleteEvent")
155 },
156 UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
157 defer GinkgoRecover()
158 Fail("Unexpected UpdateEvent")
159 },
160 DeleteFunc: func(ctx context.Context, evt event.TypedDeleteEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) {
161 defer GinkgoRecover()
162 Expect(q2).To(BeIdenticalTo(q))
163 Expect(evt.Object).To(Equal(p))
164 close(c)
165 },
166 GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
167 defer GinkgoRecover()
168 Fail("Unexpected GenericEvent")
169 },
170 })
171 err := instance.Start(ctx, q)
172 Expect(err).NotTo(HaveOccurred())
173 Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
174
175 i, err := ic.FakeInformerFor(ctx, &corev1.Pod{})
176 Expect(err).NotTo(HaveOccurred())
177
178 i.Delete(p)
179 <-c
180 })
181 })
182
183 It("should return an error from Start cache was not provided", func() {
184 instance := source.Kind(nil, &corev1.Pod{}, nil)
185 err := instance.Start(ctx, nil)
186 Expect(err).To(HaveOccurred())
187 Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil cache"))
188 })
189
190 It("should return an error from Start if a type was not provided", func() {
191 instance := source.Kind[client.Object](ic, nil, nil)
192 err := instance.Start(ctx, nil)
193 Expect(err).To(HaveOccurred())
194 Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil object"))
195 })
196 It("should return an error from Start if a handler was not provided", func() {
197 instance := source.Kind(ic, &corev1.Pod{}, nil)
198 err := instance.Start(ctx, nil)
199 Expect(err).To(HaveOccurred())
200 Expect(err.Error()).To(ContainSubstring("must create Kind with non-nil handler"))
201 })
202
203 It("should return an error if syncing fails", func() {
204 f := false
205 instance := source.Kind[client.Object](&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.EnqueueRequestForObject{})
206 Expect(instance.Start(context.Background(), nil)).NotTo(HaveOccurred())
207 err := instance.WaitForSync(context.Background())
208 Expect(err).To(HaveOccurred())
209 Expect(err.Error()).To(Equal("cache did not sync"))
210
211 })
212
213 Context("for a Kind not in the cache", func() {
214 It("should return an error when WaitForSync is called", func() {
215 ic.Error = fmt.Errorf("test error")
216 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
217
218 ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
219 defer cancel()
220
221 instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{})
222 err := instance.Start(ctx, q)
223 Expect(err).NotTo(HaveOccurred())
224 Eventually(instance.WaitForSync).WithArguments(context.Background()).Should(HaveOccurred())
225 })
226 })
227
228 It("should return an error if syncing fails", func() {
229 f := false
230 instance := source.Kind[client.Object](&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.EnqueueRequestForObject{})
231 Expect(instance.Start(context.Background(), nil)).NotTo(HaveOccurred())
232 err := instance.WaitForSync(context.Background())
233 Expect(err).To(HaveOccurred())
234 Expect(err.Error()).To(Equal("cache did not sync"))
235
236 })
237 })
238
239 Describe("Func", func() {
240 It("should be called from Start", func() {
241 run := false
242 instance := source.Func(func(
243 context.Context,
244 workqueue.RateLimitingInterface) error {
245 run = true
246 return nil
247 })
248 Expect(instance.Start(ctx, nil)).NotTo(HaveOccurred())
249 Expect(run).To(BeTrue())
250
251 expected := fmt.Errorf("expected error: Func")
252 instance = source.Func(func(
253 context.Context,
254 workqueue.RateLimitingInterface) error {
255 return expected
256 })
257 Expect(instance.Start(ctx, nil)).To(Equal(expected))
258 })
259 })
260
261 Describe("Channel", func() {
262 var ctx context.Context
263 var cancel context.CancelFunc
264 var ch chan event.GenericEvent
265
266 BeforeEach(func() {
267 ctx, cancel = context.WithCancel(context.Background())
268 ch = make(chan event.GenericEvent)
269 })
270
271 AfterEach(func() {
272 cancel()
273 close(ch)
274 })
275
276 Context("for a source", func() {
277 It("should provide a GenericEvent", func() {
278 ch := make(chan event.GenericEvent)
279 c := make(chan struct{})
280 p := &corev1.Pod{
281 ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
282 }
283 evt := event.GenericEvent{
284 Object: p,
285 }
286
287 invalidEvt := event.GenericEvent{}
288
289
290 prct := predicate.Funcs{
291 GenericFunc: func(e event.GenericEvent) bool {
292 return e.Object != nil
293 },
294 }
295
296 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
297 instance := source.Channel(
298 ch,
299 handler.Funcs{
300 CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
301 defer GinkgoRecover()
302 Fail("Unexpected CreateEvent")
303 },
304 UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
305 defer GinkgoRecover()
306 Fail("Unexpected UpdateEvent")
307 },
308 DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
309 defer GinkgoRecover()
310 Fail("Unexpected DeleteEvent")
311 },
312 GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
313 defer GinkgoRecover()
314
315
316 Expect(q2).To(BeIdenticalTo(q))
317 Expect(evt.Object).To(Equal(p))
318 close(c)
319 },
320 },
321 source.WithPredicates(prct),
322 )
323 err := instance.Start(ctx, q)
324 Expect(err).NotTo(HaveOccurred())
325
326 ch <- invalidEvt
327 ch <- evt
328 <-c
329 })
330 It("should get pending events processed once channel unblocked", func() {
331 ch := make(chan event.GenericEvent)
332 unblock := make(chan struct{})
333 processed := make(chan struct{})
334 evt := event.GenericEvent{}
335 eventCount := 0
336
337 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
338
339 instance := source.Channel(
340 ch,
341 handler.Funcs{
342 CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
343 defer GinkgoRecover()
344 Fail("Unexpected CreateEvent")
345 },
346 UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
347 defer GinkgoRecover()
348 Fail("Unexpected UpdateEvent")
349 },
350 DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
351 defer GinkgoRecover()
352 Fail("Unexpected DeleteEvent")
353 },
354 GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
355 defer GinkgoRecover()
356
357 if eventCount == 0 {
358 <-unblock
359 }
360 eventCount++
361
362 if eventCount == 3 {
363 close(processed)
364 }
365 },
366 },
367 )
368 err := instance.Start(ctx, q)
369 Expect(err).NotTo(HaveOccurred())
370
371
372
373
374
375 ch <- evt
376 ch <- evt
377 ch <- evt
378
379
380 Expect(eventCount).To(Equal(0))
381
382 close(unblock)
383
384 <-processed
385
386
387 Expect(eventCount).To(Equal(3))
388 })
389 It("should be able to cope with events in the channel before the source is started", func() {
390 ch := make(chan event.GenericEvent, 1)
391 processed := make(chan struct{})
392 evt := event.GenericEvent{}
393 ch <- evt
394
395 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
396
397 instance := source.Channel(
398 ch,
399 handler.Funcs{
400 CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
401 defer GinkgoRecover()
402 Fail("Unexpected CreateEvent")
403 },
404 UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
405 defer GinkgoRecover()
406 Fail("Unexpected UpdateEvent")
407 },
408 DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
409 defer GinkgoRecover()
410 Fail("Unexpected DeleteEvent")
411 },
412 GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
413 defer GinkgoRecover()
414
415 close(processed)
416 },
417 },
418 )
419
420 err := instance.Start(ctx, q)
421 Expect(err).NotTo(HaveOccurred())
422
423 <-processed
424 })
425 It("should stop when the source channel is closed", func() {
426 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
427
428
429
430
431 By("creating a channel with one element, then closing it")
432 ch := make(chan event.GenericEvent, 1)
433 evt := event.GenericEvent{}
434 ch <- evt
435 close(ch)
436
437 By("feeding that channel to a channel source")
438 processed := make(chan struct{})
439 defer close(processed)
440 src := source.Channel(
441 ch,
442 handler.Funcs{
443 CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
444 defer GinkgoRecover()
445 Fail("Unexpected CreateEvent")
446 },
447 UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
448 defer GinkgoRecover()
449 Fail("Unexpected UpdateEvent")
450 },
451 DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
452 defer GinkgoRecover()
453 Fail("Unexpected DeleteEvent")
454 },
455 GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
456 defer GinkgoRecover()
457
458 processed <- struct{}{}
459 },
460 },
461 )
462
463 err := src.Start(ctx, q)
464 Expect(err).NotTo(HaveOccurred())
465
466 By("expecting to only get one event")
467 Eventually(processed).Should(Receive())
468 Consistently(processed).ShouldNot(Receive())
469 })
470 It("should get error if no source specified", func() {
471 q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
472 instance := source.Channel[string](nil, nil )
473 err := instance.Start(ctx, q)
474 Expect(err).To(Equal(fmt.Errorf("must specify Channel.Source")))
475 })
476 })
477 })
478 })
479
View as plain text