/* Copyright 2015 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package apps import ( "bytes" "context" "encoding/json" "fmt" "math/rand" "reflect" "sort" "strings" "text/tabwriter" "time" "k8s.io/client-go/tools/cache" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" watch "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/util/retry" podutil "k8s.io/kubernetes/pkg/api/v1/pod" extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/test/e2e/framework" e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset" e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl" e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2eresource "k8s.io/kubernetes/test/e2e/framework/resource" admissionapi "k8s.io/pod-security-admission/api" ) const ( // this should not be a multiple of 5, because node status updates // every 5 seconds. See https://github.com/kubernetes/kubernetes/pull/14915. dsRetryPeriod = 1 * time.Second dsRetryTimeout = 5 * time.Minute daemonsetLabelPrefix = "daemonset-" daemonsetNameLabel = daemonsetLabelPrefix + "name" daemonsetColorLabel = daemonsetLabelPrefix + "color" ) // NamespaceNodeSelectors the annotation key scheduler.alpha.kubernetes.io/node-selector is for assigning // node selectors labels to namespaces var NamespaceNodeSelectors = []string{"scheduler.alpha.kubernetes.io/node-selector"} var nonTerminalPhaseSelector = func() labels.Selector { var reqs []labels.Requirement for _, phase := range []v1.PodPhase{v1.PodFailed, v1.PodSucceeded} { req, _ := labels.NewRequirement("status.phase", selection.NotEquals, []string{string(phase)}) reqs = append(reqs, *req) } selector := labels.NewSelector() return selector.Add(reqs...) }() type updateDSFunc func(*appsv1.DaemonSet) // updateDaemonSetWithRetries updates daemonsets with the given applyUpdate func // until it succeeds or a timeout expires. func updateDaemonSetWithRetries(ctx context.Context, c clientset.Interface, namespace, name string, applyUpdate updateDSFunc) (ds *appsv1.DaemonSet, err error) { daemonsets := c.AppsV1().DaemonSets(namespace) var updateErr error pollErr := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 1*time.Minute, true, func(ctx context.Context) (bool, error) { if ds, err = daemonsets.Get(ctx, name, metav1.GetOptions{}); err != nil { return false, err } // Apply the update, then attempt to push it to the apiserver. applyUpdate(ds) if ds, err = daemonsets.Update(ctx, ds, metav1.UpdateOptions{}); err == nil { framework.Logf("Updating DaemonSet %s", name) return true, nil } updateErr = err return false, nil }) if wait.Interrupted(pollErr) { pollErr = fmt.Errorf("couldn't apply the provided updated to DaemonSet %q: %v", name, updateErr) } return ds, pollErr } // This test must be run in serial because it assumes the Daemon Set pods will // always get scheduled. If we run other tests in parallel, this may not // happen. In the future, running in parallel may work if we have an eviction // model which lets the DS controller kick out other pods to make room. // See https://issues.k8s.io/21767 for more details var _ = SIGDescribe("Daemon set", framework.WithSerial(), func() { var f *framework.Framework ginkgo.AfterEach(func(ctx context.Context) { // Clean up daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(ctx, metav1.ListOptions{}) framework.ExpectNoError(err, "unable to dump DaemonSets") if daemonsets != nil && len(daemonsets.Items) > 0 { for _, ds := range daemonsets.Items { ginkgo.By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name)) framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name)) err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, &ds)) framework.ExpectNoError(err, "error waiting for daemon pod to be reaped") } } if daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(ctx, metav1.ListOptions{}); err == nil { framework.Logf("daemonset: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), daemonsets)) } else { framework.Logf("unable to dump daemonsets: %v", err) } if pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{}); err == nil { framework.Logf("pods: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), pods)) } else { framework.Logf("unable to dump pods: %v", err) } err = clearDaemonSetNodeLabels(ctx, f.ClientSet) framework.ExpectNoError(err) }) f = framework.NewDefaultFramework("daemonsets") f.NamespacePodSecurityLevel = admissionapi.LevelBaseline image := WebserverImage dsName := "daemon-set" var ns string var c clientset.Interface ginkgo.BeforeEach(func(ctx context.Context) { ns = f.Namespace.Name c = f.ClientSet updatedNS, err := patchNamespaceAnnotations(ctx, c, ns) framework.ExpectNoError(err) ns = updatedNS.Name err = clearDaemonSetNodeLabels(ctx, c) framework.ExpectNoError(err) }) /* Release: v1.10 Testname: DaemonSet-Creation Description: A conformant Kubernetes distribution MUST support the creation of DaemonSets. When a DaemonSet Pod is deleted, the DaemonSet controller MUST create a replacement Pod. */ framework.ConformanceIt("should run and stop simple daemon", func(ctx context.Context) { label := map[string]string{daemonsetNameLabel: dsName} ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName)) ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSet(dsName, image, label), metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods launch on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to start") err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName) framework.ExpectNoError(err) ginkgo.By("Stop a daemon pod, check that the daemon pod is revived.") podList := listDaemonPods(ctx, c, ns, label) pod := podList.Items[0] err = c.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{}) framework.ExpectNoError(err) err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to revive") }) /* Release: v1.10 Testname: DaemonSet-NodeSelection Description: A conformant Kubernetes distribution MUST support DaemonSet Pod node selection via label selectors. */ framework.ConformanceIt("should run and stop complex daemon", func(ctx context.Context) { complexLabel := map[string]string{daemonsetNameLabel: dsName} nodeSelector := map[string]string{daemonsetColorLabel: "blue"} framework.Logf("Creating daemon %q with a node selector", dsName) ds := newDaemonSet(dsName, image, complexLabel) ds.Spec.Template.Spec.NodeSelector = nodeSelector ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Initially, daemon pods should not be running on any nodes.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes") ginkgo.By("Change node label to blue, check that daemon pod is launched.") node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet) framework.ExpectNoError(err) newNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector) framework.ExpectNoError(err, "error setting labels on node") daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels) gomega.Expect(daemonSetLabels).To(gomega.HaveLen(1)) err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{newNode.Name})) framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes") err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName) framework.ExpectNoError(err) ginkgo.By("Update the node label to green, and wait for daemons to be unscheduled") nodeSelector[daemonsetColorLabel] = "green" greenNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector) framework.ExpectNoError(err, "error removing labels on node") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes") ginkgo.By("Update DaemonSet node selector to green, and change its update strategy to RollingUpdate") patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"nodeSelector":{"%s":"%s"}}},"updateStrategy":{"type":"RollingUpdate"}}}`, daemonsetColorLabel, greenNode.Labels[daemonsetColorLabel]) ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) framework.ExpectNoError(err, "error patching daemon set") daemonSetLabels, _ = separateDaemonSetNodeLabels(greenNode.Labels) gomega.Expect(daemonSetLabels).To(gomega.HaveLen(1)) err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{greenNode.Name})) framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes") err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName) framework.ExpectNoError(err) }) // We defer adding this test to conformance pending the disposition of moving DaemonSet scheduling logic to the // default scheduler. ginkgo.It("should run and stop complex daemon with node affinity", func(ctx context.Context) { complexLabel := map[string]string{daemonsetNameLabel: dsName} nodeSelector := map[string]string{daemonsetColorLabel: "blue"} framework.Logf("Creating daemon %q with a node affinity", dsName) ds := newDaemonSet(dsName, image, complexLabel) ds.Spec.Template.Spec.Affinity = &v1.Affinity{ NodeAffinity: &v1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ { MatchExpressions: []v1.NodeSelectorRequirement{ { Key: daemonsetColorLabel, Operator: v1.NodeSelectorOpIn, Values: []string{nodeSelector[daemonsetColorLabel]}, }, }, }, }, }, }, } ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Initially, daemon pods should not be running on any nodes.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes") ginkgo.By("Change node label to blue, check that daemon pod is launched.") node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet) framework.ExpectNoError(err) newNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector) framework.ExpectNoError(err, "error setting labels on node") daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels) gomega.Expect(daemonSetLabels).To(gomega.HaveLen(1)) err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{newNode.Name})) framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes") err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName) framework.ExpectNoError(err) ginkgo.By("Remove the node label and wait for daemons to be unscheduled") _, err = setDaemonSetNodeLabels(ctx, c, node.Name, map[string]string{}) framework.ExpectNoError(err, "error removing labels on node") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes") }) /* Release: v1.10 Testname: DaemonSet-FailedPodCreation Description: A conformant Kubernetes distribution MUST create new DaemonSet Pods when they fail. */ framework.ConformanceIt("should retry creating failed daemon pods", func(ctx context.Context) { label := map[string]string{daemonsetNameLabel: dsName} ginkgo.By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName)) ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSet(dsName, image, label), metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods launch on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to start") err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName) framework.ExpectNoError(err) ginkgo.By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.") podList := listDaemonPods(ctx, c, ns, label) pod := podList.Items[0] pod.ResourceVersion = "" pod.Status.Phase = v1.PodFailed _, err = c.CoreV1().Pods(ns).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) framework.ExpectNoError(err, "error failing a daemon pod") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to revive") ginkgo.By("Wait for the failed daemon pod to be completely deleted.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, waitFailedDaemonPodDeleted(c, &pod)) framework.ExpectNoError(err, "error waiting for the failed daemon pod to be completely deleted") }) // This test should not be added to conformance. We will consider deprecating OnDelete when the // extensions/v1beta1 and apps/v1beta1 are removed. ginkgo.It("should not update pod when spec was updated and update strategy is OnDelete", func(ctx context.Context) { label := map[string]string{daemonsetNameLabel: dsName} framework.Logf("Creating simple daemon set %s", dsName) ds := newDaemonSet(dsName, image, label) ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType} ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods launch on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to start") // Check history and labels ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{}) framework.ExpectNoError(err) waitForHistoryCreated(ctx, c, ns, label, 1) first := curHistory(listDaemonHistories(ctx, c, ns, label), ds) firstHash := first.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] gomega.Expect(first.Revision).To(gomega.Equal(int64(1))) checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), firstHash) ginkgo.By("Update daemon pods image.") patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage) ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods images aren't updated.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkDaemonPodsImageAndAvailability(c, ds, image, 0)) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods are still running on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to start") // Check history and labels ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{}) framework.ExpectNoError(err) waitForHistoryCreated(ctx, c, ns, label, 2) cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds) gomega.Expect(cur.Revision).To(gomega.Equal(int64(2))) gomega.Expect(cur.Labels).NotTo(gomega.HaveKeyWithValue(appsv1.DefaultDaemonSetUniqueLabelKey, firstHash)) checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), firstHash) }) /* Release: v1.10 Testname: DaemonSet-RollingUpdate Description: A conformant Kubernetes distribution MUST support DaemonSet RollingUpdates. */ framework.ConformanceIt("should update pod when spec was updated and update strategy is RollingUpdate", func(ctx context.Context) { label := map[string]string{daemonsetNameLabel: dsName} framework.Logf("Creating simple daemon set %s", dsName) ds := newDaemonSet(dsName, image, label) ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType} ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods launch on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to start") // Check history and labels ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{}) framework.ExpectNoError(err) waitForHistoryCreated(ctx, c, ns, label, 1) cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds) hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] gomega.Expect(cur.Revision).To(gomega.Equal(int64(1))) checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash) ginkgo.By("Update daemon pods image.") patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage) ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) framework.ExpectNoError(err) // Time to complete the rolling upgrade is proportional to the number of nodes in the cluster. // Get the number of nodes, and set the timeout appropriately. nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) framework.ExpectNoError(err) nodeCount := len(nodes.Items) retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second ginkgo.By("Check that daemon pods images are updated.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, retryTimeout, true, checkDaemonPodsImageAndAvailability(c, ds, AgnhostImage, 1)) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods are still running on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to start") // Check history and labels ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{}) framework.ExpectNoError(err) waitForHistoryCreated(ctx, c, ns, label, 2) cur = curHistory(listDaemonHistories(ctx, c, ns, label), ds) hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] gomega.Expect(cur.Revision).To(gomega.Equal(int64(2))) checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash) }) /* Release: v1.10 Testname: DaemonSet-Rollback Description: A conformant Kubernetes distribution MUST support automated, minimally disruptive rollback of updates to a DaemonSet. */ framework.ConformanceIt("should rollback without unnecessary restarts", func(ctx context.Context) { schedulableNodes, err := e2enode.GetReadySchedulableNodes(ctx, c) framework.ExpectNoError(err) gomega.Expect(len(schedulableNodes.Items)).To(gomega.BeNumerically(">", 1), "Conformance test suite needs a cluster with at least 2 nodes.") framework.Logf("Create a RollingUpdate DaemonSet") label := map[string]string{daemonsetNameLabel: dsName} ds := newDaemonSet(dsName, image, label) ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType} ds, err = c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{}) framework.ExpectNoError(err) framework.Logf("Check that daemon pods launch on every node of the cluster") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to start") framework.Logf("Update the DaemonSet to trigger a rollout") // We use a nonexistent image here, so that we make sure it won't finish newImage := "foo:non-existent" newDS, err := updateDaemonSetWithRetries(ctx, c, ns, ds.Name, func(update *appsv1.DaemonSet) { update.Spec.Template.Spec.Containers[0].Image = newImage }) framework.ExpectNoError(err) // Make sure we're in the middle of a rollout err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkAtLeastOneNewPod(c, ns, label, newImage)) framework.ExpectNoError(err) pods := listDaemonPods(ctx, c, ns, label) var existingPods, newPods []*v1.Pod for i := range pods.Items { pod := pods.Items[i] image := pod.Spec.Containers[0].Image switch image { case ds.Spec.Template.Spec.Containers[0].Image: existingPods = append(existingPods, &pod) case newDS.Spec.Template.Spec.Containers[0].Image: newPods = append(newPods, &pod) default: framework.Failf("unexpected pod found, image = %s", image) } } schedulableNodes, err = e2enode.GetReadySchedulableNodes(ctx, c) framework.ExpectNoError(err) if len(schedulableNodes.Items) < 2 { gomega.Expect(existingPods).To(gomega.BeEmpty()) } else { gomega.Expect(existingPods).NotTo(gomega.BeEmpty()) } gomega.Expect(newPods).NotTo(gomega.BeEmpty()) framework.Logf("Roll back the DaemonSet before rollout is complete") rollbackDS, err := updateDaemonSetWithRetries(ctx, c, ns, ds.Name, func(update *appsv1.DaemonSet) { update.Spec.Template.Spec.Containers[0].Image = image }) framework.ExpectNoError(err) framework.Logf("Make sure DaemonSet rollback is complete") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkDaemonPodsImageAndAvailability(c, rollbackDS, image, 1)) framework.ExpectNoError(err) // After rollback is done, compare current pods with previous old pods during rollout, to make sure they're not restarted pods = listDaemonPods(ctx, c, ns, label) rollbackPods := map[string]bool{} for _, pod := range pods.Items { rollbackPods[pod.Name] = true } for _, pod := range existingPods { if !rollbackPods[pod.Name] { framework.Failf("unexpected pod %s be restarted", pod.Name) } } }) // TODO: This test is expected to be promoted to conformance after the feature is promoted ginkgo.It("should surge pods onto nodes when spec was updated and update strategy is RollingUpdate", func(ctx context.Context) { label := map[string]string{daemonsetNameLabel: dsName} framework.Logf("Creating surge daemon set %s", dsName) maxSurgeOverlap := 60 * time.Second maxSurge := 1 surgePercent := intstr.FromString("20%") zero := intstr.FromInt32(0) oldVersion := "1" ds := newDaemonSet(dsName, image, label) ds.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{ {Name: "VERSION", Value: oldVersion}, } // delay shutdown by 15s to allow containers to overlap in time ds.Spec.Template.Spec.Containers[0].Lifecycle = &v1.Lifecycle{ PreStop: &v1.LifecycleHandler{ Exec: &v1.ExecAction{ Command: []string{"/bin/sh", "-c", "sleep 15"}, }, }, } // use a readiness probe that can be forced to fail (by changing the contents of /var/tmp/ready) ds.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{ ProbeHandler: v1.ProbeHandler{ Exec: &v1.ExecAction{ Command: []string{"/bin/sh", "-ec", `touch /var/tmp/ready; [[ "$( cat /var/tmp/ready )" == "" ]]`}, }, }, InitialDelaySeconds: 7, PeriodSeconds: 3, SuccessThreshold: 1, FailureThreshold: 1, } // use a simple surge strategy ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{ Type: appsv1.RollingUpdateDaemonSetStrategyType, RollingUpdate: &appsv1.RollingUpdateDaemonSet{ MaxUnavailable: &zero, MaxSurge: &surgePercent, }, } // The pod must be ready for at least 10s before we delete the old pod ds.Spec.MinReadySeconds = 10 ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods launch on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to start") // Check history and labels ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{}) framework.ExpectNoError(err) waitForHistoryCreated(ctx, c, ns, label, 1) cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds) hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] gomega.Expect(cur.Revision).To(gomega.Equal(int64(1))) checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash) newVersion := "2" ginkgo.By("Update daemon pods environment var") patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%s"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, newVersion) ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) framework.ExpectNoError(err) // Time to complete the rolling upgrade is proportional to the number of nodes in the cluster. // Get the number of nodes, and set the timeout appropriately. nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) framework.ExpectNoError(err) nodeCount := len(nodes.Items) retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second ginkgo.By("Check that daemon pods surge and invariants are preserved during that rollout") ageOfOldPod := make(map[string]time.Time) deliberatelyDeletedPods := sets.NewString() err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, retryTimeout, true, func(ctx context.Context) (bool, error) { podList, err := c.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { return false, err } pods := podList.Items var buf bytes.Buffer pw := tabwriter.NewWriter(&buf, 1, 1, 1, ' ', 0) fmt.Fprint(pw, "Node\tVersion\tName\tUID\tDeleted\tReady\n") now := time.Now() podUIDs := sets.NewString() deletedPodUIDs := sets.NewString() nodes := sets.NewString() versions := sets.NewString() nodesToVersions := make(map[string]map[string]int) nodesToDeletedVersions := make(map[string]map[string]int) var surgeCount, newUnavailableCount, newDeliberatelyDeletedCount, oldUnavailableCount, nodesWithoutOldVersion int for _, pod := range pods { if !metav1.IsControlledBy(&pod, ds) { continue } nodeName := pod.Spec.NodeName nodes.Insert(nodeName) podVersion := pod.Spec.Containers[0].Env[0].Value if pod.DeletionTimestamp != nil { if !deliberatelyDeletedPods.Has(string(pod.UID)) { versions := nodesToDeletedVersions[nodeName] if versions == nil { versions = make(map[string]int) nodesToDeletedVersions[nodeName] = versions } versions[podVersion]++ } } else { versions := nodesToVersions[nodeName] if versions == nil { versions = make(map[string]int) nodesToVersions[nodeName] = versions } versions[podVersion]++ } ready := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) if podVersion == newVersion { surgeCount++ if !ready || pod.DeletionTimestamp != nil { if deliberatelyDeletedPods.Has(string(pod.UID)) { newDeliberatelyDeletedCount++ } newUnavailableCount++ } } else { if !ready || pod.DeletionTimestamp != nil { oldUnavailableCount++ } } fmt.Fprintf(pw, "%s\t%s\t%s\t%s\t%t\t%t\n", pod.Spec.NodeName, podVersion, pod.Name, pod.UID, pod.DeletionTimestamp != nil, ready) } // print a stable sorted list of pods by node for debugging pw.Flush() lines := strings.Split(buf.String(), "\n") lines = lines[:len(lines)-1] sort.Strings(lines[1:]) for _, line := range lines { framework.Logf("%s", line) } // if there is an old and new pod at the same time, record a timestamp deletedPerNode := make(map[string]int) for _, pod := range pods { if !metav1.IsControlledBy(&pod, ds) { continue } // ignore deleted pods if pod.DeletionTimestamp != nil { deletedPodUIDs.Insert(string(pod.UID)) if !deliberatelyDeletedPods.Has(string(pod.UID)) { deletedPerNode[pod.Spec.NodeName]++ } continue } podUIDs.Insert(string(pod.UID)) podVersion := pod.Spec.Containers[0].Env[0].Value if podVersion == newVersion { continue } // if this is a pod in an older version AND there is a new version of this pod, record when // we started seeing this, otherwise delete the record (perhaps the node was drained) if nodesToVersions[pod.Spec.NodeName][newVersion] > 0 { if _, ok := ageOfOldPod[string(pod.UID)]; !ok { ageOfOldPod[string(pod.UID)] = now } } else { delete(ageOfOldPod, string(pod.UID)) } } // purge the old pods list of any deleted pods for uid := range ageOfOldPod { if !podUIDs.Has(uid) { delete(ageOfOldPod, uid) } } deliberatelyDeletedPods = deliberatelyDeletedPods.Intersection(deletedPodUIDs) for _, versions := range nodesToVersions { if versions[oldVersion] == 0 { nodesWithoutOldVersion++ } } var errs []string // invariant: we should not see more than 1 deleted pod per node unless a severe node problem is occurring or the controller is misbehaving for node, count := range deletedPerNode { if count > 1 { errs = append(errs, fmt.Sprintf("Node %s has %d deleted pods, which may indicate a problem on the node or a controller race condition", node, count)) } } // invariant: the controller must react to the new pod becoming ready within a reasonable timeframe (2x grace period) for uid, firstSeen := range ageOfOldPod { if now.Sub(firstSeen) > maxSurgeOverlap { errs = append(errs, fmt.Sprintf("An old pod with UID %s has been running alongside a newer version for longer than %s", uid, maxSurgeOverlap)) } } // invariant: we should never have more than maxSurge + oldUnavailableCount instances of the new version unready unless a flake in the infrastructure happens, or // if we deliberately deleted one of the new pods if newUnavailableCount > (maxSurge + oldUnavailableCount + newDeliberatelyDeletedCount + nodesWithoutOldVersion) { errs = append(errs, fmt.Sprintf("observed %d new unavailable pods greater than (surge count %d + old unavailable count %d + deliberately deleted new count %d + nodes without old version %d), may be infrastructure flake", newUnavailableCount, maxSurge, oldUnavailableCount, newDeliberatelyDeletedCount, nodesWithoutOldVersion)) } // invariant: the total number of versions created should be 2 if versions.Len() > 2 { errs = append(errs, fmt.Sprintf("observed %d versions running simultaneously, must have max 2", versions.Len())) } for _, node := range nodes.List() { // ignore pods that haven't been scheduled yet if len(node) == 0 { continue } versionCount := make(map[string]int) // invariant: surge should never have more than one instance of a pod per node running for version, count := range nodesToVersions[node] { if count > 1 { errs = append(errs, fmt.Sprintf("node %s has %d instances of version %s running simultaneously, must have max 1", node, count, version)) } versionCount[version] += count } // invariant: when surging, the most number of pods we should allow to be deleted is 2 (if we are getting evicted) for version, count := range nodesToDeletedVersions[node] { if count > 2 { errs = append(errs, fmt.Sprintf("node %s has %d deleted instances of version %s running simultaneously, must have max 1", node, count, version)) } versionCount[version] += count } // invariant: on any node, we should never have more than two instances of a version (if we are getting evicted) for version, count := range versionCount { if count > 2 { errs = append(errs, fmt.Sprintf("node %s has %d total instances of version %s running simultaneously, must have max 2 (one deleted and one running)", node, count, version)) } } } if len(errs) > 0 { sort.Strings(errs) return false, fmt.Errorf("invariants were violated during daemonset update:\n%s", strings.Join(errs, "\n")) } // Make sure every daemon pod on the node has been updated nodeNames := e2edaemonset.SchedulableNodes(ctx, c, ds) for _, node := range nodeNames { switch { case // if we don't have the new version yet nodesToVersions[node][newVersion] == 0, // if there are more than one version on a node len(nodesToVersions[node]) > 1, // if there are still any deleted pods len(nodesToDeletedVersions[node]) > 0, // if any of the new pods are unavailable newUnavailableCount > 0: // inject a failure randomly to ensure the controller recovers switch rand.Intn(25) { // cause a random old pod to go unready case 0: // select a not-deleted pod of the old version if pod := randomPod(pods, func(pod *v1.Pod) bool { return pod.DeletionTimestamp == nil && oldVersion == pod.Spec.Containers[0].Env[0].Value }); pod != nil { // make the /tmp/ready file read only, which will cause readiness to fail if _, err := e2ekubectl.RunKubectl(pod.Namespace, "exec", "-c", pod.Spec.Containers[0].Name, pod.Name, "--", "/bin/sh", "-ec", "echo 0 > /var/tmp/ready"); err != nil { framework.Logf("Failed to mark pod %s as unready via exec: %v", pod.Name, err) } else { framework.Logf("Marked old pod %s as unready", pod.Name) } } case 1: // delete a random pod if pod := randomPod(pods, func(pod *v1.Pod) bool { return pod.DeletionTimestamp == nil }); pod != nil { if err := c.CoreV1().Pods(ds.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { framework.Logf("Failed to delete pod %s early: %v", pod.Name, err) } else { framework.Logf("Deleted pod %s prematurely", pod.Name) deliberatelyDeletedPods.Insert(string(pod.UID)) } } } // then wait return false, nil } } return true, nil }) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods are still running on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds)) framework.ExpectNoError(err, "error waiting for daemon pod to start") // Check history and labels ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{}) framework.ExpectNoError(err) waitForHistoryCreated(ctx, c, ns, label, 2) cur = curHistory(listDaemonHistories(ctx, c, ns, label), ds) hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] gomega.Expect(cur.Revision).To(gomega.Equal(int64(2))) checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash) }) /* Release: v1.22 Testname: DaemonSet, list and delete a collection of DaemonSets Description: When a DaemonSet is created it MUST succeed. It MUST succeed when listing DaemonSets via a label selector. It MUST succeed when deleting the DaemonSet via deleteCollection. */ framework.ConformanceIt("should list and delete a collection of DaemonSets", func(ctx context.Context) { label := map[string]string{daemonsetNameLabel: dsName} labelSelector := labels.SelectorFromSet(label).String() dsClient := f.ClientSet.AppsV1().DaemonSets(ns) cs := f.ClientSet one := int64(1) ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName)) testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods launch on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, testDaemonset)) framework.ExpectNoError(err, "error waiting for daemon pod to start") err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName) framework.ExpectNoError(err) ginkgo.By("listing all DaemonSets") dsList, err := cs.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) framework.ExpectNoError(err, "failed to list Daemon Sets") gomega.Expect(dsList.Items).To(gomega.HaveLen(1), "filtered list wasn't found") ginkgo.By("DeleteCollection of the DaemonSets") err = dsClient.DeleteCollection(ctx, metav1.DeleteOptions{GracePeriodSeconds: &one}, metav1.ListOptions{LabelSelector: labelSelector}) framework.ExpectNoError(err, "failed to delete DaemonSets") ginkgo.By("Verify that ReplicaSets have been deleted") dsList, err = c.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) framework.ExpectNoError(err, "failed to list DaemonSets") gomega.Expect(dsList.Items).To(gomega.BeEmpty(), "filtered list should have no daemonset") }) /* Release: v1.22 Testname: DaemonSet, status sub-resource Description: When a DaemonSet is created it MUST succeed. Attempt to read, update and patch its status sub-resource; all mutating sub-resource operations MUST be visible to subsequent reads. */ framework.ConformanceIt("should verify changes to a daemon set status", func(ctx context.Context) { label := map[string]string{daemonsetNameLabel: dsName} labelSelector := labels.SelectorFromSet(label).String() dsClient := f.ClientSet.AppsV1().DaemonSets(ns) cs := f.ClientSet w := &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { options.LabelSelector = labelSelector return dsClient.Watch(ctx, options) }, } dsList, err := cs.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector}) framework.ExpectNoError(err, "failed to list Daemon Sets") ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName)) testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{}) framework.ExpectNoError(err) ginkgo.By("Check that daemon pods launch on every node of the cluster.") err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, testDaemonset)) framework.ExpectNoError(err, "error waiting for daemon pod to start") err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName) framework.ExpectNoError(err) ginkgo.By("Getting /status") dsResource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"} dsStatusUnstructured, err := f.DynamicClient.Resource(dsResource).Namespace(ns).Get(ctx, dsName, metav1.GetOptions{}, "status") framework.ExpectNoError(err, "Failed to fetch the status of daemon set %s in namespace %s", dsName, ns) dsStatusBytes, err := json.Marshal(dsStatusUnstructured) framework.ExpectNoError(err, "Failed to marshal unstructured response. %v", err) var dsStatus appsv1.DaemonSet err = json.Unmarshal(dsStatusBytes, &dsStatus) framework.ExpectNoError(err, "Failed to unmarshal JSON bytes to a daemon set object type") framework.Logf("Daemon Set %s has Conditions: %v", dsName, dsStatus.Status.Conditions) ginkgo.By("updating the DaemonSet Status") var statusToUpdate, updatedStatus *appsv1.DaemonSet err = retry.RetryOnConflict(retry.DefaultRetry, func() error { statusToUpdate, err = dsClient.Get(ctx, dsName, metav1.GetOptions{}) framework.ExpectNoError(err, "Unable to retrieve daemon set %s", dsName) statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, appsv1.DaemonSetCondition{ Type: "StatusUpdate", Status: "True", Reason: "E2E", Message: "Set from e2e test", }) updatedStatus, err = dsClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{}) return err }) framework.ExpectNoError(err, "Failed to update status. %v", err) framework.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions) ginkgo.By("watching for the daemon set status to be updated") ctxUntil, cancel := context.WithTimeout(ctx, dsRetryTimeout) defer cancel() _, err = watchtools.Until(ctxUntil, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) { if ds, ok := event.Object.(*appsv1.DaemonSet); ok { found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name && ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace && ds.Labels[daemonsetNameLabel] == dsName if !found { framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions) return false, nil } for _, cond := range ds.Status.Conditions { if cond.Type == "StatusUpdate" && cond.Reason == "E2E" && cond.Message == "Set from e2e test" { framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions) return found, nil } framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions) } } object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0] framework.Logf("Observed %v event: %+v", object, event.Type) return false, nil }) framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns) framework.Logf("Daemon set %s has an updated status", dsName) ginkgo.By("patching the DaemonSet Status") daemonSetStatusPatch := appsv1.DaemonSet{ Status: appsv1.DaemonSetStatus{ Conditions: []appsv1.DaemonSetCondition{ { Type: "StatusPatched", Status: "True", }, }, }, } payload, err := json.Marshal(daemonSetStatusPatch) framework.ExpectNoError(err, "Failed to marshal JSON. %v", err) _, err = dsClient.Patch(ctx, dsName, types.MergePatchType, payload, metav1.PatchOptions{}, "status") framework.ExpectNoError(err, "Failed to patch daemon set status", err) ginkgo.By("watching for the daemon set status to be patched") ctxUntil, cancel = context.WithTimeout(ctx, dsRetryTimeout) defer cancel() _, err = watchtools.Until(ctxUntil, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) { if ds, ok := event.Object.(*appsv1.DaemonSet); ok { found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name && ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace && ds.Labels[daemonsetNameLabel] == dsName if !found { framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions) return false, nil } for _, cond := range ds.Status.Conditions { if cond.Type == "StatusPatched" { framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions) return found, nil } framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions) } } object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0] framework.Logf("Observed %v event: %v", object, event.Type) return false, nil }) framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns) framework.Logf("Daemon set %s has a patched status", dsName) }) }) // randomPod selects a random pod within pods that causes fn to return true, or nil // if no pod can be found matching the criteria. func randomPod(pods []v1.Pod, fn func(p *v1.Pod) bool) *v1.Pod { podCount := len(pods) for offset, i := rand.Intn(podCount), 0; i < (podCount - 1); i++ { pod := &pods[(offset+i)%podCount] if fn(pod) { return pod } } return nil } // getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image func getDaemonSetImagePatch(containerName, containerImage string) string { return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage) } func newDaemonSet(dsName, image string, label map[string]string) *appsv1.DaemonSet { ds := newDaemonSetWithLabel(dsName, image, label) ds.ObjectMeta.Labels = nil return ds } func newDaemonSetWithLabel(dsName, image string, label map[string]string) *appsv1.DaemonSet { return e2edaemonset.NewDaemonSet(dsName, image, label, nil, nil, []v1.ContainerPort{{ContainerPort: 9376}}) } func listDaemonPods(ctx context.Context, c clientset.Interface, ns string, label map[string]string) *v1.PodList { selector := labels.Set(label).AsSelector() options := metav1.ListOptions{ LabelSelector: selector.String(), FieldSelector: nonTerminalPhaseSelector.String(), } podList, err := c.CoreV1().Pods(ns).List(ctx, options) framework.ExpectNoError(err) gomega.Expect(podList.Items).ToNot(gomega.BeEmpty()) return podList } func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) { daemonSetLabels := map[string]string{} otherLabels := map[string]string{} for k, v := range labels { if strings.HasPrefix(k, daemonsetLabelPrefix) { daemonSetLabels[k] = v } else { otherLabels[k] = v } } return daemonSetLabels, otherLabels } func clearDaemonSetNodeLabels(ctx context.Context, c clientset.Interface) error { nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c) if err != nil { return err } for _, node := range nodeList.Items { _, err := setDaemonSetNodeLabels(ctx, c, node.Name, map[string]string{}) if err != nil { return err } } return nil } // patchNamespaceAnnotations sets node selectors related annotations on tests namespaces to empty func patchNamespaceAnnotations(ctx context.Context, c clientset.Interface, nsName string) (*v1.Namespace, error) { nsClient := c.CoreV1().Namespaces() annotations := make(map[string]string) for _, n := range NamespaceNodeSelectors { annotations[n] = "" } nsPatch, err := json.Marshal(map[string]interface{}{ "metadata": map[string]interface{}{ "annotations": annotations, }, }) if err != nil { return nil, err } return nsClient.Patch(ctx, nsName, types.StrategicMergePatchType, nsPatch, metav1.PatchOptions{}) } func setDaemonSetNodeLabels(ctx context.Context, c clientset.Interface, nodeName string, labels map[string]string) (*v1.Node, error) { nodeClient := c.CoreV1().Nodes() var newNode *v1.Node var newLabels map[string]string err := wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, func(ctx context.Context) (bool, error) { node, err := nodeClient.Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { return false, err } // remove all labels this test is creating daemonSetLabels, otherLabels := separateDaemonSetNodeLabels(node.Labels) if reflect.DeepEqual(daemonSetLabels, labels) { newNode = node return true, nil } node.Labels = otherLabels for k, v := range labels { node.Labels[k] = v } newNode, err = nodeClient.Update(ctx, node, metav1.UpdateOptions{}) if err == nil { newLabels, _ = separateDaemonSetNodeLabels(newNode.Labels) return true, err } if se, ok := err.(*apierrors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict { framework.Logf("failed to update node due to resource version conflict") return false, nil } return false, err }) if err != nil { return nil, err } else if len(newLabels) != len(labels) { return nil, fmt.Errorf("could not set daemon set test labels as expected") } return newNode, nil } func checkRunningOnAllNodes(f *framework.Framework, ds *appsv1.DaemonSet) func(ctx context.Context) (bool, error) { return func(ctx context.Context) (bool, error) { return e2edaemonset.CheckRunningOnAllNodes(ctx, f, ds) } } func checkAtLeastOneNewPod(c clientset.Interface, ns string, label map[string]string, newImage string) func(ctx context.Context) (bool, error) { return func(ctx context.Context) (bool, error) { pods := listDaemonPods(ctx, c, ns, label) for _, pod := range pods.Items { if pod.Spec.Containers[0].Image == newImage { return true, nil } } return false, nil } } func checkRunningOnNoNodes(f *framework.Framework, ds *appsv1.DaemonSet) func(ctx context.Context) (bool, error) { return e2edaemonset.CheckDaemonPodOnNodes(f, ds, make([]string, 0)) } func checkDaemonPodsImageAndAvailability(c clientset.Interface, ds *appsv1.DaemonSet, image string, maxUnavailable int) func(ctx context.Context) (bool, error) { return func(ctx context.Context) (bool, error) { podList, err := c.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { return false, err } pods := podList.Items unavailablePods := 0 nodesToUpdatedPodCount := make(map[string]int) for _, pod := range pods { // Ignore the pod on the node that is supposed to be deleted if pod.DeletionTimestamp != nil { continue } if !metav1.IsControlledBy(&pod, ds) { continue } podImage := pod.Spec.Containers[0].Image if podImage != image { framework.Logf("Wrong image for pod: %s. Expected: %s, got: %s.", pod.Name, image, podImage) } else { nodesToUpdatedPodCount[pod.Spec.NodeName]++ } if !podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) { framework.Logf("Pod %s is not available", pod.Name) unavailablePods++ } } if unavailablePods > maxUnavailable { return false, fmt.Errorf("number of unavailable pods: %d is greater than maxUnavailable: %d", unavailablePods, maxUnavailable) } // Make sure every daemon pod on the node has been updated nodeNames := e2edaemonset.SchedulableNodes(ctx, c, ds) for _, node := range nodeNames { if nodesToUpdatedPodCount[node] == 0 { return false, nil } } return true, nil } } func checkDaemonSetPodsLabels(podList *v1.PodList, hash string) { for _, pod := range podList.Items { // Ignore all the DS pods that will be deleted if pod.DeletionTimestamp != nil { continue } podHash := pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] gomega.Expect(podHash).ToNot(gomega.BeEmpty()) if len(hash) > 0 { gomega.Expect(podHash).To(gomega.Equal(hash), "unexpected hash for pod %s", pod.Name) } } } func waitForHistoryCreated(ctx context.Context, c clientset.Interface, ns string, label map[string]string, numHistory int) { listHistoryFn := func(ctx context.Context) (bool, error) { selector := labels.Set(label).AsSelector() options := metav1.ListOptions{LabelSelector: selector.String()} historyList, err := c.AppsV1().ControllerRevisions(ns).List(ctx, options) if err != nil { return false, err } if len(historyList.Items) == numHistory { return true, nil } framework.Logf("%d/%d controllerrevisions created.", len(historyList.Items), numHistory) return false, nil } err := wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, listHistoryFn) framework.ExpectNoError(err, "error waiting for controllerrevisions to be created") } func listDaemonHistories(ctx context.Context, c clientset.Interface, ns string, label map[string]string) *appsv1.ControllerRevisionList { selector := labels.Set(label).AsSelector() options := metav1.ListOptions{LabelSelector: selector.String()} historyList, err := c.AppsV1().ControllerRevisions(ns).List(ctx, options) framework.ExpectNoError(err) gomega.Expect(historyList.Items).ToNot(gomega.BeEmpty()) return historyList } func curHistory(historyList *appsv1.ControllerRevisionList, ds *appsv1.DaemonSet) *appsv1.ControllerRevision { var curHistory *appsv1.ControllerRevision foundCurHistories := 0 for i := range historyList.Items { history := &historyList.Items[i] // Every history should have the hash label gomega.Expect(history.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]).ToNot(gomega.BeEmpty()) match, err := daemon.Match(ds, history) framework.ExpectNoError(err) if match { curHistory = history foundCurHistories++ } } gomega.Expect(foundCurHistories).To(gomega.Equal(1)) gomega.Expect(curHistory).NotTo(gomega.BeNil()) return curHistory } func waitFailedDaemonPodDeleted(c clientset.Interface, pod *v1.Pod) func(ctx context.Context) (bool, error) { return func(ctx context.Context) (bool, error) { if _, err := c.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil { if apierrors.IsNotFound(err) { return true, nil } return false, fmt.Errorf("failed to get failed daemon pod %q: %w", pod.Name, err) } return false, nil } }