...

Source file src/k8s.io/kubernetes/test/e2e/storage/regional_pd.go

Documentation: k8s.io/kubernetes/test/e2e/storage

     1  //go:build !providerless
     2  // +build !providerless
     3  
     4  /*
     5  Copyright 2016 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package storage
    21  
    22  import (
    23  	"context"
    24  
    25  	"github.com/onsi/ginkgo/v2"
    26  	"github.com/onsi/gomega"
    27  
    28  	"fmt"
    29  	"strings"
    30  	"time"
    31  
    32  	"encoding/json"
    33  
    34  	appsv1 "k8s.io/api/apps/v1"
    35  	v1 "k8s.io/api/core/v1"
    36  	storagev1 "k8s.io/api/storage/v1"
    37  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    38  	"k8s.io/apimachinery/pkg/labels"
    39  	"k8s.io/apimachinery/pkg/types"
    40  	"k8s.io/apimachinery/pkg/util/sets"
    41  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    42  	"k8s.io/apimachinery/pkg/util/wait"
    43  	clientset "k8s.io/client-go/kubernetes"
    44  	volumehelpers "k8s.io/cloud-provider/volume/helpers"
    45  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    46  	"k8s.io/kubernetes/test/e2e/framework"
    47  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    48  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    49  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    50  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    51  	"k8s.io/kubernetes/test/e2e/storage/testsuites"
    52  	"k8s.io/kubernetes/test/e2e/storage/utils"
    53  	imageutils "k8s.io/kubernetes/test/utils/image"
    54  	admissionapi "k8s.io/pod-security-admission/api"
    55  )
    56  
    57  const (
    58  	pvDeletionTimeout       = 3 * time.Minute
    59  	statefulSetReadyTimeout = 3 * time.Minute
    60  	taintKeyPrefix          = "zoneTaint_"
    61  	repdMinSize             = "200Gi"
    62  	pvcName                 = "regional-pd-vol"
    63  )
    64  
    65  var _ = utils.SIGDescribe("Regional PD", func() {
    66  	f := framework.NewDefaultFramework("regional-pd")
    67  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    68  
    69  	// filled in BeforeEach
    70  	var c clientset.Interface
    71  	var ns string
    72  
    73  	ginkgo.BeforeEach(func(ctx context.Context) {
    74  		c = f.ClientSet
    75  		ns = f.Namespace.Name
    76  
    77  		e2eskipper.SkipUnlessProviderIs("gce", "gke")
    78  		e2eskipper.SkipUnlessMultizone(ctx, c)
    79  	})
    80  
    81  	ginkgo.Describe("RegionalPD", func() {
    82  		f.It("should provision storage", f.WithSlow(), func(ctx context.Context) {
    83  			testVolumeProvisioning(ctx, c, f.Timeouts, ns)
    84  		})
    85  
    86  		f.It("should provision storage with delayed binding", f.WithSlow(), func(ctx context.Context) {
    87  			testRegionalDelayedBinding(ctx, c, ns, 1 /* pvcCount */)
    88  			testRegionalDelayedBinding(ctx, c, ns, 3 /* pvcCount */)
    89  		})
    90  
    91  		f.It("should provision storage in the allowedTopologies", f.WithSlow(), func(ctx context.Context) {
    92  			testRegionalAllowedTopologies(ctx, c, ns)
    93  		})
    94  
    95  		f.It("should provision storage in the allowedTopologies with delayed binding", f.WithSlow(), func(ctx context.Context) {
    96  			testRegionalAllowedTopologiesWithDelayedBinding(ctx, c, ns, 1 /* pvcCount */)
    97  			testRegionalAllowedTopologiesWithDelayedBinding(ctx, c, ns, 3 /* pvcCount */)
    98  		})
    99  
   100  		f.It("should failover to a different zone when all nodes in one zone become unreachable", f.WithSlow(), f.WithDisruptive(), func(ctx context.Context) {
   101  			testZonalFailover(ctx, c, ns)
   102  		})
   103  	})
   104  })
   105  
   106  func testVolumeProvisioning(ctx context.Context, c clientset.Interface, t *framework.TimeoutContext, ns string) {
   107  	cloudZones := getTwoRandomZones(ctx, c)
   108  
   109  	// This test checks that dynamic provisioning can provision a volume
   110  	// that can be used to persist data among pods.
   111  	tests := []testsuites.StorageClassTest{
   112  		{
   113  			Name:           "HDD Regional PD on GCE/GKE",
   114  			CloudProviders: []string{"gce", "gke"},
   115  			Provisioner:    "kubernetes.io/gce-pd",
   116  			Timeouts:       framework.NewTimeoutContext(),
   117  			Parameters: map[string]string{
   118  				"type":             "pd-standard",
   119  				"zones":            strings.Join(cloudZones, ","),
   120  				"replication-type": "regional-pd",
   121  			},
   122  			ClaimSize:    repdMinSize,
   123  			ExpectedSize: repdMinSize,
   124  			PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
   125  				volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, t, claim, e2epod.NodeSelection{})
   126  				gomega.Expect(volume).NotTo(gomega.BeNil())
   127  
   128  				err := checkGCEPD(volume, "pd-standard")
   129  				framework.ExpectNoError(err, "checkGCEPD")
   130  				err = verifyZonesInPV(volume, sets.NewString(cloudZones...), true /* match */)
   131  				framework.ExpectNoError(err, "verifyZonesInPV")
   132  
   133  			},
   134  		},
   135  		{
   136  			Name:           "HDD Regional PD with auto zone selection on GCE/GKE",
   137  			CloudProviders: []string{"gce", "gke"},
   138  			Provisioner:    "kubernetes.io/gce-pd",
   139  			Timeouts:       framework.NewTimeoutContext(),
   140  			Parameters: map[string]string{
   141  				"type":             "pd-standard",
   142  				"replication-type": "regional-pd",
   143  			},
   144  			ClaimSize:    repdMinSize,
   145  			ExpectedSize: repdMinSize,
   146  			PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
   147  				volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, t, claim, e2epod.NodeSelection{})
   148  				gomega.Expect(volume).NotTo(gomega.BeNil())
   149  
   150  				err := checkGCEPD(volume, "pd-standard")
   151  				framework.ExpectNoError(err, "checkGCEPD")
   152  				zones, err := e2enode.GetClusterZones(ctx, c)
   153  				framework.ExpectNoError(err, "GetClusterZones")
   154  				err = verifyZonesInPV(volume, zones, false /* match */)
   155  				framework.ExpectNoError(err, "verifyZonesInPV")
   156  			},
   157  		},
   158  	}
   159  
   160  	for _, test := range tests {
   161  		test.Client = c
   162  		computedStorageClass := testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, "" /* suffix */))
   163  		test.Class = computedStorageClass
   164  		test.Claim = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   165  			ClaimSize:        test.ClaimSize,
   166  			StorageClassName: &(test.Class.Name),
   167  			VolumeMode:       &test.VolumeMode,
   168  		}, ns)
   169  
   170  		test.TestDynamicProvisioning(ctx)
   171  	}
   172  }
   173  
   174  func testZonalFailover(ctx context.Context, c clientset.Interface, ns string) {
   175  	cloudZones := getTwoRandomZones(ctx, c)
   176  	testSpec := testsuites.StorageClassTest{
   177  		Name:           "Regional PD Failover on GCE/GKE",
   178  		CloudProviders: []string{"gce", "gke"},
   179  		Timeouts:       framework.NewTimeoutContext(),
   180  		Provisioner:    "kubernetes.io/gce-pd",
   181  		Parameters: map[string]string{
   182  			"type":             "pd-standard",
   183  			"zones":            strings.Join(cloudZones, ","),
   184  			"replication-type": "regional-pd",
   185  		},
   186  		ClaimSize:    repdMinSize,
   187  		ExpectedSize: repdMinSize,
   188  	}
   189  	class := newStorageClass(testSpec, ns, "" /* suffix */)
   190  	claimTemplate := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   191  		NamePrefix:       pvcName,
   192  		ClaimSize:        testSpec.ClaimSize,
   193  		StorageClassName: &(class.Name),
   194  		VolumeMode:       &testSpec.VolumeMode,
   195  	}, ns)
   196  	statefulSet, service, regionalPDLabels := newStatefulSet(claimTemplate, ns)
   197  
   198  	ginkgo.By("creating a StorageClass " + class.Name)
   199  	_, err := c.StorageV1().StorageClasses().Create(ctx, class, metav1.CreateOptions{})
   200  	framework.ExpectNoError(err)
   201  	defer func() {
   202  		framework.Logf("deleting storage class %s", class.Name)
   203  		framework.ExpectNoError(c.StorageV1().StorageClasses().Delete(ctx, class.Name, metav1.DeleteOptions{}),
   204  			"Error deleting StorageClass %s", class.Name)
   205  	}()
   206  
   207  	ginkgo.By("creating a StatefulSet")
   208  	_, err = c.CoreV1().Services(ns).Create(ctx, service, metav1.CreateOptions{})
   209  	framework.ExpectNoError(err)
   210  	_, err = c.AppsV1().StatefulSets(ns).Create(ctx, statefulSet, metav1.CreateOptions{})
   211  	framework.ExpectNoError(err)
   212  
   213  	ginkgo.DeferCleanup(func(ctx context.Context) {
   214  		framework.Logf("deleting statefulset%q/%q", statefulSet.Namespace, statefulSet.Name)
   215  		// typically this claim has already been deleted
   216  		framework.ExpectNoError(c.AppsV1().StatefulSets(ns).Delete(ctx, statefulSet.Name, metav1.DeleteOptions{}),
   217  			"Error deleting StatefulSet %s", statefulSet.Name)
   218  
   219  		framework.Logf("deleting claims in namespace %s", ns)
   220  		pvc := getPVC(ctx, c, ns, regionalPDLabels)
   221  		framework.ExpectNoError(c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(ctx, pvc.Name, metav1.DeleteOptions{}),
   222  			"Error deleting claim %s.", pvc.Name)
   223  		if pvc.Spec.VolumeName != "" {
   224  			err = e2epv.WaitForPersistentVolumeDeleted(ctx, c, pvc.Spec.VolumeName, framework.Poll, pvDeletionTimeout)
   225  			if err != nil {
   226  				framework.Logf("WARNING: PV %s is not yet deleted, and subsequent tests may be affected.", pvc.Spec.VolumeName)
   227  			}
   228  		}
   229  	})
   230  
   231  	err = waitForStatefulSetReplicasReady(ctx, statefulSet.Name, ns, c, framework.Poll, statefulSetReadyTimeout)
   232  	if err != nil {
   233  		pod := getPod(ctx, c, ns, regionalPDLabels)
   234  		if !podutil.IsPodReadyConditionTrue(pod.Status) {
   235  			framework.Failf("The statefulset pod %s was expected to be ready, instead has the following conditions: %v", pod.Name, pod.Status.Conditions)
   236  		}
   237  		framework.ExpectNoError(err)
   238  	}
   239  
   240  	pvc := getPVC(ctx, c, ns, regionalPDLabels)
   241  
   242  	ginkgo.By("getting zone information from pod")
   243  	pod := getPod(ctx, c, ns, regionalPDLabels)
   244  	nodeName := pod.Spec.NodeName
   245  	node, err := c.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
   246  	framework.ExpectNoError(err)
   247  	podZone := node.Labels[v1.LabelTopologyZone]
   248  
   249  	ginkgo.By("tainting nodes in the zone the pod is scheduled in")
   250  	selector := labels.SelectorFromSet(labels.Set(map[string]string{v1.LabelTopologyZone: podZone}))
   251  	nodesInZone, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
   252  	framework.ExpectNoError(err)
   253  	addTaint(ctx, c, ns, nodesInZone.Items, podZone)
   254  
   255  	ginkgo.By("deleting StatefulSet pod")
   256  	err = c.CoreV1().Pods(ns).Delete(ctx, pod.Name, metav1.DeleteOptions{})
   257  
   258  	// Verify the pod is scheduled in the other zone.
   259  	ginkgo.By("verifying the pod is scheduled in a different zone.")
   260  	var otherZone string
   261  	if cloudZones[0] == podZone {
   262  		otherZone = cloudZones[1]
   263  	} else {
   264  		otherZone = cloudZones[0]
   265  	}
   266  	waitErr := wait.PollUntilContextTimeout(ctx, framework.Poll, statefulSetReadyTimeout, true, func(ctx context.Context) (bool, error) {
   267  		framework.Logf("Checking whether new pod is scheduled in zone %q", otherZone)
   268  		pod := getPod(ctx, c, ns, regionalPDLabels)
   269  		node, err := c.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
   270  		if err != nil {
   271  			return false, nil
   272  		}
   273  		newPodZone := node.Labels[v1.LabelTopologyZone]
   274  		return newPodZone == otherZone, nil
   275  	})
   276  	framework.ExpectNoError(waitErr, "Error waiting for pod to be scheduled in a different zone (%q): %v", otherZone, err)
   277  
   278  	err = waitForStatefulSetReplicasReady(ctx, statefulSet.Name, ns, c, 3*time.Second, framework.RestartPodReadyAgainTimeout)
   279  	if err != nil {
   280  		pod := getPod(ctx, c, ns, regionalPDLabels)
   281  		if !podutil.IsPodReadyConditionTrue(pod.Status) {
   282  			framework.Failf("The statefulset pod %s was expected to be ready, instead has the following conditions: %v", pod.Name, pod.Status.Conditions)
   283  		}
   284  		framework.ExpectNoError(err)
   285  	}
   286  
   287  	ginkgo.By("verifying the same PVC is used by the new pod")
   288  	gomega.Expect(getPVC(ctx, c, ns, regionalPDLabels).Name).To(gomega.Equal(pvc.Name), "The same PVC should be used after failover.")
   289  
   290  	ginkgo.By("verifying the container output has 2 lines, indicating the pod has been created twice using the same regional PD.")
   291  	logs, err := e2epod.GetPodLogs(ctx, c, ns, pod.Name, "")
   292  	framework.ExpectNoError(err,
   293  		"Error getting logs from pod %s in namespace %s", pod.Name, ns)
   294  	lineCount := len(strings.Split(strings.TrimSpace(logs), "\n"))
   295  	expectedLineCount := 2
   296  	gomega.Expect(lineCount).To(gomega.Equal(expectedLineCount), "Line count of the written file should be %d.", expectedLineCount)
   297  
   298  }
   299  
   300  func addTaint(ctx context.Context, c clientset.Interface, ns string, nodes []v1.Node, podZone string) {
   301  	for _, node := range nodes {
   302  		oldData, err := json.Marshal(node)
   303  		framework.ExpectNoError(err)
   304  
   305  		node.Spec.Taints = append(node.Spec.Taints, v1.Taint{
   306  			Key:    taintKeyPrefix + ns,
   307  			Value:  podZone,
   308  			Effect: v1.TaintEffectNoSchedule,
   309  		})
   310  
   311  		newData, err := json.Marshal(node)
   312  		framework.ExpectNoError(err)
   313  
   314  		patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
   315  		framework.ExpectNoError(err)
   316  
   317  		reversePatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{})
   318  		framework.ExpectNoError(err)
   319  
   320  		_, err = c.CoreV1().Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
   321  		framework.ExpectNoError(err)
   322  
   323  		nodeName := node.Name
   324  		ginkgo.DeferCleanup(func(ctx context.Context) {
   325  			framework.Logf("removing taint for node %q", nodeName)
   326  			_, err := c.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, reversePatchBytes, metav1.PatchOptions{})
   327  			framework.ExpectNoError(err)
   328  		})
   329  	}
   330  }
   331  
   332  func testRegionalDelayedBinding(ctx context.Context, c clientset.Interface, ns string, pvcCount int) {
   333  	test := testsuites.StorageClassTest{
   334  		Client:      c,
   335  		Name:        "Regional PD storage class with waitForFirstConsumer test on GCE",
   336  		Provisioner: "kubernetes.io/gce-pd",
   337  		Timeouts:    framework.NewTimeoutContext(),
   338  		Parameters: map[string]string{
   339  			"type":             "pd-standard",
   340  			"replication-type": "regional-pd",
   341  		},
   342  		ClaimSize:    repdMinSize,
   343  		DelayBinding: true,
   344  	}
   345  
   346  	suffix := "delayed-regional"
   347  
   348  	test.Class = testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, suffix))
   349  	var claims []*v1.PersistentVolumeClaim
   350  	for i := 0; i < pvcCount; i++ {
   351  		claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   352  			ClaimSize:        test.ClaimSize,
   353  			StorageClassName: &(test.Class.Name),
   354  			VolumeMode:       &test.VolumeMode,
   355  		}, ns)
   356  		claims = append(claims, claim)
   357  	}
   358  	pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(ctx, claims, nil /* node selector */, false /* expect unschedulable */)
   359  	if node == nil {
   360  		framework.Failf("unexpected nil node found")
   361  	}
   362  	zone, ok := node.Labels[v1.LabelTopologyZone]
   363  	if !ok {
   364  		framework.Failf("label %s not found on Node", v1.LabelTopologyZone)
   365  	}
   366  	for _, pv := range pvs {
   367  		checkZoneFromLabelAndAffinity(pv, zone, false)
   368  	}
   369  }
   370  
   371  func testRegionalAllowedTopologies(ctx context.Context, c clientset.Interface, ns string) {
   372  	test := testsuites.StorageClassTest{
   373  		Name:        "Regional PD storage class with allowedTopologies test on GCE",
   374  		Provisioner: "kubernetes.io/gce-pd",
   375  		Timeouts:    framework.NewTimeoutContext(),
   376  		Parameters: map[string]string{
   377  			"type":             "pd-standard",
   378  			"replication-type": "regional-pd",
   379  		},
   380  		ClaimSize:    repdMinSize,
   381  		ExpectedSize: repdMinSize,
   382  	}
   383  
   384  	suffix := "topo-regional"
   385  	test.Client = c
   386  	test.Class = testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, suffix))
   387  	zones := getTwoRandomZones(ctx, c)
   388  	addAllowedTopologiesToStorageClass(c, test.Class, zones)
   389  	test.Claim = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   390  		NamePrefix:       pvcName,
   391  		ClaimSize:        test.ClaimSize,
   392  		StorageClassName: &(test.Class.Name),
   393  		VolumeMode:       &test.VolumeMode,
   394  	}, ns)
   395  
   396  	pv := test.TestDynamicProvisioning(ctx)
   397  	checkZonesFromLabelAndAffinity(pv, sets.NewString(zones...), true)
   398  }
   399  
   400  func testRegionalAllowedTopologiesWithDelayedBinding(ctx context.Context, c clientset.Interface, ns string, pvcCount int) {
   401  	test := testsuites.StorageClassTest{
   402  		Client:      c,
   403  		Timeouts:    framework.NewTimeoutContext(),
   404  		Name:        "Regional PD storage class with allowedTopologies and waitForFirstConsumer test on GCE",
   405  		Provisioner: "kubernetes.io/gce-pd",
   406  		Parameters: map[string]string{
   407  			"type":             "pd-standard",
   408  			"replication-type": "regional-pd",
   409  		},
   410  		ClaimSize:    repdMinSize,
   411  		DelayBinding: true,
   412  	}
   413  
   414  	suffix := "topo-delayed-regional"
   415  	test.Class = testsuites.SetupStorageClass(ctx, test.Client, newStorageClass(test, ns, suffix))
   416  	topoZones := getTwoRandomZones(ctx, c)
   417  	addAllowedTopologiesToStorageClass(c, test.Class, topoZones)
   418  	var claims []*v1.PersistentVolumeClaim
   419  	for i := 0; i < pvcCount; i++ {
   420  		claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   421  			ClaimSize:        test.ClaimSize,
   422  			StorageClassName: &(test.Class.Name),
   423  			VolumeMode:       &test.VolumeMode,
   424  		}, ns)
   425  		claims = append(claims, claim)
   426  	}
   427  	pvs, node := test.TestBindingWaitForFirstConsumerMultiPVC(ctx, claims, nil /* node selector */, false /* expect unschedulable */)
   428  	if node == nil {
   429  		framework.Failf("unexpected nil node found")
   430  	}
   431  	nodeZone, ok := node.Labels[v1.LabelTopologyZone]
   432  	if !ok {
   433  		framework.Failf("label %s not found on Node", v1.LabelTopologyZone)
   434  	}
   435  	zoneFound := false
   436  	for _, zone := range topoZones {
   437  		if zone == nodeZone {
   438  			zoneFound = true
   439  			break
   440  		}
   441  	}
   442  	if !zoneFound {
   443  		framework.Failf("zones specified in AllowedTopologies: %v does not contain zone of node where PV got provisioned: %s", topoZones, nodeZone)
   444  	}
   445  	for _, pv := range pvs {
   446  		checkZonesFromLabelAndAffinity(pv, sets.NewString(topoZones...), true)
   447  	}
   448  }
   449  
   450  func getPVC(ctx context.Context, c clientset.Interface, ns string, pvcLabels map[string]string) *v1.PersistentVolumeClaim {
   451  	selector := labels.Set(pvcLabels).AsSelector()
   452  	options := metav1.ListOptions{LabelSelector: selector.String()}
   453  	pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(ctx, options)
   454  	framework.ExpectNoError(err)
   455  	gomega.Expect(pvcList.Items).To(gomega.HaveLen(1), "There should be exactly 1 PVC matched.")
   456  
   457  	return &pvcList.Items[0]
   458  }
   459  
   460  func getPod(ctx context.Context, c clientset.Interface, ns string, podLabels map[string]string) *v1.Pod {
   461  	selector := labels.Set(podLabels).AsSelector()
   462  	options := metav1.ListOptions{LabelSelector: selector.String()}
   463  	podList, err := c.CoreV1().Pods(ns).List(ctx, options)
   464  	framework.ExpectNoError(err)
   465  	gomega.Expect(podList.Items).To(gomega.HaveLen(1), "There should be exactly 1 pod matched.")
   466  
   467  	return &podList.Items[0]
   468  }
   469  
   470  func addAllowedTopologiesToStorageClass(c clientset.Interface, sc *storagev1.StorageClass, zones []string) {
   471  	term := v1.TopologySelectorTerm{
   472  		MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
   473  			{
   474  				Key:    v1.LabelTopologyZone,
   475  				Values: zones,
   476  			},
   477  		},
   478  	}
   479  	sc.AllowedTopologies = append(sc.AllowedTopologies, term)
   480  }
   481  
   482  // Generates the spec of a StatefulSet with 1 replica that mounts a Regional PD.
   483  func newStatefulSet(claimTemplate *v1.PersistentVolumeClaim, ns string) (sts *appsv1.StatefulSet, svc *v1.Service, labels map[string]string) {
   484  	var replicas int32 = 1
   485  	labels = map[string]string{"app": "regional-pd-workload"}
   486  
   487  	svc = &v1.Service{
   488  		ObjectMeta: metav1.ObjectMeta{
   489  			Name:      "regional-pd-service",
   490  			Namespace: ns,
   491  			Labels:    labels,
   492  		},
   493  		Spec: v1.ServiceSpec{
   494  			Ports: []v1.ServicePort{{
   495  				Port: 80,
   496  				Name: "web",
   497  			}},
   498  			ClusterIP: v1.ClusterIPNone,
   499  			Selector:  labels,
   500  		},
   501  	}
   502  
   503  	sts = &appsv1.StatefulSet{
   504  		ObjectMeta: metav1.ObjectMeta{
   505  			Name:      "regional-pd-sts",
   506  			Namespace: ns,
   507  		},
   508  		Spec: appsv1.StatefulSetSpec{
   509  			Selector: &metav1.LabelSelector{
   510  				MatchLabels: labels,
   511  			},
   512  			ServiceName:          svc.Name,
   513  			Replicas:             &replicas,
   514  			Template:             *newPodTemplate(labels),
   515  			VolumeClaimTemplates: []v1.PersistentVolumeClaim{*claimTemplate},
   516  		},
   517  	}
   518  
   519  	return
   520  }
   521  
   522  func newPodTemplate(labels map[string]string) *v1.PodTemplateSpec {
   523  	return &v1.PodTemplateSpec{
   524  		ObjectMeta: metav1.ObjectMeta{
   525  			Labels: labels,
   526  		},
   527  		Spec: v1.PodSpec{
   528  			Containers: []v1.Container{
   529  				// This container writes its pod name to a file in the Regional PD
   530  				// and prints the entire file to stdout.
   531  				{
   532  					Name:    "busybox",
   533  					Image:   imageutils.GetE2EImage(imageutils.BusyBox),
   534  					Command: []string{"sh", "-c"},
   535  					Args: []string{
   536  						"echo ${POD_NAME} >> /mnt/data/regional-pd/pods.txt;" +
   537  							"cat /mnt/data/regional-pd/pods.txt;" +
   538  							"sleep 3600;",
   539  					},
   540  					Env: []v1.EnvVar{{
   541  						Name: "POD_NAME",
   542  						ValueFrom: &v1.EnvVarSource{
   543  							FieldRef: &v1.ObjectFieldSelector{
   544  								FieldPath: "metadata.name",
   545  							},
   546  						},
   547  					}},
   548  					Ports: []v1.ContainerPort{{
   549  						ContainerPort: 80,
   550  						Name:          "web",
   551  					}},
   552  					VolumeMounts: []v1.VolumeMount{{
   553  						Name:      pvcName,
   554  						MountPath: "/mnt/data/regional-pd",
   555  					}},
   556  				},
   557  			},
   558  		},
   559  	}
   560  }
   561  
   562  func getTwoRandomZones(ctx context.Context, c clientset.Interface) []string {
   563  	zones, err := e2enode.GetClusterZones(ctx, c)
   564  	framework.ExpectNoError(err)
   565  	gomega.Expect(zones.Len()).To(gomega.BeNumerically(">=", 2),
   566  		"The test should only be run in multizone clusters.")
   567  
   568  	zone1, _ := zones.PopAny()
   569  	zone2, _ := zones.PopAny()
   570  	return []string{zone1, zone2}
   571  }
   572  
   573  // If match is true, check if zones in PV exactly match zones given.
   574  // Otherwise, check whether zones in PV is superset of zones given.
   575  func verifyZonesInPV(volume *v1.PersistentVolume, zones sets.String, match bool) error {
   576  	pvZones, err := volumehelpers.LabelZonesToSet(volume.Labels[v1.LabelTopologyZone])
   577  	if err != nil {
   578  		return err
   579  	}
   580  
   581  	if match && zones.Equal(pvZones) || !match && zones.IsSuperset(pvZones) {
   582  		return nil
   583  	}
   584  
   585  	return fmt.Errorf("Zones in StorageClass are %v, but zones in PV are %v", zones, pvZones)
   586  
   587  }
   588  
   589  func checkZoneFromLabelAndAffinity(pv *v1.PersistentVolume, zone string, matchZone bool) {
   590  	checkZonesFromLabelAndAffinity(pv, sets.NewString(zone), matchZone)
   591  }
   592  
   593  // checkZoneLabelAndAffinity checks the LabelTopologyZone label of PV and terms
   594  // with key LabelTopologyZone in PV's node affinity contains zone
   595  // matchZones is used to indicate if zones should match perfectly
   596  func checkZonesFromLabelAndAffinity(pv *v1.PersistentVolume, zones sets.String, matchZones bool) {
   597  	ginkgo.By("checking PV's zone label and node affinity terms match expected zone")
   598  	if pv == nil {
   599  		framework.Failf("nil pv passed")
   600  	}
   601  	pvLabel, ok := pv.Labels[v1.LabelTopologyZone]
   602  	if !ok {
   603  		framework.Failf("label %s not found on PV", v1.LabelTopologyZone)
   604  	}
   605  
   606  	zonesFromLabel, err := volumehelpers.LabelZonesToSet(pvLabel)
   607  	if err != nil {
   608  		framework.Failf("unable to parse zone labels %s: %v", pvLabel, err)
   609  	}
   610  	if matchZones && !zonesFromLabel.Equal(zones) {
   611  		framework.Failf("value[s] of %s label for PV: %v does not match expected zone[s]: %v", v1.LabelTopologyZone, zonesFromLabel, zones)
   612  	}
   613  	if !matchZones && !zonesFromLabel.IsSuperset(zones) {
   614  		framework.Failf("value[s] of %s label for PV: %v does not contain expected zone[s]: %v", v1.LabelTopologyZone, zonesFromLabel, zones)
   615  	}
   616  	if pv.Spec.NodeAffinity == nil {
   617  		framework.Failf("node affinity not found in PV spec %v", pv.Spec)
   618  	}
   619  	if len(pv.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
   620  		framework.Failf("node selector terms not found in PV spec %v", pv.Spec)
   621  	}
   622  
   623  	for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
   624  		keyFound := false
   625  		for _, r := range term.MatchExpressions {
   626  			if r.Key != v1.LabelTopologyZone {
   627  				continue
   628  			}
   629  			keyFound = true
   630  			zonesFromNodeAffinity := sets.NewString(r.Values...)
   631  			if matchZones && !zonesFromNodeAffinity.Equal(zones) {
   632  				framework.Failf("zones from NodeAffinity of PV: %v does not equal expected zone[s]: %v", zonesFromNodeAffinity, zones)
   633  			}
   634  			if !matchZones && !zonesFromNodeAffinity.IsSuperset(zones) {
   635  				framework.Failf("zones from NodeAffinity of PV: %v does not contain expected zone[s]: %v", zonesFromNodeAffinity, zones)
   636  			}
   637  			break
   638  		}
   639  		if !keyFound {
   640  			framework.Failf("label %s not found in term %v", v1.LabelTopologyZone, term)
   641  		}
   642  	}
   643  }
   644  
   645  // waitForStatefulSetReplicasReady waits for all replicas of a StatefulSet to become ready or until timeout occurs, whichever comes first.
   646  func waitForStatefulSetReplicasReady(ctx context.Context, statefulSetName, ns string, c clientset.Interface, Poll, timeout time.Duration) error {
   647  	framework.Logf("Waiting up to %v for StatefulSet %s to have all replicas ready", timeout, statefulSetName)
   648  	for start := time.Now(); time.Since(start) < timeout; time.Sleep(Poll) {
   649  		sts, err := c.AppsV1().StatefulSets(ns).Get(ctx, statefulSetName, metav1.GetOptions{})
   650  		if err != nil {
   651  			framework.Logf("Get StatefulSet %s failed, ignoring for %v: %v", statefulSetName, Poll, err)
   652  			continue
   653  		}
   654  		if sts.Status.ReadyReplicas == *sts.Spec.Replicas {
   655  			framework.Logf("All %d replicas of StatefulSet %s are ready. (%v)", sts.Status.ReadyReplicas, statefulSetName, time.Since(start))
   656  			return nil
   657  		}
   658  		framework.Logf("StatefulSet %s found but there are %d ready replicas and %d total replicas.", statefulSetName, sts.Status.ReadyReplicas, *sts.Spec.Replicas)
   659  	}
   660  	return fmt.Errorf("StatefulSet %s still has unready pods within %v", statefulSetName, timeout)
   661  }
   662  

View as plain text