17 package apimachinery
19 import (
20 "context"
21 "fmt"
22 "math/rand"
23 "strconv"
24 "time"
26 v1 "k8s.io/api/core/v1"
27 apiequality "k8s.io/apimachinery/pkg/api/equality"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/watch"
31 cachetools "k8s.io/client-go/tools/cache"
32 watchtools "k8s.io/client-go/tools/watch"
33 "k8s.io/kubernetes/test/e2e/framework"
34 admissionapi "k8s.io/pod-security-admission/api"
36 "github.com/onsi/ginkgo/v2"
37 )
39 const (
40 watchConfigMapLabelKey = "watch-this-configmap"
42 multipleWatchersLabelValueA = "multiple-watchers-A"
43 multipleWatchersLabelValueB = "multiple-watchers-B"
44 fromResourceVersionLabelValue = "from-resource-version"
45 watchRestartedLabelValue = "watch-closed-and-restarted"
46 toBeChangedLabelValue = "label-changed-and-restored"
47 )
49 var _ = SIGDescribe("Watchers", func() {
50 f := framework.NewDefaultFramework("watch")
51 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
60 framework.ConformanceIt("should observe add, update, and delete watch notifications on configmaps", func(ctx context.Context) {
61 c := f.ClientSet
62 ns := f.Namespace.Name
64 ginkgo.By("creating a watch on configmaps with label A")
65 watchA, err := watchConfigMaps(ctx, f, "", multipleWatchersLabelValueA)
66 framework.ExpectNoError(err, "failed to create a watch on configmaps with label: %s", multipleWatchersLabelValueA)
68 ginkgo.By("creating a watch on configmaps with label B")
69 watchB, err := watchConfigMaps(ctx, f, "", multipleWatchersLabelValueB)
70 framework.ExpectNoError(err, "failed to create a watch on configmaps with label: %s", multipleWatchersLabelValueB)
72 ginkgo.By("creating a watch on configmaps with label A or B")
73 watchAB, err := watchConfigMaps(ctx, f, "", multipleWatchersLabelValueA, multipleWatchersLabelValueB)
74 framework.ExpectNoError(err, "failed to create a watch on configmaps with label %s or %s", multipleWatchersLabelValueA, multipleWatchersLabelValueB)
76 testConfigMapA := &v1.ConfigMap{
77 ObjectMeta: metav1.ObjectMeta{
78 Name: "e2e-watch-test-configmap-a",
79 Labels: map[string]string{
80 watchConfigMapLabelKey: multipleWatchersLabelValueA,
81 },
82 },
83 }
84 testConfigMapB := &v1.ConfigMap{
85 ObjectMeta: metav1.ObjectMeta{
86 Name: "e2e-watch-test-configmap-b",
87 Labels: map[string]string{
88 watchConfigMapLabelKey: multipleWatchersLabelValueB,
89 },
90 },
91 }
93 ginkgo.By("creating a configmap with label A and ensuring the correct watchers observe the notification")
94 testConfigMapA, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMapA, metav1.CreateOptions{})
95 framework.ExpectNoError(err, "failed to create a configmap with label %s in namespace: %s", multipleWatchersLabelValueA, ns)
96 expectEvent(watchA, watch.Added, testConfigMapA)
97 expectEvent(watchAB, watch.Added, testConfigMapA)
99 ginkgo.By("modifying configmap A and ensuring the correct watchers observe the notification")
100 testConfigMapA, err = updateConfigMap(ctx, c, ns, testConfigMapA.GetName(), func(cm *v1.ConfigMap) {
101 setConfigMapData(cm, "mutation", "1")
102 })
103 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
104 expectEvent(watchA, watch.Modified, testConfigMapA)
105 expectEvent(watchAB, watch.Modified, testConfigMapA)
107 ginkgo.By("modifying configmap A again and ensuring the correct watchers observe the notification")
108 testConfigMapA, err = updateConfigMap(ctx, c, ns, testConfigMapA.GetName(), func(cm *v1.ConfigMap) {
109 setConfigMapData(cm, "mutation", "2")
110 })
111 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
112 expectEvent(watchA, watch.Modified, testConfigMapA)
113 expectEvent(watchAB, watch.Modified, testConfigMapA)
115 ginkgo.By("deleting configmap A and ensuring the correct watchers observe the notification")
116 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMapA.GetName(), metav1.DeleteOptions{})
117 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
118 expectEvent(watchA, watch.Deleted, nil)
119 expectEvent(watchAB, watch.Deleted, nil)
121 ginkgo.By("creating a configmap with label B and ensuring the correct watchers observe the notification")
122 testConfigMapB, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMapB, metav1.CreateOptions{})
123 framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", testConfigMapB, ns)
124 expectEvent(watchB, watch.Added, testConfigMapB)
125 expectEvent(watchAB, watch.Added, testConfigMapB)
126 expectNoEvent(watchA, watch.Added, testConfigMapB)
128 ginkgo.By("deleting configmap B and ensuring the correct watchers observe the notification")
129 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMapB.GetName(), metav1.DeleteOptions{})
130 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", testConfigMapB.GetName(), ns)
131 expectEvent(watchB, watch.Deleted, nil)
132 expectEvent(watchAB, watch.Deleted, nil)
133 expectNoEvent(watchA, watch.Deleted, nil)
134 })
142 framework.ConformanceIt("should be able to start watching from a specific resource version", func(ctx context.Context) {
143 c := f.ClientSet
144 ns := f.Namespace.Name
146 testConfigMap := &v1.ConfigMap{
147 ObjectMeta: metav1.ObjectMeta{
148 Name: "e2e-watch-test-resource-version",
149 Labels: map[string]string{
150 watchConfigMapLabelKey: fromResourceVersionLabelValue,
151 },
152 },
153 }
155 ginkgo.By("creating a new configmap")
156 testConfigMap, err := c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMap, metav1.CreateOptions{})
157 framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", testConfigMap.GetName(), ns)
159 ginkgo.By("modifying the configmap once")
160 testConfigMapFirstUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
161 setConfigMapData(cm, "mutation", "1")
162 })
163 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", testConfigMap.GetName(), ns)
165 ginkgo.By("modifying the configmap a second time")
166 testConfigMapSecondUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
167 setConfigMapData(cm, "mutation", "2")
168 })
169 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a second time", testConfigMap.GetName(), ns)
171 ginkgo.By("deleting the configmap")
172 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMap.GetName(), metav1.DeleteOptions{})
173 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", testConfigMap.GetName(), ns)
175 ginkgo.By("creating a watch on configmaps from the resource version returned by the first update")
176 testWatch, err := watchConfigMaps(ctx, f, testConfigMapFirstUpdate.ObjectMeta.ResourceVersion, fromResourceVersionLabelValue)
177 framework.ExpectNoError(err, "failed to create a watch on configmaps from the resource version %s returned by the first update", testConfigMapFirstUpdate.ObjectMeta.ResourceVersion)
179 ginkgo.By("Expecting to observe notifications for all changes to the configmap after the first update")
180 expectEvent(testWatch, watch.Modified, testConfigMapSecondUpdate)
181 expectEvent(testWatch, watch.Deleted, nil)
182 })
191 framework.ConformanceIt("should be able to restart watching from the last resource version observed by the previous watch", func(ctx context.Context) {
192 c := f.ClientSet
193 ns := f.Namespace.Name
195 configMapName := "e2e-watch-test-watch-closed"
196 testConfigMap := &v1.ConfigMap{
197 ObjectMeta: metav1.ObjectMeta{
198 Name: configMapName,
199 Labels: map[string]string{
200 watchConfigMapLabelKey: watchRestartedLabelValue,
201 },
202 },
203 }
205 ginkgo.By("creating a watch on configmaps")
206 testWatchBroken, err := watchConfigMaps(ctx, f, "", watchRestartedLabelValue)
207 framework.ExpectNoError(err, "failed to create a watch on configmap with label: %s", watchRestartedLabelValue)
209 ginkgo.By("creating a new configmap")
210 testConfigMap, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMap, metav1.CreateOptions{})
211 framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", configMapName, ns)
213 ginkgo.By("modifying the configmap once")
214 _, err = updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
215 setConfigMapData(cm, "mutation", "1")
216 })
217 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", configMapName, ns)
219 ginkgo.By("closing the watch once it receives two notifications")
220 expectEvent(testWatchBroken, watch.Added, testConfigMap)
221 lastEvent, ok := waitForEvent(testWatchBroken, watch.Modified, nil, 1*time.Minute)
222 if !ok {
223 framework.Failf("Timed out waiting for second watch notification")
224 }
225 testWatchBroken.Stop()
227 ginkgo.By("modifying the configmap a second time, while the watch is closed")
228 testConfigMapSecondUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
229 setConfigMapData(cm, "mutation", "2")
230 })
231 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a second time", configMapName, ns)
233 ginkgo.By("creating a new watch on configmaps from the last resource version observed by the first watch")
234 lastEventConfigMap, ok := lastEvent.Object.(*v1.ConfigMap)
235 if !ok {
236 framework.Failf("Expected last notification to refer to a configmap but got: %v", lastEvent)
237 }
238 testWatchRestarted, err := watchConfigMaps(ctx, f, lastEventConfigMap.ObjectMeta.ResourceVersion, watchRestartedLabelValue)
239 framework.ExpectNoError(err, "failed to create a new watch on configmaps from the last resource version %s observed by the first watch", lastEventConfigMap.ObjectMeta.ResourceVersion)
241 ginkgo.By("deleting the configmap")
242 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMap.GetName(), metav1.DeleteOptions{})
243 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", configMapName, ns)
245 ginkgo.By("Expecting to observe notifications for all changes to the configmap since the first watch closed")
246 expectEvent(testWatchRestarted, watch.Modified, testConfigMapSecondUpdate)
247 expectEvent(testWatchRestarted, watch.Deleted, nil)
248 })
257 framework.ConformanceIt("should observe an object deletion if it stops meeting the requirements of the selector", func(ctx context.Context) {
258 c := f.ClientSet
259 ns := f.Namespace.Name
261 configMapName := "e2e-watch-test-label-changed"
262 testConfigMap := &v1.ConfigMap{
263 ObjectMeta: metav1.ObjectMeta{
264 Name: configMapName,
265 Labels: map[string]string{
266 watchConfigMapLabelKey: toBeChangedLabelValue,
267 },
268 },
269 }
271 ginkgo.By("creating a watch on configmaps with a certain label")
272 testWatch, err := watchConfigMaps(ctx, f, "", toBeChangedLabelValue)
273 framework.ExpectNoError(err, "failed to create a watch on configmap with label: %s", toBeChangedLabelValue)
275 ginkgo.By("creating a new configmap")
276 testConfigMap, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMap, metav1.CreateOptions{})
277 framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", configMapName, ns)
279 ginkgo.By("modifying the configmap once")
280 testConfigMapFirstUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
281 setConfigMapData(cm, "mutation", "1")
282 })
283 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", configMapName, ns)
285 ginkgo.By("changing the label value of the configmap")
286 _, err = updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
287 cm.ObjectMeta.Labels[watchConfigMapLabelKey] = "wrong-value"
288 })
289 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s by changing label value", configMapName, ns)
291 ginkgo.By("Expecting to observe a delete notification for the watched object")
292 expectEvent(testWatch, watch.Added, testConfigMap)
293 expectEvent(testWatch, watch.Modified, testConfigMapFirstUpdate)
294 expectEvent(testWatch, watch.Deleted, nil)
296 ginkgo.By("modifying the configmap a second time")
297 testConfigMapSecondUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
298 setConfigMapData(cm, "mutation", "2")
299 })
300 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a second time", configMapName, ns)
302 ginkgo.By("Expecting not to observe a notification because the object no longer meets the selector's requirements")
303 expectNoEvent(testWatch, watch.Modified, testConfigMapSecondUpdate)
305 ginkgo.By("changing the label value of the configmap back")
306 testConfigMapLabelRestored, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
307 cm.ObjectMeta.Labels[watchConfigMapLabelKey] = toBeChangedLabelValue
308 })
309 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s by changing label value back", configMapName, ns)
311 ginkgo.By("modifying the configmap a third time")
312 testConfigMapThirdUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
313 setConfigMapData(cm, "mutation", "3")
314 })
315 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a third time", configMapName, ns)
317 ginkgo.By("deleting the configmap")
318 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMap.GetName(), metav1.DeleteOptions{})
319 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", configMapName, ns)
321 ginkgo.By("Expecting to observe an add notification for the watched object when the label value was restored")
322 expectEvent(testWatch, watch.Added, testConfigMapLabelRestored)
323 expectEvent(testWatch, watch.Modified, testConfigMapThirdUpdate)
324 expectEvent(testWatch, watch.Deleted, nil)
325 })
334 framework.ConformanceIt("should receive events on concurrent watches in same order", func(ctx context.Context) {
335 c := f.ClientSet
336 ns := f.Namespace.Name
338 iterations := 100
340 ginkgo.By("getting a starting resourceVersion")
341 configmaps, err := c.CoreV1().ConfigMaps(ns).List(ctx, metav1.ListOptions{})
342 framework.ExpectNoError(err, "Failed to list configmaps in the namespace %s", ns)
343 resourceVersion := configmaps.ResourceVersion
345 ginkgo.By("starting a background goroutine to produce watch events")
346 donec := make(chan struct{})
347 stopc := make(chan struct{})
348 go func() {
349 defer ginkgo.GinkgoRecover()
350 defer close(donec)
351 produceConfigMapEvents(ctx, f, stopc, 5*time.Millisecond)
352 }()
354 listWatcher := &cachetools.ListWatch{
355 WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
356 return c.CoreV1().ConfigMaps(ns).Watch(ctx, listOptions)
357 },
358 }
360 ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
361 wcs := []watch.Interface{}
362 for i := 0; i < iterations; i++ {
363 wc, err := watchtools.NewRetryWatcher(resourceVersion, listWatcher)
364 framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns)
365 wcs = append(wcs, wc)
366 resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
367 for _, wc := range wcs[1:] {
368 e := waitForNextConfigMapEvent(wc)
369 if resourceVersion != e.ResourceVersion {
370 framework.Failf("resource version mismatch, expected %s but got %s", resourceVersion, e.ResourceVersion)
371 }
372 }
373 }
374 close(stopc)
375 for _, wc := range wcs {
376 wc.Stop()
377 }
378 <-donec
379 })
380 })
382 func watchConfigMaps(ctx context.Context, f *framework.Framework, resourceVersion string, labels ...string) (watch.Interface, error) {
383 c := f.ClientSet
384 ns := f.Namespace.Name
385 opts := metav1.ListOptions{
386 ResourceVersion: resourceVersion,
387 LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{
388 MatchExpressions: []metav1.LabelSelectorRequirement{
389 {
390 Key: watchConfigMapLabelKey,
391 Operator: metav1.LabelSelectorOpIn,
392 Values: labels,
393 },
394 },
395 }),
396 }
397 return c.CoreV1().ConfigMaps(ns).Watch(ctx, opts)
398 }
400 func int64ptr(i int) *int64 {
401 i64 := int64(i)
402 return &i64
403 }
405 func setConfigMapData(cm *v1.ConfigMap, key, value string) {
406 if cm.Data == nil {
407 cm.Data = make(map[string]string)
408 }
409 cm.Data[key] = value
410 }
412 func expectEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
413 if event, ok := waitForEvent(w, eventType, object, 1*time.Minute); !ok {
414 framework.Failf("Timed out waiting for expected watch notification: %v", event)
415 }
416 }
418 func expectNoEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
419 if event, ok := waitForEvent(w, eventType, object, 10*time.Second); ok {
420 framework.Failf("Unexpected watch notification observed: %v", event)
421 }
422 }
424 func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject runtime.Object, duration time.Duration) (watch.Event, bool) {
425 stopTimer := time.NewTimer(duration)
426 defer stopTimer.Stop()
427 for {
428 select {
429 case actual, ok := <-w.ResultChan():
430 if ok {
431 framework.Logf("Got : %v %v", actual.Type, actual.Object)
432 } else {
433 framework.Failf("Watch closed unexpectedly")
434 }
435 if expectType == actual.Type && (expectObject == nil || apiequality.Semantic.DeepEqual(expectObject, actual.Object)) {
436 return actual, true
437 }
438 case <-stopTimer.C:
439 expected := watch.Event{
440 Type: expectType,
441 Object: expectObject,
442 }
443 return expected, false
444 }
445 }
446 }
448 func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
449 select {
450 case event, ok := <-watch.ResultChan():
451 if !ok {
452 framework.Failf("Watch closed unexpectedly")
453 }
454 if configMap, ok := event.Object.(*v1.ConfigMap); ok {
455 return configMap
456 }
457 framework.Failf("expected config map, got %T", event.Object)
458 case <-time.After(10 * time.Second):
459 framework.Failf("timed out waiting for watch event")
460 }
461 return nil
462 }
464 const (
465 createEvent = iota
466 updateEvent
467 deleteEvent
468 )
470 func produceConfigMapEvents(ctx context.Context, f *framework.Framework, stopc <-chan struct{}, minWaitBetweenEvents time.Duration) {
471 c := f.ClientSet
472 ns := f.Namespace.Name
474 name := func(i int) string {
475 return fmt.Sprintf("cm-%d", i)
476 }
478 existing := []int{}
479 tc := time.NewTicker(minWaitBetweenEvents)
480 defer tc.Stop()
481 i := 0
482 updates := 0
483 for range tc.C {
484 op := rand.Intn(3)
485 if len(existing) == 0 {
486 op = createEvent
487 }
489 switch op {
490 case createEvent:
491 cm := &v1.ConfigMap{
492 ObjectMeta: metav1.ObjectMeta{
493 Name: name(i),
494 },
495 }
496 _, err := c.CoreV1().ConfigMaps(ns).Create(ctx, cm, metav1.CreateOptions{})
497 framework.ExpectNoError(err, "Failed to create configmap %s in namespace %s", cm.Name, ns)
498 existing = append(existing, i)
499 i++
500 case updateEvent:
501 idx := rand.Intn(len(existing))
502 cm := &v1.ConfigMap{
503 ObjectMeta: metav1.ObjectMeta{
504 Name: name(existing[idx]),
505 Labels: map[string]string{
506 "mutated": strconv.Itoa(updates),
507 },
508 },
509 }
510 _, err := c.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{})
511 framework.ExpectNoError(err, "Failed to update configmap %s in namespace %s", cm.Name, ns)
512 updates++
513 case deleteEvent:
514 idx := rand.Intn(len(existing))
515 err := c.CoreV1().ConfigMaps(ns).Delete(ctx, name(existing[idx]), metav1.DeleteOptions{})
516 framework.ExpectNoError(err, "Failed to delete configmap %s in namespace %s", name(existing[idx]), ns)
517 existing = append(existing[:idx], existing[idx+1:]...)
518 default:
519 framework.Failf("Unsupported event operation: %d", op)
520 }
521 select {
522 case <-stopc:
523 return
524 default:
525 }
526 }
527 }
View as plain text