...

Source file src/k8s.io/kubernetes/test/e2e/apps/daemon_set.go

Documentation: k8s.io/kubernetes/test/e2e/apps

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apps
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"encoding/json"
    23  	"fmt"
    24  	"math/rand"
    25  	"reflect"
    26  	"sort"
    27  	"strings"
    28  	"text/tabwriter"
    29  	"time"
    30  
    31  	"k8s.io/client-go/tools/cache"
    32  
    33  	"github.com/onsi/ginkgo/v2"
    34  	"github.com/onsi/gomega"
    35  	appsv1 "k8s.io/api/apps/v1"
    36  	v1 "k8s.io/api/core/v1"
    37  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    38  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    39  	"k8s.io/apimachinery/pkg/labels"
    40  	"k8s.io/apimachinery/pkg/runtime"
    41  	"k8s.io/apimachinery/pkg/runtime/schema"
    42  	"k8s.io/apimachinery/pkg/selection"
    43  	"k8s.io/apimachinery/pkg/types"
    44  	"k8s.io/apimachinery/pkg/util/intstr"
    45  	"k8s.io/apimachinery/pkg/util/sets"
    46  	"k8s.io/apimachinery/pkg/util/wait"
    47  	watch "k8s.io/apimachinery/pkg/watch"
    48  	clientset "k8s.io/client-go/kubernetes"
    49  	"k8s.io/client-go/kubernetes/scheme"
    50  	watchtools "k8s.io/client-go/tools/watch"
    51  	"k8s.io/client-go/util/retry"
    52  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    53  	extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
    54  	"k8s.io/kubernetes/pkg/controller/daemon"
    55  	"k8s.io/kubernetes/test/e2e/framework"
    56  	e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
    57  	e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
    58  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    59  	e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
    60  	admissionapi "k8s.io/pod-security-admission/api"
    61  )
    62  
    63  const (
    64  	// this should not be a multiple of 5, because node status updates
    65  	// every 5 seconds. See https://github.com/kubernetes/kubernetes/pull/14915.
    66  	dsRetryPeriod  = 1 * time.Second
    67  	dsRetryTimeout = 5 * time.Minute
    68  
    69  	daemonsetLabelPrefix = "daemonset-"
    70  	daemonsetNameLabel   = daemonsetLabelPrefix + "name"
    71  	daemonsetColorLabel  = daemonsetLabelPrefix + "color"
    72  )
    73  
    74  // NamespaceNodeSelectors the annotation key scheduler.alpha.kubernetes.io/node-selector is for assigning
    75  // node selectors labels to namespaces
    76  var NamespaceNodeSelectors = []string{"scheduler.alpha.kubernetes.io/node-selector"}
    77  
    78  var nonTerminalPhaseSelector = func() labels.Selector {
    79  	var reqs []labels.Requirement
    80  	for _, phase := range []v1.PodPhase{v1.PodFailed, v1.PodSucceeded} {
    81  		req, _ := labels.NewRequirement("status.phase", selection.NotEquals, []string{string(phase)})
    82  		reqs = append(reqs, *req)
    83  	}
    84  	selector := labels.NewSelector()
    85  	return selector.Add(reqs...)
    86  }()
    87  
    88  type updateDSFunc func(*appsv1.DaemonSet)
    89  
    90  // updateDaemonSetWithRetries updates daemonsets with the given applyUpdate func
    91  // until it succeeds or a timeout expires.
    92  func updateDaemonSetWithRetries(ctx context.Context, c clientset.Interface, namespace, name string, applyUpdate updateDSFunc) (ds *appsv1.DaemonSet, err error) {
    93  	daemonsets := c.AppsV1().DaemonSets(namespace)
    94  	var updateErr error
    95  	pollErr := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 1*time.Minute, true, func(ctx context.Context) (bool, error) {
    96  		if ds, err = daemonsets.Get(ctx, name, metav1.GetOptions{}); err != nil {
    97  			return false, err
    98  		}
    99  		// Apply the update, then attempt to push it to the apiserver.
   100  		applyUpdate(ds)
   101  		if ds, err = daemonsets.Update(ctx, ds, metav1.UpdateOptions{}); err == nil {
   102  			framework.Logf("Updating DaemonSet %s", name)
   103  			return true, nil
   104  		}
   105  		updateErr = err
   106  		return false, nil
   107  	})
   108  	if wait.Interrupted(pollErr) {
   109  		pollErr = fmt.Errorf("couldn't apply the provided updated to DaemonSet %q: %v", name, updateErr)
   110  	}
   111  	return ds, pollErr
   112  }
   113  
   114  // This test must be run in serial because it assumes the Daemon Set pods will
   115  // always get scheduled.  If we run other tests in parallel, this may not
   116  // happen.  In the future, running in parallel may work if we have an eviction
   117  // model which lets the DS controller kick out other pods to make room.
   118  // See https://issues.k8s.io/21767 for more details
   119  var _ = SIGDescribe("Daemon set", framework.WithSerial(), func() {
   120  	var f *framework.Framework
   121  
   122  	ginkgo.AfterEach(func(ctx context.Context) {
   123  		// Clean up
   124  		daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(ctx, metav1.ListOptions{})
   125  		framework.ExpectNoError(err, "unable to dump DaemonSets")
   126  		if daemonsets != nil && len(daemonsets.Items) > 0 {
   127  			for _, ds := range daemonsets.Items {
   128  				ginkgo.By(fmt.Sprintf("Deleting DaemonSet %q", ds.Name))
   129  				framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, extensionsinternal.Kind("DaemonSet"), f.Namespace.Name, ds.Name))
   130  				err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, &ds))
   131  				framework.ExpectNoError(err, "error waiting for daemon pod to be reaped")
   132  			}
   133  		}
   134  		if daemonsets, err := f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).List(ctx, metav1.ListOptions{}); err == nil {
   135  			framework.Logf("daemonset: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), daemonsets))
   136  		} else {
   137  			framework.Logf("unable to dump daemonsets: %v", err)
   138  		}
   139  		if pods, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{}); err == nil {
   140  			framework.Logf("pods: %s", runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(scheme.Scheme.PrioritizedVersionsAllGroups()...), pods))
   141  		} else {
   142  			framework.Logf("unable to dump pods: %v", err)
   143  		}
   144  		err = clearDaemonSetNodeLabels(ctx, f.ClientSet)
   145  		framework.ExpectNoError(err)
   146  	})
   147  
   148  	f = framework.NewDefaultFramework("daemonsets")
   149  	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
   150  
   151  	image := WebserverImage
   152  	dsName := "daemon-set"
   153  
   154  	var ns string
   155  	var c clientset.Interface
   156  
   157  	ginkgo.BeforeEach(func(ctx context.Context) {
   158  		ns = f.Namespace.Name
   159  
   160  		c = f.ClientSet
   161  
   162  		updatedNS, err := patchNamespaceAnnotations(ctx, c, ns)
   163  		framework.ExpectNoError(err)
   164  
   165  		ns = updatedNS.Name
   166  
   167  		err = clearDaemonSetNodeLabels(ctx, c)
   168  		framework.ExpectNoError(err)
   169  	})
   170  
   171  	/*
   172  	  Release: v1.10
   173  	  Testname: DaemonSet-Creation
   174  	  Description: A conformant Kubernetes distribution MUST support the creation of DaemonSets. When a DaemonSet
   175  	  Pod is deleted, the DaemonSet controller MUST create a replacement Pod.
   176  	*/
   177  	framework.ConformanceIt("should run and stop simple daemon", func(ctx context.Context) {
   178  		label := map[string]string{daemonsetNameLabel: dsName}
   179  
   180  		ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
   181  		ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSet(dsName, image, label), metav1.CreateOptions{})
   182  		framework.ExpectNoError(err)
   183  
   184  		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
   185  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   186  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   187  		err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
   188  		framework.ExpectNoError(err)
   189  
   190  		ginkgo.By("Stop a daemon pod, check that the daemon pod is revived.")
   191  		podList := listDaemonPods(ctx, c, ns, label)
   192  		pod := podList.Items[0]
   193  		err = c.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{})
   194  		framework.ExpectNoError(err)
   195  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   196  		framework.ExpectNoError(err, "error waiting for daemon pod to revive")
   197  	})
   198  
   199  	/*
   200  	  Release: v1.10
   201  	  Testname: DaemonSet-NodeSelection
   202  	  Description: A conformant Kubernetes distribution MUST support DaemonSet Pod node selection via label
   203  	  selectors.
   204  	*/
   205  	framework.ConformanceIt("should run and stop complex daemon", func(ctx context.Context) {
   206  		complexLabel := map[string]string{daemonsetNameLabel: dsName}
   207  		nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
   208  		framework.Logf("Creating daemon %q with a node selector", dsName)
   209  		ds := newDaemonSet(dsName, image, complexLabel)
   210  		ds.Spec.Template.Spec.NodeSelector = nodeSelector
   211  		ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
   212  		framework.ExpectNoError(err)
   213  
   214  		ginkgo.By("Initially, daemon pods should not be running on any nodes.")
   215  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds))
   216  		framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
   217  
   218  		ginkgo.By("Change node label to blue, check that daemon pod is launched.")
   219  		node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
   220  		framework.ExpectNoError(err)
   221  		newNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector)
   222  		framework.ExpectNoError(err, "error setting labels on node")
   223  		daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
   224  		gomega.Expect(daemonSetLabels).To(gomega.HaveLen(1))
   225  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{newNode.Name}))
   226  		framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
   227  		err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
   228  		framework.ExpectNoError(err)
   229  
   230  		ginkgo.By("Update the node label to green, and wait for daemons to be unscheduled")
   231  		nodeSelector[daemonsetColorLabel] = "green"
   232  		greenNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector)
   233  		framework.ExpectNoError(err, "error removing labels on node")
   234  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds))
   235  		framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
   236  
   237  		ginkgo.By("Update DaemonSet node selector to green, and change its update strategy to RollingUpdate")
   238  		patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"nodeSelector":{"%s":"%s"}}},"updateStrategy":{"type":"RollingUpdate"}}}`,
   239  			daemonsetColorLabel, greenNode.Labels[daemonsetColorLabel])
   240  		ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
   241  		framework.ExpectNoError(err, "error patching daemon set")
   242  		daemonSetLabels, _ = separateDaemonSetNodeLabels(greenNode.Labels)
   243  		gomega.Expect(daemonSetLabels).To(gomega.HaveLen(1))
   244  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{greenNode.Name}))
   245  		framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
   246  		err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
   247  		framework.ExpectNoError(err)
   248  	})
   249  
   250  	// We defer adding this test to conformance pending the disposition of moving DaemonSet scheduling logic to the
   251  	// default scheduler.
   252  	ginkgo.It("should run and stop complex daemon with node affinity", func(ctx context.Context) {
   253  		complexLabel := map[string]string{daemonsetNameLabel: dsName}
   254  		nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
   255  		framework.Logf("Creating daemon %q with a node affinity", dsName)
   256  		ds := newDaemonSet(dsName, image, complexLabel)
   257  		ds.Spec.Template.Spec.Affinity = &v1.Affinity{
   258  			NodeAffinity: &v1.NodeAffinity{
   259  				RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
   260  					NodeSelectorTerms: []v1.NodeSelectorTerm{
   261  						{
   262  							MatchExpressions: []v1.NodeSelectorRequirement{
   263  								{
   264  									Key:      daemonsetColorLabel,
   265  									Operator: v1.NodeSelectorOpIn,
   266  									Values:   []string{nodeSelector[daemonsetColorLabel]},
   267  								},
   268  							},
   269  						},
   270  					},
   271  				},
   272  			},
   273  		}
   274  		ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
   275  		framework.ExpectNoError(err)
   276  
   277  		ginkgo.By("Initially, daemon pods should not be running on any nodes.")
   278  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds))
   279  		framework.ExpectNoError(err, "error waiting for daemon pods to be running on no nodes")
   280  
   281  		ginkgo.By("Change node label to blue, check that daemon pod is launched.")
   282  		node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
   283  		framework.ExpectNoError(err)
   284  		newNode, err := setDaemonSetNodeLabels(ctx, c, node.Name, nodeSelector)
   285  		framework.ExpectNoError(err, "error setting labels on node")
   286  		daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
   287  		gomega.Expect(daemonSetLabels).To(gomega.HaveLen(1))
   288  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, e2edaemonset.CheckDaemonPodOnNodes(f, ds, []string{newNode.Name}))
   289  		framework.ExpectNoError(err, "error waiting for daemon pods to be running on new nodes")
   290  		err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
   291  		framework.ExpectNoError(err)
   292  
   293  		ginkgo.By("Remove the node label and wait for daemons to be unscheduled")
   294  		_, err = setDaemonSetNodeLabels(ctx, c, node.Name, map[string]string{})
   295  		framework.ExpectNoError(err, "error removing labels on node")
   296  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnNoNodes(f, ds))
   297  		framework.ExpectNoError(err, "error waiting for daemon pod to not be running on nodes")
   298  	})
   299  
   300  	/*
   301  	  Release: v1.10
   302  	  Testname: DaemonSet-FailedPodCreation
   303  	  Description: A conformant Kubernetes distribution MUST create new DaemonSet Pods when they fail.
   304  	*/
   305  	framework.ConformanceIt("should retry creating failed daemon pods", func(ctx context.Context) {
   306  		label := map[string]string{daemonsetNameLabel: dsName}
   307  
   308  		ginkgo.By(fmt.Sprintf("Creating a simple DaemonSet %q", dsName))
   309  		ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSet(dsName, image, label), metav1.CreateOptions{})
   310  		framework.ExpectNoError(err)
   311  
   312  		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
   313  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   314  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   315  		err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
   316  		framework.ExpectNoError(err)
   317  
   318  		ginkgo.By("Set a daemon pod's phase to 'Failed', check that the daemon pod is revived.")
   319  		podList := listDaemonPods(ctx, c, ns, label)
   320  		pod := podList.Items[0]
   321  		pod.ResourceVersion = ""
   322  		pod.Status.Phase = v1.PodFailed
   323  		_, err = c.CoreV1().Pods(ns).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
   324  		framework.ExpectNoError(err, "error failing a daemon pod")
   325  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   326  		framework.ExpectNoError(err, "error waiting for daemon pod to revive")
   327  
   328  		ginkgo.By("Wait for the failed daemon pod to be completely deleted.")
   329  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, waitFailedDaemonPodDeleted(c, &pod))
   330  		framework.ExpectNoError(err, "error waiting for the failed daemon pod to be completely deleted")
   331  	})
   332  
   333  	// This test should not be added to conformance. We will consider deprecating OnDelete when the
   334  	// extensions/v1beta1 and apps/v1beta1 are removed.
   335  	ginkgo.It("should not update pod when spec was updated and update strategy is OnDelete", func(ctx context.Context) {
   336  		label := map[string]string{daemonsetNameLabel: dsName}
   337  
   338  		framework.Logf("Creating simple daemon set %s", dsName)
   339  		ds := newDaemonSet(dsName, image, label)
   340  		ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType}
   341  		ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
   342  		framework.ExpectNoError(err)
   343  
   344  		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
   345  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   346  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   347  
   348  		// Check history and labels
   349  		ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
   350  		framework.ExpectNoError(err)
   351  		waitForHistoryCreated(ctx, c, ns, label, 1)
   352  		first := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
   353  		firstHash := first.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
   354  		gomega.Expect(first.Revision).To(gomega.Equal(int64(1)))
   355  		checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), firstHash)
   356  
   357  		ginkgo.By("Update daemon pods image.")
   358  		patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
   359  		ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
   360  		framework.ExpectNoError(err)
   361  
   362  		ginkgo.By("Check that daemon pods images aren't updated.")
   363  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkDaemonPodsImageAndAvailability(c, ds, image, 0))
   364  		framework.ExpectNoError(err)
   365  
   366  		ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
   367  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   368  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   369  
   370  		// Check history and labels
   371  		ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
   372  		framework.ExpectNoError(err)
   373  		waitForHistoryCreated(ctx, c, ns, label, 2)
   374  		cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
   375  		gomega.Expect(cur.Revision).To(gomega.Equal(int64(2)))
   376  		gomega.Expect(cur.Labels).NotTo(gomega.HaveKeyWithValue(appsv1.DefaultDaemonSetUniqueLabelKey, firstHash))
   377  		checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), firstHash)
   378  	})
   379  
   380  	/*
   381  	  Release: v1.10
   382  	  Testname: DaemonSet-RollingUpdate
   383  	  Description: A conformant Kubernetes distribution MUST support DaemonSet RollingUpdates.
   384  	*/
   385  	framework.ConformanceIt("should update pod when spec was updated and update strategy is RollingUpdate", func(ctx context.Context) {
   386  		label := map[string]string{daemonsetNameLabel: dsName}
   387  
   388  		framework.Logf("Creating simple daemon set %s", dsName)
   389  		ds := newDaemonSet(dsName, image, label)
   390  		ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
   391  		ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
   392  		framework.ExpectNoError(err)
   393  
   394  		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
   395  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   396  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   397  
   398  		// Check history and labels
   399  		ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
   400  		framework.ExpectNoError(err)
   401  		waitForHistoryCreated(ctx, c, ns, label, 1)
   402  		cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
   403  		hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
   404  		gomega.Expect(cur.Revision).To(gomega.Equal(int64(1)))
   405  		checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
   406  
   407  		ginkgo.By("Update daemon pods image.")
   408  		patch := getDaemonSetImagePatch(ds.Spec.Template.Spec.Containers[0].Name, AgnhostImage)
   409  		ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
   410  		framework.ExpectNoError(err)
   411  
   412  		// Time to complete the rolling upgrade is proportional to the number of nodes in the cluster.
   413  		// Get the number of nodes, and set the timeout appropriately.
   414  		nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
   415  		framework.ExpectNoError(err)
   416  		nodeCount := len(nodes.Items)
   417  		retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
   418  
   419  		ginkgo.By("Check that daemon pods images are updated.")
   420  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, retryTimeout, true, checkDaemonPodsImageAndAvailability(c, ds, AgnhostImage, 1))
   421  		framework.ExpectNoError(err)
   422  
   423  		ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
   424  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   425  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   426  
   427  		// Check history and labels
   428  		ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
   429  		framework.ExpectNoError(err)
   430  		waitForHistoryCreated(ctx, c, ns, label, 2)
   431  		cur = curHistory(listDaemonHistories(ctx, c, ns, label), ds)
   432  		hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
   433  		gomega.Expect(cur.Revision).To(gomega.Equal(int64(2)))
   434  		checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
   435  	})
   436  
   437  	/*
   438  	  Release: v1.10
   439  	  Testname: DaemonSet-Rollback
   440  	  Description: A conformant Kubernetes distribution MUST support automated, minimally disruptive
   441  	  rollback of updates to a DaemonSet.
   442  	*/
   443  	framework.ConformanceIt("should rollback without unnecessary restarts", func(ctx context.Context) {
   444  		schedulableNodes, err := e2enode.GetReadySchedulableNodes(ctx, c)
   445  		framework.ExpectNoError(err)
   446  		gomega.Expect(len(schedulableNodes.Items)).To(gomega.BeNumerically(">", 1), "Conformance test suite needs a cluster with at least 2 nodes.")
   447  		framework.Logf("Create a RollingUpdate DaemonSet")
   448  		label := map[string]string{daemonsetNameLabel: dsName}
   449  		ds := newDaemonSet(dsName, image, label)
   450  		ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{Type: appsv1.RollingUpdateDaemonSetStrategyType}
   451  		ds, err = c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
   452  		framework.ExpectNoError(err)
   453  
   454  		framework.Logf("Check that daemon pods launch on every node of the cluster")
   455  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   456  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   457  
   458  		framework.Logf("Update the DaemonSet to trigger a rollout")
   459  		// We use a nonexistent image here, so that we make sure it won't finish
   460  		newImage := "foo:non-existent"
   461  		newDS, err := updateDaemonSetWithRetries(ctx, c, ns, ds.Name, func(update *appsv1.DaemonSet) {
   462  			update.Spec.Template.Spec.Containers[0].Image = newImage
   463  		})
   464  		framework.ExpectNoError(err)
   465  
   466  		// Make sure we're in the middle of a rollout
   467  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkAtLeastOneNewPod(c, ns, label, newImage))
   468  		framework.ExpectNoError(err)
   469  
   470  		pods := listDaemonPods(ctx, c, ns, label)
   471  		var existingPods, newPods []*v1.Pod
   472  		for i := range pods.Items {
   473  			pod := pods.Items[i]
   474  			image := pod.Spec.Containers[0].Image
   475  			switch image {
   476  			case ds.Spec.Template.Spec.Containers[0].Image:
   477  				existingPods = append(existingPods, &pod)
   478  			case newDS.Spec.Template.Spec.Containers[0].Image:
   479  				newPods = append(newPods, &pod)
   480  			default:
   481  				framework.Failf("unexpected pod found, image = %s", image)
   482  			}
   483  		}
   484  		schedulableNodes, err = e2enode.GetReadySchedulableNodes(ctx, c)
   485  		framework.ExpectNoError(err)
   486  		if len(schedulableNodes.Items) < 2 {
   487  			gomega.Expect(existingPods).To(gomega.BeEmpty())
   488  		} else {
   489  			gomega.Expect(existingPods).NotTo(gomega.BeEmpty())
   490  		}
   491  		gomega.Expect(newPods).NotTo(gomega.BeEmpty())
   492  
   493  		framework.Logf("Roll back the DaemonSet before rollout is complete")
   494  		rollbackDS, err := updateDaemonSetWithRetries(ctx, c, ns, ds.Name, func(update *appsv1.DaemonSet) {
   495  			update.Spec.Template.Spec.Containers[0].Image = image
   496  		})
   497  		framework.ExpectNoError(err)
   498  
   499  		framework.Logf("Make sure DaemonSet rollback is complete")
   500  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkDaemonPodsImageAndAvailability(c, rollbackDS, image, 1))
   501  		framework.ExpectNoError(err)
   502  
   503  		// After rollback is done, compare current pods with previous old pods during rollout, to make sure they're not restarted
   504  		pods = listDaemonPods(ctx, c, ns, label)
   505  		rollbackPods := map[string]bool{}
   506  		for _, pod := range pods.Items {
   507  			rollbackPods[pod.Name] = true
   508  		}
   509  		for _, pod := range existingPods {
   510  			if !rollbackPods[pod.Name] {
   511  				framework.Failf("unexpected pod %s be restarted", pod.Name)
   512  			}
   513  		}
   514  	})
   515  
   516  	// TODO: This test is expected to be promoted to conformance after the feature is promoted
   517  	ginkgo.It("should surge pods onto nodes when spec was updated and update strategy is RollingUpdate", func(ctx context.Context) {
   518  		label := map[string]string{daemonsetNameLabel: dsName}
   519  
   520  		framework.Logf("Creating surge daemon set %s", dsName)
   521  		maxSurgeOverlap := 60 * time.Second
   522  		maxSurge := 1
   523  		surgePercent := intstr.FromString("20%")
   524  		zero := intstr.FromInt32(0)
   525  		oldVersion := "1"
   526  		ds := newDaemonSet(dsName, image, label)
   527  		ds.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{
   528  			{Name: "VERSION", Value: oldVersion},
   529  		}
   530  		// delay shutdown by 15s to allow containers to overlap in time
   531  		ds.Spec.Template.Spec.Containers[0].Lifecycle = &v1.Lifecycle{
   532  			PreStop: &v1.LifecycleHandler{
   533  				Exec: &v1.ExecAction{
   534  					Command: []string{"/bin/sh", "-c", "sleep 15"},
   535  				},
   536  			},
   537  		}
   538  		// use a readiness probe that can be forced to fail (by changing the contents of /var/tmp/ready)
   539  		ds.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{
   540  			ProbeHandler: v1.ProbeHandler{
   541  				Exec: &v1.ExecAction{
   542  					Command: []string{"/bin/sh", "-ec", `touch /var/tmp/ready; [[ "$( cat /var/tmp/ready )" == "" ]]`},
   543  				},
   544  			},
   545  			InitialDelaySeconds: 7,
   546  			PeriodSeconds:       3,
   547  			SuccessThreshold:    1,
   548  			FailureThreshold:    1,
   549  		}
   550  		// use a simple surge strategy
   551  		ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{
   552  			Type: appsv1.RollingUpdateDaemonSetStrategyType,
   553  			RollingUpdate: &appsv1.RollingUpdateDaemonSet{
   554  				MaxUnavailable: &zero,
   555  				MaxSurge:       &surgePercent,
   556  			},
   557  		}
   558  		// The pod must be ready for at least 10s before we delete the old pod
   559  		ds.Spec.MinReadySeconds = 10
   560  
   561  		ds, err := c.AppsV1().DaemonSets(ns).Create(ctx, ds, metav1.CreateOptions{})
   562  		framework.ExpectNoError(err)
   563  
   564  		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
   565  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   566  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   567  
   568  		// Check history and labels
   569  		ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
   570  		framework.ExpectNoError(err)
   571  		waitForHistoryCreated(ctx, c, ns, label, 1)
   572  		cur := curHistory(listDaemonHistories(ctx, c, ns, label), ds)
   573  		hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
   574  		gomega.Expect(cur.Revision).To(gomega.Equal(int64(1)))
   575  		checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
   576  
   577  		newVersion := "2"
   578  		ginkgo.By("Update daemon pods environment var")
   579  		patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%s"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, newVersion)
   580  		ds, err = c.AppsV1().DaemonSets(ns).Patch(ctx, dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
   581  		framework.ExpectNoError(err)
   582  
   583  		// Time to complete the rolling upgrade is proportional to the number of nodes in the cluster.
   584  		// Get the number of nodes, and set the timeout appropriately.
   585  		nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
   586  		framework.ExpectNoError(err)
   587  		nodeCount := len(nodes.Items)
   588  		retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
   589  
   590  		ginkgo.By("Check that daemon pods surge and invariants are preserved during that rollout")
   591  		ageOfOldPod := make(map[string]time.Time)
   592  		deliberatelyDeletedPods := sets.NewString()
   593  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, retryTimeout, true, func(ctx context.Context) (bool, error) {
   594  			podList, err := c.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{})
   595  			if err != nil {
   596  				return false, err
   597  			}
   598  			pods := podList.Items
   599  
   600  			var buf bytes.Buffer
   601  			pw := tabwriter.NewWriter(&buf, 1, 1, 1, ' ', 0)
   602  			fmt.Fprint(pw, "Node\tVersion\tName\tUID\tDeleted\tReady\n")
   603  
   604  			now := time.Now()
   605  			podUIDs := sets.NewString()
   606  			deletedPodUIDs := sets.NewString()
   607  			nodes := sets.NewString()
   608  			versions := sets.NewString()
   609  			nodesToVersions := make(map[string]map[string]int)
   610  			nodesToDeletedVersions := make(map[string]map[string]int)
   611  			var surgeCount, newUnavailableCount, newDeliberatelyDeletedCount, oldUnavailableCount, nodesWithoutOldVersion int
   612  			for _, pod := range pods {
   613  				if !metav1.IsControlledBy(&pod, ds) {
   614  					continue
   615  				}
   616  				nodeName := pod.Spec.NodeName
   617  				nodes.Insert(nodeName)
   618  				podVersion := pod.Spec.Containers[0].Env[0].Value
   619  				if pod.DeletionTimestamp != nil {
   620  					if !deliberatelyDeletedPods.Has(string(pod.UID)) {
   621  						versions := nodesToDeletedVersions[nodeName]
   622  						if versions == nil {
   623  							versions = make(map[string]int)
   624  							nodesToDeletedVersions[nodeName] = versions
   625  						}
   626  						versions[podVersion]++
   627  					}
   628  				} else {
   629  					versions := nodesToVersions[nodeName]
   630  					if versions == nil {
   631  						versions = make(map[string]int)
   632  						nodesToVersions[nodeName] = versions
   633  					}
   634  					versions[podVersion]++
   635  				}
   636  
   637  				ready := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now())
   638  				if podVersion == newVersion {
   639  					surgeCount++
   640  					if !ready || pod.DeletionTimestamp != nil {
   641  						if deliberatelyDeletedPods.Has(string(pod.UID)) {
   642  							newDeliberatelyDeletedCount++
   643  						}
   644  						newUnavailableCount++
   645  					}
   646  				} else {
   647  					if !ready || pod.DeletionTimestamp != nil {
   648  						oldUnavailableCount++
   649  					}
   650  				}
   651  				fmt.Fprintf(pw, "%s\t%s\t%s\t%s\t%t\t%t\n", pod.Spec.NodeName, podVersion, pod.Name, pod.UID, pod.DeletionTimestamp != nil, ready)
   652  			}
   653  
   654  			// print a stable sorted list of pods by node for debugging
   655  			pw.Flush()
   656  			lines := strings.Split(buf.String(), "\n")
   657  			lines = lines[:len(lines)-1]
   658  			sort.Strings(lines[1:])
   659  			for _, line := range lines {
   660  				framework.Logf("%s", line)
   661  			}
   662  
   663  			// if there is an old and new pod at the same time, record a timestamp
   664  			deletedPerNode := make(map[string]int)
   665  			for _, pod := range pods {
   666  				if !metav1.IsControlledBy(&pod, ds) {
   667  					continue
   668  				}
   669  				// ignore deleted pods
   670  				if pod.DeletionTimestamp != nil {
   671  					deletedPodUIDs.Insert(string(pod.UID))
   672  					if !deliberatelyDeletedPods.Has(string(pod.UID)) {
   673  						deletedPerNode[pod.Spec.NodeName]++
   674  					}
   675  					continue
   676  				}
   677  				podUIDs.Insert(string(pod.UID))
   678  				podVersion := pod.Spec.Containers[0].Env[0].Value
   679  				if podVersion == newVersion {
   680  					continue
   681  				}
   682  				// if this is a pod in an older version AND there is a new version of this pod, record when
   683  				// we started seeing this, otherwise delete the record (perhaps the node was drained)
   684  				if nodesToVersions[pod.Spec.NodeName][newVersion] > 0 {
   685  					if _, ok := ageOfOldPod[string(pod.UID)]; !ok {
   686  						ageOfOldPod[string(pod.UID)] = now
   687  					}
   688  				} else {
   689  					delete(ageOfOldPod, string(pod.UID))
   690  				}
   691  			}
   692  			// purge the old pods list of any deleted pods
   693  			for uid := range ageOfOldPod {
   694  				if !podUIDs.Has(uid) {
   695  					delete(ageOfOldPod, uid)
   696  				}
   697  			}
   698  			deliberatelyDeletedPods = deliberatelyDeletedPods.Intersection(deletedPodUIDs)
   699  
   700  			for _, versions := range nodesToVersions {
   701  				if versions[oldVersion] == 0 {
   702  					nodesWithoutOldVersion++
   703  				}
   704  			}
   705  
   706  			var errs []string
   707  
   708  			// invariant: we should not see more than 1 deleted pod per node unless a severe node problem is occurring or the controller is misbehaving
   709  			for node, count := range deletedPerNode {
   710  				if count > 1 {
   711  					errs = append(errs, fmt.Sprintf("Node %s has %d deleted pods, which may indicate a problem on the node or a controller race condition", node, count))
   712  				}
   713  			}
   714  
   715  			// invariant: the controller must react to the new pod becoming ready within a reasonable timeframe (2x grace period)
   716  			for uid, firstSeen := range ageOfOldPod {
   717  				if now.Sub(firstSeen) > maxSurgeOverlap {
   718  					errs = append(errs, fmt.Sprintf("An old pod with UID %s has been running alongside a newer version for longer than %s", uid, maxSurgeOverlap))
   719  				}
   720  			}
   721  
   722  			// invariant: we should never have more than maxSurge + oldUnavailableCount instances of the new version unready unless a flake in the infrastructure happens, or
   723  			//            if we deliberately deleted one of the new pods
   724  			if newUnavailableCount > (maxSurge + oldUnavailableCount + newDeliberatelyDeletedCount + nodesWithoutOldVersion) {
   725  				errs = append(errs, fmt.Sprintf("observed %d new unavailable pods greater than (surge count %d + old unavailable count %d + deliberately deleted new count %d + nodes without old version %d), may be infrastructure flake", newUnavailableCount, maxSurge, oldUnavailableCount, newDeliberatelyDeletedCount, nodesWithoutOldVersion))
   726  			}
   727  			// invariant: the total number of versions created should be 2
   728  			if versions.Len() > 2 {
   729  				errs = append(errs, fmt.Sprintf("observed %d versions running simultaneously, must have max 2", versions.Len()))
   730  			}
   731  			for _, node := range nodes.List() {
   732  				// ignore pods that haven't been scheduled yet
   733  				if len(node) == 0 {
   734  					continue
   735  				}
   736  				versionCount := make(map[string]int)
   737  				// invariant: surge should never have more than one instance of a pod per node running
   738  				for version, count := range nodesToVersions[node] {
   739  					if count > 1 {
   740  						errs = append(errs, fmt.Sprintf("node %s has %d instances of version %s running simultaneously, must have max 1", node, count, version))
   741  					}
   742  					versionCount[version] += count
   743  				}
   744  				// invariant: when surging, the most number of pods we should allow to be deleted is 2 (if we are getting evicted)
   745  				for version, count := range nodesToDeletedVersions[node] {
   746  					if count > 2 {
   747  						errs = append(errs, fmt.Sprintf("node %s has %d deleted instances of version %s running simultaneously, must have max 1", node, count, version))
   748  					}
   749  					versionCount[version] += count
   750  				}
   751  				// invariant: on any node, we should never have more than two instances of a version (if we are getting evicted)
   752  				for version, count := range versionCount {
   753  					if count > 2 {
   754  						errs = append(errs, fmt.Sprintf("node %s has %d total instances of version %s running simultaneously, must have max 2 (one deleted and one running)", node, count, version))
   755  					}
   756  				}
   757  			}
   758  
   759  			if len(errs) > 0 {
   760  				sort.Strings(errs)
   761  				return false, fmt.Errorf("invariants were violated during daemonset update:\n%s", strings.Join(errs, "\n"))
   762  			}
   763  
   764  			// Make sure every daemon pod on the node has been updated
   765  			nodeNames := e2edaemonset.SchedulableNodes(ctx, c, ds)
   766  			for _, node := range nodeNames {
   767  				switch {
   768  				case
   769  					// if we don't have the new version yet
   770  					nodesToVersions[node][newVersion] == 0,
   771  					// if there are more than one version on a node
   772  					len(nodesToVersions[node]) > 1,
   773  					// if there are still any deleted pods
   774  					len(nodesToDeletedVersions[node]) > 0,
   775  					// if any of the new pods are unavailable
   776  					newUnavailableCount > 0:
   777  
   778  					// inject a failure randomly to ensure the controller recovers
   779  					switch rand.Intn(25) {
   780  					// cause a random old pod to go unready
   781  					case 0:
   782  						// select a not-deleted pod of the old version
   783  						if pod := randomPod(pods, func(pod *v1.Pod) bool {
   784  							return pod.DeletionTimestamp == nil && oldVersion == pod.Spec.Containers[0].Env[0].Value
   785  						}); pod != nil {
   786  							// make the /tmp/ready file read only, which will cause readiness to fail
   787  							if _, err := e2ekubectl.RunKubectl(pod.Namespace, "exec", "-c", pod.Spec.Containers[0].Name, pod.Name, "--", "/bin/sh", "-ec", "echo 0 > /var/tmp/ready"); err != nil {
   788  								framework.Logf("Failed to mark pod %s as unready via exec: %v", pod.Name, err)
   789  							} else {
   790  								framework.Logf("Marked old pod %s as unready", pod.Name)
   791  							}
   792  						}
   793  					case 1:
   794  						// delete a random pod
   795  						if pod := randomPod(pods, func(pod *v1.Pod) bool {
   796  							return pod.DeletionTimestamp == nil
   797  						}); pod != nil {
   798  							if err := c.CoreV1().Pods(ds.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
   799  								framework.Logf("Failed to delete pod %s early: %v", pod.Name, err)
   800  							} else {
   801  								framework.Logf("Deleted pod %s prematurely", pod.Name)
   802  								deliberatelyDeletedPods.Insert(string(pod.UID))
   803  							}
   804  						}
   805  					}
   806  
   807  					// then wait
   808  					return false, nil
   809  				}
   810  			}
   811  			return true, nil
   812  		})
   813  		framework.ExpectNoError(err)
   814  
   815  		ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
   816  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, ds))
   817  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   818  
   819  		// Check history and labels
   820  		ds, err = c.AppsV1().DaemonSets(ns).Get(ctx, ds.Name, metav1.GetOptions{})
   821  		framework.ExpectNoError(err)
   822  		waitForHistoryCreated(ctx, c, ns, label, 2)
   823  		cur = curHistory(listDaemonHistories(ctx, c, ns, label), ds)
   824  		hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
   825  		gomega.Expect(cur.Revision).To(gomega.Equal(int64(2)))
   826  		checkDaemonSetPodsLabels(listDaemonPods(ctx, c, ns, label), hash)
   827  	})
   828  
   829  	/*
   830  		Release: v1.22
   831  		Testname: DaemonSet, list and delete a collection of DaemonSets
   832  		Description: When a DaemonSet is created it MUST succeed. It
   833  		MUST succeed when listing DaemonSets via a label selector. It
   834  		MUST succeed when deleting the DaemonSet via deleteCollection.
   835  	*/
   836  	framework.ConformanceIt("should list and delete a collection of DaemonSets", func(ctx context.Context) {
   837  		label := map[string]string{daemonsetNameLabel: dsName}
   838  		labelSelector := labels.SelectorFromSet(label).String()
   839  
   840  		dsClient := f.ClientSet.AppsV1().DaemonSets(ns)
   841  		cs := f.ClientSet
   842  		one := int64(1)
   843  
   844  		ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
   845  		testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{})
   846  		framework.ExpectNoError(err)
   847  
   848  		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
   849  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, testDaemonset))
   850  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   851  		err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
   852  		framework.ExpectNoError(err)
   853  
   854  		ginkgo.By("listing all DaemonSets")
   855  		dsList, err := cs.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
   856  		framework.ExpectNoError(err, "failed to list Daemon Sets")
   857  		gomega.Expect(dsList.Items).To(gomega.HaveLen(1), "filtered list wasn't found")
   858  
   859  		ginkgo.By("DeleteCollection of the DaemonSets")
   860  		err = dsClient.DeleteCollection(ctx, metav1.DeleteOptions{GracePeriodSeconds: &one}, metav1.ListOptions{LabelSelector: labelSelector})
   861  		framework.ExpectNoError(err, "failed to delete DaemonSets")
   862  
   863  		ginkgo.By("Verify that ReplicaSets have been deleted")
   864  		dsList, err = c.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
   865  		framework.ExpectNoError(err, "failed to list DaemonSets")
   866  		gomega.Expect(dsList.Items).To(gomega.BeEmpty(), "filtered list should have no daemonset")
   867  	})
   868  
   869  	/*	Release: v1.22
   870  		Testname: DaemonSet, status sub-resource
   871  		Description: When a DaemonSet is created it MUST succeed.
   872  		Attempt to read, update and patch its status sub-resource; all
   873  		mutating sub-resource operations MUST be visible to subsequent reads.
   874  	*/
   875  	framework.ConformanceIt("should verify changes to a daemon set status", func(ctx context.Context) {
   876  		label := map[string]string{daemonsetNameLabel: dsName}
   877  		labelSelector := labels.SelectorFromSet(label).String()
   878  
   879  		dsClient := f.ClientSet.AppsV1().DaemonSets(ns)
   880  		cs := f.ClientSet
   881  
   882  		w := &cache.ListWatch{
   883  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   884  				options.LabelSelector = labelSelector
   885  				return dsClient.Watch(ctx, options)
   886  			},
   887  		}
   888  
   889  		dsList, err := cs.AppsV1().DaemonSets("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
   890  		framework.ExpectNoError(err, "failed to list Daemon Sets")
   891  
   892  		ginkgo.By(fmt.Sprintf("Creating simple DaemonSet %q", dsName))
   893  		testDaemonset, err := c.AppsV1().DaemonSets(ns).Create(ctx, newDaemonSetWithLabel(dsName, image, label), metav1.CreateOptions{})
   894  		framework.ExpectNoError(err)
   895  
   896  		ginkgo.By("Check that daemon pods launch on every node of the cluster.")
   897  		err = wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, checkRunningOnAllNodes(f, testDaemonset))
   898  		framework.ExpectNoError(err, "error waiting for daemon pod to start")
   899  		err = e2edaemonset.CheckDaemonStatus(ctx, f, dsName)
   900  		framework.ExpectNoError(err)
   901  
   902  		ginkgo.By("Getting /status")
   903  		dsResource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}
   904  		dsStatusUnstructured, err := f.DynamicClient.Resource(dsResource).Namespace(ns).Get(ctx, dsName, metav1.GetOptions{}, "status")
   905  		framework.ExpectNoError(err, "Failed to fetch the status of daemon set %s in namespace %s", dsName, ns)
   906  		dsStatusBytes, err := json.Marshal(dsStatusUnstructured)
   907  		framework.ExpectNoError(err, "Failed to marshal unstructured response. %v", err)
   908  
   909  		var dsStatus appsv1.DaemonSet
   910  		err = json.Unmarshal(dsStatusBytes, &dsStatus)
   911  		framework.ExpectNoError(err, "Failed to unmarshal JSON bytes to a daemon set object type")
   912  		framework.Logf("Daemon Set %s has Conditions: %v", dsName, dsStatus.Status.Conditions)
   913  
   914  		ginkgo.By("updating the DaemonSet Status")
   915  		var statusToUpdate, updatedStatus *appsv1.DaemonSet
   916  
   917  		err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
   918  			statusToUpdate, err = dsClient.Get(ctx, dsName, metav1.GetOptions{})
   919  			framework.ExpectNoError(err, "Unable to retrieve daemon set %s", dsName)
   920  
   921  			statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, appsv1.DaemonSetCondition{
   922  				Type:    "StatusUpdate",
   923  				Status:  "True",
   924  				Reason:  "E2E",
   925  				Message: "Set from e2e test",
   926  			})
   927  
   928  			updatedStatus, err = dsClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
   929  			return err
   930  		})
   931  		framework.ExpectNoError(err, "Failed to update status. %v", err)
   932  		framework.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions)
   933  
   934  		ginkgo.By("watching for the daemon set status to be updated")
   935  		ctxUntil, cancel := context.WithTimeout(ctx, dsRetryTimeout)
   936  		defer cancel()
   937  		_, err = watchtools.Until(ctxUntil, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
   938  			if ds, ok := event.Object.(*appsv1.DaemonSet); ok {
   939  				found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name &&
   940  					ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace &&
   941  					ds.Labels[daemonsetNameLabel] == dsName
   942  				if !found {
   943  					framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
   944  					return false, nil
   945  				}
   946  				for _, cond := range ds.Status.Conditions {
   947  					if cond.Type == "StatusUpdate" &&
   948  						cond.Reason == "E2E" &&
   949  						cond.Message == "Set from e2e test" {
   950  						framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions)
   951  						return found, nil
   952  					}
   953  					framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
   954  				}
   955  			}
   956  			object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0]
   957  			framework.Logf("Observed %v event: %+v", object, event.Type)
   958  			return false, nil
   959  		})
   960  		framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns)
   961  		framework.Logf("Daemon set %s has an updated status", dsName)
   962  
   963  		ginkgo.By("patching the DaemonSet Status")
   964  		daemonSetStatusPatch := appsv1.DaemonSet{
   965  			Status: appsv1.DaemonSetStatus{
   966  				Conditions: []appsv1.DaemonSetCondition{
   967  					{
   968  						Type:   "StatusPatched",
   969  						Status: "True",
   970  					},
   971  				},
   972  			},
   973  		}
   974  
   975  		payload, err := json.Marshal(daemonSetStatusPatch)
   976  		framework.ExpectNoError(err, "Failed to marshal JSON. %v", err)
   977  		_, err = dsClient.Patch(ctx, dsName, types.MergePatchType, payload, metav1.PatchOptions{}, "status")
   978  		framework.ExpectNoError(err, "Failed to patch daemon set status", err)
   979  
   980  		ginkgo.By("watching for the daemon set status to be patched")
   981  		ctxUntil, cancel = context.WithTimeout(ctx, dsRetryTimeout)
   982  		defer cancel()
   983  		_, err = watchtools.Until(ctxUntil, dsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
   984  			if ds, ok := event.Object.(*appsv1.DaemonSet); ok {
   985  				found := ds.ObjectMeta.Name == testDaemonset.ObjectMeta.Name &&
   986  					ds.ObjectMeta.Namespace == testDaemonset.ObjectMeta.Namespace &&
   987  					ds.Labels[daemonsetNameLabel] == dsName
   988  				if !found {
   989  					framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
   990  					return false, nil
   991  				}
   992  				for _, cond := range ds.Status.Conditions {
   993  					if cond.Type == "StatusPatched" {
   994  						framework.Logf("Found daemon set %v in namespace %v with labels: %v annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.ObjectMeta.Labels, ds.Annotations, ds.Status.Conditions)
   995  						return found, nil
   996  					}
   997  					framework.Logf("Observed daemon set %v in namespace %v with annotations: %v & Conditions: %v", ds.ObjectMeta.Name, ds.ObjectMeta.Namespace, ds.Annotations, ds.Status.Conditions)
   998  				}
   999  			}
  1000  			object := strings.Split(fmt.Sprintf("%v", event.Object), "{")[0]
  1001  			framework.Logf("Observed %v event: %v", object, event.Type)
  1002  			return false, nil
  1003  		})
  1004  		framework.ExpectNoError(err, "failed to locate daemon set %v in namespace %v", testDaemonset.ObjectMeta.Name, ns)
  1005  		framework.Logf("Daemon set %s has a patched status", dsName)
  1006  	})
  1007  })
  1008  
  1009  // randomPod selects a random pod within pods that causes fn to return true, or nil
  1010  // if no pod can be found matching the criteria.
  1011  func randomPod(pods []v1.Pod, fn func(p *v1.Pod) bool) *v1.Pod {
  1012  	podCount := len(pods)
  1013  	for offset, i := rand.Intn(podCount), 0; i < (podCount - 1); i++ {
  1014  		pod := &pods[(offset+i)%podCount]
  1015  		if fn(pod) {
  1016  			return pod
  1017  		}
  1018  	}
  1019  	return nil
  1020  }
  1021  
  1022  // getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image
  1023  func getDaemonSetImagePatch(containerName, containerImage string) string {
  1024  	return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage)
  1025  }
  1026  
  1027  func newDaemonSet(dsName, image string, label map[string]string) *appsv1.DaemonSet {
  1028  	ds := newDaemonSetWithLabel(dsName, image, label)
  1029  	ds.ObjectMeta.Labels = nil
  1030  	return ds
  1031  }
  1032  
  1033  func newDaemonSetWithLabel(dsName, image string, label map[string]string) *appsv1.DaemonSet {
  1034  	return e2edaemonset.NewDaemonSet(dsName, image, label, nil, nil, []v1.ContainerPort{{ContainerPort: 9376}})
  1035  }
  1036  
  1037  func listDaemonPods(ctx context.Context, c clientset.Interface, ns string, label map[string]string) *v1.PodList {
  1038  	selector := labels.Set(label).AsSelector()
  1039  	options := metav1.ListOptions{
  1040  		LabelSelector: selector.String(),
  1041  		FieldSelector: nonTerminalPhaseSelector.String(),
  1042  	}
  1043  	podList, err := c.CoreV1().Pods(ns).List(ctx, options)
  1044  	framework.ExpectNoError(err)
  1045  	gomega.Expect(podList.Items).ToNot(gomega.BeEmpty())
  1046  	return podList
  1047  }
  1048  
  1049  func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
  1050  	daemonSetLabels := map[string]string{}
  1051  	otherLabels := map[string]string{}
  1052  	for k, v := range labels {
  1053  		if strings.HasPrefix(k, daemonsetLabelPrefix) {
  1054  			daemonSetLabels[k] = v
  1055  		} else {
  1056  			otherLabels[k] = v
  1057  		}
  1058  	}
  1059  	return daemonSetLabels, otherLabels
  1060  }
  1061  
  1062  func clearDaemonSetNodeLabels(ctx context.Context, c clientset.Interface) error {
  1063  	nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
  1064  	if err != nil {
  1065  		return err
  1066  	}
  1067  	for _, node := range nodeList.Items {
  1068  		_, err := setDaemonSetNodeLabels(ctx, c, node.Name, map[string]string{})
  1069  		if err != nil {
  1070  			return err
  1071  		}
  1072  	}
  1073  	return nil
  1074  }
  1075  
  1076  // patchNamespaceAnnotations sets node selectors related annotations on tests namespaces to empty
  1077  func patchNamespaceAnnotations(ctx context.Context, c clientset.Interface, nsName string) (*v1.Namespace, error) {
  1078  	nsClient := c.CoreV1().Namespaces()
  1079  
  1080  	annotations := make(map[string]string)
  1081  	for _, n := range NamespaceNodeSelectors {
  1082  		annotations[n] = ""
  1083  	}
  1084  	nsPatch, err := json.Marshal(map[string]interface{}{
  1085  		"metadata": map[string]interface{}{
  1086  			"annotations": annotations,
  1087  		},
  1088  	})
  1089  	if err != nil {
  1090  		return nil, err
  1091  	}
  1092  
  1093  	return nsClient.Patch(ctx, nsName, types.StrategicMergePatchType, nsPatch, metav1.PatchOptions{})
  1094  }
  1095  
  1096  func setDaemonSetNodeLabels(ctx context.Context, c clientset.Interface, nodeName string, labels map[string]string) (*v1.Node, error) {
  1097  	nodeClient := c.CoreV1().Nodes()
  1098  	var newNode *v1.Node
  1099  	var newLabels map[string]string
  1100  	err := wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, func(ctx context.Context) (bool, error) {
  1101  		node, err := nodeClient.Get(ctx, nodeName, metav1.GetOptions{})
  1102  		if err != nil {
  1103  			return false, err
  1104  		}
  1105  
  1106  		// remove all labels this test is creating
  1107  		daemonSetLabels, otherLabels := separateDaemonSetNodeLabels(node.Labels)
  1108  		if reflect.DeepEqual(daemonSetLabels, labels) {
  1109  			newNode = node
  1110  			return true, nil
  1111  		}
  1112  		node.Labels = otherLabels
  1113  		for k, v := range labels {
  1114  			node.Labels[k] = v
  1115  		}
  1116  		newNode, err = nodeClient.Update(ctx, node, metav1.UpdateOptions{})
  1117  		if err == nil {
  1118  			newLabels, _ = separateDaemonSetNodeLabels(newNode.Labels)
  1119  			return true, err
  1120  		}
  1121  		if se, ok := err.(*apierrors.StatusError); ok && se.ErrStatus.Reason == metav1.StatusReasonConflict {
  1122  			framework.Logf("failed to update node due to resource version conflict")
  1123  			return false, nil
  1124  		}
  1125  		return false, err
  1126  	})
  1127  	if err != nil {
  1128  		return nil, err
  1129  	} else if len(newLabels) != len(labels) {
  1130  		return nil, fmt.Errorf("could not set daemon set test labels as expected")
  1131  	}
  1132  
  1133  	return newNode, nil
  1134  }
  1135  
  1136  func checkRunningOnAllNodes(f *framework.Framework, ds *appsv1.DaemonSet) func(ctx context.Context) (bool, error) {
  1137  	return func(ctx context.Context) (bool, error) {
  1138  		return e2edaemonset.CheckRunningOnAllNodes(ctx, f, ds)
  1139  	}
  1140  }
  1141  
  1142  func checkAtLeastOneNewPod(c clientset.Interface, ns string, label map[string]string, newImage string) func(ctx context.Context) (bool, error) {
  1143  	return func(ctx context.Context) (bool, error) {
  1144  		pods := listDaemonPods(ctx, c, ns, label)
  1145  		for _, pod := range pods.Items {
  1146  			if pod.Spec.Containers[0].Image == newImage {
  1147  				return true, nil
  1148  			}
  1149  		}
  1150  		return false, nil
  1151  	}
  1152  }
  1153  
  1154  func checkRunningOnNoNodes(f *framework.Framework, ds *appsv1.DaemonSet) func(ctx context.Context) (bool, error) {
  1155  	return e2edaemonset.CheckDaemonPodOnNodes(f, ds, make([]string, 0))
  1156  }
  1157  
  1158  func checkDaemonPodsImageAndAvailability(c clientset.Interface, ds *appsv1.DaemonSet, image string, maxUnavailable int) func(ctx context.Context) (bool, error) {
  1159  	return func(ctx context.Context) (bool, error) {
  1160  		podList, err := c.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{})
  1161  		if err != nil {
  1162  			return false, err
  1163  		}
  1164  		pods := podList.Items
  1165  
  1166  		unavailablePods := 0
  1167  		nodesToUpdatedPodCount := make(map[string]int)
  1168  		for _, pod := range pods {
  1169  			// Ignore the pod on the node that is supposed to be deleted
  1170  			if pod.DeletionTimestamp != nil {
  1171  				continue
  1172  			}
  1173  			if !metav1.IsControlledBy(&pod, ds) {
  1174  				continue
  1175  			}
  1176  			podImage := pod.Spec.Containers[0].Image
  1177  			if podImage != image {
  1178  				framework.Logf("Wrong image for pod: %s. Expected: %s, got: %s.", pod.Name, image, podImage)
  1179  			} else {
  1180  				nodesToUpdatedPodCount[pod.Spec.NodeName]++
  1181  			}
  1182  			if !podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) {
  1183  				framework.Logf("Pod %s is not available", pod.Name)
  1184  				unavailablePods++
  1185  			}
  1186  		}
  1187  		if unavailablePods > maxUnavailable {
  1188  			return false, fmt.Errorf("number of unavailable pods: %d is greater than maxUnavailable: %d", unavailablePods, maxUnavailable)
  1189  		}
  1190  		// Make sure every daemon pod on the node has been updated
  1191  		nodeNames := e2edaemonset.SchedulableNodes(ctx, c, ds)
  1192  		for _, node := range nodeNames {
  1193  			if nodesToUpdatedPodCount[node] == 0 {
  1194  				return false, nil
  1195  			}
  1196  		}
  1197  		return true, nil
  1198  	}
  1199  }
  1200  
  1201  func checkDaemonSetPodsLabels(podList *v1.PodList, hash string) {
  1202  	for _, pod := range podList.Items {
  1203  		// Ignore all the DS pods that will be deleted
  1204  		if pod.DeletionTimestamp != nil {
  1205  			continue
  1206  		}
  1207  		podHash := pod.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
  1208  		gomega.Expect(podHash).ToNot(gomega.BeEmpty())
  1209  		if len(hash) > 0 {
  1210  			gomega.Expect(podHash).To(gomega.Equal(hash), "unexpected hash for pod %s", pod.Name)
  1211  		}
  1212  	}
  1213  }
  1214  
  1215  func waitForHistoryCreated(ctx context.Context, c clientset.Interface, ns string, label map[string]string, numHistory int) {
  1216  	listHistoryFn := func(ctx context.Context) (bool, error) {
  1217  		selector := labels.Set(label).AsSelector()
  1218  		options := metav1.ListOptions{LabelSelector: selector.String()}
  1219  		historyList, err := c.AppsV1().ControllerRevisions(ns).List(ctx, options)
  1220  		if err != nil {
  1221  			return false, err
  1222  		}
  1223  		if len(historyList.Items) == numHistory {
  1224  			return true, nil
  1225  		}
  1226  		framework.Logf("%d/%d controllerrevisions created.", len(historyList.Items), numHistory)
  1227  		return false, nil
  1228  	}
  1229  	err := wait.PollUntilContextTimeout(ctx, dsRetryPeriod, dsRetryTimeout, true, listHistoryFn)
  1230  	framework.ExpectNoError(err, "error waiting for controllerrevisions to be created")
  1231  }
  1232  
  1233  func listDaemonHistories(ctx context.Context, c clientset.Interface, ns string, label map[string]string) *appsv1.ControllerRevisionList {
  1234  	selector := labels.Set(label).AsSelector()
  1235  	options := metav1.ListOptions{LabelSelector: selector.String()}
  1236  	historyList, err := c.AppsV1().ControllerRevisions(ns).List(ctx, options)
  1237  	framework.ExpectNoError(err)
  1238  	gomega.Expect(historyList.Items).ToNot(gomega.BeEmpty())
  1239  	return historyList
  1240  }
  1241  
  1242  func curHistory(historyList *appsv1.ControllerRevisionList, ds *appsv1.DaemonSet) *appsv1.ControllerRevision {
  1243  	var curHistory *appsv1.ControllerRevision
  1244  	foundCurHistories := 0
  1245  	for i := range historyList.Items {
  1246  		history := &historyList.Items[i]
  1247  		// Every history should have the hash label
  1248  		gomega.Expect(history.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]).ToNot(gomega.BeEmpty())
  1249  		match, err := daemon.Match(ds, history)
  1250  		framework.ExpectNoError(err)
  1251  		if match {
  1252  			curHistory = history
  1253  			foundCurHistories++
  1254  		}
  1255  	}
  1256  	gomega.Expect(foundCurHistories).To(gomega.Equal(1))
  1257  	gomega.Expect(curHistory).NotTo(gomega.BeNil())
  1258  	return curHistory
  1259  }
  1260  
  1261  func waitFailedDaemonPodDeleted(c clientset.Interface, pod *v1.Pod) func(ctx context.Context) (bool, error) {
  1262  	return func(ctx context.Context) (bool, error) {
  1263  		if _, err := c.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil {
  1264  			if apierrors.IsNotFound(err) {
  1265  				return true, nil
  1266  			}
  1267  			return false, fmt.Errorf("failed to get failed daemon pod %q: %w", pod.Name, err)
  1268  		}
  1269  		return false, nil
  1270  	}
  1271  }
  1272  

View as plain text