1
16
17 package apps
18
19 import (
20 "bytes"
21 "context"
22 "encoding/json"
23 "fmt"
24 "math/rand"
25 "reflect"
26 "sort"
27 "strings"
28 "text/tabwriter"
29 "time"
30
31 "k8s.io/client-go/tools/cache"
32
33 "github.com/onsi/ginkgo/v2"
34 "github.com/onsi/gomega"
35 appsv1 "k8s.io/api/apps/v1"
36 v1 "k8s.io/api/core/v1"
37 apierrors "k8s.io/apimachinery/pkg/api/errors"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/labels"
40 "k8s.io/apimachinery/pkg/runtime"
41 "k8s.io/apimachinery/pkg/runtime/schema"
42 "k8s.io/apimachinery/pkg/selection"
43 "k8s.io/apimachinery/pkg/types"
44 "k8s.io/apimachinery/pkg/util/intstr"
45 "k8s.io/apimachinery/pkg/util/sets"
46 "k8s.io/apimachinery/pkg/util/wait"
47 watch "k8s.io/apimachinery/pkg/watch"
48 clientset "k8s.io/client-go/kubernetes"
49 "k8s.io/client-go/kubernetes/scheme"
50 watchtools "k8s.io/client-go/tools/watch"
51 "k8s.io/client-go/util/retry"
52 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
53 extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
54 "k8s.io/kubernetes/pkg/controller/daemon"
55 "k8s.io/kubernetes/test/e2e/framework"
56 e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
57 e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
58 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
59 e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
60 admissionapi "k8s.io/pod-security-admission/api"
61 )
62
63 const (
64
65
66 dsRetryPeriod = 1 * time.Second
67 dsRetryTimeout = 5 * time.Minute
68
69 daemonsetLabelPrefix = "daemonset-"
70 daemonsetNameLabel = daemonsetLabelPrefix + "name"
71 daemonsetColorLabel = daemonsetLabelPrefix + "color"
72 )
73
74
75
76 var NamespaceNodeSelectors = []string{"scheduler.alpha.kubernetes.io/node-selector"}
77
78 var nonTerminalPhaseSelector = func() labels.Selector {
79 var reqs []labels.Requirement
80 for _, phase := range []v1.PodPhase{v1.PodFailed, v1.PodSucceeded} {
81 req, _ := labels.NewRequirement("status.phase", selection.NotEquals, []string{string(phase)})
82 reqs = append(reqs, *req)
83 }
84 selector := labels.NewSelector()
85 return selector.Add(reqs...)
86 }()
87
88 type updateDSFunc func(*appsv1.DaemonSet)
89
90
91
92 func updateDaemonSetWithRetries(ctx context.Context, c clientset.Interface, namespace, name string, applyUpdate updateDSFunc) (ds *appsv1.DaemonSet, err error) {
93 daemonsets := c.AppsV1().DaemonSets(namespace)
94 var updateErr error
95 pollErr := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 1*time.Minute, true, func(ctx context.Context) (bool, error) {
96 if ds, err = daemonsets.Get(ctx, name, metav1.GetOptions{}); err != nil {
97 return false, err
98 }
99
100 applyUpdate(ds)
101 if ds, err = daemonsets.Update(ctx, ds, metav1.UpdateOptions{}); err == nil {
102 framework.Logf("Updating DaemonSet %s", name)
103 return true, nil
104 }
105 updateErr = err
106 return false, nil
107 })
108 if wait.Interrupted(pollErr) {
109 pollErr = fmt.Errorf("couldn't apply the provided updated to DaemonSet %q: %v", name, updateErr)
110 }
111 return ds, pollErr
112 }
113
114
115
116
117
118
119 var _ = SIGDescribe("Daemon set", framework.WithSerial(), func() {
120 var f *framework.Framework
121
122 ginkgo.AfterEach(func(ctx context.Context) {
123
124 daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(ctx, metav1.ListOptions{})
125 framework.ExpectNoError(err, "unable to dump DaemonSets")
126 if daemonsets != nil && len(daemonsets.Items) > 0 {
127 for _, ds := range daemonsets.Items {
128 ginkgo.By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name))
129 framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name))
130 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, &ds))
131 framework.ExpectNoError(err, "error waiting for daemon pod to be reaped")
132 }
133 }
134 if daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(ctx, metav1.ListOptions{}); err == nil {
135 framework.Logf("daemonset: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), daemonsets))
136 } else {
137 framework.Logf("unable to dump daemonsets: %v", err)
138 }
139 if pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{}); err == nil {
140 framework.Logf("pods: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), pods))
141 } else {
142 framework.Logf("unable to dump pods: %v", err)
143 }
144 err = clearDaemonSetNodeLabels(ctx, f.ClientSet)
145 framework.ExpectNoError(err)
146 })
147
148 f = framework.NewDefaultFramework("daemonsets")
149 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
150
151 image := WebserverImage
152 dsName := "daemon-set"
153
154 var ns string
155 var c clientset.Interface
156
157 ginkgo.BeforeEach(func(ctx context.Context) {
158 ns = f.Namespace.Name
159
160 c = f.ClientSet
161
162 updatedNS, err := patchNamespaceAnnotations(ctx, c, ns)
163 framework.ExpectNoError(err)
164
165 ns = updatedNS.Name
166
167 err = clearDaemonSetNodeLabels(ctx, c)
168 framework.ExpectNoError(err)
169 })
170
171
177 framework.ConformanceIt("should run and stop simple daemon", func(ctx context.Context) {
178 label := map[string]string{daemonsetNameLabel: dsName}
179
180 ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
181 ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSet(dsName, image, label), metav1.CreateOptions{})
182 framework.ExpectNoError(err)
183
184 ginkgo.By("Check that daemon pods launch on every node of the cluster.")
185 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
186 framework.ExpectNoError(err, "error waiting for daemon pod to start")
187 err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
188 framework.ExpectNoError(err)
189
190 ginkgo.By("Stop a daemon pod, check that the daemon pod is revived.")
191 podList := listDaemonPods(ctx, c, ns, label)
192 pod := podList.Items[0]
193 err = c.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{})
194 framework.ExpectNoError(err)
195 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
196 framework.ExpectNoError(err, "error waiting for daemon pod to revive")
197 })
198
199
205 framework.ConformanceIt("should run and stop complex daemon", func(ctx context.Context) {
206 complexLabel := map[string]string{daemonsetNameLabel: dsName}
207 nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
208 framework.Logf("Creating daemon %q with a node selector", dsName)
209 ds := newDaemonSet(dsName, image, complexLabel)
210 ds.Spec.Template.Spec.NodeSelector = nodeSelector
211 ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
212 framework.ExpectNoError(err)
213
214 ginkgo.By("Initially, daemon pods should not be running on any nodes.")
215 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds))
216 framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
217
218 ginkgo.By("Change node label to blue, check that daemon pod is launched.")
219 node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
220 framework.ExpectNoError(err)
221 newNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector)
222 framework.ExpectNoError(err, "error setting labels on node")
223 daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
224 gomega.Expect(daemonSetLabels).To(gomega.HaveLen(1))
225 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{newNode.Name}))
226 framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
227 err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
228 framework.ExpectNoError(err)
229
230 ginkgo.By("Update the node label to green, and wait for daemons to be unscheduled")
231 nodeSelector[daemonsetColorLabel] = "green"
232 greenNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector)
233 framework.ExpectNoError(err, "error removing labels on node")
234 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds))
235 framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
236
237 ginkgo.By("Update DaemonSet node selector to green, and change its update strategy to RollingUpdate")
238 patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"nodeSelector":{"%s":"%s"}}},"updateStrategy":{"type":"RollingUpdate"}}}`,
239 daemonsetColorLabel, greenNode.Labels[daemonsetColorLabel])
240 ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
241 framework.ExpectNoError(err, "error patching daemon set")
242 daemonSetLabels, _ = separateDaemonSetNodeLabels(greenNode.Labels)
243 gomega.Expect(daemonSetLabels).To(gomega.HaveLen(1))
244 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{greenNode.Name}))
245 framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
246 err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
247 framework.ExpectNoError(err)
248 })
249
250
251
252 ginkgo.It("should run and stop complex daemon with node affinity", func(ctx context.Context) {
253 complexLabel := map[string]string{daemonsetNameLabel: dsName}
254 nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
255 framework.Logf("Creating daemon %q with a node affinity", dsName)
256 ds := newDaemonSet(dsName, image, complexLabel)
257 ds.Spec.Template.Spec.Affinity = &v1.Affinity{
258 NodeAffinity: &v1.NodeAffinity{
259 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
260 NodeSelectorTerms: []v1.NodeSelectorTerm{
261 {
262 MatchExpressions: []v1.NodeSelectorRequirement{
263 {
264 Key: daemonsetColorLabel,
265 Operator: v1.NodeSelectorOpIn,
266 Values: []string{nodeSelector[daemonsetColorLabel]},
267 },
268 },
269 },
270 },
271 },
272 },
273 }
274 ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
275 framework.ExpectNoError(err)
276
277 ginkgo.By("Initially, daemon pods should not be running on any nodes.")
278 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds))
279 framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
280
281 ginkgo.By("Change node label to blue, check that daemon pod is launched.")
282 node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
283 framework.ExpectNoError(err)
284 newNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector)
285 framework.ExpectNoError(err, "error setting labels on node")
286 daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
287 gomega.Expect(daemonSetLabels).To(gomega.HaveLen(1))
288 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{newNode.Name}))
289 framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
290 err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
291 framework.ExpectNoError(err)
292
293 ginkgo.By("Remove the node label and wait for daemons to be unscheduled")
294 _, err = setDaemonSetNodeLabels(ctx, c, node.Name, map[string]string{})
295 framework.ExpectNoError(err, "error removing labels on node")
296 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds))
297 framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
298 })
299
300
305 framework.ConformanceIt("should retry creating failed daemon pods", func(ctx context.Context) {
306 label := map[string]string{daemonsetNameLabel: dsName}
307
308 ginkgo.By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName))
309 ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSet(dsName, image, label), metav1.CreateOptions{})
310 framework.ExpectNoError(err)
311
312 ginkgo.By("Check that daemon pods launch on every node of the cluster.")
313 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
314 framework.ExpectNoError(err, "error waiting for daemon pod to start")
315 err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
316 framework.ExpectNoError(err)
317
318 ginkgo.By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.")
319 podList := listDaemonPods(ctx, c, ns, label)
320 pod := podList.Items[0]
321 pod.ResourceVersion = ""
322 pod.Status.Phase = v1.PodFailed
323 _, err = c.CoreV1().Pods(ns).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
324 framework.ExpectNoError(err, "error failing a daemon pod")
325 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
326 framework.ExpectNoError(err, "error waiting for daemon pod to revive")
327
328 ginkgo.By("Wait for the failed daemon pod to be completely deleted.")
329 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, waitFailedDaemonPodDeleted(c, &pod))
330 framework.ExpectNoError(err, "error waiting for the failed daemon pod to be completely deleted")
331 })
332
333
334
335 ginkgo.It("should not update pod when spec was updated and update strategy is OnDelete", func(ctx context.Context) {
336 label := map[string]string{daemonsetNameLabel: dsName}
337
338 framework.Logf("Creating simple daemon set %s", dsName)
339 ds := newDaemonSet(dsName, image, label)
340 ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType}
341 ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
342 framework.ExpectNoError(err)
343
344 ginkgo.By("Check that daemon pods launch on every node of the cluster.")
345 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
346 framework.ExpectNoError(err, "error waiting for daemon pod to start")
347
348
349 ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
350 framework.ExpectNoError(err)
351 waitForHistoryCreated(ctx, c, ns, label, 1)
352 first := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
353 firstHash := first.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
354 gomega.Expect(first.Revision).To(gomega.Equal(int64(1)))
355 checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), firstHash)
356
357 ginkgo.By("Update daemon pods image.")
358 patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
359 ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
360 framework.ExpectNoError(err)
361
362 ginkgo.By("Check that daemon pods images aren't updated.")
363 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkDaemonPodsImageAndAvailability(c, ds, image, 0))
364 framework.ExpectNoError(err)
365
366 ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
367 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
368 framework.ExpectNoError(err, "error waiting for daemon pod to start")
369
370
371 ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
372 framework.ExpectNoError(err)
373 waitForHistoryCreated(ctx, c, ns, label, 2)
374 cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
375 gomega.Expect(cur.Revision).To(gomega.Equal(int64(2)))
376 gomega.Expect(cur.Labels).NotTo(gomega.HaveKeyWithValue(appsv1.DefaultDaemonSetUniqueLabelKey, firstHash))
377 checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), firstHash)
378 })
379
380
385 framework.ConformanceIt("should update pod when spec was updated and update strategy is RollingUpdate", func(ctx context.Context) {
386 label := map[string]string{daemonsetNameLabel: dsName}
387
388 framework.Logf("Creating simple daemon set %s", dsName)
389 ds := newDaemonSet(dsName, image, label)
390 ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
391 ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
392 framework.ExpectNoError(err)
393
394 ginkgo.By("Check that daemon pods launch on every node of the cluster.")
395 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
396 framework.ExpectNoError(err, "error waiting for daemon pod to start")
397
398
399 ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
400 framework.ExpectNoError(err)
401 waitForHistoryCreated(ctx, c, ns, label, 1)
402 cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
403 hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
404 gomega.Expect(cur.Revision).To(gomega.Equal(int64(1)))
405 checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
406
407 ginkgo.By("Update daemon pods image.")
408 patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
409 ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
410 framework.ExpectNoError(err)
411
412
413
414 nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
415 framework.ExpectNoError(err)
416 nodeCount := len(nodes.Items)
417 retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
418
419 ginkgo.By("Check that daemon pods images are updated.")
420 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, retryTimeout, true, checkDaemonPodsImageAndAvailability(c, ds, AgnhostImage, 1))
421 framework.ExpectNoError(err)
422
423 ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
424 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
425 framework.ExpectNoError(err, "error waiting for daemon pod to start")
426
427
428 ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
429 framework.ExpectNoError(err)
430 waitForHistoryCreated(ctx, c, ns, label, 2)
431 cur = curHistory(listDaemonHistories(ctx, c, ns, label), ds)
432 hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
433 gomega.Expect(cur.Revision).To(gomega.Equal(int64(2)))
434 checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
435 })
436
437
443 framework.ConformanceIt("should rollback without unnecessary restarts", func(ctx context.Context) {
444 schedulableNodes, err := e2enode.GetReadySchedulableNodes(ctx, c)
445 framework.ExpectNoError(err)
446 gomega.Expect(len(schedulableNodes.Items)).To(gomega.BeNumerically(">", 1), "Conformance test suite needs a cluster with at least 2 nodes.")
447 framework.Logf("Create a RollingUpdate DaemonSet")
448 label := map[string]string{daemonsetNameLabel: dsName}
449 ds := newDaemonSet(dsName, image, label)
450 ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
451 ds, err = c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
452 framework.ExpectNoError(err)
453
454 framework.Logf("Check that daemon pods launch on every node of the cluster")
455 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
456 framework.ExpectNoError(err, "error waiting for daemon pod to start")
457
458 framework.Logf("Update the DaemonSet to trigger a rollout")
459
460 newImage := "foo:non-existent"
461 newDS, err := updateDaemonSetWithRetries(ctx, c, ns, ds.Name, func(update *appsv1.DaemonSet) {
462 update.Spec.Template.Spec.Containers[0].Image = newImage
463 })
464 framework.ExpectNoError(err)
465
466
467 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkAtLeastOneNewPod(c, ns, label, newImage))
468 framework.ExpectNoError(err)
469
470 pods := listDaemonPods(ctx, c, ns, label)
471 var existingPods, newPods []*v1.Pod
472 for i := range pods.Items {
473 pod := pods.Items[i]
474 image := pod.Spec.Containers[0].Image
475 switch image {
476 case ds.Spec.Template.Spec.Containers[0].Image:
477 existingPods = append(existingPods, &pod)
478 case newDS.Spec.Template.Spec.Containers[0].Image:
479 newPods = append(newPods, &pod)
480 default:
481 framework.Failf("unexpected pod found, image = %s", image)
482 }
483 }
484 schedulableNodes, err = e2enode.GetReadySchedulableNodes(ctx, c)
485 framework.ExpectNoError(err)
486 if len(schedulableNodes.Items) < 2 {
487 gomega.Expect(existingPods).To(gomega.BeEmpty())
488 } else {
489 gomega.Expect(existingPods).NotTo(gomega.BeEmpty())
490 }
491 gomega.Expect(newPods).NotTo(gomega.BeEmpty())
492
493 framework.Logf("Roll back the DaemonSet before rollout is complete")
494 rollbackDS, err := updateDaemonSetWithRetries(ctx, c, ns, ds.Name, func(update *appsv1.DaemonSet) {
495 update.Spec.Template.Spec.Containers[0].Image = image
496 })
497 framework.ExpectNoError(err)
498
499 framework.Logf("Make sure DaemonSet rollback is complete")
500 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkDaemonPodsImageAndAvailability(c, rollbackDS, image, 1))
501 framework.ExpectNoError(err)
502
503
504 pods = listDaemonPods(ctx, c, ns, label)
505 rollbackPods := map[string]bool{}
506 for _, pod := range pods.Items {
507 rollbackPods[pod.Name] = true
508 }
509 for _, pod := range existingPods {
510 if !rollbackPods[pod.Name] {
511 framework.Failf("unexpected pod %s be restarted", pod.Name)
512 }
513 }
514 })
515
516
517 ginkgo.It("should surge pods onto nodes when spec was updated and update strategy is RollingUpdate", func(ctx context.Context) {
518 label := map[string]string{daemonsetNameLabel: dsName}
519
520 framework.Logf("Creating surge daemon set %s", dsName)
521 maxSurgeOverlap := 60 * time.Second
522 maxSurge := 1
523 surgePercent := intstr.FromString("20%")
524 zero := intstr.FromInt32(0)
525 oldVersion := "1"
526 ds := newDaemonSet(dsName, image, label)
527 ds.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{
528 {Name: "VERSION", Value: oldVersion},
529 }
530
531 ds.Spec.Template.Spec.Containers[0].Lifecycle = &v1.Lifecycle{
532 PreStop: &v1.LifecycleHandler{
533 Exec: &v1.ExecAction{
534 Command: []string{"/bin/sh", "-c", "sleep 15"},
535 },
536 },
537 }
538
539 ds.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{
540 ProbeHandler: v1.ProbeHandler{
541 Exec: &v1.ExecAction{
542 Command: []string{"/bin/sh", "-ec", `touch /var/tmp/ready; [[ "$( cat /var/tmp/ready )" == "" ]]`},
543 },
544 },
545 InitialDelaySeconds: 7,
546 PeriodSeconds: 3,
547 SuccessThreshold: 1,
548 FailureThreshold: 1,
549 }
550
551 ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{
552 Type: appsv1.RollingUpdateDaemonSetStrategyType,
553 RollingUpdate: &appsv1.RollingUpdateDaemonSet{
554 MaxUnavailable: &zero,
555 MaxSurge: &surgePercent,
556 },
557 }
558
559 ds.Spec.MinReadySeconds = 10
560
561 ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
562 framework.ExpectNoError(err)
563
564 ginkgo.By("Check that daemon pods launch on every node of the cluster.")
565 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
566 framework.ExpectNoError(err, "error waiting for daemon pod to start")
567
568
569 ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
570 framework.ExpectNoError(err)
571 waitForHistoryCreated(ctx, c, ns, label, 1)
572 cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
573 hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
574 gomega.Expect(cur.Revision).To(gomega.Equal(int64(1)))
575 checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
576
577 newVersion := "2"
578 ginkgo.By("Update daemon pods environment var")
579 patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%s"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, newVersion)
580 ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
581 framework.ExpectNoError(err)
582
583
584
585 nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
586 framework.ExpectNoError(err)
587 nodeCount := len(nodes.Items)
588 retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
589
590 ginkgo.By("Check that daemon pods surge and invariants are preserved during that rollout")
591 ageOfOldPod := make(map[string]time.Time)
592 deliberatelyDeletedPods := sets.NewString()
593 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, retryTimeout, true, func(ctx context.Context) (bool, error) {
594 podList, err := c.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{})
595 if err != nil {
596 return false, err
597 }
598 pods := podList.Items
599
600 var buf bytes.Buffer
601 pw := tabwriter.NewWriter(&buf, 1, 1, 1, ' ', 0)
602 fmt.Fprint(pw, "Node\tVersion\tName\tUID\tDeleted\tReady\n")
603
604 now := time.Now()
605 podUIDs := sets.NewString()
606 deletedPodUIDs := sets.NewString()
607 nodes := sets.NewString()
608 versions := sets.NewString()
609 nodesToVersions := make(map[string]map[string]int)
610 nodesToDeletedVersions := make(map[string]map[string]int)
611 var surgeCount, newUnavailableCount, newDeliberatelyDeletedCount, oldUnavailableCount, nodesWithoutOldVersion int
612 for _, pod := range pods {
613 if !metav1.IsControlledBy(&pod, ds) {
614 continue
615 }
616 nodeName := pod.Spec.NodeName
617 nodes.Insert(nodeName)
618 podVersion := pod.Spec.Containers[0].Env[0].Value
619 if pod.DeletionTimestamp != nil {
620 if !deliberatelyDeletedPods.Has(string(pod.UID)) {
621 versions := nodesToDeletedVersions[nodeName]
622 if versions == nil {
623 versions = make(map[string]int)
624 nodesToDeletedVersions[nodeName] = versions
625 }
626 versions[podVersion]++
627 }
628 } else {
629 versions := nodesToVersions[nodeName]
630 if versions == nil {
631 versions = make(map[string]int)
632 nodesToVersions[nodeName] = versions
633 }
634 versions[podVersion]++
635 }
636
637 ready := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now())
638 if podVersion == newVersion {
639 surgeCount++
640 if !ready || pod.DeletionTimestamp != nil {
641 if deliberatelyDeletedPods.Has(string(pod.UID)) {
642 newDeliberatelyDeletedCount++
643 }
644 newUnavailableCount++
645 }
646 } else {
647 if !ready || pod.DeletionTimestamp != nil {
648 oldUnavailableCount++
649 }
650 }
651 fmt.Fprintf(pw, "%s\t%s\t%s\t%s\t%t\t%t\n", pod.Spec.NodeName, podVersion, pod.Name, pod.UID, pod.DeletionTimestamp != nil, ready)
652 }
653
654
655 pw.Flush()
656 lines := strings.Split(buf.String(), "\n")
657 lines = lines[:len(lines)-1]
658 sort.Strings(lines[1:])
659 for _, line := range lines {
660 framework.Logf("%s", line)
661 }
662
663
664 deletedPerNode := make(map[string]int)
665 for _, pod := range pods {
666 if !metav1.IsControlledBy(&pod, ds) {
667 continue
668 }
669
670 if pod.DeletionTimestamp != nil {
671 deletedPodUIDs.Insert(string(pod.UID))
672 if !deliberatelyDeletedPods.Has(string(pod.UID)) {
673 deletedPerNode[pod.Spec.NodeName]++
674 }
675 continue
676 }
677 podUIDs.Insert(string(pod.UID))
678 podVersion := pod.Spec.Containers[0].Env[0].Value
679 if podVersion == newVersion {
680 continue
681 }
682
683
684 if nodesToVersions[pod.Spec.NodeName][newVersion] > 0 {
685 if _, ok := ageOfOldPod[string(pod.UID)]; !ok {
686 ageOfOldPod[string(pod.UID)] = now
687 }
688 } else {
689 delete(ageOfOldPod, string(pod.UID))
690 }
691 }
692
693 for uid := range ageOfOldPod {
694 if !podUIDs.Has(uid) {
695 delete(ageOfOldPod, uid)
696 }
697 }
698 deliberatelyDeletedPods = deliberatelyDeletedPods.Intersection(deletedPodUIDs)
699
700 for _, versions := range nodesToVersions {
701 if versions[oldVersion] == 0 {
702 nodesWithoutOldVersion++
703 }
704 }
705
706 var errs []string
707
708
709 for node, count := range deletedPerNode {
710 if count > 1 {
711 errs = append(errs, fmt.Sprintf("Node %s has %d deleted pods, which may indicate a problem on the node or a controller race condition", node, count))
712 }
713 }
714
715
716 for uid, firstSeen := range ageOfOldPod {
717 if now.Sub(firstSeen) > maxSurgeOverlap {
718 errs = append(errs, fmt.Sprintf("An old pod with UID %s has been running alongside a newer version for longer than %s", uid, maxSurgeOverlap))
719 }
720 }
721
722
723
724 if newUnavailableCount > (maxSurge + oldUnavailableCount + newDeliberatelyDeletedCount + nodesWithoutOldVersion) {
725 errs = append(errs, fmt.Sprintf("observed %d new unavailable pods greater than (surge count %d + old unavailable count %d + deliberately deleted new count %d + nodes without old version %d), may be infrastructure flake", newUnavailableCount, maxSurge, oldUnavailableCount, newDeliberatelyDeletedCount, nodesWithoutOldVersion))
726 }
727
728 if versions.Len() > 2 {
729 errs = append(errs, fmt.Sprintf("observed %d versions running simultaneously, must have max 2", versions.Len()))
730 }
731 for _, node := range nodes.List() {
732
733 if len(node) == 0 {
734 continue
735 }
736 versionCount := make(map[string]int)
737
738 for version, count := range nodesToVersions[node] {
739 if count > 1 {
740 errs = append(errs, fmt.Sprintf("node %s has %d instances of version %s running simultaneously, must have max 1", node, count, version))
741 }
742 versionCount[version] += count
743 }
744
745 for version, count := range nodesToDeletedVersions[node] {
746 if count > 2 {
747 errs = append(errs, fmt.Sprintf("node %s has %d deleted instances of version %s running simultaneously, must have max 1", node, count, version))
748 }
749 versionCount[version] += count
750 }
751
752 for version, count := range versionCount {
753 if count > 2 {
754 errs = append(errs, fmt.Sprintf("node %s has %d total instances of version %s running simultaneously, must have max 2 (one deleted and one running)", node, count, version))
755 }
756 }
757 }
758
759 if len(errs) > 0 {
760 sort.Strings(errs)
761 return false, fmt.Errorf("invariants were violated during daemonset update:\n%s", strings.Join(errs, "\n"))
762 }
763
764
765 nodeNames := e2edaemonset.SchedulableNodes(ctx, c, ds)
766 for _, node := range nodeNames {
767 switch {
768 case
769
770 nodesToVersions[node][newVersion] == 0,
771
772 len(nodesToVersions[node]) > 1,
773
774 len(nodesToDeletedVersions[node]) > 0,
775
776 newUnavailableCount > 0:
777
778
779 switch rand.Intn(25) {
780
781 case 0:
782
783 if pod := randomPod(pods, func(pod *v1.Pod) bool {
784 return pod.DeletionTimestamp == nil && oldVersion == pod.Spec.Containers[0].Env[0].Value
785 }); pod != nil {
786
787 if _, err := e2ekubectl.RunKubectl(pod.Namespace, "exec", "-c", pod.Spec.Containers[0].Name, pod.Name, "--", "/bin/sh", "-ec", "echo 0 > /var/tmp/ready"); err != nil {
788 framework.Logf("Failed to mark pod %s as unready via exec: %v", pod.Name, err)
789 } else {
790 framework.Logf("Marked old pod %s as unready", pod.Name)
791 }
792 }
793 case 1:
794
795 if pod := randomPod(pods, func(pod *v1.Pod) bool {
796 return pod.DeletionTimestamp == nil
797 }); pod != nil {
798 if err := c.CoreV1().Pods(ds.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
799 framework.Logf("Failed to delete pod %s early: %v", pod.Name, err)
800 } else {
801 framework.Logf("Deleted pod %s prematurely", pod.Name)
802 deliberatelyDeletedPods.Insert(string(pod.UID))
803 }
804 }
805 }
806
807
808 return false, nil
809 }
810 }
811 return true, nil
812 })
813 framework.ExpectNoError(err)
814
815 ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
816 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
817 framework.ExpectNoError(err, "error waiting for daemon pod to start")
818
819
820 ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
821 framework.ExpectNoError(err)
822 waitForHistoryCreated(ctx, c, ns, label, 2)
823 cur = curHistory(listDaemonHistories(ctx, c, ns, label), ds)
824 hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
825 gomega.Expect(cur.Revision).To(gomega.Equal(int64(2)))
826 checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
827 })
828
829
836 framework.ConformanceIt("should list and delete a collection of DaemonSets", func(ctx context.Context) {
837 label := map[string]string{daemonsetNameLabel: dsName}
838 labelSelector := labels.SelectorFromSet(label).String()
839
840 dsClient := f.ClientSet.AppsV1().DaemonSets(ns)
841 cs := f.ClientSet
842 one := int64(1)
843
844 ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
845 testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{})
846 framework.ExpectNoError(err)
847
848 ginkgo.By("Check that daemon pods launch on every node of the cluster.")
849 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, testDaemonset))
850 framework.ExpectNoError(err, "error waiting for daemon pod to start")
851 err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
852 framework.ExpectNoError(err)
853
854 ginkgo.By("listing all DaemonSets")
855 dsList, err := cs.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
856 framework.ExpectNoError(err, "failed to list Daemon Sets")
857 gomega.Expect(dsList.Items).To(gomega.HaveLen(1), "filtered list wasn't found")
858
859 ginkgo.By("DeleteCollection of the DaemonSets")
860 err = dsClient.DeleteCollection(ctx, metav1.DeleteOptions{GracePeriodSeconds: &one}, metav1.ListOptions{LabelSelector: labelSelector})
861 framework.ExpectNoError(err, "failed to delete DaemonSets")
862
863 ginkgo.By("Verify that ReplicaSets have been deleted")
864 dsList, err = c.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
865 framework.ExpectNoError(err, "failed to list DaemonSets")
866 gomega.Expect(dsList.Items).To(gomega.BeEmpty(), "filtered list should have no daemonset")
867 })
868
869
875 framework.ConformanceIt("should verify changes to a daemon set status", func(ctx context.Context) {
876 label := map[string]string{daemonsetNameLabel: dsName}
877 labelSelector := labels.SelectorFromSet(label).String()
878
879 dsClient := f.ClientSet.AppsV1().DaemonSets(ns)
880 cs := f.ClientSet
881
882 w := &cache.ListWatch{
883 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
884 options.LabelSelector = labelSelector
885 return dsClient.Watch(ctx, options)
886 },
887 }
888
889 dsList, err := cs.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
890 framework.ExpectNoError(err, "failed to list Daemon Sets")
891
892 ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
893 testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{})
894 framework.ExpectNoError(err)
895
896 ginkgo.By("Check that daemon pods launch on every node of the cluster.")
897 err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, testDaemonset))
898 framework.ExpectNoError(err, "error waiting for daemon pod to start")
899 err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
900 framework.ExpectNoError(err)
901
902 ginkgo.By("Getting /status")
903 dsResource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}
904 dsStatusUnstructured, err := f.DynamicClient.Resource(dsResource).Namespace(ns).Get(ctx, dsName, metav1.GetOptions{}, "status")
905 framework.ExpectNoError(err, "Failed to fetch the status of daemon set %s in namespace %s", dsName, ns)
906 dsStatusBytes, err := json.Marshal(dsStatusUnstructured)
907 framework.ExpectNoError(err, "Failed to marshal unstructured response. %v", err)
908
909 var dsStatus appsv1.DaemonSet
910 err = json.Unmarshal(dsStatusBytes, &dsStatus)
911 framework.ExpectNoError(err, "Failed to unmarshal JSON bytes to a daemon set object type")
912 framework.Logf("Daemon Set %s has Conditions: %v", dsName, dsStatus.Status.Conditions)
913
914 ginkgo.By("updating the DaemonSet Status")
915 var statusToUpdate, updatedStatus *appsv1.DaemonSet
916
917 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
918 statusToUpdate, err = dsClient.Get(ctx, dsName, metav1.GetOptions{})
919 framework.ExpectNoError(err, "Unable to retrieve daemon set %s", dsName)
920
921 statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, appsv1.DaemonSetCondition{
922 Type: "StatusUpdate",
923 Status: "True",
924 Reason: "E2E",
925 Message: "Set from e2e test",
926 })
927
928 updatedStatus, err = dsClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
929 return err
930 })
931 framework.ExpectNoError(err, "Failed to update status. %v", err)
932 framework.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions)
933
934 ginkgo.By("watching for the daemon set status to be updated")
935 ctxUntil, cancel := context.WithTimeout(ctx, dsRetryTimeout)
936 defer cancel()
937 _, err = watchtools.Until(ctxUntil, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
938 if ds, ok := event.Object.(*appsv1.DaemonSet); ok {
939 found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name &&
940 ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace &&
941 ds.Labels[daemonsetNameLabel] == dsName
942 if !found {
943 framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
944 return false, nil
945 }
946 for _, cond := range ds.Status.Conditions {
947 if cond.Type == "StatusUpdate" &&
948 cond.Reason == "E2E" &&
949 cond.Message == "Set from e2e test" {
950 framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions)
951 return found, nil
952 }
953 framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
954 }
955 }
956 object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0]
957 framework.Logf("Observed %v event: %+v", object, event.Type)
958 return false, nil
959 })
960 framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns)
961 framework.Logf("Daemon set %s has an updated status", dsName)
962
963 ginkgo.By("patching the DaemonSet Status")
964 daemonSetStatusPatch := appsv1.DaemonSet{
965 Status: appsv1.DaemonSetStatus{
966 Conditions: []appsv1.DaemonSetCondition{
967 {
968 Type: "StatusPatched",
969 Status: "True",
970 },
971 },
972 },
973 }
974
975 payload, err := json.Marshal(daemonSetStatusPatch)
976 framework.ExpectNoError(err, "Failed to marshal JSON. %v", err)
977 _, err = dsClient.Patch(ctx, dsName, types.MergePatchType, payload, metav1.PatchOptions{}, "status")
978 framework.ExpectNoError(err, "Failed to patch daemon set status", err)
979
980 ginkgo.By("watching for the daemon set status to be patched")
981 ctxUntil, cancel = context.WithTimeout(ctx, dsRetryTimeout)
982 defer cancel()
983 _, err = watchtools.Until(ctxUntil, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
984 if ds, ok := event.Object.(*appsv1.DaemonSet); ok {
985 found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name &&
986 ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace &&
987 ds.Labels[daemonsetNameLabel] == dsName
988 if !found {
989 framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
990 return false, nil
991 }
992 for _, cond := range ds.Status.Conditions {
993 if cond.Type == "StatusPatched" {
994 framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions)
995 return found, nil
996 }
997 framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
998 }
999 }
1000 object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0]
1001 framework.Logf("Observed %v event: %v", object, event.Type)
1002 return false, nil
1003 })
1004 framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns)
1005 framework.Logf("Daemon set %s has a patched status", dsName)
1006 })
1007 })
1008
1009
1010
1011 func randomPod(pods []v1.Pod, fn func(p *v1.Pod) bool) *v1.Pod {
1012 podCount := len(pods)
1013 for offset, i := rand.Intn(podCount), 0; i < (podCount - 1); i++ {
1014 pod := &pods[(offset+i)%podCount]
1015 if fn(pod) {
1016 return pod
1017 }
1018 }
1019 return nil
1020 }
1021
1022
1023 func getDaemonSetImagePatch(containerName, containerImage string) string {
1024 return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage)
1025 }
1026
1027 func newDaemonSet(dsName, image string, label map[string]string) *appsv1.DaemonSet {
1028 ds := newDaemonSetWithLabel(dsName, image, label)
1029 ds.ObjectMeta.Labels = nil
1030 return ds
1031 }
1032
1033 func newDaemonSetWithLabel(dsName, image string, label map[string]string) *appsv1.DaemonSet {
1034 return e2edaemonset.NewDaemonSet(dsName, image, label, nil, nil, []v1.ContainerPort{{ContainerPort: 9376}})
1035 }
1036
1037 func listDaemonPods(ctx context.Context, c clientset.Interface, ns string, label map[string]string) *v1.PodList {
1038 selector := labels.Set(label).AsSelector()
1039 options := metav1.ListOptions{
1040 LabelSelector: selector.String(),
1041 FieldSelector: nonTerminalPhaseSelector.String(),
1042 }
1043 podList, err := c.CoreV1().Pods(ns).List(ctx, options)
1044 framework.ExpectNoError(err)
1045 gomega.Expect(podList.Items).ToNot(gomega.BeEmpty())
1046 return podList
1047 }
1048
1049 func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
1050 daemonSetLabels := map[string]string{}
1051 otherLabels := map[string]string{}
1052 for k, v := range labels {
1053 if strings.HasPrefix(k, daemonsetLabelPrefix) {
1054 daemonSetLabels[k] = v
1055 } else {
1056 otherLabels[k] = v
1057 }
1058 }
1059 return daemonSetLabels, otherLabels
1060 }
1061
1062 func clearDaemonSetNodeLabels(ctx context.Context, c clientset.Interface) error {
1063 nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
1064 if err != nil {
1065 return err
1066 }
1067 for _, node := range nodeList.Items {
1068 _, err := setDaemonSetNodeLabels(ctx, c, node.Name, map[string]string{})
1069 if err != nil {
1070 return err
1071 }
1072 }
1073 return nil
1074 }
1075
1076
1077 func patchNamespaceAnnotations(ctx context.Context, c clientset.Interface, nsName string) (*v1.Namespace, error) {
1078 nsClient := c.CoreV1().Namespaces()
1079
1080 annotations := make(map[string]string)
1081 for _, n := range NamespaceNodeSelectors {
1082 annotations[n] = ""
1083 }
1084 nsPatch, err := json.Marshal(map[string]interface{}{
1085 "metadata": map[string]interface{}{
1086 "annotations": annotations,
1087 },
1088 })
1089 if err != nil {
1090 return nil, err
1091 }
1092
1093 return nsClient.Patch(ctx, nsName, types.StrategicMergePatchType, nsPatch, metav1.PatchOptions{})
1094 }
1095
1096 func setDaemonSetNodeLabels(ctx context.Context, c clientset.Interface, nodeName string, labels map[string]string) (*v1.Node, error) {
1097 nodeClient := c.CoreV1().Nodes()
1098 var newNode *v1.Node
1099 var newLabels map[string]string
1100 err := wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, func(ctx context.Context) (bool, error) {
1101 node, err := nodeClient.Get(ctx, nodeName, metav1.GetOptions{})
1102 if err != nil {
1103 return false, err
1104 }
1105
1106
1107 daemonSetLabels, otherLabels := separateDaemonSetNodeLabels(node.Labels)
1108 if reflect.DeepEqual(daemonSetLabels, labels) {
1109 newNode = node
1110 return true, nil
1111 }
1112 node.Labels = otherLabels
1113 for k, v := range labels {
1114 node.Labels[k] = v
1115 }
1116 newNode, err = nodeClient.Update(ctx, node, metav1.UpdateOptions{})
1117 if err == nil {
1118 newLabels, _ = separateDaemonSetNodeLabels(newNode.Labels)
1119 return true, err
1120 }
1121 if se, ok := err.(*apierrors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
1122 framework.Logf("failed to update node due to resource version conflict")
1123 return false, nil
1124 }
1125 return false, err
1126 })
1127 if err != nil {
1128 return nil, err
1129 } else if len(newLabels) != len(labels) {
1130 return nil, fmt.Errorf("could not set daemon set test labels as expected")
1131 }
1132
1133 return newNode, nil
1134 }
1135
1136 func checkRunningOnAllNodes(f *framework.Framework, ds *appsv1.DaemonSet) func(ctx context.Context) (bool, error) {
1137 return func(ctx context.Context) (bool, error) {
1138 return e2edaemonset.CheckRunningOnAllNodes(ctx, f, ds)
1139 }
1140 }
1141
1142 func checkAtLeastOneNewPod(c clientset.Interface, ns string, label map[string]string, newImage string) func(ctx context.Context) (bool, error) {
1143 return func(ctx context.Context) (bool, error) {
1144 pods := listDaemonPods(ctx, c, ns, label)
1145 for _, pod := range pods.Items {
1146 if pod.Spec.Containers[0].Image == newImage {
1147 return true, nil
1148 }
1149 }
1150 return false, nil
1151 }
1152 }
1153
1154 func checkRunningOnNoNodes(f *framework.Framework, ds *appsv1.DaemonSet) func(ctx context.Context) (bool, error) {
1155 return e2edaemonset.CheckDaemonPodOnNodes(f, ds, make([]string, 0))
1156 }
1157
1158 func checkDaemonPodsImageAndAvailability(c clientset.Interface, ds *appsv1.DaemonSet, image string, maxUnavailable int) func(ctx context.Context) (bool, error) {
1159 return func(ctx context.Context) (bool, error) {
1160 podList, err := c.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{})
1161 if err != nil {
1162 return false, err
1163 }
1164 pods := podList.Items
1165
1166 unavailablePods := 0
1167 nodesToUpdatedPodCount := make(map[string]int)
1168 for _, pod := range pods {
1169
1170 if pod.DeletionTimestamp != nil {
1171 continue
1172 }
1173 if !metav1.IsControlledBy(&pod, ds) {
1174 continue
1175 }
1176 podImage := pod.Spec.Containers[0].Image
1177 if podImage != image {
1178 framework.Logf("Wrong image for pod: %s. Expected: %s, got: %s.", pod.Name, image, podImage)
1179 } else {
1180 nodesToUpdatedPodCount[pod.Spec.NodeName]++
1181 }
1182 if !podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) {
1183 framework.Logf("Pod %s is not available", pod.Name)
1184 unavailablePods++
1185 }
1186 }
1187 if unavailablePods > maxUnavailable {
1188 return false, fmt.Errorf("number of unavailable pods: %d is greater than maxUnavailable: %d", unavailablePods, maxUnavailable)
1189 }
1190
1191 nodeNames := e2edaemonset.SchedulableNodes(ctx, c, ds)
1192 for _, node := range nodeNames {
1193 if nodesToUpdatedPodCount[node] == 0 {
1194 return false, nil
1195 }
1196 }
1197 return true, nil
1198 }
1199 }
1200
1201 func checkDaemonSetPodsLabels(podList *v1.PodList, hash string) {
1202 for _, pod := range podList.Items {
1203
1204 if pod.DeletionTimestamp != nil {
1205 continue
1206 }
1207 podHash := pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
1208 gomega.Expect(podHash).ToNot(gomega.BeEmpty())
1209 if len(hash) > 0 {
1210 gomega.Expect(podHash).To(gomega.Equal(hash), "unexpected hash for pod %s", pod.Name)
1211 }
1212 }
1213 }
1214
1215 func waitForHistoryCreated(ctx context.Context, c clientset.Interface, ns string, label map[string]string, numHistory int) {
1216 listHistoryFn := func(ctx context.Context) (bool, error) {
1217 selector := labels.Set(label).AsSelector()
1218 options := metav1.ListOptions{LabelSelector: selector.String()}
1219 historyList, err := c.AppsV1().ControllerRevisions(ns).List(ctx, options)
1220 if err != nil {
1221 return false, err
1222 }
1223 if len(historyList.Items) == numHistory {
1224 return true, nil
1225 }
1226 framework.Logf("%d/%d controllerrevisions created.", len(historyList.Items), numHistory)
1227 return false, nil
1228 }
1229 err := wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, listHistoryFn)
1230 framework.ExpectNoError(err, "error waiting for controllerrevisions to be created")
1231 }
1232
1233 func listDaemonHistories(ctx context.Context, c clientset.Interface, ns string, label map[string]string) *appsv1.ControllerRevisionList {
1234 selector := labels.Set(label).AsSelector()
1235 options := metav1.ListOptions{LabelSelector: selector.String()}
1236 historyList, err := c.AppsV1().ControllerRevisions(ns).List(ctx, options)
1237 framework.ExpectNoError(err)
1238 gomega.Expect(historyList.Items).ToNot(gomega.BeEmpty())
1239 return historyList
1240 }
1241
1242 func curHistory(historyList *appsv1.ControllerRevisionList, ds *appsv1.DaemonSet) *appsv1.ControllerRevision {
1243 var curHistory *appsv1.ControllerRevision
1244 foundCurHistories := 0
1245 for i := range historyList.Items {
1246 history := &historyList.Items[i]
1247
1248 gomega.Expect(history.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]).ToNot(gomega.BeEmpty())
1249 match, err := daemon.Match(ds, history)
1250 framework.ExpectNoError(err)
1251 if match {
1252 curHistory = history
1253 foundCurHistories++
1254 }
1255 }
1256 gomega.Expect(foundCurHistories).To(gomega.Equal(1))
1257 gomega.Expect(curHistory).NotTo(gomega.BeNil())
1258 return curHistory
1259 }
1260
1261 func waitFailedDaemonPodDeleted(c clientset.Interface, pod *v1.Pod) func(ctx context.Context) (bool, error) {
1262 return func(ctx context.Context) (bool, error) {
1263 if _, err := c.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil {
1264 if apierrors.IsNotFound(err) {
1265 return true, nil
1266 }
1267 return false, fmt.Errorf("failed to get failed daemon pod %q: %w", pod.Name, err)
1268 }
1269 return false, nil
1270 }
1271 }
1272
View as plain text