...

Source file src/k8s.io/kubernetes/test/e2e/framework/statefulset/rest.go

Documentation: k8s.io/kubernetes/test/e2e/framework/statefulset

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"path/filepath"
    23  	"strings"
    24  	"time"
    25  
    26  	appsv1 "k8s.io/api/apps/v1"
    27  	v1 "k8s.io/api/core/v1"
    28  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/labels"
    31  	"k8s.io/apimachinery/pkg/util/sets"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	clientset "k8s.io/client-go/kubernetes"
    34  	"k8s.io/kubectl/pkg/util/podutils"
    35  	"k8s.io/kubernetes/test/e2e/framework"
    36  	e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest"
    37  	e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
    38  )
    39  
    40  // CreateStatefulSet creates a StatefulSet from the manifest at manifestPath in the Namespace ns using kubectl create.
    41  func CreateStatefulSet(ctx context.Context, c clientset.Interface, manifestPath, ns string) *appsv1.StatefulSet {
    42  	mkpath := func(file string) string {
    43  		return filepath.Join(manifestPath, file)
    44  	}
    45  
    46  	framework.Logf("Parsing statefulset from %v", mkpath("statefulset.yaml"))
    47  	ss, err := e2emanifest.StatefulSetFromManifest(mkpath("statefulset.yaml"), ns)
    48  	framework.ExpectNoError(err)
    49  	framework.Logf("Parsing service from %v", mkpath("service.yaml"))
    50  	svc, err := e2emanifest.SvcFromManifest(mkpath("service.yaml"))
    51  	framework.ExpectNoError(err)
    52  
    53  	framework.Logf(fmt.Sprintf("creating " + ss.Name + " service"))
    54  	_, err = c.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
    55  	framework.ExpectNoError(err)
    56  
    57  	framework.Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector))
    58  	_, err = c.AppsV1().StatefulSets(ns).Create(ctx, ss, metav1.CreateOptions{})
    59  	framework.ExpectNoError(err)
    60  	WaitForRunningAndReady(ctx, c, *ss.Spec.Replicas, ss)
    61  	return ss
    62  }
    63  
    64  // GetPodList gets the current Pods in ss.
    65  func GetPodList(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) *v1.PodList {
    66  	selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
    67  	framework.ExpectNoError(err)
    68  	podList, err := c.CoreV1().Pods(ss.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
    69  	framework.ExpectNoError(err)
    70  	return podList
    71  }
    72  
    73  // DeleteAllStatefulSets deletes all StatefulSet API Objects in Namespace ns.
    74  func DeleteAllStatefulSets(ctx context.Context, c clientset.Interface, ns string) {
    75  	ssList, err := c.AppsV1().StatefulSets(ns).List(ctx, metav1.ListOptions{LabelSelector: labels.Everything().String()})
    76  	framework.ExpectNoError(err)
    77  
    78  	// Scale down each statefulset, then delete it completely.
    79  	// Deleting a pvc without doing this will leak volumes, #25101.
    80  	errList := []string{}
    81  	for i := range ssList.Items {
    82  		ss := &ssList.Items[i]
    83  		var err error
    84  		if ss, err = Scale(ctx, c, ss, 0); err != nil {
    85  			errList = append(errList, fmt.Sprintf("%v", err))
    86  		}
    87  		WaitForStatusReplicas(ctx, c, ss, 0)
    88  		framework.Logf("Deleting statefulset %v", ss.Name)
    89  		// Use OrphanDependents=false so it's deleted synchronously.
    90  		// We already made sure the Pods are gone inside Scale().
    91  		if err := c.AppsV1().StatefulSets(ss.Namespace).Delete(ctx, ss.Name, metav1.DeleteOptions{OrphanDependents: new(bool)}); err != nil {
    92  			errList = append(errList, fmt.Sprintf("%v", err))
    93  		}
    94  	}
    95  
    96  	// pvs are global, so we need to wait for the exact ones bound to the statefulset pvcs.
    97  	pvNames := sets.NewString()
    98  	// TODO: Don't assume all pvcs in the ns belong to a statefulset
    99  	pvcPollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true, func(ctx context.Context) (bool, error) {
   100  		pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(ctx, metav1.ListOptions{LabelSelector: labels.Everything().String()})
   101  		if err != nil {
   102  			framework.Logf("WARNING: Failed to list pvcs, retrying %v", err)
   103  			return false, nil
   104  		}
   105  		for _, pvc := range pvcList.Items {
   106  			pvNames.Insert(pvc.Spec.VolumeName)
   107  			// TODO: Double check that there are no pods referencing the pvc
   108  			framework.Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName)
   109  			if err := c.CoreV1().PersistentVolumeClaims(ns).Delete(ctx, pvc.Name, metav1.DeleteOptions{}); err != nil {
   110  				return false, nil
   111  			}
   112  		}
   113  		return true, nil
   114  	})
   115  	if pvcPollErr != nil {
   116  		errList = append(errList, fmt.Sprintf("Timeout waiting for pvc deletion."))
   117  	}
   118  
   119  	pollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true, func(ctx context.Context) (bool, error) {
   120  		pvList, err := c.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{LabelSelector: labels.Everything().String()})
   121  		if err != nil {
   122  			framework.Logf("WARNING: Failed to list pvs, retrying %v", err)
   123  			return false, nil
   124  		}
   125  		waitingFor := []string{}
   126  		for _, pv := range pvList.Items {
   127  			if pvNames.Has(pv.Name) {
   128  				waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status))
   129  			}
   130  		}
   131  		if len(waitingFor) == 0 {
   132  			return true, nil
   133  		}
   134  		framework.Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n"))
   135  		return false, nil
   136  	})
   137  	if pollErr != nil {
   138  		errList = append(errList, fmt.Sprintf("Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs."))
   139  	}
   140  	if len(errList) != 0 {
   141  		framework.ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n")))
   142  	}
   143  }
   144  
   145  // Scale scales ss to count replicas.
   146  func Scale(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, count int32) (*appsv1.StatefulSet, error) {
   147  	name := ss.Name
   148  	ns := ss.Namespace
   149  
   150  	framework.Logf("Scaling statefulset %s to %d", name, count)
   151  	ss = update(ctx, c, ns, name, count)
   152  
   153  	var statefulPodList *v1.PodList
   154  	pollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true, func(ctx context.Context) (bool, error) {
   155  		statefulPodList = GetPodList(ctx, c, ss)
   156  		if int32(len(statefulPodList.Items)) == count {
   157  			return true, nil
   158  		}
   159  		return false, nil
   160  	})
   161  	if pollErr != nil {
   162  		unhealthy := []string{}
   163  		for _, statefulPod := range statefulPodList.Items {
   164  			delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, podutils.IsPodReady(&statefulPod)
   165  			if delTs != nil || phase != v1.PodRunning || !readiness {
   166  				unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
   167  			}
   168  		}
   169  		return ss, fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, StatefulSetTimeout, unhealthy)
   170  	}
   171  	return ss, nil
   172  }
   173  
   174  // UpdateReplicas updates the replicas of ss to count.
   175  func UpdateReplicas(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, count int32) {
   176  	update(ctx, c, ss.Namespace, ss.Name, count)
   177  }
   178  
   179  // Restart scales ss to 0 and then back to its previous number of replicas.
   180  func Restart(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) {
   181  	oldReplicas := *(ss.Spec.Replicas)
   182  	ss, err := Scale(ctx, c, ss, 0)
   183  	framework.ExpectNoError(err)
   184  	// Wait for controller to report the desired number of Pods.
   185  	// This way we know the controller has observed all Pod deletions
   186  	// before we scale it back up.
   187  	WaitForStatusReplicas(ctx, c, ss, 0)
   188  	update(ctx, c, ss.Namespace, ss.Name, oldReplicas)
   189  }
   190  
   191  // CheckHostname verifies that all Pods in ss have the correct Hostname. If the returned error is not nil than verification failed.
   192  func CheckHostname(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) error {
   193  	cmd := "printf $(hostname)"
   194  	podList := GetPodList(ctx, c, ss)
   195  	for _, statefulPod := range podList.Items {
   196  		hostname, err := e2epodoutput.RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
   197  		if err != nil {
   198  			return err
   199  		}
   200  		if hostname != statefulPod.Name {
   201  			return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name)
   202  		}
   203  	}
   204  	return nil
   205  }
   206  
   207  // CheckMount checks that the mount at mountPath is valid for all Pods in ss.
   208  func CheckMount(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, mountPath string) error {
   209  	for _, cmd := range []string{
   210  		// Print inode, size etc
   211  		fmt.Sprintf("ls -idlh %v", mountPath),
   212  		// Print subdirs
   213  		fmt.Sprintf("find %v", mountPath),
   214  		// Try writing
   215  		fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))),
   216  	} {
   217  		if err := ExecInStatefulPods(ctx, c, ss, cmd); err != nil {
   218  			return fmt.Errorf("failed to execute %v, error: %w", cmd, err)
   219  		}
   220  	}
   221  	return nil
   222  }
   223  
   224  // CheckServiceName asserts that the ServiceName for ss is equivalent to expectedServiceName.
   225  func CheckServiceName(ss *appsv1.StatefulSet, expectedServiceName string) error {
   226  	framework.Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName)
   227  
   228  	if expectedServiceName != ss.Spec.ServiceName {
   229  		return fmt.Errorf("wrong service name governing statefulset. Expected %s got %s",
   230  			expectedServiceName, ss.Spec.ServiceName)
   231  	}
   232  
   233  	return nil
   234  }
   235  
   236  // ExecInStatefulPods executes cmd in all Pods in ss. If a error occurs it is returned and cmd is not execute in any subsequent Pods.
   237  func ExecInStatefulPods(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, cmd string) error {
   238  	podList := GetPodList(ctx, c, ss)
   239  	for _, statefulPod := range podList.Items {
   240  		stdout, err := e2epodoutput.RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
   241  		framework.Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout)
   242  		if err != nil {
   243  			return err
   244  		}
   245  	}
   246  	return nil
   247  }
   248  
   249  // update updates a statefulset, and it is only used within rest.go
   250  func update(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) *appsv1.StatefulSet {
   251  	for i := 0; i < 3; i++ {
   252  		ss, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
   253  		if err != nil {
   254  			framework.Failf("failed to get statefulset %q: %v", name, err)
   255  		}
   256  		*(ss.Spec.Replicas) = replicas
   257  		ss, err = c.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
   258  		if err == nil {
   259  			return ss
   260  		}
   261  		if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
   262  			framework.Failf("failed to update statefulset %q: %v", name, err)
   263  		}
   264  	}
   265  	framework.Failf("too many retries draining statefulset %q", name)
   266  	return nil
   267  }
   268  

View as plain text