...

Source file src/k8s.io/kubernetes/test/integration/disruption/disruption_test.go

Documentation: k8s.io/kubernetes/test/integration/disruption

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package disruption
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"path"
    24  	"reflect"
    25  	"testing"
    26  	"time"
    27  
    28  	"github.com/google/go-cmp/cmp"
    29  	"github.com/google/go-cmp/cmp/cmpopts"
    30  	clientv3 "go.etcd.io/etcd/client/v3"
    31  	v1 "k8s.io/api/core/v1"
    32  	policyv1 "k8s.io/api/policy/v1"
    33  	"k8s.io/api/policy/v1beta1"
    34  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    35  	apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
    36  	"k8s.io/apiextensions-apiserver/test/integration/fixtures"
    37  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    38  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    39  	"k8s.io/apimachinery/pkg/runtime/schema"
    40  	"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
    41  	"k8s.io/apimachinery/pkg/types"
    42  	"k8s.io/apimachinery/pkg/util/intstr"
    43  	"k8s.io/apimachinery/pkg/util/wait"
    44  	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
    45  	"k8s.io/apiserver/pkg/registry/rest"
    46  	cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
    47  	"k8s.io/client-go/dynamic"
    48  	"k8s.io/client-go/informers"
    49  	clientset "k8s.io/client-go/kubernetes"
    50  	restclient "k8s.io/client-go/rest"
    51  	"k8s.io/client-go/restmapper"
    52  	"k8s.io/client-go/scale"
    53  	"k8s.io/client-go/tools/cache"
    54  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    55  	"k8s.io/kubernetes/pkg/api/legacyscheme"
    56  	"k8s.io/kubernetes/pkg/controller/disruption"
    57  	"k8s.io/kubernetes/test/integration/etcd"
    58  	"k8s.io/kubernetes/test/integration/framework"
    59  	"k8s.io/kubernetes/test/integration/util"
    60  	"k8s.io/kubernetes/test/utils/ktesting"
    61  	"k8s.io/utils/clock"
    62  	"k8s.io/utils/ptr"
    63  )
    64  
    65  const stalePodDisruptionTimeout = 3 * time.Second
    66  
    67  func setup(ctx context.Context, t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) {
    68  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd())
    69  
    70  	clientSet, err := clientset.NewForConfig(server.ClientConfig)
    71  	if err != nil {
    72  		t.Fatalf("Error creating clientset: %v", err)
    73  	}
    74  	resyncPeriod := 12 * time.Hour
    75  	informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(server.ClientConfig, "pdb-informers")), resyncPeriod)
    76  
    77  	client := clientset.NewForConfigOrDie(restclient.AddUserAgent(server.ClientConfig, "disruption-controller"))
    78  
    79  	discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery())
    80  	mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
    81  
    82  	scaleKindResolver := scale.NewDiscoveryScaleKindResolver(client.Discovery())
    83  	scaleClient, err := scale.NewForConfig(server.ClientConfig, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
    84  	if err != nil {
    85  		t.Fatalf("Error creating scaleClient: %v", err)
    86  	}
    87  
    88  	apiExtensionClient, err := apiextensionsclientset.NewForConfig(server.ClientConfig)
    89  	if err != nil {
    90  		t.Fatalf("Error creating extension clientset: %v", err)
    91  	}
    92  
    93  	dynamicClient, err := dynamic.NewForConfig(server.ClientConfig)
    94  	if err != nil {
    95  		t.Fatalf("Error creating dynamicClient: %v", err)
    96  	}
    97  
    98  	pdbc := disruption.NewDisruptionControllerInternal(
    99  		ctx,
   100  		informers.Core().V1().Pods(),
   101  		informers.Policy().V1().PodDisruptionBudgets(),
   102  		informers.Core().V1().ReplicationControllers(),
   103  		informers.Apps().V1().ReplicaSets(),
   104  		informers.Apps().V1().Deployments(),
   105  		informers.Apps().V1().StatefulSets(),
   106  		client,
   107  		mapper,
   108  		scaleClient,
   109  		client.Discovery(),
   110  		clock.RealClock{},
   111  		stalePodDisruptionTimeout,
   112  	)
   113  	return server, pdbc, informers, clientSet, apiExtensionClient, dynamicClient
   114  }
   115  
   116  func TestPDBWithScaleSubresource(t *testing.T) {
   117  	tCtx := ktesting.Init(t)
   118  	s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(tCtx, t)
   119  	defer s.TearDownFn()
   120  	defer tCtx.Cancel("test has completed")
   121  
   122  	nsName := "pdb-scale-subresource"
   123  	createNs(tCtx, t, nsName, clientSet)
   124  
   125  	informers.Start(tCtx.Done())
   126  	go pdbc.Run(tCtx)
   127  
   128  	crdDefinition := newCustomResourceDefinition()
   129  	etcd.CreateTestCRDs(t, apiExtensionClient, true, crdDefinition)
   130  	gvr := schema.GroupVersionResource{Group: crdDefinition.Spec.Group, Version: crdDefinition.Spec.Versions[0].Name, Resource: crdDefinition.Spec.Names.Plural}
   131  	resourceClient := dynamicClient.Resource(gvr).Namespace(nsName)
   132  
   133  	replicas := 4
   134  	maxUnavailable := int32(2)
   135  
   136  	resource := &unstructured.Unstructured{
   137  		Object: map[string]interface{}{
   138  			"kind":       crdDefinition.Spec.Names.Kind,
   139  			"apiVersion": crdDefinition.Spec.Group + "/" + crdDefinition.Spec.Versions[0].Name,
   140  			"metadata": map[string]interface{}{
   141  				"name":      "resource",
   142  				"namespace": nsName,
   143  			},
   144  			"spec": map[string]interface{}{
   145  				"replicas": replicas,
   146  			},
   147  		},
   148  	}
   149  	createdResource, err := resourceClient.Create(tCtx, resource, metav1.CreateOptions{})
   150  	if err != nil {
   151  		t.Error(err)
   152  	}
   153  
   154  	trueValue := true
   155  	ownerRefs := []metav1.OwnerReference{
   156  		{
   157  			Name:       resource.GetName(),
   158  			Kind:       crdDefinition.Spec.Names.Kind,
   159  			APIVersion: crdDefinition.Spec.Group + "/" + crdDefinition.Spec.Versions[0].Name,
   160  			UID:        createdResource.GetUID(),
   161  			Controller: &trueValue,
   162  		},
   163  	}
   164  	for i := 0; i < replicas; i++ {
   165  		createPod(tCtx, t, fmt.Sprintf("pod-%d", i), nsName, map[string]string{"app": "test-crd"}, clientSet, ownerRefs)
   166  	}
   167  
   168  	waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning)
   169  
   170  	pdb := &policyv1.PodDisruptionBudget{
   171  		ObjectMeta: metav1.ObjectMeta{
   172  			Name: "test-pdb",
   173  		},
   174  		Spec: policyv1.PodDisruptionBudgetSpec{
   175  			MaxUnavailable: &intstr.IntOrString{
   176  				Type:   intstr.Int,
   177  				IntVal: maxUnavailable,
   178  			},
   179  			Selector: &metav1.LabelSelector{
   180  				MatchLabels: map[string]string{"app": "test-crd"},
   181  			},
   182  		},
   183  	}
   184  	if _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(tCtx, pdb, metav1.CreateOptions{}); err != nil {
   185  		t.Errorf("Error creating PodDisruptionBudget: %v", err)
   186  	}
   187  
   188  	waitPDBStable(tCtx, t, clientSet, 4, nsName, pdb.Name)
   189  
   190  	newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(tCtx, pdb.Name, metav1.GetOptions{})
   191  	if err != nil {
   192  		t.Errorf("Error getting PodDisruptionBudget: %v", err)
   193  	}
   194  
   195  	if expected, found := int32(replicas), newPdb.Status.ExpectedPods; expected != found {
   196  		t.Errorf("Expected %d, but found %d", expected, found)
   197  	}
   198  	if expected, found := int32(replicas)-maxUnavailable, newPdb.Status.DesiredHealthy; expected != found {
   199  		t.Errorf("Expected %d, but found %d", expected, found)
   200  	}
   201  	if expected, found := maxUnavailable, newPdb.Status.DisruptionsAllowed; expected != found {
   202  		t.Errorf("Expected %d, but found %d", expected, found)
   203  	}
   204  }
   205  
   206  func TestEmptySelector(t *testing.T) {
   207  	testcases := []struct {
   208  		name                   string
   209  		createPDBFunc          func(ctx context.Context, clientSet clientset.Interface, etcdClient *clientv3.Client, etcdStoragePrefix, name, nsName string, minAvailable intstr.IntOrString) error
   210  		expectedCurrentHealthy int32
   211  	}{
   212  		{
   213  			name: "v1beta1 should not target any pods",
   214  			createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, etcdClient *clientv3.Client, etcdStoragePrefix, name, nsName string, minAvailable intstr.IntOrString) error {
   215  				pdb := &v1beta1.PodDisruptionBudget{
   216  					ObjectMeta: metav1.ObjectMeta{
   217  						Name: name,
   218  					},
   219  					Spec: v1beta1.PodDisruptionBudgetSpec{
   220  						MinAvailable: &minAvailable,
   221  						Selector:     &metav1.LabelSelector{},
   222  					},
   223  				}
   224  				return createPDBUsingRemovedAPI(ctx, etcdClient, etcdStoragePrefix, nsName, pdb)
   225  			},
   226  			expectedCurrentHealthy: 0,
   227  		},
   228  		{
   229  			name: "v1 should target all pods",
   230  			createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, etcdClient *clientv3.Client, etcdStoragePrefix, name, nsName string, minAvailable intstr.IntOrString) error {
   231  				pdb := &policyv1.PodDisruptionBudget{
   232  					ObjectMeta: metav1.ObjectMeta{
   233  						Name: name,
   234  					},
   235  					Spec: policyv1.PodDisruptionBudgetSpec{
   236  						MinAvailable: &minAvailable,
   237  						Selector:     &metav1.LabelSelector{},
   238  					},
   239  				}
   240  				_, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{})
   241  				return err
   242  			},
   243  			expectedCurrentHealthy: 4,
   244  		},
   245  	}
   246  
   247  	for i, tc := range testcases {
   248  		t.Run(tc.name, func(t *testing.T) {
   249  			tCtx := ktesting.Init(t)
   250  			s, pdbc, informers, clientSet, _, _ := setup(tCtx, t)
   251  			defer s.TearDownFn()
   252  			defer tCtx.Cancel("test has completed")
   253  
   254  			nsName := fmt.Sprintf("pdb-empty-selector-%d", i)
   255  			createNs(tCtx, t, nsName, clientSet)
   256  
   257  			informers.Start(tCtx.Done())
   258  			go pdbc.Run(tCtx)
   259  
   260  			replicas := 4
   261  			minAvailable := intstr.FromInt32(2)
   262  
   263  			for j := 0; j < replicas; j++ {
   264  				createPod(tCtx, t, fmt.Sprintf("pod-%d", j), nsName, map[string]string{"app": "test-crd"},
   265  					clientSet, []metav1.OwnerReference{})
   266  			}
   267  
   268  			waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning)
   269  
   270  			pdbName := "test-pdb"
   271  			if err := tc.createPDBFunc(tCtx, clientSet, s.EtcdClient, s.EtcdStoragePrefix, pdbName, nsName, minAvailable); err != nil {
   272  				t.Errorf("Error creating PodDisruptionBudget: %v", err)
   273  			}
   274  
   275  			waitPDBStable(tCtx, t, clientSet, tc.expectedCurrentHealthy, nsName, pdbName)
   276  
   277  			newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(tCtx, pdbName, metav1.GetOptions{})
   278  			if err != nil {
   279  				t.Errorf("Error getting PodDisruptionBudget: %v", err)
   280  			}
   281  
   282  			if expected, found := tc.expectedCurrentHealthy, newPdb.Status.CurrentHealthy; expected != found {
   283  				t.Errorf("Expected %d, but found %d", expected, found)
   284  			}
   285  		})
   286  	}
   287  }
   288  
   289  func TestSelectorsForPodsWithoutLabels(t *testing.T) {
   290  	testcases := []struct {
   291  		name                   string
   292  		createPDBFunc          func(ctx context.Context, clientSet clientset.Interface, etcdClient *clientv3.Client, etcdStoragePrefix, name, nsName string, minAvailable intstr.IntOrString) error
   293  		expectedCurrentHealthy int32
   294  	}{
   295  		{
   296  			name: "pods with no labels can be targeted by v1 PDBs with empty selector",
   297  			createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, etcdClient *clientv3.Client, etcdStoragePrefix, name, nsName string, minAvailable intstr.IntOrString) error {
   298  				pdb := &policyv1.PodDisruptionBudget{
   299  					ObjectMeta: metav1.ObjectMeta{
   300  						Name: name,
   301  					},
   302  					Spec: policyv1.PodDisruptionBudgetSpec{
   303  						MinAvailable: &minAvailable,
   304  						Selector:     &metav1.LabelSelector{},
   305  					},
   306  				}
   307  				_, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{})
   308  				return err
   309  			},
   310  			expectedCurrentHealthy: 1,
   311  		},
   312  		{
   313  			name: "pods with no labels can be targeted by v1 PDBs with DoesNotExist selector",
   314  			createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, etcdClient *clientv3.Client, etcdStoragePrefix, name, nsName string, minAvailable intstr.IntOrString) error {
   315  				pdb := &policyv1.PodDisruptionBudget{
   316  					ObjectMeta: metav1.ObjectMeta{
   317  						Name: name,
   318  					},
   319  					Spec: policyv1.PodDisruptionBudgetSpec{
   320  						MinAvailable: &minAvailable,
   321  						Selector: &metav1.LabelSelector{
   322  							MatchExpressions: []metav1.LabelSelectorRequirement{
   323  								{
   324  									Key:      "DoesNotExist",
   325  									Operator: metav1.LabelSelectorOpDoesNotExist,
   326  								},
   327  							},
   328  						},
   329  					},
   330  				}
   331  				_, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{})
   332  				return err
   333  			},
   334  			expectedCurrentHealthy: 1,
   335  		},
   336  		{
   337  			name: "pods with no labels can be targeted by v1beta1 PDBs with DoesNotExist selector",
   338  			createPDBFunc: func(ctx context.Context, clientSet clientset.Interface, etcdClient *clientv3.Client, etcdStoragePrefix, name, nsName string, minAvailable intstr.IntOrString) error {
   339  				pdb := &v1beta1.PodDisruptionBudget{
   340  					ObjectMeta: metav1.ObjectMeta{
   341  						Name: name,
   342  					},
   343  					Spec: v1beta1.PodDisruptionBudgetSpec{
   344  						MinAvailable: &minAvailable,
   345  						Selector: &metav1.LabelSelector{
   346  							MatchExpressions: []metav1.LabelSelectorRequirement{
   347  								{
   348  									Key:      "DoesNotExist",
   349  									Operator: metav1.LabelSelectorOpDoesNotExist,
   350  								},
   351  							},
   352  						},
   353  					},
   354  				}
   355  				return createPDBUsingRemovedAPI(ctx, etcdClient, etcdStoragePrefix, nsName, pdb)
   356  			},
   357  			expectedCurrentHealthy: 1,
   358  		},
   359  	}
   360  
   361  	for i, tc := range testcases {
   362  		t.Run(tc.name, func(t *testing.T) {
   363  			tCtx := ktesting.Init(t)
   364  			s, pdbc, informers, clientSet, _, _ := setup(tCtx, t)
   365  			defer s.TearDownFn()
   366  			defer tCtx.Cancel("test has completed")
   367  
   368  			nsName := fmt.Sprintf("pdb-selectors-%d", i)
   369  			createNs(tCtx, t, nsName, clientSet)
   370  
   371  			informers.Start(tCtx.Done())
   372  			go pdbc.Run(tCtx)
   373  
   374  			minAvailable := intstr.FromInt32(1)
   375  
   376  			// Create the PDB first and wait for it to settle.
   377  			pdbName := "test-pdb"
   378  			if err := tc.createPDBFunc(tCtx, clientSet, s.EtcdClient, s.EtcdStoragePrefix, pdbName, nsName, minAvailable); err != nil {
   379  				t.Errorf("Error creating PodDisruptionBudget: %v", err)
   380  			}
   381  			waitPDBStable(tCtx, t, clientSet, 0, nsName, pdbName)
   382  
   383  			// Create a pod and wait for it be reach the running phase.
   384  			createPod(tCtx, t, "pod", nsName, map[string]string{}, clientSet, []metav1.OwnerReference{})
   385  			waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, v1.PodRunning)
   386  
   387  			// Then verify that the added pod are picked up by the disruption controller.
   388  			waitPDBStable(tCtx, t, clientSet, 1, nsName, pdbName)
   389  
   390  			newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(tCtx, pdbName, metav1.GetOptions{})
   391  			if err != nil {
   392  				t.Errorf("Error getting PodDisruptionBudget: %v", err)
   393  			}
   394  
   395  			if expected, found := tc.expectedCurrentHealthy, newPdb.Status.CurrentHealthy; expected != found {
   396  				t.Errorf("Expected %d, but found %d", expected, found)
   397  			}
   398  		})
   399  	}
   400  }
   401  
   402  func createPod(ctx context.Context, t *testing.T, name, namespace string, labels map[string]string, clientSet clientset.Interface, ownerRefs []metav1.OwnerReference) {
   403  	pod := &v1.Pod{
   404  		ObjectMeta: metav1.ObjectMeta{
   405  			Name:            name,
   406  			Namespace:       namespace,
   407  			Labels:          labels,
   408  			OwnerReferences: ownerRefs,
   409  		},
   410  		Spec: v1.PodSpec{
   411  			Containers: []v1.Container{
   412  				{
   413  					Name:  "fake-name",
   414  					Image: "fakeimage",
   415  				},
   416  			},
   417  		},
   418  	}
   419  	_, err := clientSet.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
   420  	if err != nil {
   421  		t.Error(err)
   422  	}
   423  	addPodConditionReady(pod)
   424  	if _, err := clientSet.CoreV1().Pods(namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
   425  		t.Error(err)
   426  	}
   427  }
   428  
   429  func createNs(ctx context.Context, t *testing.T, name string, clientSet clientset.Interface) {
   430  	_, err := clientSet.CoreV1().Namespaces().Create(ctx, &v1.Namespace{
   431  		ObjectMeta: metav1.ObjectMeta{
   432  			Name: name,
   433  		},
   434  	}, metav1.CreateOptions{})
   435  	if err != nil {
   436  		t.Errorf("Error creating namespace: %v", err)
   437  	}
   438  }
   439  
   440  func addPodConditionReady(pod *v1.Pod) {
   441  	pod.Status = v1.PodStatus{
   442  		Phase: v1.PodRunning,
   443  		Conditions: []v1.PodCondition{
   444  			{
   445  				Type:   v1.PodReady,
   446  				Status: v1.ConditionTrue,
   447  			},
   448  		},
   449  	}
   450  }
   451  
   452  func newCustomResourceDefinition() *apiextensionsv1.CustomResourceDefinition {
   453  	return &apiextensionsv1.CustomResourceDefinition{
   454  		ObjectMeta: metav1.ObjectMeta{Name: "crds.mygroup.example.com"},
   455  		Spec: apiextensionsv1.CustomResourceDefinitionSpec{
   456  			Group: "mygroup.example.com",
   457  			Names: apiextensionsv1.CustomResourceDefinitionNames{
   458  				Plural:   "crds",
   459  				Singular: "crd",
   460  				Kind:     "Crd",
   461  				ListKind: "CrdList",
   462  			},
   463  			Scope: apiextensionsv1.NamespaceScoped,
   464  			Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
   465  				{
   466  					Name:    "v1beta1",
   467  					Served:  true,
   468  					Storage: true,
   469  					Schema:  fixtures.AllowAllSchema(),
   470  					Subresources: &apiextensionsv1.CustomResourceSubresources{
   471  						Scale: &apiextensionsv1.CustomResourceSubresourceScale{
   472  							SpecReplicasPath:   ".spec.replicas",
   473  							StatusReplicasPath: ".status.replicas",
   474  						},
   475  					},
   476  				},
   477  			},
   478  		},
   479  	}
   480  }
   481  
   482  func waitPDBStable(ctx context.Context, t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) {
   483  	if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
   484  		pdb, err := clientSet.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, pdbName, metav1.GetOptions{})
   485  		if err != nil {
   486  			return false, err
   487  		}
   488  		if pdb.Status.ObservedGeneration == 0 || pdb.Status.CurrentHealthy != podNum {
   489  			return false, nil
   490  		}
   491  		return true, nil
   492  	}); err != nil {
   493  		t.Fatal(err)
   494  	}
   495  }
   496  
   497  func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podNum int, phase v1.PodPhase) {
   498  	if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) {
   499  		objects := podInformer.GetIndexer().List()
   500  		if len(objects) != podNum {
   501  			return false, nil
   502  		}
   503  		for _, obj := range objects {
   504  			pod := obj.(*v1.Pod)
   505  			if pod.Status.Phase != phase {
   506  				return false, nil
   507  			}
   508  		}
   509  		return true, nil
   510  	}); err != nil {
   511  		t.Fatal(err)
   512  	}
   513  }
   514  
   515  // createPDBUsingRemovedAPI creates a PDB directly using etcd.  This is must *ONLY* be used for checks of compatibility
   516  // with removed data. Do not use this just because you don't want to update your test to use v1.  Only use this
   517  // when it actually matters.
   518  func createPDBUsingRemovedAPI(ctx context.Context, etcdClient *clientv3.Client, etcdStoragePrefix, nsName string, betaPDB *v1beta1.PodDisruptionBudget) error {
   519  	betaPDB.APIVersion = v1beta1.SchemeGroupVersion.Group + "/" + v1beta1.SchemeGroupVersion.Version
   520  	betaPDB.Kind = "PodDisruptionBudget"
   521  	betaPDB.Namespace = nsName
   522  	betaPDB.Generation = 1
   523  	rest.FillObjectMetaSystemFields(betaPDB)
   524  	ctx = genericapirequest.WithNamespace(ctx, nsName)
   525  	key := path.Join("/", etcdStoragePrefix, "poddisruptionbudgets", nsName, betaPDB.Name)
   526  	protoSerializer := protobuf.NewSerializer(legacyscheme.Scheme, legacyscheme.Scheme)
   527  	buffer := bytes.NewBuffer(nil)
   528  	if err := protoSerializer.Encode(betaPDB, buffer); err != nil {
   529  		return err
   530  	}
   531  	_, err := etcdClient.Put(ctx, key, buffer.String())
   532  	return err
   533  }
   534  
   535  func TestPatchCompatibility(t *testing.T) {
   536  	tCtx := ktesting.Init(t)
   537  	s, pdbc, _, clientSet, _, _ := setup(tCtx, t)
   538  	defer s.TearDownFn()
   539  	// Even though pdbc isn't used in this test, its creation is already
   540  	// spawning some goroutines. So we need to run it to ensure they won't leak.
   541  	// We can't cancel immediately but later, because when the context is canceled,
   542  	// the event broadcaster will be shut down .
   543  	defer tCtx.Cancel("cleaning up")
   544  	go pdbc.Run(tCtx)
   545  
   546  	testcases := []struct {
   547  		name             string
   548  		version          string
   549  		startingSelector *metav1.LabelSelector
   550  		patchType        types.PatchType
   551  		patch            string
   552  		force            *bool
   553  		fieldManager     string
   554  		expectSelector   *metav1.LabelSelector
   555  	}{
   556  		{
   557  			name:      "v1-smp",
   558  			version:   "v1",
   559  			patchType: types.StrategicMergePatchType,
   560  			patch:     `{"spec":{"selector":{"matchLabels":{"patchmatch":"true"},"matchExpressions":[{"key":"patchexpression","operator":"In","values":["true"]}]}}}`,
   561  			// matchLabels and matchExpressions are both replaced (because selector patchStrategy=replace in v1)
   562  			expectSelector: &metav1.LabelSelector{
   563  				MatchLabels:      map[string]string{"patchmatch": "true"},
   564  				MatchExpressions: []metav1.LabelSelectorRequirement{{Key: "patchexpression", Operator: "In", Values: []string{"true"}}},
   565  			},
   566  		},
   567  		{
   568  			name:      "v1-mergepatch",
   569  			version:   "v1",
   570  			patchType: types.MergePatchType,
   571  			patch:     `{"spec":{"selector":{"matchLabels":{"patchmatch":"true"},"matchExpressions":[{"key":"patchexpression","operator":"In","values":["true"]}]}}}`,
   572  			// matchLabels portion is merged, matchExpressions portion is replaced (because it's a list)
   573  			expectSelector: &metav1.LabelSelector{
   574  				MatchLabels:      map[string]string{"basematch": "true", "patchmatch": "true"},
   575  				MatchExpressions: []metav1.LabelSelectorRequirement{{Key: "patchexpression", Operator: "In", Values: []string{"true"}}},
   576  			},
   577  		},
   578  		{
   579  			name:         "v1-apply",
   580  			version:      "v1",
   581  			patchType:    types.ApplyPatchType,
   582  			patch:        `{"apiVersion":"policy/v1","kind":"PodDisruptionBudget","spec":{"selector":{"matchLabels":{"patchmatch":"true"},"matchExpressions":[{"key":"patchexpression","operator":"In","values":["true"]}]}}}`,
   583  			force:        ptr.To(true),
   584  			fieldManager: "test",
   585  			// entire selector is replaced (because structType=atomic)
   586  			expectSelector: &metav1.LabelSelector{
   587  				MatchLabels:      map[string]string{"patchmatch": "true"},
   588  				MatchExpressions: []metav1.LabelSelectorRequirement{{Key: "patchexpression", Operator: "In", Values: []string{"true"}}},
   589  			},
   590  		},
   591  	}
   592  
   593  	for _, tc := range testcases {
   594  		t.Run(tc.name, func(t *testing.T) {
   595  			ns := "default"
   596  			maxUnavailable := int32(2)
   597  			pdb := &policyv1.PodDisruptionBudget{
   598  				ObjectMeta: metav1.ObjectMeta{
   599  					Name: "test-pdb",
   600  				},
   601  				Spec: policyv1.PodDisruptionBudgetSpec{
   602  					MaxUnavailable: &intstr.IntOrString{Type: intstr.Int, IntVal: maxUnavailable},
   603  					Selector: &metav1.LabelSelector{
   604  						MatchLabels:      map[string]string{"basematch": "true"},
   605  						MatchExpressions: []metav1.LabelSelectorRequirement{{Key: "baseexpression", Operator: "In", Values: []string{"true"}}},
   606  					},
   607  				},
   608  			}
   609  			if _, err := clientSet.PolicyV1().PodDisruptionBudgets(ns).Create(context.TODO(), pdb, metav1.CreateOptions{}); err != nil {
   610  				t.Fatalf("Error creating PodDisruptionBudget: %v", err)
   611  			}
   612  			defer func() {
   613  				err := clientSet.PolicyV1().PodDisruptionBudgets(ns).Delete(context.TODO(), pdb.Name, metav1.DeleteOptions{})
   614  				if err != nil {
   615  					t.Fatal(err)
   616  				}
   617  			}()
   618  
   619  			var resultSelector *metav1.LabelSelector
   620  			switch tc.version {
   621  			case "v1":
   622  				result, err := clientSet.PolicyV1().PodDisruptionBudgets(ns).Patch(context.TODO(), pdb.Name, tc.patchType, []byte(tc.patch), metav1.PatchOptions{Force: tc.force, FieldManager: tc.fieldManager})
   623  				if err != nil {
   624  					t.Fatal(err)
   625  				}
   626  				resultSelector = result.Spec.Selector
   627  			default:
   628  				t.Error("unknown version")
   629  			}
   630  
   631  			if !reflect.DeepEqual(resultSelector, tc.expectSelector) {
   632  				t.Fatalf("unexpected selector:\n%s", cmp.Diff(tc.expectSelector, resultSelector))
   633  			}
   634  		})
   635  	}
   636  }
   637  
   638  func TestStalePodDisruption(t *testing.T) {
   639  	tCtx := ktesting.Init(t)
   640  	s, pdbc, informers, clientSet, _, _ := setup(tCtx, t)
   641  	defer s.TearDownFn()
   642  	defer tCtx.Cancel("test has completed")
   643  
   644  	nsName := "pdb-stale-pod-disruption"
   645  	createNs(tCtx, t, nsName, clientSet)
   646  
   647  	informers.Start(tCtx.Done())
   648  	informers.WaitForCacheSync(tCtx.Done())
   649  	go pdbc.Run(tCtx)
   650  
   651  	cases := map[string]struct {
   652  		deletePod      bool
   653  		podPhase       v1.PodPhase
   654  		reason         string
   655  		wantConditions []v1.PodCondition
   656  	}{
   657  		"stale-condition": {
   658  			podPhase: v1.PodRunning,
   659  			wantConditions: []v1.PodCondition{
   660  				{
   661  					Type:   v1.DisruptionTarget,
   662  					Status: v1.ConditionFalse,
   663  				},
   664  			},
   665  		},
   666  		"deleted-pod": {
   667  			podPhase:  v1.PodRunning,
   668  			deletePod: true,
   669  			wantConditions: []v1.PodCondition{
   670  				{
   671  					Type:   v1.DisruptionTarget,
   672  					Status: v1.ConditionTrue,
   673  				},
   674  			},
   675  		},
   676  		"disruption-condition-by-kubelet": {
   677  			podPhase: v1.PodFailed,
   678  			reason:   v1.PodReasonTerminationByKubelet,
   679  			wantConditions: []v1.PodCondition{
   680  				{
   681  					Type:   v1.DisruptionTarget,
   682  					Status: v1.ConditionTrue,
   683  					Reason: v1.PodReasonTerminationByKubelet,
   684  				},
   685  			},
   686  		},
   687  		"disruption-condition-on-failed-pod": {
   688  			podPhase: v1.PodFailed,
   689  			wantConditions: []v1.PodCondition{
   690  				{
   691  					Type:   v1.DisruptionTarget,
   692  					Status: v1.ConditionTrue,
   693  				},
   694  			},
   695  		},
   696  	}
   697  
   698  	for name, tc := range cases {
   699  		t.Run(name, func(t *testing.T) {
   700  			pod := util.InitPausePod(&util.PausePodConfig{
   701  				Name:      name,
   702  				Namespace: nsName,
   703  				NodeName:  "foo", // mock pod as scheduled so that it's not immediately deleted when calling Delete.
   704  			})
   705  			var err error
   706  			pod, err = util.CreatePausePod(clientSet, pod)
   707  			if err != nil {
   708  				t.Fatalf("Failed creating pod: %v", err)
   709  			}
   710  
   711  			pod.Status.Phase = tc.podPhase
   712  			pod.Status.Conditions = append(pod.Status.Conditions, v1.PodCondition{
   713  				Type:               v1.DisruptionTarget,
   714  				Status:             v1.ConditionTrue,
   715  				Reason:             tc.reason,
   716  				LastTransitionTime: metav1.Now(),
   717  			})
   718  			pod, err = clientSet.CoreV1().Pods(nsName).UpdateStatus(tCtx, pod, metav1.UpdateOptions{})
   719  			if err != nil {
   720  				t.Fatalf("Failed updating pod: %v", err)
   721  			}
   722  
   723  			if tc.deletePod {
   724  				if err := clientSet.CoreV1().Pods(nsName).Delete(tCtx, name, metav1.DeleteOptions{}); err != nil {
   725  					t.Fatalf("Failed to delete pod: %v", err)
   726  				}
   727  			}
   728  			time.Sleep(stalePodDisruptionTimeout)
   729  			diff := ""
   730  			if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
   731  				pod, err = clientSet.CoreV1().Pods(nsName).Get(tCtx, name, metav1.GetOptions{})
   732  				if err != nil {
   733  					return false, err
   734  				}
   735  				if tc.deletePod && pod.DeletionTimestamp == nil {
   736  					return false, nil
   737  				}
   738  				diff = cmp.Diff(tc.wantConditions, pod.Status.Conditions, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime"))
   739  				return diff == "", nil
   740  			}); err != nil {
   741  				t.Errorf("Failed waiting for status to change: %v", err)
   742  				if diff != "" {
   743  					t.Errorf("Pod has conditions (-want,+got):\n%s", diff)
   744  				}
   745  			}
   746  		})
   747  	}
   748  }
   749  

View as plain text