...

Source file src/k8s.io/kubernetes/test/e2e/framework/resource/resources.go

Documentation: k8s.io/kubernetes/test/e2e/framework/resource

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package resource
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	"github.com/onsi/ginkgo/v2"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/fields"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	"k8s.io/apimachinery/pkg/runtime/schema"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	"k8s.io/client-go/dynamic"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	scaleclient "k8s.io/client-go/scale"
    36  	"k8s.io/kubernetes/test/e2e/framework"
    37  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    38  	testutils "k8s.io/kubernetes/test/utils"
    39  )
    40  
    41  const (
    42  	// Number of objects that gc can delete in a second.
    43  	// GC issues 2 requestes for single delete.
    44  	gcThroughput = 10
    45  )
    46  
    47  // ScaleResource scales resource to the given size.
    48  func ScaleResource(
    49  	ctx context.Context,
    50  	clientset clientset.Interface,
    51  	scalesGetter scaleclient.ScalesGetter,
    52  	ns, name string,
    53  	size uint,
    54  	wait bool,
    55  	kind schema.GroupKind,
    56  	gvr schema.GroupVersionResource,
    57  ) error {
    58  	ginkgo.By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
    59  	if err := testutils.ScaleResourceWithRetries(scalesGetter, ns, name, size, gvr); err != nil {
    60  		return fmt.Errorf("error while scaling RC %s to %d replicas: %w", name, size, err)
    61  	}
    62  	if !wait {
    63  		return nil
    64  	}
    65  	return WaitForControlledPodsRunning(ctx, clientset, ns, name, kind)
    66  }
    67  
    68  // DeleteResourceAndWaitForGC deletes only given resource and waits for GC to delete the pods.
    69  func DeleteResourceAndWaitForGC(ctx context.Context, c clientset.Interface, kind schema.GroupKind, ns, name string) error {
    70  	ginkgo.By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", kind, name, ns))
    71  
    72  	rtObject, err := GetRuntimeObjectForKind(ctx, c, kind, ns, name)
    73  	if err != nil {
    74  		if apierrors.IsNotFound(err) {
    75  			framework.Logf("%v %s not found: %v", kind, name, err)
    76  			return nil
    77  		}
    78  		return err
    79  	}
    80  	deleteObject := func() error {
    81  		background := metav1.DeletePropagationBackground
    82  		return testutils.DeleteResource(c, kind, ns, name, metav1.DeleteOptions{PropagationPolicy: &background})
    83  	}
    84  	return deleteObjectAndWaitForGC(ctx, c, rtObject, deleteObject, ns, name, kind.String())
    85  }
    86  
    87  // DeleteCustomResourceAndWaitForGC deletes only given resource and waits for GC to delete the pods.
    88  // Enables to provide a custom resourece client, e.g. to fetch a CRD object.
    89  func DeleteCustomResourceAndWaitForGC(ctx context.Context, c clientset.Interface, dynamicClient dynamic.Interface, scaleClient scaleclient.ScalesGetter, gvr schema.GroupVersionResource, ns, name string) error {
    90  	ginkgo.By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", gvr, name, ns))
    91  	resourceClient := dynamicClient.Resource(gvr).Namespace(ns)
    92  	_, err := resourceClient.Get(ctx, name, metav1.GetOptions{})
    93  	if err != nil {
    94  		if apierrors.IsNotFound(err) {
    95  			framework.Logf("%v %s not found: %v", gvr, name, err)
    96  			return nil
    97  		}
    98  		return err
    99  	}
   100  	scaleObj, err := scaleClient.Scales(ns).Get(ctx, gvr.GroupResource(), name, metav1.GetOptions{})
   101  	if err != nil {
   102  		framework.Logf("error while trying to get scale subresource of kind %v with name %v: %v", gvr, name, err)
   103  		return nil
   104  	}
   105  	deleteObject := func() error {
   106  		background := metav1.DeletePropagationBackground
   107  		return resourceClient.Delete(ctx, name, metav1.DeleteOptions{PropagationPolicy: &background})
   108  	}
   109  	return deleteObjectAndWaitForGC(ctx, c, scaleObj, deleteObject, ns, name, gvr.String())
   110  }
   111  
   112  func deleteObjectAndWaitForGC(ctx context.Context, c clientset.Interface, rtObject runtime.Object, deleteObject func() error, ns, name, description string) error {
   113  	selector, err := GetSelectorFromRuntimeObject(rtObject)
   114  	if err != nil {
   115  		return err
   116  	}
   117  	replicas, err := GetReplicasFromRuntimeObject(rtObject)
   118  	if err != nil {
   119  		return err
   120  	}
   121  
   122  	ps, err := testutils.NewPodStore(c, ns, selector, fields.Everything())
   123  	if err != nil {
   124  		return err
   125  	}
   126  
   127  	defer ps.Stop()
   128  	startTime := time.Now()
   129  	if err := testutils.RetryWithExponentialBackOff(func() (bool, error) {
   130  		err := deleteObject()
   131  		if err == nil || apierrors.IsNotFound(err) {
   132  			return true, nil
   133  		}
   134  		return false, fmt.Errorf("failed to delete object with non-retriable error: %w", err)
   135  	}); err != nil {
   136  		return err
   137  	}
   138  	deleteTime := time.Since(startTime)
   139  	framework.Logf("Deleting %v %s took: %v", description, name, deleteTime)
   140  
   141  	var interval, timeout time.Duration
   142  	switch {
   143  	case replicas < 100:
   144  		interval = 100 * time.Millisecond
   145  	case replicas < 1000:
   146  		interval = 1 * time.Second
   147  	default:
   148  		interval = 10 * time.Second
   149  	}
   150  	if replicas < 5000 {
   151  		timeout = 10 * time.Minute
   152  	} else {
   153  		timeout = time.Duration(replicas/gcThroughput) * time.Second
   154  		// gcThroughput is pretty strict now, add a bit more to it
   155  		timeout = timeout + 3*time.Minute
   156  	}
   157  
   158  	err = waitForPodsInactive(ctx, ps, interval, timeout)
   159  	if err != nil {
   160  		return fmt.Errorf("error while waiting for pods to become inactive %s: %w", name, err)
   161  	}
   162  	terminatePodTime := time.Since(startTime) - deleteTime
   163  	framework.Logf("Terminating %v %s pods took: %v", description, name, terminatePodTime)
   164  
   165  	// In gce, at any point, small percentage of nodes can disappear for
   166  	// ~10 minutes due to hostError. 20 minutes should be long enough to
   167  	// restart VM in that case and delete the pod.
   168  	err = waitForPodsGone(ctx, ps, interval, 20*time.Minute)
   169  	if err != nil {
   170  		return fmt.Errorf("error while waiting for pods gone %s: %w", name, err)
   171  	}
   172  	return nil
   173  }
   174  
   175  // waitForPodsGone waits until there are no pods left in the PodStore.
   176  func waitForPodsGone(ctx context.Context, ps *testutils.PodStore, interval, timeout time.Duration) error {
   177  	var pods []*v1.Pod
   178  	err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
   179  		if pods = ps.List(); len(pods) == 0 {
   180  			return true, nil
   181  		}
   182  		return false, nil
   183  	})
   184  
   185  	if wait.Interrupted(err) {
   186  		for _, pod := range pods {
   187  			framework.Logf("ERROR: Pod %q still exists. Node: %q", pod.Name, pod.Spec.NodeName)
   188  		}
   189  		return fmt.Errorf("there are %d pods left. E.g. %q on node %q", len(pods), pods[0].Name, pods[0].Spec.NodeName)
   190  	}
   191  	return err
   192  }
   193  
   194  // waitForPodsInactive waits until there are no active pods left in the PodStore.
   195  // This is to make a fair comparison of deletion time between DeleteRCAndPods
   196  // and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas
   197  // when the pod is inactvie.
   198  func waitForPodsInactive(ctx context.Context, ps *testutils.PodStore, interval, timeout time.Duration) error {
   199  	var activePods []*v1.Pod
   200  	err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
   201  		pods := ps.List()
   202  		activePods = e2epod.FilterActivePods(pods)
   203  		if len(activePods) != 0 {
   204  			return false, nil
   205  		}
   206  		return true, nil
   207  	})
   208  
   209  	if wait.Interrupted(err) {
   210  		for _, pod := range activePods {
   211  			framework.Logf("ERROR: Pod %q running on %q is still active", pod.Name, pod.Spec.NodeName)
   212  		}
   213  		return fmt.Errorf("there are %d active pods. E.g. %q on node %q", len(activePods), activePods[0].Name, activePods[0].Spec.NodeName)
   214  	}
   215  	return err
   216  }
   217  
   218  // WaitForControlledPodsRunning waits up to 10 minutes for pods to become Running.
   219  func WaitForControlledPodsRunning(ctx context.Context, c clientset.Interface, ns, name string, kind schema.GroupKind) error {
   220  	rtObject, err := GetRuntimeObjectForKind(ctx, c, kind, ns, name)
   221  	if err != nil {
   222  		return err
   223  	}
   224  	selector, err := GetSelectorFromRuntimeObject(rtObject)
   225  	if err != nil {
   226  		return err
   227  	}
   228  	replicas, err := GetReplicasFromRuntimeObject(rtObject)
   229  	if err != nil {
   230  		return err
   231  	}
   232  	err = testutils.WaitForEnoughPodsWithLabelRunning(c, ns, selector, int(replicas))
   233  	if err != nil {
   234  		return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %w", name, err)
   235  	}
   236  	return nil
   237  }
   238  
   239  // WaitForControlledPods waits up to podListTimeout for getting pods of the specified controller name and return them.
   240  func WaitForControlledPods(ctx context.Context, c clientset.Interface, ns, name string, kind schema.GroupKind) (pods *v1.PodList, err error) {
   241  	rtObject, err := GetRuntimeObjectForKind(ctx, c, kind, ns, name)
   242  	if err != nil {
   243  		return nil, err
   244  	}
   245  	selector, err := GetSelectorFromRuntimeObject(rtObject)
   246  	if err != nil {
   247  		return nil, err
   248  	}
   249  	return e2epod.WaitForPodsWithLabel(ctx, c, ns, selector)
   250  }
   251  

View as plain text