...

Source file src/k8s.io/kubernetes/test/integration/statefulset/util.go

Documentation: k8s.io/kubernetes/test/integration/statefulset

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"k8s.io/kubernetes/test/utils/ktesting"
    23  	"sync"
    24  	"testing"
    25  	"time"
    26  
    27  	appsv1 "k8s.io/api/apps/v1"
    28  	v1 "k8s.io/api/core/v1"
    29  	"k8s.io/apimachinery/pkg/api/resource"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/labels"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	"k8s.io/apiserver/pkg/admission"
    34  	"k8s.io/client-go/informers"
    35  	clientset "k8s.io/client-go/kubernetes"
    36  	typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
    37  	typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
    38  	restclient "k8s.io/client-go/rest"
    39  	"k8s.io/client-go/util/retry"
    40  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    41  	api "k8s.io/kubernetes/pkg/apis/core"
    42  
    43  	//svc "k8s.io/kubernetes/pkg/api/v1/service"
    44  	"k8s.io/kubernetes/pkg/controller/statefulset"
    45  	"k8s.io/kubernetes/test/integration/framework"
    46  )
    47  
    48  const (
    49  	pollInterval = 100 * time.Millisecond
    50  	pollTimeout  = 60 * time.Second
    51  )
    52  
    53  func labelMap() map[string]string {
    54  	return map[string]string{"foo": "bar"}
    55  }
    56  
    57  // newService returns a service with a fake name for StatefulSet to be created soon
    58  func newHeadlessService(namespace string) *v1.Service {
    59  	return &v1.Service{
    60  		TypeMeta: metav1.TypeMeta{
    61  			Kind:       "Service",
    62  			APIVersion: "v1",
    63  		},
    64  		ObjectMeta: metav1.ObjectMeta{
    65  			Namespace: namespace,
    66  			Name:      "fake-service-name",
    67  		},
    68  		Spec: v1.ServiceSpec{
    69  			ClusterIP: "None",
    70  			Ports: []v1.ServicePort{
    71  				{Port: 80, Name: "http", Protocol: "TCP"},
    72  			},
    73  			Selector: labelMap(),
    74  		},
    75  	}
    76  }
    77  
    78  // newSTS returns a StatefulSet with a fake container image
    79  func newSTS(name, namespace string, replicas int) *appsv1.StatefulSet {
    80  	replicasCopy := int32(replicas)
    81  	return &appsv1.StatefulSet{
    82  		TypeMeta: metav1.TypeMeta{
    83  			Kind:       "StatefulSet",
    84  			APIVersion: "apps/v1",
    85  		},
    86  		ObjectMeta: metav1.ObjectMeta{
    87  			Namespace: namespace,
    88  			Name:      name,
    89  		},
    90  		Spec: appsv1.StatefulSetSpec{
    91  			PodManagementPolicy: appsv1.ParallelPodManagement,
    92  			Replicas:            &replicasCopy,
    93  			Selector: &metav1.LabelSelector{
    94  				MatchLabels: labelMap(),
    95  			},
    96  			Template: v1.PodTemplateSpec{
    97  				ObjectMeta: metav1.ObjectMeta{
    98  					Labels: labelMap(),
    99  				},
   100  				Spec: v1.PodSpec{
   101  					Containers: []v1.Container{
   102  						{
   103  							Name:  "fake-name",
   104  							Image: "fakeimage",
   105  							VolumeMounts: []v1.VolumeMount{
   106  								{Name: "datadir", MountPath: "/data/"},
   107  								{Name: "home", MountPath: "/home"},
   108  							},
   109  						},
   110  					},
   111  					Volumes: []v1.Volume{
   112  						{
   113  							Name: "datadir",
   114  							VolumeSource: v1.VolumeSource{
   115  								PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   116  									ClaimName: "fake-pvc-name",
   117  								},
   118  							},
   119  						},
   120  						{
   121  							Name: "home",
   122  							VolumeSource: v1.VolumeSource{
   123  								HostPath: &v1.HostPathVolumeSource{
   124  									Path: fmt.Sprintf("/tmp/%v", "home"),
   125  								},
   126  							},
   127  						},
   128  					},
   129  				},
   130  			},
   131  			ServiceName: "fake-service-name",
   132  			UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
   133  				Type: appsv1.RollingUpdateStatefulSetStrategyType,
   134  			},
   135  			VolumeClaimTemplates: []v1.PersistentVolumeClaim{
   136  				// for volume mount "datadir"
   137  				newStatefulSetPVC("fake-pvc-name"),
   138  			},
   139  		},
   140  	}
   141  }
   142  
   143  func newStatefulSetPVC(name string) v1.PersistentVolumeClaim {
   144  	return v1.PersistentVolumeClaim{
   145  		ObjectMeta: metav1.ObjectMeta{
   146  			Name: name,
   147  			Annotations: map[string]string{
   148  				"volume.alpha.kubernetes.io/storage-class": "anything",
   149  			},
   150  		},
   151  		Spec: v1.PersistentVolumeClaimSpec{
   152  			AccessModes: []v1.PersistentVolumeAccessMode{
   153  				v1.ReadWriteOnce,
   154  			},
   155  			Resources: v1.VolumeResourceRequirements{
   156  				Requests: v1.ResourceList{
   157  					v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
   158  				},
   159  			},
   160  		},
   161  	}
   162  }
   163  
   164  // scSetup sets up necessities for Statefulset integration test, including control plane, apiserver, informers, and clientset
   165  func scSetup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
   166  	tCtx := ktesting.Init(t)
   167  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   168  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   169  
   170  	config := restclient.CopyConfig(server.ClientConfig)
   171  	clientSet, err := clientset.NewForConfig(config)
   172  	if err != nil {
   173  		t.Fatalf("error in create clientset: %v", err)
   174  	}
   175  	resyncPeriod := 12 * time.Hour
   176  	informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod)
   177  
   178  	sc := statefulset.NewStatefulSetController(
   179  		tCtx,
   180  		informers.Core().V1().Pods(),
   181  		informers.Apps().V1().StatefulSets(),
   182  		informers.Core().V1().PersistentVolumeClaims(),
   183  		informers.Apps().V1().ControllerRevisions(),
   184  		clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-controller")),
   185  	)
   186  
   187  	teardown := func() {
   188  		tCtx.Cancel("tearing down controller")
   189  		server.TearDownFn()
   190  	}
   191  	return tCtx, teardown, sc, informers, clientSet
   192  }
   193  
   194  // Run STS controller and informers
   195  func runControllerAndInformers(ctx context.Context, sc *statefulset.StatefulSetController, informers informers.SharedInformerFactory) context.CancelFunc {
   196  	ctx, cancel := context.WithCancel(ctx)
   197  	informers.Start(ctx.Done())
   198  	go sc.Run(ctx, 5)
   199  	return cancel
   200  }
   201  
   202  func createHeadlessService(t *testing.T, clientSet clientset.Interface, headlessService *v1.Service) {
   203  	_, err := clientSet.CoreV1().Services(headlessService.Namespace).Create(context.TODO(), headlessService, metav1.CreateOptions{})
   204  	if err != nil {
   205  		t.Fatalf("failed creating headless service: %v", err)
   206  	}
   207  }
   208  
   209  func createSTSs(t *testing.T, clientSet clientset.Interface, stss []*appsv1.StatefulSet) []*appsv1.StatefulSet {
   210  	var createdSTSs []*appsv1.StatefulSet
   211  	for _, sts := range stss {
   212  		createdSTS, err := clientSet.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{})
   213  		if err != nil {
   214  			t.Fatalf("failed to create sts %s: %v", sts.Name, err)
   215  		}
   216  		createdSTSs = append(createdSTSs, createdSTS)
   217  	}
   218  	return createdSTSs
   219  }
   220  
   221  func createPods(t *testing.T, clientSet clientset.Interface, pods []*v1.Pod) []*v1.Pod {
   222  	var createdPods []*v1.Pod
   223  	for _, pod := range pods {
   224  		createdPod, err := clientSet.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
   225  		if err != nil {
   226  			t.Fatalf("failed to create pod %s: %v", pod.Name, err)
   227  		}
   228  		createdPods = append(createdPods, createdPod)
   229  	}
   230  
   231  	return createdPods
   232  }
   233  
   234  func createSTSsPods(t *testing.T, clientSet clientset.Interface, stss []*appsv1.StatefulSet, pods []*v1.Pod) ([]*appsv1.StatefulSet, []*v1.Pod) {
   235  	return createSTSs(t, clientSet, stss), createPods(t, clientSet, pods)
   236  }
   237  
   238  // Verify .Status.Replicas is equal to .Spec.Replicas
   239  func waitSTSStable(t *testing.T, clientSet clientset.Interface, sts *appsv1.StatefulSet) {
   240  	stsClient := clientSet.AppsV1().StatefulSets(sts.Namespace)
   241  	desiredGeneration := sts.Generation
   242  	if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
   243  		newSTS, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
   244  		if err != nil {
   245  			return false, err
   246  		}
   247  		return newSTS.Status.Replicas == *newSTS.Spec.Replicas && newSTS.Status.ObservedGeneration >= desiredGeneration, nil
   248  	}); err != nil {
   249  		t.Fatalf("failed to verify .Status.Replicas is equal to .Spec.Replicas for sts %s: %v", sts.Name, err)
   250  	}
   251  }
   252  
   253  func updatePod(t *testing.T, podClient typedv1.PodInterface, podName string, updateFunc func(*v1.Pod)) *v1.Pod {
   254  	var pod *v1.Pod
   255  	if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   256  		newPod, err := podClient.Get(context.TODO(), podName, metav1.GetOptions{})
   257  		if err != nil {
   258  			return err
   259  		}
   260  		updateFunc(newPod)
   261  		pod, err = podClient.Update(context.TODO(), newPod, metav1.UpdateOptions{})
   262  		return err
   263  	}); err != nil {
   264  		t.Fatalf("failed to update pod %s: %v", podName, err)
   265  	}
   266  	return pod
   267  }
   268  
   269  func updatePodStatus(t *testing.T, podClient typedv1.PodInterface, podName string, updateStatusFunc func(*v1.Pod)) *v1.Pod {
   270  	var pod *v1.Pod
   271  	if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   272  		newPod, err := podClient.Get(context.TODO(), podName, metav1.GetOptions{})
   273  		if err != nil {
   274  			return err
   275  		}
   276  		updateStatusFunc(newPod)
   277  		pod, err = podClient.UpdateStatus(context.TODO(), newPod, metav1.UpdateOptions{})
   278  		return err
   279  	}); err != nil {
   280  		t.Fatalf("failed to update status of pod %s: %v", podName, err)
   281  	}
   282  	return pod
   283  }
   284  
   285  func getPods(t *testing.T, podClient typedv1.PodInterface, labelMap map[string]string) *v1.PodList {
   286  	podSelector := labels.Set(labelMap).AsSelector()
   287  	options := metav1.ListOptions{LabelSelector: podSelector.String()}
   288  	pods, err := podClient.List(context.TODO(), options)
   289  	if err != nil {
   290  		t.Fatalf("failed obtaining a list of pods that match the pod labels %v: %v", labelMap, err)
   291  	}
   292  	if pods == nil {
   293  		t.Fatalf("obtained a nil list of pods")
   294  	}
   295  	return pods
   296  }
   297  
   298  func getStatefulSetPVCs(t *testing.T, pvcClient typedv1.PersistentVolumeClaimInterface, sts *appsv1.StatefulSet) []*v1.PersistentVolumeClaim {
   299  	pvcs := []*v1.PersistentVolumeClaim{}
   300  	for i := int32(0); i < *sts.Spec.Replicas; i++ {
   301  		pvcName := fmt.Sprintf("%s-%s-%d", sts.Spec.VolumeClaimTemplates[0].Name, sts.Name, i)
   302  		pvc, err := pvcClient.Get(context.TODO(), pvcName, metav1.GetOptions{})
   303  		if err != nil {
   304  			t.Fatalf("failed to get PVC %s: %v", pvcName, err)
   305  		}
   306  		pvcs = append(pvcs, pvc)
   307  	}
   308  	return pvcs
   309  }
   310  
   311  func verifyOwnerRef(t *testing.T, pvc *v1.PersistentVolumeClaim, kind string, expected bool) {
   312  	found := false
   313  	for _, ref := range pvc.GetOwnerReferences() {
   314  		if ref.Kind == kind {
   315  			if expected {
   316  				found = true
   317  			} else {
   318  				t.Fatalf("Found %s ref but expected none for PVC %s", kind, pvc.Name)
   319  			}
   320  		}
   321  	}
   322  	if expected && !found {
   323  		t.Fatalf("Expected %s ref but found none for PVC %s", kind, pvc.Name)
   324  	}
   325  }
   326  
   327  func updateSTS(t *testing.T, stsClient typedappsv1.StatefulSetInterface, stsName string, updateFunc func(*appsv1.StatefulSet)) *appsv1.StatefulSet {
   328  	var sts *appsv1.StatefulSet
   329  	if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   330  		newSTS, err := stsClient.Get(context.TODO(), stsName, metav1.GetOptions{})
   331  		if err != nil {
   332  			return err
   333  		}
   334  		updateFunc(newSTS)
   335  		sts, err = stsClient.Update(context.TODO(), newSTS, metav1.UpdateOptions{})
   336  		return err
   337  	}); err != nil {
   338  		t.Fatalf("failed to update sts %s: %v", stsName, err)
   339  	}
   340  	return sts
   341  }
   342  
   343  // Update .Spec.Replicas to replicas and verify .Status.Replicas is changed accordingly
   344  func scaleSTS(t *testing.T, c clientset.Interface, sts *appsv1.StatefulSet, replicas int32) {
   345  	stsClient := c.AppsV1().StatefulSets(sts.Namespace)
   346  	if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   347  		newSTS, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
   348  		if err != nil {
   349  			return err
   350  		}
   351  		*newSTS.Spec.Replicas = replicas
   352  		sts, err = stsClient.Update(context.TODO(), newSTS, metav1.UpdateOptions{})
   353  		return err
   354  	}); err != nil {
   355  		t.Fatalf("failed to update .Spec.Replicas to %d for sts %s: %v", replicas, sts.Name, err)
   356  	}
   357  	waitSTSStable(t, c, sts)
   358  }
   359  
   360  var _ admission.ValidationInterface = &fakePodFailAdmission{}
   361  
   362  type fakePodFailAdmission struct {
   363  	lock             sync.Mutex
   364  	limitedPodNumber int
   365  	succeedPodsCount int
   366  }
   367  
   368  func (f *fakePodFailAdmission) Handles(operation admission.Operation) bool {
   369  	return operation == admission.Create
   370  }
   371  
   372  func (f *fakePodFailAdmission) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) {
   373  	if attr.GetKind().GroupKind() != api.Kind("Pod") {
   374  		return nil
   375  	}
   376  
   377  	f.lock.Lock()
   378  	defer f.lock.Unlock()
   379  
   380  	if f.succeedPodsCount >= f.limitedPodNumber {
   381  		return fmt.Errorf("fakePodFailAdmission error")
   382  	}
   383  	f.succeedPodsCount++
   384  	return nil
   385  }
   386  

View as plain text