...

Source file src/sigs.k8s.io/controller-runtime/pkg/source/source_integration_test.go

Documentation: sigs.k8s.io/controller-runtime/pkg/source

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package source_test
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	"sigs.k8s.io/controller-runtime/pkg/client"
    25  	"sigs.k8s.io/controller-runtime/pkg/event"
    26  	"sigs.k8s.io/controller-runtime/pkg/handler"
    27  	"sigs.k8s.io/controller-runtime/pkg/source"
    28  
    29  	. "github.com/onsi/ginkgo/v2"
    30  	. "github.com/onsi/gomega"
    31  	appsv1 "k8s.io/api/apps/v1"
    32  	corev1 "k8s.io/api/core/v1"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	kubeinformers "k8s.io/client-go/informers"
    35  	toolscache "k8s.io/client-go/tools/cache"
    36  	"k8s.io/client-go/util/workqueue"
    37  )
    38  
    39  var _ = Describe("Source", func() {
    40  	var instance1, instance2 source.Source
    41  	var obj client.Object
    42  	var q workqueue.RateLimitingInterface
    43  	var c1, c2 chan interface{}
    44  	var ns string
    45  	count := 0
    46  
    47  	BeforeEach(func() {
    48  		// Create the namespace for the test
    49  		ns = fmt.Sprintf("controller-source-kindsource-%v", count)
    50  		count++
    51  		_, err := clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
    52  			ObjectMeta: metav1.ObjectMeta{Name: ns},
    53  		}, metav1.CreateOptions{})
    54  		Expect(err).NotTo(HaveOccurred())
    55  
    56  		q = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
    57  		c1 = make(chan interface{})
    58  		c2 = make(chan interface{})
    59  	})
    60  
    61  	AfterEach(func() {
    62  		err := clientset.CoreV1().Namespaces().Delete(ctx, ns, metav1.DeleteOptions{})
    63  		Expect(err).NotTo(HaveOccurred())
    64  		close(c1)
    65  		close(c2)
    66  	})
    67  
    68  	Describe("Kind", func() {
    69  		Context("for a Deployment resource", func() {
    70  			obj = &appsv1.Deployment{}
    71  
    72  			It("should provide Deployment Events", func() {
    73  				var created, updated, deleted *appsv1.Deployment
    74  				var err error
    75  
    76  				// Get the client and Deployment used to create events
    77  				client := clientset.AppsV1().Deployments(ns)
    78  				deployment := &appsv1.Deployment{
    79  					ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"},
    80  					Spec: appsv1.DeploymentSpec{
    81  						Selector: &metav1.LabelSelector{
    82  							MatchLabels: map[string]string{"foo": "bar"},
    83  						},
    84  						Template: corev1.PodTemplateSpec{
    85  							ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
    86  							Spec: corev1.PodSpec{
    87  								Containers: []corev1.Container{
    88  									{
    89  										Name:  "nginx",
    90  										Image: "nginx",
    91  									},
    92  								},
    93  							},
    94  						},
    95  					},
    96  				}
    97  
    98  				// Create an event handler to verify the events
    99  				newHandler := func(c chan interface{}) handler.Funcs {
   100  					return handler.Funcs{
   101  						CreateFunc: func(ctx context.Context, evt event.CreateEvent, rli workqueue.RateLimitingInterface) {
   102  							defer GinkgoRecover()
   103  							Expect(rli).To(Equal(q))
   104  							c <- evt
   105  						},
   106  						UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, rli workqueue.RateLimitingInterface) {
   107  							defer GinkgoRecover()
   108  							Expect(rli).To(Equal(q))
   109  							c <- evt
   110  						},
   111  						DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, rli workqueue.RateLimitingInterface) {
   112  							defer GinkgoRecover()
   113  							Expect(rli).To(Equal(q))
   114  							c <- evt
   115  						},
   116  					}
   117  				}
   118  				handler1 := newHandler(c1)
   119  				handler2 := newHandler(c2)
   120  
   121  				// Create 2 instances
   122  				instance1 = source.Kind(icache, obj, handler1)
   123  				instance2 = source.Kind(icache, obj, handler2)
   124  				Expect(instance1.Start(ctx, q)).To(Succeed())
   125  				Expect(instance2.Start(ctx, q)).To(Succeed())
   126  
   127  				By("Creating a Deployment and expecting the CreateEvent.")
   128  				created, err = client.Create(ctx, deployment, metav1.CreateOptions{})
   129  				Expect(err).NotTo(HaveOccurred())
   130  				Expect(created).NotTo(BeNil())
   131  
   132  				// Check first CreateEvent
   133  				evt := <-c1
   134  				createEvt, ok := evt.(event.CreateEvent)
   135  				Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.CreateEvent{}))
   136  				Expect(createEvt.Object).To(Equal(created))
   137  
   138  				// Check second CreateEvent
   139  				evt = <-c2
   140  				createEvt, ok = evt.(event.CreateEvent)
   141  				Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.CreateEvent{}))
   142  				Expect(createEvt.Object).To(Equal(created))
   143  
   144  				By("Updating a Deployment and expecting the UpdateEvent.")
   145  				updated = created.DeepCopy()
   146  				updated.Labels = map[string]string{"biz": "buz"}
   147  				updated, err = client.Update(ctx, updated, metav1.UpdateOptions{})
   148  				Expect(err).NotTo(HaveOccurred())
   149  
   150  				// Check first UpdateEvent
   151  				evt = <-c1
   152  				updateEvt, ok := evt.(event.UpdateEvent)
   153  				Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.UpdateEvent{}))
   154  
   155  				Expect(updateEvt.ObjectNew).To(Equal(updated))
   156  
   157  				Expect(updateEvt.ObjectOld).To(Equal(created))
   158  
   159  				// Check second UpdateEvent
   160  				evt = <-c2
   161  				updateEvt, ok = evt.(event.UpdateEvent)
   162  				Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.UpdateEvent{}))
   163  
   164  				Expect(updateEvt.ObjectNew).To(Equal(updated))
   165  
   166  				Expect(updateEvt.ObjectOld).To(Equal(created))
   167  
   168  				By("Deleting a Deployment and expecting the Delete.")
   169  				deleted = updated.DeepCopy()
   170  				err = client.Delete(ctx, created.Name, metav1.DeleteOptions{})
   171  				Expect(err).NotTo(HaveOccurred())
   172  
   173  				deleted.SetResourceVersion("")
   174  				evt = <-c1
   175  				deleteEvt, ok := evt.(event.DeleteEvent)
   176  				Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.DeleteEvent{}))
   177  				deleteEvt.Object.SetResourceVersion("")
   178  				Expect(deleteEvt.Object).To(Equal(deleted))
   179  
   180  				evt = <-c2
   181  				deleteEvt, ok = evt.(event.DeleteEvent)
   182  				Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.DeleteEvent{}))
   183  				deleteEvt.Object.SetResourceVersion("")
   184  				Expect(deleteEvt.Object).To(Equal(deleted))
   185  			})
   186  		})
   187  
   188  		// TODO(pwittrock): Write this test
   189  		PContext("for a Foo CRD resource", func() {
   190  			It("should provide Foo Events", func() {
   191  
   192  			})
   193  		})
   194  	})
   195  
   196  	Describe("Informer", func() {
   197  		var c chan struct{}
   198  		var rs *appsv1.ReplicaSet
   199  		var depInformer toolscache.SharedIndexInformer
   200  		var informerFactory kubeinformers.SharedInformerFactory
   201  		var stopTest chan struct{}
   202  
   203  		BeforeEach(func() {
   204  			stopTest = make(chan struct{})
   205  			informerFactory = kubeinformers.NewSharedInformerFactory(clientset, time.Second*30)
   206  			depInformer = informerFactory.Apps().V1().ReplicaSets().Informer()
   207  			informerFactory.Start(stopTest)
   208  			Eventually(depInformer.HasSynced).Should(BeTrue())
   209  
   210  			c = make(chan struct{})
   211  			rs = &appsv1.ReplicaSet{
   212  				ObjectMeta: metav1.ObjectMeta{Name: "informer-rs-name"},
   213  				Spec: appsv1.ReplicaSetSpec{
   214  					Selector: &metav1.LabelSelector{
   215  						MatchLabels: map[string]string{"foo": "bar"},
   216  					},
   217  					Template: corev1.PodTemplateSpec{
   218  						ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
   219  						Spec: corev1.PodSpec{
   220  							Containers: []corev1.Container{
   221  								{
   222  									Name:  "nginx",
   223  									Image: "nginx",
   224  								},
   225  							},
   226  						},
   227  					},
   228  				},
   229  			}
   230  		})
   231  
   232  		AfterEach(func() {
   233  			close(stopTest)
   234  		})
   235  
   236  		Context("for a ReplicaSet resource", func() {
   237  			It("should provide a ReplicaSet CreateEvent", func() {
   238  				c := make(chan struct{})
   239  
   240  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   241  				instance := &source.Informer{
   242  					Informer: depInformer,
   243  					Handler: handler.Funcs{
   244  						CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
   245  							defer GinkgoRecover()
   246  							var err error
   247  							rs, err := clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{})
   248  							Expect(err).NotTo(HaveOccurred())
   249  
   250  							Expect(q2).To(BeIdenticalTo(q))
   251  							Expect(evt.Object).To(Equal(rs))
   252  							close(c)
   253  						},
   254  						UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
   255  							defer GinkgoRecover()
   256  							Fail("Unexpected UpdateEvent")
   257  						},
   258  						DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
   259  							defer GinkgoRecover()
   260  							Fail("Unexpected DeleteEvent")
   261  						},
   262  						GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
   263  							defer GinkgoRecover()
   264  							Fail("Unexpected GenericEvent")
   265  						},
   266  					},
   267  				}
   268  				err := instance.Start(ctx, q)
   269  				Expect(err).NotTo(HaveOccurred())
   270  
   271  				_, err = clientset.AppsV1().ReplicaSets("default").Create(ctx, rs, metav1.CreateOptions{})
   272  				Expect(err).NotTo(HaveOccurred())
   273  				<-c
   274  			})
   275  
   276  			It("should provide a ReplicaSet UpdateEvent", func() {
   277  				var err error
   278  				rs, err = clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{})
   279  				Expect(err).NotTo(HaveOccurred())
   280  
   281  				rs2 := rs.DeepCopy()
   282  				rs2.SetLabels(map[string]string{"biz": "baz"})
   283  
   284  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   285  				instance := &source.Informer{
   286  					Informer: depInformer,
   287  					Handler: handler.Funcs{
   288  						CreateFunc: func(ctx context.Context, evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
   289  						},
   290  						UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) {
   291  							defer GinkgoRecover()
   292  							var err error
   293  							rs2, err := clientset.AppsV1().ReplicaSets("default").Get(ctx, rs.Name, metav1.GetOptions{})
   294  							Expect(err).NotTo(HaveOccurred())
   295  
   296  							Expect(q2).To(Equal(q))
   297  							Expect(evt.ObjectOld).To(Equal(rs))
   298  
   299  							Expect(evt.ObjectNew).To(Equal(rs2))
   300  
   301  							close(c)
   302  						},
   303  						DeleteFunc: func(context.Context, event.DeleteEvent, workqueue.RateLimitingInterface) {
   304  							defer GinkgoRecover()
   305  							Fail("Unexpected DeleteEvent")
   306  						},
   307  						GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
   308  							defer GinkgoRecover()
   309  							Fail("Unexpected GenericEvent")
   310  						},
   311  					},
   312  				}
   313  				err = instance.Start(ctx, q)
   314  				Expect(err).NotTo(HaveOccurred())
   315  
   316  				_, err = clientset.AppsV1().ReplicaSets("default").Update(ctx, rs2, metav1.UpdateOptions{})
   317  				Expect(err).NotTo(HaveOccurred())
   318  				<-c
   319  			})
   320  
   321  			It("should provide a ReplicaSet DeletedEvent", func() {
   322  				c := make(chan struct{})
   323  
   324  				q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
   325  				instance := &source.Informer{
   326  					Informer: depInformer,
   327  					Handler: handler.Funcs{
   328  						CreateFunc: func(context.Context, event.CreateEvent, workqueue.RateLimitingInterface) {
   329  						},
   330  						UpdateFunc: func(context.Context, event.UpdateEvent, workqueue.RateLimitingInterface) {
   331  						},
   332  						DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, q2 workqueue.RateLimitingInterface) {
   333  							defer GinkgoRecover()
   334  							Expect(q2).To(Equal(q))
   335  							Expect(evt.Object.GetName()).To(Equal(rs.Name))
   336  							close(c)
   337  						},
   338  						GenericFunc: func(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
   339  							defer GinkgoRecover()
   340  							Fail("Unexpected GenericEvent")
   341  						},
   342  					},
   343  				}
   344  				err := instance.Start(ctx, q)
   345  				Expect(err).NotTo(HaveOccurred())
   346  
   347  				err = clientset.AppsV1().ReplicaSets("default").Delete(ctx, rs.Name, metav1.DeleteOptions{})
   348  				Expect(err).NotTo(HaveOccurred())
   349  				<-c
   350  			})
   351  		})
   352  	})
   353  })
   354  

View as plain text