...

Source file src/k8s.io/kubernetes/test/e2e/apimachinery/watch.go

Documentation: k8s.io/kubernetes/test/e2e/apimachinery

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apimachinery
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math/rand"
    23  	"strconv"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/runtime"
    30  	"k8s.io/apimachinery/pkg/watch"
    31  	cachetools "k8s.io/client-go/tools/cache"
    32  	watchtools "k8s.io/client-go/tools/watch"
    33  	"k8s.io/kubernetes/test/e2e/framework"
    34  	admissionapi "k8s.io/pod-security-admission/api"
    35  
    36  	"github.com/onsi/ginkgo/v2"
    37  )
    38  
    39  const (
    40  	watchConfigMapLabelKey = "watch-this-configmap"
    41  
    42  	multipleWatchersLabelValueA   = "multiple-watchers-A"
    43  	multipleWatchersLabelValueB   = "multiple-watchers-B"
    44  	fromResourceVersionLabelValue = "from-resource-version"
    45  	watchRestartedLabelValue      = "watch-closed-and-restarted"
    46  	toBeChangedLabelValue         = "label-changed-and-restored"
    47  )
    48  
    49  var _ = SIGDescribe("Watchers", func() {
    50  	f := framework.NewDefaultFramework("watch")
    51  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    52  
    53  	/*
    54  		    Release: v1.11
    55  		    Testname: watch-configmaps-with-multiple-watchers
    56  		    Description: Ensure that multiple watchers are able to receive all add,
    57  			update, and delete notifications on configmaps that match a label selector and do
    58  			not receive notifications for configmaps which do not match that label selector.
    59  	*/
    60  	framework.ConformanceIt("should observe add, update, and delete watch notifications on configmaps", func(ctx context.Context) {
    61  		c := f.ClientSet
    62  		ns := f.Namespace.Name
    63  
    64  		ginkgo.By("creating a watch on configmaps with label A")
    65  		watchA, err := watchConfigMaps(ctx, f, "", multipleWatchersLabelValueA)
    66  		framework.ExpectNoError(err, "failed to create a watch on configmaps with label: %s", multipleWatchersLabelValueA)
    67  
    68  		ginkgo.By("creating a watch on configmaps with label B")
    69  		watchB, err := watchConfigMaps(ctx, f, "", multipleWatchersLabelValueB)
    70  		framework.ExpectNoError(err, "failed to create a watch on configmaps with label: %s", multipleWatchersLabelValueB)
    71  
    72  		ginkgo.By("creating a watch on configmaps with label A or B")
    73  		watchAB, err := watchConfigMaps(ctx, f, "", multipleWatchersLabelValueA, multipleWatchersLabelValueB)
    74  		framework.ExpectNoError(err, "failed to create a watch on configmaps with label %s or %s", multipleWatchersLabelValueA, multipleWatchersLabelValueB)
    75  
    76  		testConfigMapA := &v1.ConfigMap{
    77  			ObjectMeta: metav1.ObjectMeta{
    78  				Name: "e2e-watch-test-configmap-a",
    79  				Labels: map[string]string{
    80  					watchConfigMapLabelKey: multipleWatchersLabelValueA,
    81  				},
    82  			},
    83  		}
    84  		testConfigMapB := &v1.ConfigMap{
    85  			ObjectMeta: metav1.ObjectMeta{
    86  				Name: "e2e-watch-test-configmap-b",
    87  				Labels: map[string]string{
    88  					watchConfigMapLabelKey: multipleWatchersLabelValueB,
    89  				},
    90  			},
    91  		}
    92  
    93  		ginkgo.By("creating a configmap with label A and ensuring the correct watchers observe the notification")
    94  		testConfigMapA, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMapA, metav1.CreateOptions{})
    95  		framework.ExpectNoError(err, "failed to create a configmap with label %s in namespace: %s", multipleWatchersLabelValueA, ns)
    96  		expectEvent(watchA, watch.Added, testConfigMapA)
    97  		expectEvent(watchAB, watch.Added, testConfigMapA)
    98  
    99  		ginkgo.By("modifying configmap A and ensuring the correct watchers observe the notification")
   100  		testConfigMapA, err = updateConfigMap(ctx, c, ns, testConfigMapA.GetName(), func(cm *v1.ConfigMap) {
   101  			setConfigMapData(cm, "mutation", "1")
   102  		})
   103  		framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
   104  		expectEvent(watchA, watch.Modified, testConfigMapA)
   105  		expectEvent(watchAB, watch.Modified, testConfigMapA)
   106  
   107  		ginkgo.By("modifying configmap A again and ensuring the correct watchers observe the notification")
   108  		testConfigMapA, err = updateConfigMap(ctx, c, ns, testConfigMapA.GetName(), func(cm *v1.ConfigMap) {
   109  			setConfigMapData(cm, "mutation", "2")
   110  		})
   111  		framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
   112  		expectEvent(watchA, watch.Modified, testConfigMapA)
   113  		expectEvent(watchAB, watch.Modified, testConfigMapA)
   114  
   115  		ginkgo.By("deleting configmap A and ensuring the correct watchers observe the notification")
   116  		err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMapA.GetName(), metav1.DeleteOptions{})
   117  		framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
   118  		expectEvent(watchA, watch.Deleted, nil)
   119  		expectEvent(watchAB, watch.Deleted, nil)
   120  
   121  		ginkgo.By("creating a configmap with label B and ensuring the correct watchers observe the notification")
   122  		testConfigMapB, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMapB, metav1.CreateOptions{})
   123  		framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", testConfigMapB, ns)
   124  		expectEvent(watchB, watch.Added, testConfigMapB)
   125  		expectEvent(watchAB, watch.Added, testConfigMapB)
   126  		expectNoEvent(watchA, watch.Added, testConfigMapB)
   127  
   128  		ginkgo.By("deleting configmap B and ensuring the correct watchers observe the notification")
   129  		err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMapB.GetName(), metav1.DeleteOptions{})
   130  		framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", testConfigMapB.GetName(), ns)
   131  		expectEvent(watchB, watch.Deleted, nil)
   132  		expectEvent(watchAB, watch.Deleted, nil)
   133  		expectNoEvent(watchA, watch.Deleted, nil)
   134  	})
   135  
   136  	/*
   137  		    Release: v1.11
   138  		    Testname: watch-configmaps-from-resource-version
   139  		    Description: Ensure that a watch can be opened from a particular resource version
   140  			in the past and only notifications happening after that resource version are observed.
   141  	*/
   142  	framework.ConformanceIt("should be able to start watching from a specific resource version", func(ctx context.Context) {
   143  		c := f.ClientSet
   144  		ns := f.Namespace.Name
   145  
   146  		testConfigMap := &v1.ConfigMap{
   147  			ObjectMeta: metav1.ObjectMeta{
   148  				Name: "e2e-watch-test-resource-version",
   149  				Labels: map[string]string{
   150  					watchConfigMapLabelKey: fromResourceVersionLabelValue,
   151  				},
   152  			},
   153  		}
   154  
   155  		ginkgo.By("creating a new configmap")
   156  		testConfigMap, err := c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMap, metav1.CreateOptions{})
   157  		framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", testConfigMap.GetName(), ns)
   158  
   159  		ginkgo.By("modifying the configmap once")
   160  		testConfigMapFirstUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
   161  			setConfigMapData(cm, "mutation", "1")
   162  		})
   163  		framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", testConfigMap.GetName(), ns)
   164  
   165  		ginkgo.By("modifying the configmap a second time")
   166  		testConfigMapSecondUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
   167  			setConfigMapData(cm, "mutation", "2")
   168  		})
   169  		framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a second time", testConfigMap.GetName(), ns)
   170  
   171  		ginkgo.By("deleting the configmap")
   172  		err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMap.GetName(), metav1.DeleteOptions{})
   173  		framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", testConfigMap.GetName(), ns)
   174  
   175  		ginkgo.By("creating a watch on configmaps from the resource version returned by the first update")
   176  		testWatch, err := watchConfigMaps(ctx, f, testConfigMapFirstUpdate.ObjectMeta.ResourceVersion, fromResourceVersionLabelValue)
   177  		framework.ExpectNoError(err, "failed to create a watch on configmaps from the resource version %s returned by the first update", testConfigMapFirstUpdate.ObjectMeta.ResourceVersion)
   178  
   179  		ginkgo.By("Expecting to observe notifications for all changes to the configmap after the first update")
   180  		expectEvent(testWatch, watch.Modified, testConfigMapSecondUpdate)
   181  		expectEvent(testWatch, watch.Deleted, nil)
   182  	})
   183  
   184  	/*
   185  		    Release: v1.11
   186  		    Testname: watch-configmaps-closed-and-restarted
   187  		    Description: Ensure that a watch can be reopened from the last resource version
   188  			observed by the previous watch, and it will continue delivering notifications from
   189  			that point in time.
   190  	*/
   191  	framework.ConformanceIt("should be able to restart watching from the last resource version observed by the previous watch", func(ctx context.Context) {
   192  		c := f.ClientSet
   193  		ns := f.Namespace.Name
   194  
   195  		configMapName := "e2e-watch-test-watch-closed"
   196  		testConfigMap := &v1.ConfigMap{
   197  			ObjectMeta: metav1.ObjectMeta{
   198  				Name: configMapName,
   199  				Labels: map[string]string{
   200  					watchConfigMapLabelKey: watchRestartedLabelValue,
   201  				},
   202  			},
   203  		}
   204  
   205  		ginkgo.By("creating a watch on configmaps")
   206  		testWatchBroken, err := watchConfigMaps(ctx, f, "", watchRestartedLabelValue)
   207  		framework.ExpectNoError(err, "failed to create a watch on configmap with label: %s", watchRestartedLabelValue)
   208  
   209  		ginkgo.By("creating a new configmap")
   210  		testConfigMap, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMap, metav1.CreateOptions{})
   211  		framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", configMapName, ns)
   212  
   213  		ginkgo.By("modifying the configmap once")
   214  		_, err = updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
   215  			setConfigMapData(cm, "mutation", "1")
   216  		})
   217  		framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", configMapName, ns)
   218  
   219  		ginkgo.By("closing the watch once it receives two notifications")
   220  		expectEvent(testWatchBroken, watch.Added, testConfigMap)
   221  		lastEvent, ok := waitForEvent(testWatchBroken, watch.Modified, nil, 1*time.Minute)
   222  		if !ok {
   223  			framework.Failf("Timed out waiting for second watch notification")
   224  		}
   225  		testWatchBroken.Stop()
   226  
   227  		ginkgo.By("modifying the configmap a second time, while the watch is closed")
   228  		testConfigMapSecondUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
   229  			setConfigMapData(cm, "mutation", "2")
   230  		})
   231  		framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a second time", configMapName, ns)
   232  
   233  		ginkgo.By("creating a new watch on configmaps from the last resource version observed by the first watch")
   234  		lastEventConfigMap, ok := lastEvent.Object.(*v1.ConfigMap)
   235  		if !ok {
   236  			framework.Failf("Expected last notification to refer to a configmap but got: %v", lastEvent)
   237  		}
   238  		testWatchRestarted, err := watchConfigMaps(ctx, f, lastEventConfigMap.ObjectMeta.ResourceVersion, watchRestartedLabelValue)
   239  		framework.ExpectNoError(err, "failed to create a new watch on configmaps from the last resource version %s observed by the first watch", lastEventConfigMap.ObjectMeta.ResourceVersion)
   240  
   241  		ginkgo.By("deleting the configmap")
   242  		err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMap.GetName(), metav1.DeleteOptions{})
   243  		framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", configMapName, ns)
   244  
   245  		ginkgo.By("Expecting to observe notifications for all changes to the configmap since the first watch closed")
   246  		expectEvent(testWatchRestarted, watch.Modified, testConfigMapSecondUpdate)
   247  		expectEvent(testWatchRestarted, watch.Deleted, nil)
   248  	})
   249  
   250  	/*
   251  		    Release: v1.11
   252  		    Testname: watch-configmaps-label-changed
   253  		    Description: Ensure that a watched object stops meeting the requirements of
   254  			a watch's selector, the watch will observe a delete, and will not observe
   255  			notifications for that object until it meets the selector's requirements again.
   256  	*/
   257  	framework.ConformanceIt("should observe an object deletion if it stops meeting the requirements of the selector", func(ctx context.Context) {
   258  		c := f.ClientSet
   259  		ns := f.Namespace.Name
   260  
   261  		configMapName := "e2e-watch-test-label-changed"
   262  		testConfigMap := &v1.ConfigMap{
   263  			ObjectMeta: metav1.ObjectMeta{
   264  				Name: configMapName,
   265  				Labels: map[string]string{
   266  					watchConfigMapLabelKey: toBeChangedLabelValue,
   267  				},
   268  			},
   269  		}
   270  
   271  		ginkgo.By("creating a watch on configmaps with a certain label")
   272  		testWatch, err := watchConfigMaps(ctx, f, "", toBeChangedLabelValue)
   273  		framework.ExpectNoError(err, "failed to create a watch on configmap with label: %s", toBeChangedLabelValue)
   274  
   275  		ginkgo.By("creating a new configmap")
   276  		testConfigMap, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMap, metav1.CreateOptions{})
   277  		framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", configMapName, ns)
   278  
   279  		ginkgo.By("modifying the configmap once")
   280  		testConfigMapFirstUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
   281  			setConfigMapData(cm, "mutation", "1")
   282  		})
   283  		framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", configMapName, ns)
   284  
   285  		ginkgo.By("changing the label value of the configmap")
   286  		_, err = updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
   287  			cm.ObjectMeta.Labels[watchConfigMapLabelKey] = "wrong-value"
   288  		})
   289  		framework.ExpectNoError(err, "failed to update configmap %s in namespace %s by changing label value", configMapName, ns)
   290  
   291  		ginkgo.By("Expecting to observe a delete notification for the watched object")
   292  		expectEvent(testWatch, watch.Added, testConfigMap)
   293  		expectEvent(testWatch, watch.Modified, testConfigMapFirstUpdate)
   294  		expectEvent(testWatch, watch.Deleted, nil)
   295  
   296  		ginkgo.By("modifying the configmap a second time")
   297  		testConfigMapSecondUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
   298  			setConfigMapData(cm, "mutation", "2")
   299  		})
   300  		framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a second time", configMapName, ns)
   301  
   302  		ginkgo.By("Expecting not to observe a notification because the object no longer meets the selector's requirements")
   303  		expectNoEvent(testWatch, watch.Modified, testConfigMapSecondUpdate)
   304  
   305  		ginkgo.By("changing the label value of the configmap back")
   306  		testConfigMapLabelRestored, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
   307  			cm.ObjectMeta.Labels[watchConfigMapLabelKey] = toBeChangedLabelValue
   308  		})
   309  		framework.ExpectNoError(err, "failed to update configmap %s in namespace %s by changing label value back", configMapName, ns)
   310  
   311  		ginkgo.By("modifying the configmap a third time")
   312  		testConfigMapThirdUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
   313  			setConfigMapData(cm, "mutation", "3")
   314  		})
   315  		framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a third time", configMapName, ns)
   316  
   317  		ginkgo.By("deleting the configmap")
   318  		err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMap.GetName(), metav1.DeleteOptions{})
   319  		framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", configMapName, ns)
   320  
   321  		ginkgo.By("Expecting to observe an add notification for the watched object when the label value was restored")
   322  		expectEvent(testWatch, watch.Added, testConfigMapLabelRestored)
   323  		expectEvent(testWatch, watch.Modified, testConfigMapThirdUpdate)
   324  		expectEvent(testWatch, watch.Deleted, nil)
   325  	})
   326  
   327  	/*
   328  	   Release: v1.15
   329  	   Testname: watch-consistency
   330  	   Description: Ensure that concurrent watches are consistent with each other by initiating an additional watch
   331  	   for events received from the first watch, initiated at the resource version of the event, and checking that all
   332  	   resource versions of all events match. Events are produced from writes on a background goroutine.
   333  	*/
   334  	framework.ConformanceIt("should receive events on concurrent watches in same order", func(ctx context.Context) {
   335  		c := f.ClientSet
   336  		ns := f.Namespace.Name
   337  
   338  		iterations := 100
   339  
   340  		ginkgo.By("getting a starting resourceVersion")
   341  		configmaps, err := c.CoreV1().ConfigMaps(ns).List(ctx, metav1.ListOptions{})
   342  		framework.ExpectNoError(err, "Failed to list configmaps in the namespace %s", ns)
   343  		resourceVersion := configmaps.ResourceVersion
   344  
   345  		ginkgo.By("starting a background goroutine to produce watch events")
   346  		donec := make(chan struct{})
   347  		stopc := make(chan struct{})
   348  		go func() {
   349  			defer ginkgo.GinkgoRecover()
   350  			defer close(donec)
   351  			produceConfigMapEvents(ctx, f, stopc, 5*time.Millisecond)
   352  		}()
   353  
   354  		listWatcher := &cachetools.ListWatch{
   355  			WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
   356  				return c.CoreV1().ConfigMaps(ns).Watch(ctx, listOptions)
   357  			},
   358  		}
   359  
   360  		ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
   361  		wcs := []watch.Interface{}
   362  		for i := 0; i < iterations; i++ {
   363  			wc, err := watchtools.NewRetryWatcher(resourceVersion, listWatcher)
   364  			framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns)
   365  			wcs = append(wcs, wc)
   366  			resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
   367  			for _, wc := range wcs[1:] {
   368  				e := waitForNextConfigMapEvent(wc)
   369  				if resourceVersion != e.ResourceVersion {
   370  					framework.Failf("resource version mismatch, expected %s but got %s", resourceVersion, e.ResourceVersion)
   371  				}
   372  			}
   373  		}
   374  		close(stopc)
   375  		for _, wc := range wcs {
   376  			wc.Stop()
   377  		}
   378  		<-donec
   379  	})
   380  })
   381  
   382  func watchConfigMaps(ctx context.Context, f *framework.Framework, resourceVersion string, labels ...string) (watch.Interface, error) {
   383  	c := f.ClientSet
   384  	ns := f.Namespace.Name
   385  	opts := metav1.ListOptions{
   386  		ResourceVersion: resourceVersion,
   387  		LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{
   388  			MatchExpressions: []metav1.LabelSelectorRequirement{
   389  				{
   390  					Key:      watchConfigMapLabelKey,
   391  					Operator: metav1.LabelSelectorOpIn,
   392  					Values:   labels,
   393  				},
   394  			},
   395  		}),
   396  	}
   397  	return c.CoreV1().ConfigMaps(ns).Watch(ctx, opts)
   398  }
   399  
   400  func int64ptr(i int) *int64 {
   401  	i64 := int64(i)
   402  	return &i64
   403  }
   404  
   405  func setConfigMapData(cm *v1.ConfigMap, key, value string) {
   406  	if cm.Data == nil {
   407  		cm.Data = make(map[string]string)
   408  	}
   409  	cm.Data[key] = value
   410  }
   411  
   412  func expectEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
   413  	if event, ok := waitForEvent(w, eventType, object, 1*time.Minute); !ok {
   414  		framework.Failf("Timed out waiting for expected watch notification: %v", event)
   415  	}
   416  }
   417  
   418  func expectNoEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
   419  	if event, ok := waitForEvent(w, eventType, object, 10*time.Second); ok {
   420  		framework.Failf("Unexpected watch notification observed: %v", event)
   421  	}
   422  }
   423  
   424  func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject runtime.Object, duration time.Duration) (watch.Event, bool) {
   425  	stopTimer := time.NewTimer(duration)
   426  	defer stopTimer.Stop()
   427  	for {
   428  		select {
   429  		case actual, ok := <-w.ResultChan():
   430  			if ok {
   431  				framework.Logf("Got : %v %v", actual.Type, actual.Object)
   432  			} else {
   433  				framework.Failf("Watch closed unexpectedly")
   434  			}
   435  			if expectType == actual.Type && (expectObject == nil || apiequality.Semantic.DeepEqual(expectObject, actual.Object)) {
   436  				return actual, true
   437  			}
   438  		case <-stopTimer.C:
   439  			expected := watch.Event{
   440  				Type:   expectType,
   441  				Object: expectObject,
   442  			}
   443  			return expected, false
   444  		}
   445  	}
   446  }
   447  
   448  func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
   449  	select {
   450  	case event, ok := <-watch.ResultChan():
   451  		if !ok {
   452  			framework.Failf("Watch closed unexpectedly")
   453  		}
   454  		if configMap, ok := event.Object.(*v1.ConfigMap); ok {
   455  			return configMap
   456  		}
   457  		framework.Failf("expected config map, got %T", event.Object)
   458  	case <-time.After(10 * time.Second):
   459  		framework.Failf("timed out waiting for watch event")
   460  	}
   461  	return nil // should never happen
   462  }
   463  
   464  const (
   465  	createEvent = iota
   466  	updateEvent
   467  	deleteEvent
   468  )
   469  
   470  func produceConfigMapEvents(ctx context.Context, f *framework.Framework, stopc <-chan struct{}, minWaitBetweenEvents time.Duration) {
   471  	c := f.ClientSet
   472  	ns := f.Namespace.Name
   473  
   474  	name := func(i int) string {
   475  		return fmt.Sprintf("cm-%d", i)
   476  	}
   477  
   478  	existing := []int{}
   479  	tc := time.NewTicker(minWaitBetweenEvents)
   480  	defer tc.Stop()
   481  	i := 0
   482  	updates := 0
   483  	for range tc.C {
   484  		op := rand.Intn(3)
   485  		if len(existing) == 0 {
   486  			op = createEvent
   487  		}
   488  
   489  		switch op {
   490  		case createEvent:
   491  			cm := &v1.ConfigMap{
   492  				ObjectMeta: metav1.ObjectMeta{
   493  					Name: name(i),
   494  				},
   495  			}
   496  			_, err := c.CoreV1().ConfigMaps(ns).Create(ctx, cm, metav1.CreateOptions{})
   497  			framework.ExpectNoError(err, "Failed to create configmap %s in namespace %s", cm.Name, ns)
   498  			existing = append(existing, i)
   499  			i++
   500  		case updateEvent:
   501  			idx := rand.Intn(len(existing))
   502  			cm := &v1.ConfigMap{
   503  				ObjectMeta: metav1.ObjectMeta{
   504  					Name: name(existing[idx]),
   505  					Labels: map[string]string{
   506  						"mutated": strconv.Itoa(updates),
   507  					},
   508  				},
   509  			}
   510  			_, err := c.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{})
   511  			framework.ExpectNoError(err, "Failed to update configmap %s in namespace %s", cm.Name, ns)
   512  			updates++
   513  		case deleteEvent:
   514  			idx := rand.Intn(len(existing))
   515  			err := c.CoreV1().ConfigMaps(ns).Delete(ctx, name(existing[idx]), metav1.DeleteOptions{})
   516  			framework.ExpectNoError(err, "Failed to delete configmap %s in namespace %s", name(existing[idx]), ns)
   517  			existing = append(existing[:idx], existing[idx+1:]...)
   518  		default:
   519  			framework.Failf("Unsupported event operation: %d", op)
   520  		}
   521  		select {
   522  		case <-stopc:
   523  			return
   524  		default:
   525  		}
   526  	}
   527  }
   528  

View as plain text