...

Source file src/sigs.k8s.io/controller-runtime/pkg/source/source_test.go

Documentation: sigs.k8s.io/controller-runtime/pkg/source

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package source_test
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	. "github.com/onsi/ginkgo/v2"
    25  	. "github.com/onsi/gomega"
    26  
    27  	"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
    28  	"sigs.k8s.io/controller-runtime/pkg/client"
    29  	"sigs.k8s.io/controller-runtime/pkg/event"
    30  	"sigs.k8s.io/controller-runtime/pkg/handler"
    31  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    32  	"sigs.k8s.io/controller-runtime/pkg/source"
    33  
    34  	corev1 "k8s.io/api/core/v1"
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  	"k8s.io/client-go/util/workqueue"
    37  )
    38  
    39  var _ = Describe("Source", func() {
    40  	Describe("Kind", func() {
    41  		var c chan struct{}
    42  		var p *corev1.Pod
    43  		var ic *informertest.FakeInformers
    44  
    45  		BeforeEach(func() {
    46  			ic = &informertest.FakeInformers{}
    47  			c = make(chan struct{})
    48  			p = &corev1.Pod{
    49  				Spec: corev1.PodSpec{
    50  					Containers: []corev1.Container{
    51  						{Name: "test", Image: "test"},
    52  					},
    53  				},
    54  			}
    55  		})
    56  
    57  		Context("for a Pod resource", func() {
    58  			It("should provide a Pod CreateEvent", func() {
    59  				c := make(chan struct{})
    60  				p := &corev1.Pod{
    61  					Spec: corev1.PodSpec{
    62  						Containers: []corev1.Container{
    63  							{Name: "test", Image: "test"},
    64  						},
    65  					},
    66  				}
    67  
    68  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
    69  				instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{
    70  					CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) {
    71  						defer GinkgoRecover()
    72  						Expect(q2).To(Equal(q))
    73  						Expect(evt.Object).To(Equal(p))
    74  						close(c)
    75  					},
    76  					UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
    77  						defer GinkgoRecover()
    78  						Fail("Unexpected UpdateEvent")
    79  					},
    80  					DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
    81  						defer GinkgoRecover()
    82  						Fail("Unexpected DeleteEvent")
    83  					},
    84  					GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
    85  						defer GinkgoRecover()
    86  						Fail("Unexpected GenericEvent")
    87  					},
    88  				})
    89  				err := instance.Start(ctx, q)
    90  				Expect(err).NotTo(HaveOccurred())
    91  				Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
    92  
    93  				i, err := ic.FakeInformerFor(ctx, &corev1.Pod{})
    94  				Expect(err).NotTo(HaveOccurred())
    95  
    96  				i.Add(p)
    97  				<-c
    98  			})
    99  
   100  			It("should provide a Pod UpdateEvent", func() {
   101  				p2 := p.DeepCopy()
   102  				p2.SetLabels(map[string]string{"biz": "baz"})
   103  
   104  				ic := &informertest.FakeInformers{}
   105  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   106  				instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{
   107  					CreateFunc: func(ctx context.Context, evt event.TypedCreateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) {
   108  						defer GinkgoRecover()
   109  						Fail("Unexpected CreateEvent")
   110  					},
   111  					UpdateFunc: func(ctx context.Context, evt event.TypedUpdateEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) {
   112  						defer GinkgoRecover()
   113  						Expect(q2).To(BeIdenticalTo(q))
   114  						Expect(evt.ObjectOld).To(Equal(p))
   115  
   116  						Expect(evt.ObjectNew).To(Equal(p2))
   117  
   118  						close(c)
   119  					},
   120  					DeleteFunc: func(context.Context, event.TypedDeleteEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
   121  						defer GinkgoRecover()
   122  						Fail("Unexpected DeleteEvent")
   123  					},
   124  					GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
   125  						defer GinkgoRecover()
   126  						Fail("Unexpected GenericEvent")
   127  					},
   128  				})
   129  				err := instance.Start(ctx, q)
   130  				Expect(err).NotTo(HaveOccurred())
   131  				Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
   132  
   133  				i, err := ic.FakeInformerFor(ctx, &corev1.Pod{})
   134  				Expect(err).NotTo(HaveOccurred())
   135  
   136  				i.Update(p, p2)
   137  				<-c
   138  			})
   139  
   140  			It("should provide a Pod DeletedEvent", func() {
   141  				c := make(chan struct{})
   142  				p := &corev1.Pod{
   143  					Spec: corev1.PodSpec{
   144  						Containers: []corev1.Container{
   145  							{Name: "test", Image: "test"},
   146  						},
   147  					},
   148  				}
   149  
   150  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   151  				instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{
   152  					CreateFunc: func(context.Context, event.TypedCreateEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
   153  						defer GinkgoRecover()
   154  						Fail("Unexpected DeleteEvent")
   155  					},
   156  					UpdateFunc: func(context.Context, event.TypedUpdateEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
   157  						defer GinkgoRecover()
   158  						Fail("Unexpected UpdateEvent")
   159  					},
   160  					DeleteFunc: func(ctx context.Context, evt event.TypedDeleteEvent[*corev1.Pod], q2 workqueue.RateLimitingInterface) {
   161  						defer GinkgoRecover()
   162  						Expect(q2).To(BeIdenticalTo(q))
   163  						Expect(evt.Object).To(Equal(p))
   164  						close(c)
   165  					},
   166  					GenericFunc: func(context.Context, event.TypedGenericEvent[*corev1.Pod], workqueue.RateLimitingInterface) {
   167  						defer GinkgoRecover()
   168  						Fail("Unexpected GenericEvent")
   169  					},
   170  				})
   171  				err := instance.Start(ctx, q)
   172  				Expect(err).NotTo(HaveOccurred())
   173  				Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())
   174  
   175  				i, err := ic.FakeInformerFor(ctx, &corev1.Pod{})
   176  				Expect(err).NotTo(HaveOccurred())
   177  
   178  				i.Delete(p)
   179  				<-c
   180  			})
   181  		})
   182  
   183  		It("should return an error from Start cache was not provided", func() {
   184  			instance := source.Kind(nil, &corev1.Pod{}, nil)
   185  			err := instance.Start(ctx, nil)
   186  			Expect(err).To(HaveOccurred())
   187  			Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil cache"))
   188  		})
   189  
   190  		It("should return an error from Start if a type was not provided", func() {
   191  			instance := source.Kind[client.Object](ic, nil, nil)
   192  			err := instance.Start(ctx, nil)
   193  			Expect(err).To(HaveOccurred())
   194  			Expect(err.Error()).To(ContainSubstring("must create Kind with a non-nil object"))
   195  		})
   196  		It("should return an error from Start if a handler was not provided", func() {
   197  			instance := source.Kind(ic, &corev1.Pod{}, nil)
   198  			err := instance.Start(ctx, nil)
   199  			Expect(err).To(HaveOccurred())
   200  			Expect(err.Error()).To(ContainSubstring("must create Kind with non-nil handler"))
   201  		})
   202  
   203  		It("should return an error if syncing fails", func() {
   204  			f := false
   205  			instance := source.Kind[client.Object](&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.EnqueueRequestForObject{})
   206  			Expect(instance.Start(context.Background(), nil)).NotTo(HaveOccurred())
   207  			err := instance.WaitForSync(context.Background())
   208  			Expect(err).To(HaveOccurred())
   209  			Expect(err.Error()).To(Equal("cache did not sync"))
   210  
   211  		})
   212  
   213  		Context("for a Kind not in the cache", func() {
   214  			It("should return an error when WaitForSync is called", func() {
   215  				ic.Error = fmt.Errorf("test error")
   216  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   217  
   218  				ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
   219  				defer cancel()
   220  
   221  				instance := source.Kind(ic, &corev1.Pod{}, handler.TypedFuncs[*corev1.Pod]{})
   222  				err := instance.Start(ctx, q)
   223  				Expect(err).NotTo(HaveOccurred())
   224  				Eventually(instance.WaitForSync).WithArguments(context.Background()).Should(HaveOccurred())
   225  			})
   226  		})
   227  
   228  		It("should return an error if syncing fails", func() {
   229  			f := false
   230  			instance := source.Kind[client.Object](&informertest.FakeInformers{Synced: &f}, &corev1.Pod{}, &handler.EnqueueRequestForObject{})
   231  			Expect(instance.Start(context.Background(), nil)).NotTo(HaveOccurred())
   232  			err := instance.WaitForSync(context.Background())
   233  			Expect(err).To(HaveOccurred())
   234  			Expect(err.Error()).To(Equal("cache did not sync"))
   235  
   236  		})
   237  	})
   238  
   239  	Describe("Func", func() {
   240  		It("should be called from Start", func() {
   241  			run := false
   242  			instance := source.Func(func(
   243  				context.Context,
   244  				workqueue.RateLimitingInterface) error {
   245  				run = true
   246  				return nil
   247  			})
   248  			Expect(instance.Start(ctx, nil)).NotTo(HaveOccurred())
   249  			Expect(run).To(BeTrue())
   250  
   251  			expected := fmt.Errorf("expected error: Func")
   252  			instance = source.Func(func(
   253  				context.Context,
   254  				workqueue.RateLimitingInterface) error {
   255  				return expected
   256  			})
   257  			Expect(instance.Start(ctx, nil)).To(Equal(expected))
   258  		})
   259  	})
   260  
   261  	Describe("Channel", func() {
   262  		var ctx context.Context
   263  		var cancel context.CancelFunc
   264  		var ch chan event.GenericEvent
   265  
   266  		BeforeEach(func() {
   267  			ctx, cancel = context.WithCancel(context.Background())
   268  			ch = make(chan event.GenericEvent)
   269  		})
   270  
   271  		AfterEach(func() {
   272  			cancel()
   273  			close(ch)
   274  		})
   275  
   276  		Context("for a source", func() {
   277  			It("should provide a GenericEvent", func() {
   278  				ch := make(chan event.GenericEvent)
   279  				c := make(chan struct{})
   280  				p := &corev1.Pod{
   281  					ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "bar"},
   282  				}
   283  				evt := event.GenericEvent{
   284  					Object: p,
   285  				}
   286  				// Event that should be filtered out by predicates
   287  				invalidEvt := event.GenericEvent{}
   288  
   289  				// Predicate to filter out empty event
   290  				prct := predicate.Funcs{
   291  					GenericFunc: func(e event.GenericEvent) bool {
   292  						return e.Object != nil
   293  					},
   294  				}
   295  
   296  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   297  				instance := source.Channel(
   298  					ch,
   299  					handler.Funcs{
   300  						CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
   301  							defer GinkgoRecover()
   302  							Fail("Unexpected CreateEvent")
   303  						},
   304  						UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
   305  							defer GinkgoRecover()
   306  							Fail("Unexpected UpdateEvent")
   307  						},
   308  						DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
   309  							defer GinkgoRecover()
   310  							Fail("Unexpected DeleteEvent")
   311  						},
   312  						GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
   313  							defer GinkgoRecover()
   314  							// The empty event should have been filtered out by the predicates,
   315  							// and will not be passed to the handler.
   316  							Expect(q2).To(BeIdenticalTo(q))
   317  							Expect(evt.Object).To(Equal(p))
   318  							close(c)
   319  						},
   320  					},
   321  					source.WithPredicates(prct),
   322  				)
   323  				err := instance.Start(ctx, q)
   324  				Expect(err).NotTo(HaveOccurred())
   325  
   326  				ch <- invalidEvt
   327  				ch <- evt
   328  				<-c
   329  			})
   330  			It("should get pending events processed once channel unblocked", func() {
   331  				ch := make(chan event.GenericEvent)
   332  				unblock := make(chan struct{})
   333  				processed := make(chan struct{})
   334  				evt := event.GenericEvent{}
   335  				eventCount := 0
   336  
   337  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   338  				// Add a handler to get distribution blocked
   339  				instance := source.Channel(
   340  					ch,
   341  					handler.Funcs{
   342  						CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
   343  							defer GinkgoRecover()
   344  							Fail("Unexpected CreateEvent")
   345  						},
   346  						UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
   347  							defer GinkgoRecover()
   348  							Fail("Unexpected UpdateEvent")
   349  						},
   350  						DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
   351  							defer GinkgoRecover()
   352  							Fail("Unexpected DeleteEvent")
   353  						},
   354  						GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
   355  							defer GinkgoRecover()
   356  							// Block for the first time
   357  							if eventCount == 0 {
   358  								<-unblock
   359  							}
   360  							eventCount++
   361  
   362  							if eventCount == 3 {
   363  								close(processed)
   364  							}
   365  						},
   366  					},
   367  				)
   368  				err := instance.Start(ctx, q)
   369  				Expect(err).NotTo(HaveOccurred())
   370  
   371  				// Write 3 events into the source channel.
   372  				// The 1st should be passed into the generic func of the handler;
   373  				// The 2nd should be fetched out of the source channel, and waiting to write into dest channel;
   374  				// The 3rd should be pending in the source channel.
   375  				ch <- evt
   376  				ch <- evt
   377  				ch <- evt
   378  
   379  				// Validate none of the events have been processed.
   380  				Expect(eventCount).To(Equal(0))
   381  
   382  				close(unblock)
   383  
   384  				<-processed
   385  
   386  				// Validate all of the events have been processed.
   387  				Expect(eventCount).To(Equal(3))
   388  			})
   389  			It("should be able to cope with events in the channel before the source is started", func() {
   390  				ch := make(chan event.GenericEvent, 1)
   391  				processed := make(chan struct{})
   392  				evt := event.GenericEvent{}
   393  				ch <- evt
   394  
   395  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   396  				// Add a handler to get distribution blocked
   397  				instance := source.Channel(
   398  					ch,
   399  					handler.Funcs{
   400  						CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
   401  							defer GinkgoRecover()
   402  							Fail("Unexpected CreateEvent")
   403  						},
   404  						UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
   405  							defer GinkgoRecover()
   406  							Fail("Unexpected UpdateEvent")
   407  						},
   408  						DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
   409  							defer GinkgoRecover()
   410  							Fail("Unexpected DeleteEvent")
   411  						},
   412  						GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
   413  							defer GinkgoRecover()
   414  
   415  							close(processed)
   416  						},
   417  					},
   418  				)
   419  
   420  				err := instance.Start(ctx, q)
   421  				Expect(err).NotTo(HaveOccurred())
   422  
   423  				<-processed
   424  			})
   425  			It("should stop when the source channel is closed", func() {
   426  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   427  				// if we didn't stop, we'd start spamming the queue with empty
   428  				// messages as we "received" a zero-valued GenericEvent from
   429  				// the source channel
   430  
   431  				By("creating a channel with one element, then closing it")
   432  				ch := make(chan event.GenericEvent, 1)
   433  				evt := event.GenericEvent{}
   434  				ch <- evt
   435  				close(ch)
   436  
   437  				By("feeding that channel to a channel source")
   438  				processed := make(chan struct{})
   439  				defer close(processed)
   440  				src := source.Channel(
   441  					ch,
   442  					handler.Funcs{
   443  						CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
   444  							defer GinkgoRecover()
   445  							Fail("Unexpected CreateEvent")
   446  						},
   447  						UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
   448  							defer GinkgoRecover()
   449  							Fail("Unexpected UpdateEvent")
   450  						},
   451  						DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
   452  							defer GinkgoRecover()
   453  							Fail("Unexpected DeleteEvent")
   454  						},
   455  						GenericFunc: func(ctx context.Context, evt event.GenericEvent, q2 workqueue.RateLimitingInterface) {
   456  							defer GinkgoRecover()
   457  
   458  							processed <- struct{}{}
   459  						},
   460  					},
   461  				)
   462  
   463  				err := src.Start(ctx, q)
   464  				Expect(err).NotTo(HaveOccurred())
   465  
   466  				By("expecting to only get one event")
   467  				Eventually(processed).Should(Receive())
   468  				Consistently(processed).ShouldNot(Receive())
   469  			})
   470  			It("should get error if no source specified", func() {
   471  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   472  				instance := source.Channel[string](nil, nil /*no source specified*/)
   473  				err := instance.Start(ctx, q)
   474  				Expect(err).To(Equal(fmt.Errorf("must specify Channel.Source")))
   475  			})
   476  		})
   477  	})
   478  })
   479  

View as plain text