...

Source file src/k8s.io/kubernetes/test/e2e/apps/disruption.go

Documentation: k8s.io/kubernetes/test/e2e/apps

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apps
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strings"
    23  	"time"
    24  
    25  	"github.com/onsi/gomega"
    26  
    27  	jsonpatch "github.com/evanphx/json-patch"
    28  	"github.com/onsi/ginkgo/v2"
    29  
    30  	appsv1 "k8s.io/api/apps/v1"
    31  	v1 "k8s.io/api/core/v1"
    32  	policyv1 "k8s.io/api/policy/v1"
    33  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    36  	"k8s.io/apimachinery/pkg/labels"
    37  	"k8s.io/apimachinery/pkg/runtime"
    38  	"k8s.io/apimachinery/pkg/types"
    39  	"k8s.io/apimachinery/pkg/util/intstr"
    40  	"k8s.io/apimachinery/pkg/util/json"
    41  	"k8s.io/apimachinery/pkg/util/wait"
    42  	"k8s.io/client-go/dynamic"
    43  	"k8s.io/client-go/kubernetes"
    44  	clientscheme "k8s.io/client-go/kubernetes/scheme"
    45  	"k8s.io/client-go/util/retry"
    46  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    47  	"k8s.io/kubernetes/test/e2e/framework"
    48  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    49  	imageutils "k8s.io/kubernetes/test/utils/image"
    50  	admissionapi "k8s.io/pod-security-admission/api"
    51  )
    52  
    53  // schedulingTimeout is longer specifically because sometimes we need to wait
    54  // awhile to guarantee that we've been patient waiting for something ordinary
    55  // to happen: a pod to get scheduled and move into Ready
    56  const (
    57  	bigClusterSize    = 7
    58  	schedulingTimeout = 10 * time.Minute
    59  	timeout           = 60 * time.Second
    60  	defaultName       = "foo"
    61  )
    62  
    63  var defaultLabels = map[string]string{"foo": "bar"}
    64  
    65  var _ = SIGDescribe("DisruptionController", func() {
    66  	f := framework.NewDefaultFramework("disruption")
    67  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    68  	var ns string
    69  	var cs kubernetes.Interface
    70  	var dc dynamic.Interface
    71  
    72  	ginkgo.BeforeEach(func() {
    73  		cs = f.ClientSet
    74  		ns = f.Namespace.Name
    75  		dc = f.DynamicClient
    76  	})
    77  
    78  	ginkgo.Context("Listing PodDisruptionBudgets for all namespaces", func() {
    79  		anotherFramework := framework.NewDefaultFramework("disruption-2")
    80  		anotherFramework.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    81  
    82  		/*
    83  		   Release : v1.21
    84  		   Testname: PodDisruptionBudget: list and delete collection
    85  		   Description: PodDisruptionBudget API must support list and deletecollection operations.
    86  		*/
    87  		framework.ConformanceIt("should list and delete a collection of PodDisruptionBudgets", func(ctx context.Context) {
    88  			specialLabels := map[string]string{"foo_pdb": "bar_pdb"}
    89  			labelSelector := labels.SelectorFromSet(specialLabels).String()
    90  			createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(2), specialLabels)
    91  			createPDBMinAvailableOrDie(ctx, cs, ns, "foo2", intstr.FromString("1%"), specialLabels)
    92  			createPDBMinAvailableOrDie(ctx, anotherFramework.ClientSet, anotherFramework.Namespace.Name, "foo3", intstr.FromInt32(2), specialLabels)
    93  
    94  			ginkgo.By("listing a collection of PDBs across all namespaces")
    95  			listPDBs(ctx, cs, metav1.NamespaceAll, labelSelector, 3, []string{defaultName, "foo2", "foo3"})
    96  
    97  			ginkgo.By("listing a collection of PDBs in namespace " + ns)
    98  			listPDBs(ctx, cs, ns, labelSelector, 2, []string{defaultName, "foo2"})
    99  			deletePDBCollection(ctx, cs, ns)
   100  		})
   101  	})
   102  
   103  	/*
   104  		Release : v1.21
   105  		Testname: PodDisruptionBudget: create, update, patch, and delete object
   106  		Description: PodDisruptionBudget API must support create, update, patch, and delete operations.
   107  	*/
   108  	framework.ConformanceIt("should create a PodDisruptionBudget", func(ctx context.Context) {
   109  		ginkgo.By("creating the pdb")
   110  		createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromString("1%"), defaultLabels)
   111  
   112  		ginkgo.By("updating the pdb")
   113  		updatedPDB := updatePDBOrDie(ctx, cs, ns, defaultName, func(pdb *policyv1.PodDisruptionBudget) *policyv1.PodDisruptionBudget {
   114  			newMinAvailable := intstr.FromString("2%")
   115  			pdb.Spec.MinAvailable = &newMinAvailable
   116  			return pdb
   117  		}, cs.PolicyV1().PodDisruptionBudgets(ns).Update)
   118  		gomega.Expect(updatedPDB.Spec.MinAvailable.String()).To(gomega.Equal("2%"))
   119  
   120  		ginkgo.By("patching the pdb")
   121  		patchedPDB := patchPDBOrDie(ctx, cs, dc, ns, defaultName, func(old *policyv1.PodDisruptionBudget) (bytes []byte, err error) {
   122  			newBytes, err := json.Marshal(map[string]interface{}{
   123  				"spec": map[string]interface{}{
   124  					"minAvailable": "3%",
   125  				},
   126  			})
   127  			framework.ExpectNoError(err, "failed to marshal JSON for new data")
   128  			return newBytes, nil
   129  		})
   130  		gomega.Expect(patchedPDB.Spec.MinAvailable.String()).To(gomega.Equal("3%"))
   131  
   132  		deletePDBOrDie(ctx, cs, ns, defaultName)
   133  	})
   134  
   135  	/*
   136  	   Release : v1.21
   137  	   Testname: PodDisruptionBudget: Status updates
   138  	   Description: Disruption controller MUST update the PDB status with
   139  	   how many disruptions are allowed.
   140  	*/
   141  	framework.ConformanceIt("should observe PodDisruptionBudget status updated", func(ctx context.Context) {
   142  		createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(1), defaultLabels)
   143  
   144  		createPodsOrDie(ctx, cs, ns, 3)
   145  		waitForPodsOrDie(ctx, cs, ns, 3)
   146  
   147  		// Since disruptionAllowed starts out 0, if we see it ever become positive,
   148  		// that means the controller is working.
   149  		err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
   150  			pdb, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, defaultName, metav1.GetOptions{})
   151  			if err != nil {
   152  				return false, err
   153  			}
   154  			return pdb.Status.DisruptionsAllowed > 0, nil
   155  		})
   156  		framework.ExpectNoError(err)
   157  	})
   158  
   159  	/*
   160  		Release : v1.21
   161  		Testname: PodDisruptionBudget: update and patch status
   162  		Description: PodDisruptionBudget API must support update and patch operations on status subresource.
   163  	*/
   164  	framework.ConformanceIt("should update/patch PodDisruptionBudget status", func(ctx context.Context) {
   165  		createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(1), defaultLabels)
   166  
   167  		ginkgo.By("Updating PodDisruptionBudget status")
   168  		// PDB status can be updated by both PDB controller and the status API. The test selects `DisruptedPods` field to show immediate update via API.
   169  		// The pod has to exist, otherwise wil be removed by the controller. Other fields may not reflect the change from API.
   170  		createPodsOrDie(ctx, cs, ns, 1)
   171  		waitForPodsOrDie(ctx, cs, ns, 1)
   172  		pod, _ := locateRunningPod(ctx, cs, ns)
   173  		updatePDBOrDie(ctx, cs, ns, defaultName, func(old *policyv1.PodDisruptionBudget) *policyv1.PodDisruptionBudget {
   174  			old.Status.DisruptedPods = make(map[string]metav1.Time)
   175  			old.Status.DisruptedPods[pod.Name] = metav1.NewTime(time.Now())
   176  			return old
   177  		}, cs.PolicyV1().PodDisruptionBudgets(ns).UpdateStatus)
   178  		// fetch again to make sure the update from API was effective
   179  		updated := getPDBStatusOrDie(ctx, dc, ns, defaultName)
   180  		gomega.Expect(updated.Status.DisruptedPods).To(gomega.HaveKey(pod.Name), "Expecting the DisruptedPods have %s", pod.Name)
   181  
   182  		ginkgo.By("Patching PodDisruptionBudget status")
   183  		patched := patchPDBOrDie(ctx, cs, dc, ns, defaultName, func(old *policyv1.PodDisruptionBudget) (bytes []byte, err error) {
   184  			oldBytes, err := json.Marshal(old)
   185  			framework.ExpectNoError(err, "failed to marshal JSON for old data")
   186  			old.Status.DisruptedPods = make(map[string]metav1.Time)
   187  			newBytes, err := json.Marshal(old)
   188  			framework.ExpectNoError(err, "failed to marshal JSON for new data")
   189  			return jsonpatch.CreateMergePatch(oldBytes, newBytes)
   190  		}, "status")
   191  		gomega.Expect(patched.Status.DisruptedPods).To(gomega.BeEmpty(), "Expecting the PodDisruptionBudget's be empty")
   192  	})
   193  
   194  	// PDB shouldn't error out when there are unmanaged pods
   195  	ginkgo.It("should observe that the PodDisruptionBudget status is not updated for unmanaged pods",
   196  		func(ctx context.Context) {
   197  			createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(1), defaultLabels)
   198  
   199  			createPodsOrDie(ctx, cs, ns, 3)
   200  			waitForPodsOrDie(ctx, cs, ns, 3)
   201  
   202  			// Since we allow unmanaged pods to be associated with a PDB, we should not see any error
   203  			gomega.Consistently(ctx, func(ctx context.Context) (bool, error) {
   204  				pdb, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, defaultName, metav1.GetOptions{})
   205  				if err != nil {
   206  					return false, err
   207  				}
   208  				return isPDBErroring(pdb), nil
   209  			}, 1*time.Minute, 1*time.Second).ShouldNot(gomega.BeTrue(), "pod shouldn't error for "+
   210  				"unmanaged pod")
   211  		})
   212  
   213  	evictionCases := []struct {
   214  		description        string
   215  		minAvailable       intstr.IntOrString
   216  		maxUnavailable     intstr.IntOrString
   217  		podCount           int
   218  		replicaSetSize     int32
   219  		shouldDeny         bool
   220  		exclusive          bool
   221  		skipForBigClusters bool
   222  	}{
   223  		{
   224  			description:    "no PDB",
   225  			minAvailable:   intstr.FromString(""),
   226  			maxUnavailable: intstr.FromString(""),
   227  			podCount:       1,
   228  			shouldDeny:     false,
   229  		}, {
   230  			description:    "too few pods, absolute",
   231  			minAvailable:   intstr.FromInt32(2),
   232  			maxUnavailable: intstr.FromString(""),
   233  			podCount:       2,
   234  			shouldDeny:     true,
   235  		}, {
   236  			description:    "enough pods, absolute",
   237  			minAvailable:   intstr.FromInt32(2),
   238  			maxUnavailable: intstr.FromString(""),
   239  			podCount:       3,
   240  			shouldDeny:     false,
   241  		}, {
   242  			description:    "enough pods, replicaSet, percentage",
   243  			minAvailable:   intstr.FromString("90%"),
   244  			maxUnavailable: intstr.FromString(""),
   245  			replicaSetSize: 10,
   246  			exclusive:      false,
   247  			shouldDeny:     false,
   248  		}, {
   249  			description:    "too few pods, replicaSet, percentage",
   250  			minAvailable:   intstr.FromString("90%"),
   251  			maxUnavailable: intstr.FromString(""),
   252  			replicaSetSize: 10,
   253  			exclusive:      true,
   254  			shouldDeny:     true,
   255  			// This tests assumes that there is less than replicaSetSize nodes in the cluster.
   256  			skipForBigClusters: true,
   257  		},
   258  		{
   259  			description:    "maxUnavailable allow single eviction, percentage",
   260  			minAvailable:   intstr.FromString(""),
   261  			maxUnavailable: intstr.FromString("10%"),
   262  			replicaSetSize: 10,
   263  			exclusive:      false,
   264  			shouldDeny:     false,
   265  		},
   266  		{
   267  			description:    "maxUnavailable deny evictions, integer",
   268  			minAvailable:   intstr.FromString(""),
   269  			maxUnavailable: intstr.FromInt32(1),
   270  			replicaSetSize: 10,
   271  			exclusive:      true,
   272  			shouldDeny:     true,
   273  			// This tests assumes that there is less than replicaSetSize nodes in the cluster.
   274  			skipForBigClusters: true,
   275  		},
   276  	}
   277  	for i := range evictionCases {
   278  		c := evictionCases[i]
   279  		expectation := "should allow an eviction"
   280  		if c.shouldDeny {
   281  			expectation = "should not allow an eviction"
   282  		}
   283  		// tests with exclusive set to true relies on HostPort to make sure
   284  		// only one pod from the replicaset is assigned to each node. This
   285  		// requires these tests to be run serially.
   286  		args := []interface{}{fmt.Sprintf("evictions: %s => %s", c.description, expectation)}
   287  		if c.exclusive {
   288  			args = append(args, framework.WithSerial())
   289  		}
   290  		f.It(append(args, func(ctx context.Context) {
   291  			if c.skipForBigClusters {
   292  				e2eskipper.SkipUnlessNodeCountIsAtMost(bigClusterSize - 1)
   293  			}
   294  			createPodsOrDie(ctx, cs, ns, c.podCount)
   295  			if c.replicaSetSize > 0 {
   296  				createReplicaSetOrDie(ctx, cs, ns, c.replicaSetSize, c.exclusive)
   297  			}
   298  
   299  			if c.minAvailable.String() != "" {
   300  				createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, c.minAvailable, defaultLabels)
   301  			}
   302  
   303  			if c.maxUnavailable.String() != "" {
   304  				createPDBMaxUnavailableOrDie(ctx, cs, ns, defaultName, c.maxUnavailable)
   305  			}
   306  
   307  			// Locate a running pod.
   308  			pod, err := locateRunningPod(ctx, cs, ns)
   309  			framework.ExpectNoError(err)
   310  
   311  			e := &policyv1.Eviction{
   312  				ObjectMeta: metav1.ObjectMeta{
   313  					Name:      pod.Name,
   314  					Namespace: ns,
   315  				},
   316  			}
   317  
   318  			if c.shouldDeny {
   319  				err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
   320  				gomega.Expect(err).To(gomega.MatchError(func(err error) bool {
   321  					return apierrors.HasStatusCause(err, policyv1.DisruptionBudgetCause)
   322  				}, "pod eviction should fail with DisruptionBudget cause"))
   323  			} else {
   324  				// Only wait for running pods in the "allow" case
   325  				// because one of shouldDeny cases relies on the
   326  				// replicaSet not fitting on the cluster.
   327  				waitForPodsOrDie(ctx, cs, ns, c.podCount+int(c.replicaSetSize))
   328  
   329  				// Since disruptionAllowed starts out false, if an eviction is ever allowed,
   330  				// that means the controller is working.
   331  				err = wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
   332  					err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
   333  					if err != nil {
   334  						return false, nil
   335  					}
   336  					return true, nil
   337  				})
   338  				framework.ExpectNoError(err)
   339  			}
   340  		})...)
   341  	}
   342  
   343  	/*
   344  		Release : v1.22
   345  		Testname: PodDisruptionBudget: block an eviction until the PDB is updated to allow it
   346  		Description: Eviction API must block an eviction until the PDB is updated to allow it
   347  	*/
   348  	framework.ConformanceIt("should block an eviction until the PDB is updated to allow it", func(ctx context.Context) {
   349  		ginkgo.By("Creating a pdb that targets all three pods in a test replica set")
   350  		createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(3), defaultLabels)
   351  		createReplicaSetOrDie(ctx, cs, ns, 3, false)
   352  
   353  		ginkgo.By("First trying to evict a pod which shouldn't be evictable")
   354  		waitForPodsOrDie(ctx, cs, ns, 3) // make sure that they are running and so would be evictable with a different pdb
   355  
   356  		pod, err := locateRunningPod(ctx, cs, ns)
   357  		framework.ExpectNoError(err)
   358  		e := &policyv1.Eviction{
   359  			ObjectMeta: metav1.ObjectMeta{
   360  				Name:      pod.Name,
   361  				Namespace: ns,
   362  			},
   363  		}
   364  		err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
   365  		gomega.Expect(err).To(gomega.MatchError(func(err error) bool {
   366  			return apierrors.HasStatusCause(err, policyv1.DisruptionBudgetCause)
   367  		}, fmt.Sprintf("pod eviction should fail with DisruptionBudget cause. The error was \"%v\"\n", err)))
   368  
   369  		ginkgo.By("Updating the pdb to allow a pod to be evicted")
   370  		updatePDBOrDie(ctx, cs, ns, defaultName, func(pdb *policyv1.PodDisruptionBudget) *policyv1.PodDisruptionBudget {
   371  			newMinAvailable := intstr.FromInt32(2)
   372  			pdb.Spec.MinAvailable = &newMinAvailable
   373  			return pdb
   374  		}, cs.PolicyV1().PodDisruptionBudgets(ns).Update)
   375  
   376  		ginkgo.By("Trying to evict the same pod we tried earlier which should now be evictable")
   377  		waitForPodsOrDie(ctx, cs, ns, 3)
   378  		waitForPdbToObserveHealthyPods(ctx, cs, ns, 3)
   379  		err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
   380  		framework.ExpectNoError(err) // the eviction is now allowed
   381  
   382  		ginkgo.By("Patching the pdb to disallow a pod to be evicted")
   383  		patchPDBOrDie(ctx, cs, dc, ns, defaultName, func(old *policyv1.PodDisruptionBudget) (bytes []byte, err error) {
   384  			oldData, err := json.Marshal(old)
   385  			framework.ExpectNoError(err, "failed to marshal JSON for old data")
   386  			old.Spec.MinAvailable = nil
   387  			maxUnavailable := intstr.FromInt32(0)
   388  			old.Spec.MaxUnavailable = &maxUnavailable
   389  			newData, err := json.Marshal(old)
   390  			framework.ExpectNoError(err, "failed to marshal JSON for new data")
   391  			return jsonpatch.CreateMergePatch(oldData, newData)
   392  		})
   393  
   394  		waitForPodsOrDie(ctx, cs, ns, 3)
   395  		pod, err = locateRunningPod(ctx, cs, ns) // locate a new running pod
   396  		framework.ExpectNoError(err)
   397  		e = &policyv1.Eviction{
   398  			ObjectMeta: metav1.ObjectMeta{
   399  				Name:      pod.Name,
   400  				Namespace: ns,
   401  			},
   402  		}
   403  		err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
   404  		gomega.Expect(err).To(gomega.MatchError(func(err error) bool {
   405  			return apierrors.HasStatusCause(err, policyv1.DisruptionBudgetCause)
   406  		}, fmt.Sprintf("pod eviction should fail with DisruptionBudget cause. The error was \"%v\"\n", err)))
   407  
   408  		ginkgo.By("Deleting the pdb to allow a pod to be evicted")
   409  		deletePDBOrDie(ctx, cs, ns, defaultName)
   410  
   411  		ginkgo.By("Trying to evict the same pod we tried earlier which should now be evictable")
   412  		waitForPodsOrDie(ctx, cs, ns, 3)
   413  		err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
   414  		framework.ExpectNoError(err) // the eviction is now allowed
   415  	})
   416  
   417  })
   418  
   419  func createPDBMinAvailableOrDie(ctx context.Context, cs kubernetes.Interface, ns string, name string, minAvailable intstr.IntOrString, labels map[string]string) {
   420  	pdb := policyv1.PodDisruptionBudget{
   421  		ObjectMeta: metav1.ObjectMeta{
   422  			Name:      name,
   423  			Namespace: ns,
   424  			Labels:    labels,
   425  		},
   426  		Spec: policyv1.PodDisruptionBudgetSpec{
   427  			Selector:     &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
   428  			MinAvailable: &minAvailable,
   429  		},
   430  	}
   431  	_, err := cs.PolicyV1().PodDisruptionBudgets(ns).Create(ctx, &pdb, metav1.CreateOptions{})
   432  	framework.ExpectNoError(err, "Waiting for the pdb to be created with minAvailable %d in namespace %s", minAvailable.IntVal, ns)
   433  	waitForPdbToBeProcessed(ctx, cs, ns, name)
   434  }
   435  
   436  func createPDBMaxUnavailableOrDie(ctx context.Context, cs kubernetes.Interface, ns string, name string, maxUnavailable intstr.IntOrString) {
   437  	pdb := policyv1.PodDisruptionBudget{
   438  		ObjectMeta: metav1.ObjectMeta{
   439  			Name:      name,
   440  			Namespace: ns,
   441  		},
   442  		Spec: policyv1.PodDisruptionBudgetSpec{
   443  			Selector:       &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
   444  			MaxUnavailable: &maxUnavailable,
   445  		},
   446  	}
   447  	_, err := cs.PolicyV1().PodDisruptionBudgets(ns).Create(ctx, &pdb, metav1.CreateOptions{})
   448  	framework.ExpectNoError(err, "Waiting for the pdb to be created with maxUnavailable %d in namespace %s", maxUnavailable.IntVal, ns)
   449  	waitForPdbToBeProcessed(ctx, cs, ns, name)
   450  }
   451  
   452  type updateFunc func(pdb *policyv1.PodDisruptionBudget) *policyv1.PodDisruptionBudget
   453  type updateRestAPI func(ctx context.Context, podDisruptionBudget *policyv1.PodDisruptionBudget, opts metav1.UpdateOptions) (*policyv1.PodDisruptionBudget, error)
   454  type patchFunc func(pdb *policyv1.PodDisruptionBudget) ([]byte, error)
   455  
   456  func updatePDBOrDie(ctx context.Context, cs kubernetes.Interface, ns string, name string, f updateFunc, api updateRestAPI) (updated *policyv1.PodDisruptionBudget) {
   457  	err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   458  		old, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, name, metav1.GetOptions{})
   459  		if err != nil {
   460  			return err
   461  		}
   462  		old = f(old)
   463  		if updated, err = api(ctx, old, metav1.UpdateOptions{}); err != nil {
   464  			return err
   465  		}
   466  		return nil
   467  	})
   468  
   469  	framework.ExpectNoError(err, "Waiting for the PDB update to be processed in namespace %s", ns)
   470  	waitForPdbToBeProcessed(ctx, cs, ns, name)
   471  	return updated
   472  }
   473  
   474  func patchPDBOrDie(ctx context.Context, cs kubernetes.Interface, dc dynamic.Interface, ns string, name string, f patchFunc, subresources ...string) (updated *policyv1.PodDisruptionBudget) {
   475  	err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
   476  		old := getPDBStatusOrDie(ctx, dc, ns, name)
   477  		patchBytes, err := f(old)
   478  		framework.ExpectNoError(err)
   479  		if updated, err = cs.PolicyV1().PodDisruptionBudgets(ns).Patch(ctx, old.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, subresources...); err != nil {
   480  			return err
   481  		}
   482  		framework.ExpectNoError(err)
   483  		return nil
   484  	})
   485  
   486  	framework.ExpectNoError(err, "Waiting for the pdb update to be processed in namespace %s", ns)
   487  	waitForPdbToBeProcessed(ctx, cs, ns, name)
   488  	return updated
   489  }
   490  
   491  func deletePDBOrDie(ctx context.Context, cs kubernetes.Interface, ns string, name string) {
   492  	err := cs.PolicyV1().PodDisruptionBudgets(ns).Delete(ctx, name, metav1.DeleteOptions{})
   493  	framework.ExpectNoError(err, "Deleting pdb in namespace %s", ns)
   494  	waitForPdbToBeDeleted(ctx, cs, ns, name)
   495  }
   496  
   497  func listPDBs(ctx context.Context, cs kubernetes.Interface, ns string, labelSelector string, count int, expectedPDBNames []string) {
   498  	pdbList, err := cs.PolicyV1().PodDisruptionBudgets(ns).List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
   499  	framework.ExpectNoError(err, "Listing PDB set in namespace %s", ns)
   500  	gomega.Expect(pdbList.Items).To(gomega.HaveLen(count), "Expecting %d PDBs returned in namespace %s", count, ns)
   501  
   502  	pdbNames := make([]string, 0)
   503  	for _, item := range pdbList.Items {
   504  		pdbNames = append(pdbNames, item.Name)
   505  	}
   506  	gomega.Expect(pdbNames).To(gomega.ConsistOf(expectedPDBNames), "Expecting returned PDBs '%s' in namespace %s", expectedPDBNames, ns)
   507  }
   508  
   509  func deletePDBCollection(ctx context.Context, cs kubernetes.Interface, ns string) {
   510  	ginkgo.By("deleting a collection of PDBs")
   511  	err := cs.PolicyV1().PodDisruptionBudgets(ns).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{})
   512  	framework.ExpectNoError(err, "Deleting PDB set in namespace %s", ns)
   513  
   514  	waitForPDBCollectionToBeDeleted(ctx, cs, ns)
   515  }
   516  
   517  func waitForPDBCollectionToBeDeleted(ctx context.Context, cs kubernetes.Interface, ns string) {
   518  	ginkgo.By("Waiting for the PDB collection to be deleted")
   519  	err := wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
   520  		pdbList, err := cs.PolicyV1().PodDisruptionBudgets(ns).List(ctx, metav1.ListOptions{})
   521  		if err != nil {
   522  			return false, err
   523  		}
   524  		if len(pdbList.Items) != 0 {
   525  			return false, nil
   526  		}
   527  		return true, nil
   528  	})
   529  	framework.ExpectNoError(err, "Waiting for the PDB collection to be deleted in namespace %s", ns)
   530  }
   531  
   532  func createPodsOrDie(ctx context.Context, cs kubernetes.Interface, ns string, n int) {
   533  	for i := 0; i < n; i++ {
   534  		pod := &v1.Pod{
   535  			ObjectMeta: metav1.ObjectMeta{
   536  				Name:      fmt.Sprintf("pod-%d", i),
   537  				Namespace: ns,
   538  				Labels:    map[string]string{"foo": "bar"},
   539  			},
   540  			Spec: v1.PodSpec{
   541  				Containers: []v1.Container{
   542  					{
   543  						Name:  "donothing",
   544  						Image: imageutils.GetPauseImageName(),
   545  					},
   546  				},
   547  				RestartPolicy: v1.RestartPolicyAlways,
   548  			},
   549  		}
   550  
   551  		_, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   552  		framework.ExpectNoError(err, "Creating pod %q in namespace %q", pod.Name, ns)
   553  	}
   554  }
   555  
   556  func waitForPodsOrDie(ctx context.Context, cs kubernetes.Interface, ns string, n int) {
   557  	ginkgo.By("Waiting for all pods to be running")
   558  	err := wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
   559  		pods, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{LabelSelector: "foo=bar"})
   560  		if err != nil {
   561  			return false, err
   562  		}
   563  		if pods == nil {
   564  			return false, fmt.Errorf("pods is nil")
   565  		}
   566  		if len(pods.Items) < n {
   567  			framework.Logf("pods: %v < %v", len(pods.Items), n)
   568  			return false, nil
   569  		}
   570  		ready := 0
   571  		for i := range pods.Items {
   572  			pod := pods.Items[i]
   573  			if podutil.IsPodReady(&pod) && pod.ObjectMeta.DeletionTimestamp.IsZero() {
   574  				ready++
   575  			}
   576  		}
   577  		if ready < n {
   578  			framework.Logf("running pods: %v < %v", ready, n)
   579  			return false, nil
   580  		}
   581  		return true, nil
   582  	})
   583  	framework.ExpectNoError(err, "Waiting for pods in namespace %q to be ready", ns)
   584  }
   585  
   586  func createReplicaSetOrDie(ctx context.Context, cs kubernetes.Interface, ns string, size int32, exclusive bool) {
   587  	container := v1.Container{
   588  		Name:  "donothing",
   589  		Image: imageutils.GetPauseImageName(),
   590  	}
   591  	if exclusive {
   592  		container.Ports = []v1.ContainerPort{
   593  			{HostPort: 5555, ContainerPort: 5555},
   594  		}
   595  	}
   596  
   597  	rs := &appsv1.ReplicaSet{
   598  		ObjectMeta: metav1.ObjectMeta{
   599  			Name:      "rs",
   600  			Namespace: ns,
   601  		},
   602  		Spec: appsv1.ReplicaSetSpec{
   603  			Replicas: &size,
   604  			Selector: &metav1.LabelSelector{
   605  				MatchLabels: map[string]string{"foo": "bar"},
   606  			},
   607  			Template: v1.PodTemplateSpec{
   608  				ObjectMeta: metav1.ObjectMeta{
   609  					Labels: map[string]string{"foo": "bar"},
   610  				},
   611  				Spec: v1.PodSpec{
   612  					Containers: []v1.Container{container},
   613  				},
   614  			},
   615  		},
   616  	}
   617  
   618  	_, err := cs.AppsV1().ReplicaSets(ns).Create(ctx, rs, metav1.CreateOptions{})
   619  	framework.ExpectNoError(err, "Creating replica set %q in namespace %q", rs.Name, ns)
   620  }
   621  
   622  func locateRunningPod(ctx context.Context, cs kubernetes.Interface, ns string) (pod *v1.Pod, err error) {
   623  	ginkgo.By("locating a running pod")
   624  	err = wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
   625  		podList, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
   626  		if err != nil {
   627  			return false, err
   628  		}
   629  
   630  		for i := range podList.Items {
   631  			p := podList.Items[i]
   632  			if podutil.IsPodReady(&p) && p.ObjectMeta.DeletionTimestamp.IsZero() {
   633  				pod = &p
   634  				return true, nil
   635  			}
   636  		}
   637  
   638  		return false, nil
   639  	})
   640  	return pod, err
   641  }
   642  
   643  func waitForPdbToBeProcessed(ctx context.Context, cs kubernetes.Interface, ns string, name string) {
   644  	ginkgo.By("Waiting for the pdb to be processed")
   645  	err := wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
   646  		pdb, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, name, metav1.GetOptions{})
   647  		if err != nil {
   648  			return false, err
   649  		}
   650  		if pdb.Status.ObservedGeneration < pdb.Generation {
   651  			return false, nil
   652  		}
   653  		return true, nil
   654  	})
   655  	framework.ExpectNoError(err, "Waiting for the pdb to be processed in namespace %s", ns)
   656  }
   657  
   658  func waitForPdbToBeDeleted(ctx context.Context, cs kubernetes.Interface, ns string, name string) {
   659  	ginkgo.By("Waiting for the pdb to be deleted")
   660  	err := wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
   661  		_, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, name, metav1.GetOptions{})
   662  		if apierrors.IsNotFound(err) {
   663  			return true, nil // done
   664  		}
   665  		if err != nil {
   666  			return false, err
   667  		}
   668  		return false, nil
   669  	})
   670  	framework.ExpectNoError(err, "Waiting for the pdb to be deleted in namespace %s", ns)
   671  }
   672  
   673  func waitForPdbToObserveHealthyPods(ctx context.Context, cs kubernetes.Interface, ns string, healthyCount int32) {
   674  	ginkgo.By("Waiting for the pdb to observed all healthy pods")
   675  	err := wait.PollUntilContextTimeout(ctx, framework.Poll, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
   676  		pdb, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, "foo", metav1.GetOptions{})
   677  		if err != nil {
   678  			return false, err
   679  		}
   680  		if pdb.Status.CurrentHealthy != healthyCount {
   681  			return false, nil
   682  		}
   683  		return true, nil
   684  	})
   685  	framework.ExpectNoError(err, "Waiting for the pdb in namespace %s to observed %d healthy pods", ns, healthyCount)
   686  }
   687  
   688  func getPDBStatusOrDie(ctx context.Context, dc dynamic.Interface, ns string, name string) *policyv1.PodDisruptionBudget {
   689  	pdbStatusResource := policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets")
   690  	unstruct, err := dc.Resource(pdbStatusResource).Namespace(ns).Get(ctx, name, metav1.GetOptions{}, "status")
   691  	framework.ExpectNoError(err)
   692  	pdb, err := unstructuredToPDB(unstruct)
   693  	framework.ExpectNoError(err, "Getting the status of the pdb %s in namespace %s", name, ns)
   694  	return pdb
   695  }
   696  
   697  func unstructuredToPDB(obj *unstructured.Unstructured) (*policyv1.PodDisruptionBudget, error) {
   698  	json, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
   699  	if err != nil {
   700  		return nil, err
   701  	}
   702  	pdb := &policyv1.PodDisruptionBudget{}
   703  	err = runtime.DecodeInto(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), json, pdb)
   704  	pdb.Kind = ""
   705  	pdb.APIVersion = ""
   706  	return pdb, err
   707  }
   708  
   709  // isPDBErroring checks if the PDB is erroring on when there are unmanaged pods
   710  func isPDBErroring(pdb *policyv1.PodDisruptionBudget) bool {
   711  	hasFailed := false
   712  	for _, condition := range pdb.Status.Conditions {
   713  		if strings.Contains(condition.Reason, "SyncFailed") &&
   714  			strings.Contains(condition.Message, "found no controller ref for pod") {
   715  			hasFailed = true
   716  		}
   717  	}
   718  	return hasFailed
   719  }
   720  

View as plain text