1
16
17 package apimachinery
18
19 import (
20 "context"
21 "fmt"
22 "math/rand"
23 "strconv"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 apiequality "k8s.io/apimachinery/pkg/api/equality"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/watch"
31 cachetools "k8s.io/client-go/tools/cache"
32 watchtools "k8s.io/client-go/tools/watch"
33 "k8s.io/kubernetes/test/e2e/framework"
34 admissionapi "k8s.io/pod-security-admission/api"
35
36 "github.com/onsi/ginkgo/v2"
37 )
38
39 const (
40 watchConfigMapLabelKey = "watch-this-configmap"
41
42 multipleWatchersLabelValueA = "multiple-watchers-A"
43 multipleWatchersLabelValueB = "multiple-watchers-B"
44 fromResourceVersionLabelValue = "from-resource-version"
45 watchRestartedLabelValue = "watch-closed-and-restarted"
46 toBeChangedLabelValue = "label-changed-and-restored"
47 )
48
49 var _ = SIGDescribe("Watchers", func() {
50 f := framework.NewDefaultFramework("watch")
51 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
52
53
60 framework.ConformanceIt("should observe add, update, and delete watch notifications on configmaps", func(ctx context.Context) {
61 c := f.ClientSet
62 ns := f.Namespace.Name
63
64 ginkgo.By("creating a watch on configmaps with label A")
65 watchA, err := watchConfigMaps(ctx, f, "", multipleWatchersLabelValueA)
66 framework.ExpectNoError(err, "failed to create a watch on configmaps with label: %s", multipleWatchersLabelValueA)
67
68 ginkgo.By("creating a watch on configmaps with label B")
69 watchB, err := watchConfigMaps(ctx, f, "", multipleWatchersLabelValueB)
70 framework.ExpectNoError(err, "failed to create a watch on configmaps with label: %s", multipleWatchersLabelValueB)
71
72 ginkgo.By("creating a watch on configmaps with label A or B")
73 watchAB, err := watchConfigMaps(ctx, f, "", multipleWatchersLabelValueA, multipleWatchersLabelValueB)
74 framework.ExpectNoError(err, "failed to create a watch on configmaps with label %s or %s", multipleWatchersLabelValueA, multipleWatchersLabelValueB)
75
76 testConfigMapA := &v1.ConfigMap{
77 ObjectMeta: metav1.ObjectMeta{
78 Name: "e2e-watch-test-configmap-a",
79 Labels: map[string]string{
80 watchConfigMapLabelKey: multipleWatchersLabelValueA,
81 },
82 },
83 }
84 testConfigMapB := &v1.ConfigMap{
85 ObjectMeta: metav1.ObjectMeta{
86 Name: "e2e-watch-test-configmap-b",
87 Labels: map[string]string{
88 watchConfigMapLabelKey: multipleWatchersLabelValueB,
89 },
90 },
91 }
92
93 ginkgo.By("creating a configmap with label A and ensuring the correct watchers observe the notification")
94 testConfigMapA, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMapA, metav1.CreateOptions{})
95 framework.ExpectNoError(err, "failed to create a configmap with label %s in namespace: %s", multipleWatchersLabelValueA, ns)
96 expectEvent(watchA, watch.Added, testConfigMapA)
97 expectEvent(watchAB, watch.Added, testConfigMapA)
98
99 ginkgo.By("modifying configmap A and ensuring the correct watchers observe the notification")
100 testConfigMapA, err = updateConfigMap(ctx, c, ns, testConfigMapA.GetName(), func(cm *v1.ConfigMap) {
101 setConfigMapData(cm, "mutation", "1")
102 })
103 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
104 expectEvent(watchA, watch.Modified, testConfigMapA)
105 expectEvent(watchAB, watch.Modified, testConfigMapA)
106
107 ginkgo.By("modifying configmap A again and ensuring the correct watchers observe the notification")
108 testConfigMapA, err = updateConfigMap(ctx, c, ns, testConfigMapA.GetName(), func(cm *v1.ConfigMap) {
109 setConfigMapData(cm, "mutation", "2")
110 })
111 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
112 expectEvent(watchA, watch.Modified, testConfigMapA)
113 expectEvent(watchAB, watch.Modified, testConfigMapA)
114
115 ginkgo.By("deleting configmap A and ensuring the correct watchers observe the notification")
116 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMapA.GetName(), metav1.DeleteOptions{})
117 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", testConfigMapA.GetName(), ns)
118 expectEvent(watchA, watch.Deleted, nil)
119 expectEvent(watchAB, watch.Deleted, nil)
120
121 ginkgo.By("creating a configmap with label B and ensuring the correct watchers observe the notification")
122 testConfigMapB, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMapB, metav1.CreateOptions{})
123 framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", testConfigMapB, ns)
124 expectEvent(watchB, watch.Added, testConfigMapB)
125 expectEvent(watchAB, watch.Added, testConfigMapB)
126 expectNoEvent(watchA, watch.Added, testConfigMapB)
127
128 ginkgo.By("deleting configmap B and ensuring the correct watchers observe the notification")
129 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMapB.GetName(), metav1.DeleteOptions{})
130 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", testConfigMapB.GetName(), ns)
131 expectEvent(watchB, watch.Deleted, nil)
132 expectEvent(watchAB, watch.Deleted, nil)
133 expectNoEvent(watchA, watch.Deleted, nil)
134 })
135
136
142 framework.ConformanceIt("should be able to start watching from a specific resource version", func(ctx context.Context) {
143 c := f.ClientSet
144 ns := f.Namespace.Name
145
146 testConfigMap := &v1.ConfigMap{
147 ObjectMeta: metav1.ObjectMeta{
148 Name: "e2e-watch-test-resource-version",
149 Labels: map[string]string{
150 watchConfigMapLabelKey: fromResourceVersionLabelValue,
151 },
152 },
153 }
154
155 ginkgo.By("creating a new configmap")
156 testConfigMap, err := c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMap, metav1.CreateOptions{})
157 framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", testConfigMap.GetName(), ns)
158
159 ginkgo.By("modifying the configmap once")
160 testConfigMapFirstUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
161 setConfigMapData(cm, "mutation", "1")
162 })
163 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", testConfigMap.GetName(), ns)
164
165 ginkgo.By("modifying the configmap a second time")
166 testConfigMapSecondUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
167 setConfigMapData(cm, "mutation", "2")
168 })
169 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a second time", testConfigMap.GetName(), ns)
170
171 ginkgo.By("deleting the configmap")
172 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMap.GetName(), metav1.DeleteOptions{})
173 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", testConfigMap.GetName(), ns)
174
175 ginkgo.By("creating a watch on configmaps from the resource version returned by the first update")
176 testWatch, err := watchConfigMaps(ctx, f, testConfigMapFirstUpdate.ObjectMeta.ResourceVersion, fromResourceVersionLabelValue)
177 framework.ExpectNoError(err, "failed to create a watch on configmaps from the resource version %s returned by the first update", testConfigMapFirstUpdate.ObjectMeta.ResourceVersion)
178
179 ginkgo.By("Expecting to observe notifications for all changes to the configmap after the first update")
180 expectEvent(testWatch, watch.Modified, testConfigMapSecondUpdate)
181 expectEvent(testWatch, watch.Deleted, nil)
182 })
183
184
191 framework.ConformanceIt("should be able to restart watching from the last resource version observed by the previous watch", func(ctx context.Context) {
192 c := f.ClientSet
193 ns := f.Namespace.Name
194
195 configMapName := "e2e-watch-test-watch-closed"
196 testConfigMap := &v1.ConfigMap{
197 ObjectMeta: metav1.ObjectMeta{
198 Name: configMapName,
199 Labels: map[string]string{
200 watchConfigMapLabelKey: watchRestartedLabelValue,
201 },
202 },
203 }
204
205 ginkgo.By("creating a watch on configmaps")
206 testWatchBroken, err := watchConfigMaps(ctx, f, "", watchRestartedLabelValue)
207 framework.ExpectNoError(err, "failed to create a watch on configmap with label: %s", watchRestartedLabelValue)
208
209 ginkgo.By("creating a new configmap")
210 testConfigMap, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMap, metav1.CreateOptions{})
211 framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", configMapName, ns)
212
213 ginkgo.By("modifying the configmap once")
214 _, err = updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
215 setConfigMapData(cm, "mutation", "1")
216 })
217 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", configMapName, ns)
218
219 ginkgo.By("closing the watch once it receives two notifications")
220 expectEvent(testWatchBroken, watch.Added, testConfigMap)
221 lastEvent, ok := waitForEvent(testWatchBroken, watch.Modified, nil, 1*time.Minute)
222 if !ok {
223 framework.Failf("Timed out waiting for second watch notification")
224 }
225 testWatchBroken.Stop()
226
227 ginkgo.By("modifying the configmap a second time, while the watch is closed")
228 testConfigMapSecondUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
229 setConfigMapData(cm, "mutation", "2")
230 })
231 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a second time", configMapName, ns)
232
233 ginkgo.By("creating a new watch on configmaps from the last resource version observed by the first watch")
234 lastEventConfigMap, ok := lastEvent.Object.(*v1.ConfigMap)
235 if !ok {
236 framework.Failf("Expected last notification to refer to a configmap but got: %v", lastEvent)
237 }
238 testWatchRestarted, err := watchConfigMaps(ctx, f, lastEventConfigMap.ObjectMeta.ResourceVersion, watchRestartedLabelValue)
239 framework.ExpectNoError(err, "failed to create a new watch on configmaps from the last resource version %s observed by the first watch", lastEventConfigMap.ObjectMeta.ResourceVersion)
240
241 ginkgo.By("deleting the configmap")
242 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMap.GetName(), metav1.DeleteOptions{})
243 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", configMapName, ns)
244
245 ginkgo.By("Expecting to observe notifications for all changes to the configmap since the first watch closed")
246 expectEvent(testWatchRestarted, watch.Modified, testConfigMapSecondUpdate)
247 expectEvent(testWatchRestarted, watch.Deleted, nil)
248 })
249
250
257 framework.ConformanceIt("should observe an object deletion if it stops meeting the requirements of the selector", func(ctx context.Context) {
258 c := f.ClientSet
259 ns := f.Namespace.Name
260
261 configMapName := "e2e-watch-test-label-changed"
262 testConfigMap := &v1.ConfigMap{
263 ObjectMeta: metav1.ObjectMeta{
264 Name: configMapName,
265 Labels: map[string]string{
266 watchConfigMapLabelKey: toBeChangedLabelValue,
267 },
268 },
269 }
270
271 ginkgo.By("creating a watch on configmaps with a certain label")
272 testWatch, err := watchConfigMaps(ctx, f, "", toBeChangedLabelValue)
273 framework.ExpectNoError(err, "failed to create a watch on configmap with label: %s", toBeChangedLabelValue)
274
275 ginkgo.By("creating a new configmap")
276 testConfigMap, err = c.CoreV1().ConfigMaps(ns).Create(ctx, testConfigMap, metav1.CreateOptions{})
277 framework.ExpectNoError(err, "failed to create configmap %s in namespace: %s", configMapName, ns)
278
279 ginkgo.By("modifying the configmap once")
280 testConfigMapFirstUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
281 setConfigMapData(cm, "mutation", "1")
282 })
283 framework.ExpectNoError(err, "failed to update configmap %s in namespace: %s", configMapName, ns)
284
285 ginkgo.By("changing the label value of the configmap")
286 _, err = updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
287 cm.ObjectMeta.Labels[watchConfigMapLabelKey] = "wrong-value"
288 })
289 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s by changing label value", configMapName, ns)
290
291 ginkgo.By("Expecting to observe a delete notification for the watched object")
292 expectEvent(testWatch, watch.Added, testConfigMap)
293 expectEvent(testWatch, watch.Modified, testConfigMapFirstUpdate)
294 expectEvent(testWatch, watch.Deleted, nil)
295
296 ginkgo.By("modifying the configmap a second time")
297 testConfigMapSecondUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
298 setConfigMapData(cm, "mutation", "2")
299 })
300 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a second time", configMapName, ns)
301
302 ginkgo.By("Expecting not to observe a notification because the object no longer meets the selector's requirements")
303 expectNoEvent(testWatch, watch.Modified, testConfigMapSecondUpdate)
304
305 ginkgo.By("changing the label value of the configmap back")
306 testConfigMapLabelRestored, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
307 cm.ObjectMeta.Labels[watchConfigMapLabelKey] = toBeChangedLabelValue
308 })
309 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s by changing label value back", configMapName, ns)
310
311 ginkgo.By("modifying the configmap a third time")
312 testConfigMapThirdUpdate, err := updateConfigMap(ctx, c, ns, testConfigMap.GetName(), func(cm *v1.ConfigMap) {
313 setConfigMapData(cm, "mutation", "3")
314 })
315 framework.ExpectNoError(err, "failed to update configmap %s in namespace %s a third time", configMapName, ns)
316
317 ginkgo.By("deleting the configmap")
318 err = c.CoreV1().ConfigMaps(ns).Delete(ctx, testConfigMap.GetName(), metav1.DeleteOptions{})
319 framework.ExpectNoError(err, "failed to delete configmap %s in namespace: %s", configMapName, ns)
320
321 ginkgo.By("Expecting to observe an add notification for the watched object when the label value was restored")
322 expectEvent(testWatch, watch.Added, testConfigMapLabelRestored)
323 expectEvent(testWatch, watch.Modified, testConfigMapThirdUpdate)
324 expectEvent(testWatch, watch.Deleted, nil)
325 })
326
327
334 framework.ConformanceIt("should receive events on concurrent watches in same order", func(ctx context.Context) {
335 c := f.ClientSet
336 ns := f.Namespace.Name
337
338 iterations := 100
339
340 ginkgo.By("getting a starting resourceVersion")
341 configmaps, err := c.CoreV1().ConfigMaps(ns).List(ctx, metav1.ListOptions{})
342 framework.ExpectNoError(err, "Failed to list configmaps in the namespace %s", ns)
343 resourceVersion := configmaps.ResourceVersion
344
345 ginkgo.By("starting a background goroutine to produce watch events")
346 donec := make(chan struct{})
347 stopc := make(chan struct{})
348 go func() {
349 defer ginkgo.GinkgoRecover()
350 defer close(donec)
351 produceConfigMapEvents(ctx, f, stopc, 5*time.Millisecond)
352 }()
353
354 listWatcher := &cachetools.ListWatch{
355 WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
356 return c.CoreV1().ConfigMaps(ns).Watch(ctx, listOptions)
357 },
358 }
359
360 ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
361 wcs := []watch.Interface{}
362 for i := 0; i < iterations; i++ {
363 wc, err := watchtools.NewRetryWatcher(resourceVersion, listWatcher)
364 framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns)
365 wcs = append(wcs, wc)
366 resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
367 for _, wc := range wcs[1:] {
368 e := waitForNextConfigMapEvent(wc)
369 if resourceVersion != e.ResourceVersion {
370 framework.Failf("resource version mismatch, expected %s but got %s", resourceVersion, e.ResourceVersion)
371 }
372 }
373 }
374 close(stopc)
375 for _, wc := range wcs {
376 wc.Stop()
377 }
378 <-donec
379 })
380 })
381
382 func watchConfigMaps(ctx context.Context, f *framework.Framework, resourceVersion string, labels ...string) (watch.Interface, error) {
383 c := f.ClientSet
384 ns := f.Namespace.Name
385 opts := metav1.ListOptions{
386 ResourceVersion: resourceVersion,
387 LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{
388 MatchExpressions: []metav1.LabelSelectorRequirement{
389 {
390 Key: watchConfigMapLabelKey,
391 Operator: metav1.LabelSelectorOpIn,
392 Values: labels,
393 },
394 },
395 }),
396 }
397 return c.CoreV1().ConfigMaps(ns).Watch(ctx, opts)
398 }
399
400 func int64ptr(i int) *int64 {
401 i64 := int64(i)
402 return &i64
403 }
404
405 func setConfigMapData(cm *v1.ConfigMap, key, value string) {
406 if cm.Data == nil {
407 cm.Data = make(map[string]string)
408 }
409 cm.Data[key] = value
410 }
411
412 func expectEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
413 if event, ok := waitForEvent(w, eventType, object, 1*time.Minute); !ok {
414 framework.Failf("Timed out waiting for expected watch notification: %v", event)
415 }
416 }
417
418 func expectNoEvent(w watch.Interface, eventType watch.EventType, object runtime.Object) {
419 if event, ok := waitForEvent(w, eventType, object, 10*time.Second); ok {
420 framework.Failf("Unexpected watch notification observed: %v", event)
421 }
422 }
423
424 func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject runtime.Object, duration time.Duration) (watch.Event, bool) {
425 stopTimer := time.NewTimer(duration)
426 defer stopTimer.Stop()
427 for {
428 select {
429 case actual, ok := <-w.ResultChan():
430 if ok {
431 framework.Logf("Got : %v %v", actual.Type, actual.Object)
432 } else {
433 framework.Failf("Watch closed unexpectedly")
434 }
435 if expectType == actual.Type && (expectObject == nil || apiequality.Semantic.DeepEqual(expectObject, actual.Object)) {
436 return actual, true
437 }
438 case <-stopTimer.C:
439 expected := watch.Event{
440 Type: expectType,
441 Object: expectObject,
442 }
443 return expected, false
444 }
445 }
446 }
447
448 func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
449 select {
450 case event, ok := <-watch.ResultChan():
451 if !ok {
452 framework.Failf("Watch closed unexpectedly")
453 }
454 if configMap, ok := event.Object.(*v1.ConfigMap); ok {
455 return configMap
456 }
457 framework.Failf("expected config map, got %T", event.Object)
458 case <-time.After(10 * time.Second):
459 framework.Failf("timed out waiting for watch event")
460 }
461 return nil
462 }
463
464 const (
465 createEvent = iota
466 updateEvent
467 deleteEvent
468 )
469
470 func produceConfigMapEvents(ctx context.Context, f *framework.Framework, stopc <-chan struct{}, minWaitBetweenEvents time.Duration) {
471 c := f.ClientSet
472 ns := f.Namespace.Name
473
474 name := func(i int) string {
475 return fmt.Sprintf("cm-%d", i)
476 }
477
478 existing := []int{}
479 tc := time.NewTicker(minWaitBetweenEvents)
480 defer tc.Stop()
481 i := 0
482 updates := 0
483 for range tc.C {
484 op := rand.Intn(3)
485 if len(existing) == 0 {
486 op = createEvent
487 }
488
489 switch op {
490 case createEvent:
491 cm := &v1.ConfigMap{
492 ObjectMeta: metav1.ObjectMeta{
493 Name: name(i),
494 },
495 }
496 _, err := c.CoreV1().ConfigMaps(ns).Create(ctx, cm, metav1.CreateOptions{})
497 framework.ExpectNoError(err, "Failed to create configmap %s in namespace %s", cm.Name, ns)
498 existing = append(existing, i)
499 i++
500 case updateEvent:
501 idx := rand.Intn(len(existing))
502 cm := &v1.ConfigMap{
503 ObjectMeta: metav1.ObjectMeta{
504 Name: name(existing[idx]),
505 Labels: map[string]string{
506 "mutated": strconv.Itoa(updates),
507 },
508 },
509 }
510 _, err := c.CoreV1().ConfigMaps(ns).Update(ctx, cm, metav1.UpdateOptions{})
511 framework.ExpectNoError(err, "Failed to update configmap %s in namespace %s", cm.Name, ns)
512 updates++
513 case deleteEvent:
514 idx := rand.Intn(len(existing))
515 err := c.CoreV1().ConfigMaps(ns).Delete(ctx, name(existing[idx]), metav1.DeleteOptions{})
516 framework.ExpectNoError(err, "Failed to delete configmap %s in namespace %s", name(existing[idx]), ns)
517 existing = append(existing[:idx], existing[idx+1:]...)
518 default:
519 framework.Failf("Unsupported event operation: %d", op)
520 }
521 select {
522 case <-stopc:
523 return
524 default:
525 }
526 }
527 }
528
View as plain text