...

Source file src/k8s.io/kubernetes/test/integration/deployment/util.go

Documentation: k8s.io/kubernetes/test/integration/deployment

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package deployment
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	apps "k8s.io/api/apps/v1"
    27  	v1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	"k8s.io/client-go/informers"
    31  	clientset "k8s.io/client-go/kubernetes"
    32  	restclient "k8s.io/client-go/rest"
    33  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    34  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    35  	"k8s.io/kubernetes/pkg/controller/deployment"
    36  	deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
    37  	"k8s.io/kubernetes/pkg/controller/replicaset"
    38  	"k8s.io/kubernetes/test/integration/framework"
    39  	testutil "k8s.io/kubernetes/test/utils"
    40  )
    41  
    42  const (
    43  	pollInterval = 100 * time.Millisecond
    44  	pollTimeout  = 60 * time.Second
    45  
    46  	fakeContainerName = "fake-name"
    47  	fakeImage         = "fakeimage"
    48  )
    49  
    50  var pauseFn = func(update *apps.Deployment) {
    51  	update.Spec.Paused = true
    52  }
    53  
    54  var resumeFn = func(update *apps.Deployment) {
    55  	update.Spec.Paused = false
    56  }
    57  
    58  type deploymentTester struct {
    59  	t          *testing.T
    60  	c          clientset.Interface
    61  	deployment *apps.Deployment
    62  }
    63  
    64  func testLabels() map[string]string {
    65  	return map[string]string{"name": "test"}
    66  }
    67  
    68  // newDeployment returns a RollingUpdate Deployment with a fake container image
    69  func newDeployment(name, ns string, replicas int32) *apps.Deployment {
    70  	return &apps.Deployment{
    71  		TypeMeta: metav1.TypeMeta{
    72  			Kind:       "Deployment",
    73  			APIVersion: "apps/v1",
    74  		},
    75  		ObjectMeta: metav1.ObjectMeta{
    76  			Namespace: ns,
    77  			Name:      name,
    78  		},
    79  		Spec: apps.DeploymentSpec{
    80  			Replicas: &replicas,
    81  			Selector: &metav1.LabelSelector{MatchLabels: testLabels()},
    82  			Strategy: apps.DeploymentStrategy{
    83  				Type:          apps.RollingUpdateDeploymentStrategyType,
    84  				RollingUpdate: new(apps.RollingUpdateDeployment),
    85  			},
    86  			Template: v1.PodTemplateSpec{
    87  				ObjectMeta: metav1.ObjectMeta{
    88  					Labels: testLabels(),
    89  				},
    90  				Spec: v1.PodSpec{
    91  					Containers: []v1.Container{
    92  						{
    93  							Name:  fakeContainerName,
    94  							Image: fakeImage,
    95  						},
    96  					},
    97  				},
    98  			},
    99  		},
   100  	}
   101  }
   102  
   103  // dcSetup sets up necessities for Deployment integration test, including control plane, apiserver, informers, and clientset
   104  func dcSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *replicaset.ReplicaSetController, *deployment.DeploymentController, informers.SharedInformerFactory, clientset.Interface) {
   105  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   106  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   107  
   108  	config := restclient.CopyConfig(server.ClientConfig)
   109  	clientSet, err := clientset.NewForConfig(config)
   110  	if err != nil {
   111  		t.Fatalf("error in create clientset: %v", err)
   112  	}
   113  	resyncPeriod := 12 * time.Hour
   114  	informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "deployment-informers")), resyncPeriod)
   115  
   116  	dc, err := deployment.NewDeploymentController(
   117  		ctx,
   118  		informers.Apps().V1().Deployments(),
   119  		informers.Apps().V1().ReplicaSets(),
   120  		informers.Core().V1().Pods(),
   121  		clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "deployment-controller")),
   122  	)
   123  	if err != nil {
   124  		t.Fatalf("error creating Deployment controller: %v", err)
   125  	}
   126  	rm := replicaset.NewReplicaSetController(
   127  		ctx,
   128  		informers.Apps().V1().ReplicaSets(),
   129  		informers.Core().V1().Pods(),
   130  		clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "replicaset-controller")),
   131  		replicaset.BurstReplicas,
   132  	)
   133  	return server.TearDownFn, rm, dc, informers, clientSet
   134  }
   135  
   136  // dcSimpleSetup sets up necessities for Deployment integration test, including control plane, apiserver,
   137  // and clientset, but not controllers and informers
   138  func dcSimpleSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, clientset.Interface) {
   139  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   140  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   141  
   142  	config := restclient.CopyConfig(server.ClientConfig)
   143  	clientSet, err := clientset.NewForConfig(config)
   144  	if err != nil {
   145  		t.Fatalf("error in create clientset: %v", err)
   146  	}
   147  	return server.TearDownFn, clientSet
   148  }
   149  
   150  // runControllersAndInformers runs RS and deployment controllers and informers
   151  func runControllersAndInformers(t *testing.T, rm *replicaset.ReplicaSetController, dc *deployment.DeploymentController, informers informers.SharedInformerFactory) func() {
   152  	ctx, cancelFn := context.WithCancel(context.Background())
   153  	informers.Start(ctx.Done())
   154  	go rm.Run(ctx, 5)
   155  	go dc.Run(ctx, 5)
   156  	return cancelFn
   157  }
   158  
   159  // addPodConditionReady sets given pod status to ready at given time
   160  func addPodConditionReady(pod *v1.Pod, time metav1.Time) {
   161  	pod.Status = v1.PodStatus{
   162  		Phase: v1.PodRunning,
   163  		Conditions: []v1.PodCondition{
   164  			{
   165  				Type:               v1.PodReady,
   166  				Status:             v1.ConditionTrue,
   167  				LastTransitionTime: time,
   168  			},
   169  		},
   170  	}
   171  }
   172  
   173  func (d *deploymentTester) waitForDeploymentRevisionAndImage(revision, image string) error {
   174  	if err := testutil.WaitForDeploymentRevisionAndImage(d.c, d.deployment.Namespace, d.deployment.Name, revision, image, d.t.Logf, pollInterval, pollTimeout); err != nil {
   175  		return fmt.Errorf("failed to wait for Deployment revision %s: %v", d.deployment.Name, err)
   176  	}
   177  	return nil
   178  }
   179  
   180  func markPodReady(c clientset.Interface, ns string, pod *v1.Pod) error {
   181  	addPodConditionReady(pod, metav1.Now())
   182  	_, err := c.CoreV1().Pods(ns).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{})
   183  	return err
   184  }
   185  
   186  // markUpdatedPodsReady manually marks updated Deployment pods status to ready,
   187  // until the deployment is complete
   188  func (d *deploymentTester) markUpdatedPodsReady(wg *sync.WaitGroup) {
   189  	defer wg.Done()
   190  
   191  	ns := d.deployment.Namespace
   192  	err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
   193  		// We're done when the deployment is complete
   194  		if completed, err := d.deploymentComplete(); err != nil {
   195  			return false, err
   196  		} else if completed {
   197  			return true, nil
   198  		}
   199  		// Otherwise, mark remaining pods as ready
   200  		pods, err := d.listUpdatedPods()
   201  		if err != nil {
   202  			d.t.Log(err)
   203  			return false, nil
   204  		}
   205  		d.t.Logf("%d/%d of deployment pods are created", len(pods), *d.deployment.Spec.Replicas)
   206  		for i := range pods {
   207  			pod := pods[i]
   208  			if podutil.IsPodReady(&pod) {
   209  				continue
   210  			}
   211  			if err = markPodReady(d.c, ns, &pod); err != nil {
   212  				d.t.Logf("failed to update Deployment pod %s, will retry later: %v", pod.Name, err)
   213  			}
   214  		}
   215  		return false, nil
   216  	})
   217  	if err != nil {
   218  		d.t.Errorf("failed to mark updated Deployment pods to ready: %v", err)
   219  	}
   220  }
   221  
   222  func (d *deploymentTester) deploymentComplete() (bool, error) {
   223  	latest, err := d.c.AppsV1().Deployments(d.deployment.Namespace).Get(context.TODO(), d.deployment.Name, metav1.GetOptions{})
   224  	if err != nil {
   225  		return false, err
   226  	}
   227  	return deploymentutil.DeploymentComplete(d.deployment, &latest.Status), nil
   228  }
   229  
   230  // Waits for the deployment to complete, and check rolling update strategy isn't broken at any times.
   231  // Rolling update strategy should not be broken during a rolling update.
   232  func (d *deploymentTester) waitForDeploymentCompleteAndCheckRolling() error {
   233  	return testutil.WaitForDeploymentCompleteAndCheckRolling(d.c, d.deployment, d.t.Logf, pollInterval, pollTimeout)
   234  }
   235  
   236  // Waits for the deployment to complete, and don't check if rolling update strategy is broken.
   237  // Rolling update strategy is used only during a rolling update, and can be violated in other situations,
   238  // such as shortly after a scaling event or the deployment is just created.
   239  func (d *deploymentTester) waitForDeploymentComplete() error {
   240  	return testutil.WaitForDeploymentComplete(d.c, d.deployment, d.t.Logf, pollInterval, pollTimeout)
   241  }
   242  
   243  // waitForDeploymentCompleteAndCheckRollingAndMarkPodsReady waits for the Deployment to complete
   244  // while marking updated Deployment pods as ready at the same time.
   245  // Uses hard check to make sure rolling update strategy is not violated at any times.
   246  func (d *deploymentTester) waitForDeploymentCompleteAndCheckRollingAndMarkPodsReady() error {
   247  	var wg sync.WaitGroup
   248  
   249  	// Manually mark updated Deployment pods as ready in a separate goroutine
   250  	wg.Add(1)
   251  	go d.markUpdatedPodsReady(&wg)
   252  	// Wait for goroutine to finish, for all return paths.
   253  	defer wg.Wait()
   254  
   255  	// Wait for the Deployment status to complete while Deployment pods are becoming ready
   256  	err := d.waitForDeploymentCompleteAndCheckRolling()
   257  	if err != nil {
   258  		return fmt.Errorf("failed to wait for Deployment %s to complete: %v", d.deployment.Name, err)
   259  	}
   260  
   261  	return nil
   262  }
   263  
   264  // waitForDeploymentCompleteAndMarkPodsReady waits for the Deployment to complete
   265  // while marking updated Deployment pods as ready at the same time.
   266  func (d *deploymentTester) waitForDeploymentCompleteAndMarkPodsReady() error {
   267  	var wg sync.WaitGroup
   268  
   269  	// Manually mark updated Deployment pods as ready in a separate goroutine
   270  	wg.Add(1)
   271  	go d.markUpdatedPodsReady(&wg)
   272  
   273  	// Wait for the Deployment status to complete using soft check, while Deployment pods are becoming ready
   274  	err := d.waitForDeploymentComplete()
   275  	if err != nil {
   276  		return fmt.Errorf("failed to wait for Deployment status %s: %v", d.deployment.Name, err)
   277  	}
   278  
   279  	// Wait for goroutine to finish
   280  	wg.Wait()
   281  
   282  	return nil
   283  }
   284  
   285  func (d *deploymentTester) updateDeployment(applyUpdate testutil.UpdateDeploymentFunc) (*apps.Deployment, error) {
   286  	return testutil.UpdateDeploymentWithRetries(d.c, d.deployment.Namespace, d.deployment.Name, applyUpdate, d.t.Logf, pollInterval, pollTimeout)
   287  }
   288  
   289  func (d *deploymentTester) waitForObservedDeployment(desiredGeneration int64) error {
   290  	if err := testutil.WaitForObservedDeployment(d.c, d.deployment.Namespace, d.deployment.Name, desiredGeneration); err != nil {
   291  		return fmt.Errorf("failed waiting for ObservedGeneration of deployment %s to become %d: %v", d.deployment.Name, desiredGeneration, err)
   292  	}
   293  	return nil
   294  }
   295  
   296  func (d *deploymentTester) getNewReplicaSet() (*apps.ReplicaSet, error) {
   297  	deployment, err := d.c.AppsV1().Deployments(d.deployment.Namespace).Get(context.TODO(), d.deployment.Name, metav1.GetOptions{})
   298  	if err != nil {
   299  		return nil, fmt.Errorf("failed retrieving deployment %s: %v", d.deployment.Name, err)
   300  	}
   301  	rs, err := testutil.GetNewReplicaSet(deployment, d.c)
   302  	if err != nil {
   303  		return nil, fmt.Errorf("failed retrieving new replicaset of deployment %s: %v", d.deployment.Name, err)
   304  	}
   305  	return rs, nil
   306  }
   307  
   308  func (d *deploymentTester) expectNoNewReplicaSet() error {
   309  	rs, err := d.getNewReplicaSet()
   310  	if err != nil {
   311  		return err
   312  	}
   313  	if rs != nil {
   314  		return fmt.Errorf("expected deployment %s not to create a new replicaset, got %v", d.deployment.Name, rs)
   315  	}
   316  	return nil
   317  }
   318  
   319  func (d *deploymentTester) expectNewReplicaSet() (*apps.ReplicaSet, error) {
   320  	rs, err := d.getNewReplicaSet()
   321  	if err != nil {
   322  		return nil, err
   323  	}
   324  	if rs == nil {
   325  		return nil, fmt.Errorf("expected deployment %s to create a new replicaset, got nil", d.deployment.Name)
   326  	}
   327  	return rs, nil
   328  }
   329  
   330  func (d *deploymentTester) updateReplicaSet(name string, applyUpdate testutil.UpdateReplicaSetFunc) (*apps.ReplicaSet, error) {
   331  	return testutil.UpdateReplicaSetWithRetries(d.c, d.deployment.Namespace, name, applyUpdate, d.t.Logf, pollInterval, pollTimeout)
   332  }
   333  
   334  func (d *deploymentTester) waitForDeploymentUpdatedReplicasGTE(minUpdatedReplicas int32) error {
   335  	return testutil.WaitForDeploymentUpdatedReplicasGTE(d.c, d.deployment.Namespace, d.deployment.Name, minUpdatedReplicas, d.deployment.Generation, pollInterval, pollTimeout)
   336  }
   337  
   338  func (d *deploymentTester) waitForDeploymentWithCondition(reason string, condType apps.DeploymentConditionType) error {
   339  	return testutil.WaitForDeploymentWithCondition(d.c, d.deployment.Namespace, d.deployment.Name, reason, condType, d.t.Logf, pollInterval, pollTimeout)
   340  }
   341  
   342  func (d *deploymentTester) listUpdatedPods() ([]v1.Pod, error) {
   343  	selector, err := metav1.LabelSelectorAsSelector(d.deployment.Spec.Selector)
   344  	if err != nil {
   345  		return nil, fmt.Errorf("failed to parse deployment selector: %v", err)
   346  	}
   347  	pods, err := d.c.CoreV1().Pods(d.deployment.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String()})
   348  	if err != nil {
   349  		return nil, fmt.Errorf("failed to list deployment pods, will retry later: %v", err)
   350  	}
   351  	newRS, err := d.getNewReplicaSet()
   352  	if err != nil {
   353  		return nil, fmt.Errorf("failed to get new replicaset of deployment %q: %v", d.deployment.Name, err)
   354  	}
   355  	if newRS == nil {
   356  		return nil, fmt.Errorf("unable to find new replicaset of deployment %q", d.deployment.Name)
   357  	}
   358  
   359  	var ownedPods []v1.Pod
   360  	for _, pod := range pods.Items {
   361  		rs := metav1.GetControllerOf(&pod)
   362  		if rs.UID == newRS.UID {
   363  			ownedPods = append(ownedPods, pod)
   364  		}
   365  	}
   366  	return ownedPods, nil
   367  }
   368  
   369  func (d *deploymentTester) waitRSStable(replicaset *apps.ReplicaSet) error {
   370  	return testutil.WaitRSStable(d.t, d.c, replicaset, pollInterval, pollTimeout)
   371  }
   372  
   373  func (d *deploymentTester) scaleDeployment(newReplicas int32) error {
   374  	var err error
   375  	d.deployment, err = d.updateDeployment(func(update *apps.Deployment) {
   376  		update.Spec.Replicas = &newReplicas
   377  	})
   378  	if err != nil {
   379  		return fmt.Errorf("failed updating deployment %q: %v", d.deployment.Name, err)
   380  	}
   381  
   382  	if err := d.waitForDeploymentCompleteAndMarkPodsReady(); err != nil {
   383  		return err
   384  	}
   385  
   386  	rs, err := d.expectNewReplicaSet()
   387  	if err != nil {
   388  		return err
   389  	}
   390  	if *rs.Spec.Replicas != newReplicas {
   391  		return fmt.Errorf("expected new replicaset replicas = %d, got %d", newReplicas, *rs.Spec.Replicas)
   392  	}
   393  	return nil
   394  }
   395  
   396  // waitForReadyReplicas waits for number of ready replicas to equal number of replicas.
   397  func (d *deploymentTester) waitForReadyReplicas() error {
   398  	if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
   399  		deployment, err := d.c.AppsV1().Deployments(d.deployment.Namespace).Get(context.TODO(), d.deployment.Name, metav1.GetOptions{})
   400  		if err != nil {
   401  			return false, fmt.Errorf("failed to get deployment %q: %v", d.deployment.Name, err)
   402  		}
   403  		return deployment.Status.ReadyReplicas == *deployment.Spec.Replicas, nil
   404  	}); err != nil {
   405  		return fmt.Errorf("failed to wait for .readyReplicas to equal .replicas: %v", err)
   406  	}
   407  	return nil
   408  }
   409  
   410  // markUpdatedPodsReadyWithoutComplete marks updated Deployment pods as ready without waiting for deployment to complete.
   411  func (d *deploymentTester) markUpdatedPodsReadyWithoutComplete() error {
   412  	if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
   413  		pods, err := d.listUpdatedPods()
   414  		if err != nil {
   415  			return false, err
   416  		}
   417  		for i := range pods {
   418  			pod := pods[i]
   419  			if podutil.IsPodReady(&pod) {
   420  				continue
   421  			}
   422  			if err = markPodReady(d.c, d.deployment.Namespace, &pod); err != nil {
   423  				d.t.Logf("failed to update Deployment pod %q, will retry later: %v", pod.Name, err)
   424  				return false, nil
   425  			}
   426  		}
   427  		return true, nil
   428  	}); err != nil {
   429  		return fmt.Errorf("failed to mark all updated pods as ready: %v", err)
   430  	}
   431  	return nil
   432  }
   433  
   434  // Verify all replicas fields of DeploymentStatus have desired count.
   435  // Immediately return an error when found a non-matching replicas field.
   436  func (d *deploymentTester) checkDeploymentStatusReplicasFields(replicas, updatedReplicas, readyReplicas, availableReplicas, unavailableReplicas int32) error {
   437  	deployment, err := d.c.AppsV1().Deployments(d.deployment.Namespace).Get(context.TODO(), d.deployment.Name, metav1.GetOptions{})
   438  	if err != nil {
   439  		return fmt.Errorf("failed to get deployment %q: %v", d.deployment.Name, err)
   440  	}
   441  	if deployment.Status.Replicas != replicas {
   442  		return fmt.Errorf("unexpected .replicas: expect %d, got %d", replicas, deployment.Status.Replicas)
   443  	}
   444  	if deployment.Status.UpdatedReplicas != updatedReplicas {
   445  		return fmt.Errorf("unexpected .updatedReplicas: expect %d, got %d", updatedReplicas, deployment.Status.UpdatedReplicas)
   446  	}
   447  	if deployment.Status.ReadyReplicas != readyReplicas {
   448  		return fmt.Errorf("unexpected .readyReplicas: expect %d, got %d", readyReplicas, deployment.Status.ReadyReplicas)
   449  	}
   450  	if deployment.Status.AvailableReplicas != availableReplicas {
   451  		return fmt.Errorf("unexpected .replicas: expect %d, got %d", availableReplicas, deployment.Status.AvailableReplicas)
   452  	}
   453  	if deployment.Status.UnavailableReplicas != unavailableReplicas {
   454  		return fmt.Errorf("unexpected .replicas: expect %d, got %d", unavailableReplicas, deployment.Status.UnavailableReplicas)
   455  	}
   456  	return nil
   457  }
   458  

View as plain text