...

Source file src/k8s.io/kubernetes/test/integration/util/util.go

Documentation: k8s.io/kubernetes/test/integration/util

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package util
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"errors"
    23  	"fmt"
    24  	"net/http"
    25  	"sync/atomic"
    26  	"testing"
    27  	"time"
    28  
    29  	v1 "k8s.io/api/core/v1"
    30  	policy "k8s.io/api/policy/v1"
    31  	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
    32  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/types"
    35  	"k8s.io/apimachinery/pkg/util/uuid"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/apiserver/pkg/admission"
    38  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    39  	cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
    40  	"k8s.io/client-go/dynamic"
    41  	"k8s.io/client-go/dynamic/dynamicinformer"
    42  	"k8s.io/client-go/informers"
    43  	clientset "k8s.io/client-go/kubernetes"
    44  	corelisters "k8s.io/client-go/listers/core/v1"
    45  	"k8s.io/client-go/metadata"
    46  	"k8s.io/client-go/metadata/metadatainformer"
    47  	restclient "k8s.io/client-go/rest"
    48  	"k8s.io/client-go/restmapper"
    49  	"k8s.io/client-go/scale"
    50  	"k8s.io/client-go/tools/cache"
    51  	"k8s.io/client-go/tools/events"
    52  	cliflag "k8s.io/component-base/cli/flag"
    53  	pvutil "k8s.io/component-helpers/storage/volume"
    54  	"k8s.io/controller-manager/pkg/informerfactory"
    55  	"k8s.io/klog/v2"
    56  	kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
    57  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    58  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    59  	"k8s.io/kubernetes/pkg/controller/disruption"
    60  	"k8s.io/kubernetes/pkg/controller/garbagecollector"
    61  	"k8s.io/kubernetes/pkg/controller/namespace"
    62  	"k8s.io/kubernetes/pkg/controller/resourceclaim"
    63  	"k8s.io/kubernetes/pkg/controlplane"
    64  	"k8s.io/kubernetes/pkg/features"
    65  	"k8s.io/kubernetes/pkg/scheduler"
    66  	kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
    67  	configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
    68  	schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
    69  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
    70  	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    71  	"k8s.io/kubernetes/pkg/scheduler/profile"
    72  	st "k8s.io/kubernetes/pkg/scheduler/testing"
    73  	taintutils "k8s.io/kubernetes/pkg/util/taints"
    74  	"k8s.io/kubernetes/test/integration/framework"
    75  	imageutils "k8s.io/kubernetes/test/utils/image"
    76  	"k8s.io/kubernetes/test/utils/ktesting"
    77  	"k8s.io/utils/ptr"
    78  )
    79  
    80  // ShutdownFunc represents the function handle to be called, typically in a defer handler, to shutdown a running module
    81  type ShutdownFunc func()
    82  
    83  // StartScheduler configures and starts a scheduler given a handle to the clientSet interface
    84  // and event broadcaster. It returns the running scheduler and podInformer. Background goroutines
    85  // will keep running until the context is canceled.
    86  func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConfig *restclient.Config, cfg *kubeschedulerconfig.KubeSchedulerConfiguration, outOfTreePluginRegistry frameworkruntime.Registry) (*scheduler.Scheduler, informers.SharedInformerFactory) {
    87  	informerFactory := scheduler.NewInformerFactory(clientSet, 0)
    88  	evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
    89  		Interface: clientSet.EventsV1()})
    90  	go func() {
    91  		<-ctx.Done()
    92  		evtBroadcaster.Shutdown()
    93  	}()
    94  
    95  	evtBroadcaster.StartRecordingToSink(ctx.Done())
    96  
    97  	logger := klog.FromContext(ctx)
    98  
    99  	sched, err := scheduler.New(
   100  		ctx,
   101  		clientSet,
   102  		informerFactory,
   103  		nil,
   104  		profile.NewRecorderFactory(evtBroadcaster),
   105  		scheduler.WithKubeConfig(kubeConfig),
   106  		scheduler.WithProfiles(cfg.Profiles...),
   107  		scheduler.WithPercentageOfNodesToScore(cfg.PercentageOfNodesToScore),
   108  		scheduler.WithPodMaxBackoffSeconds(cfg.PodMaxBackoffSeconds),
   109  		scheduler.WithPodInitialBackoffSeconds(cfg.PodInitialBackoffSeconds),
   110  		scheduler.WithExtenders(cfg.Extenders...),
   111  		scheduler.WithParallelism(cfg.Parallelism),
   112  		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreePluginRegistry),
   113  	)
   114  	if err != nil {
   115  		logger.Error(err, "Error creating scheduler")
   116  		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   117  	}
   118  
   119  	informerFactory.Start(ctx.Done())
   120  	informerFactory.WaitForCacheSync(ctx.Done())
   121  	if err = sched.WaitForHandlersSync(ctx); err != nil {
   122  		logger.Error(err, "Failed waiting for handlers to sync")
   123  		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   124  	}
   125  	logger.V(3).Info("Handlers synced")
   126  	go sched.Run(ctx)
   127  
   128  	return sched, informerFactory
   129  }
   130  
   131  func CreateResourceClaimController(ctx context.Context, tb ktesting.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() {
   132  	podInformer := informerFactory.Core().V1().Pods()
   133  	schedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts()
   134  	claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims()
   135  	claimTemplateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates()
   136  	claimController, err := resourceclaim.NewController(klog.FromContext(ctx), clientSet, podInformer, schedulingInformer, claimInformer, claimTemplateInformer)
   137  	if err != nil {
   138  		tb.Fatalf("Error creating claim controller: %v", err)
   139  	}
   140  	return func() {
   141  		go claimController.Run(ctx, 5 /* workers */)
   142  	}
   143  }
   144  
   145  // StartFakePVController is a simplified pv controller logic that sets PVC VolumeName and annotation for each PV binding.
   146  // TODO(mborsz): Use a real PV controller here.
   147  func StartFakePVController(ctx context.Context, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) {
   148  	pvInformer := informerFactory.Core().V1().PersistentVolumes()
   149  
   150  	syncPV := func(obj *v1.PersistentVolume) {
   151  		if obj.Spec.ClaimRef != nil {
   152  			claimRef := obj.Spec.ClaimRef
   153  			pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{})
   154  			if err != nil {
   155  				// Note that the error can be anything, because components like
   156  				// apiserver are also shutting down at the same time, but this
   157  				// check is conservative and only ignores the "context canceled"
   158  				// error while shutting down.
   159  				if ctx.Err() == nil || !errors.Is(err, context.Canceled) {
   160  					klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
   161  				}
   162  				return
   163  			}
   164  
   165  			if pvc.Spec.VolumeName == "" {
   166  				pvc.Spec.VolumeName = obj.Name
   167  				metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes")
   168  				_, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{})
   169  				if err != nil {
   170  					if ctx.Err() == nil || !errors.Is(err, context.Canceled) {
   171  						// Shutting down, no need to record this.
   172  						klog.Errorf("error while updating %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
   173  					}
   174  					return
   175  				}
   176  			}
   177  		}
   178  	}
   179  
   180  	pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   181  		AddFunc: func(obj interface{}) {
   182  			syncPV(obj.(*v1.PersistentVolume))
   183  		},
   184  		UpdateFunc: func(_, obj interface{}) {
   185  			syncPV(obj.(*v1.PersistentVolume))
   186  		},
   187  	})
   188  }
   189  
   190  // CreateGCController creates a garbage controller and returns a run function
   191  // for it. The informer factory needs to be started before invoking that
   192  // function.
   193  func CreateGCController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
   194  	restclient.AddUserAgent(&restConfig, "gc-controller")
   195  	clientSet := clientset.NewForConfigOrDie(&restConfig)
   196  	metadataClient, err := metadata.NewForConfig(&restConfig)
   197  	if err != nil {
   198  		tb.Fatalf("Failed to create metadataClient: %v", err)
   199  	}
   200  	restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery()))
   201  	restMapper.Reset()
   202  	metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
   203  	alwaysStarted := make(chan struct{})
   204  	close(alwaysStarted)
   205  	gc, err := garbagecollector.NewGarbageCollector(
   206  		ctx,
   207  		clientSet,
   208  		metadataClient,
   209  		restMapper,
   210  		garbagecollector.DefaultIgnoredResources(),
   211  		informerfactory.NewInformerFactory(informerSet, metadataInformers),
   212  		alwaysStarted,
   213  	)
   214  	if err != nil {
   215  		tb.Fatalf("Failed creating garbage collector")
   216  	}
   217  	startGC := func() {
   218  		syncPeriod := 5 * time.Second
   219  		go wait.Until(func() {
   220  			restMapper.Reset()
   221  		}, syncPeriod, ctx.Done())
   222  		go gc.Run(ctx, 1)
   223  		go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
   224  	}
   225  	return startGC
   226  }
   227  
   228  // CreateNamespaceController creates a namespace controller and returns a run
   229  // function for it. The informer factory needs to be started before invoking
   230  // that function.
   231  func CreateNamespaceController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
   232  	restclient.AddUserAgent(&restConfig, "namespace-controller")
   233  	clientSet := clientset.NewForConfigOrDie(&restConfig)
   234  	metadataClient, err := metadata.NewForConfig(&restConfig)
   235  	if err != nil {
   236  		tb.Fatalf("Failed to create metadataClient: %v", err)
   237  	}
   238  	discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources
   239  	controller := namespace.NewNamespaceController(
   240  		ctx,
   241  		clientSet,
   242  		metadataClient,
   243  		discoverResourcesFn,
   244  		informerSet.Core().V1().Namespaces(),
   245  		10*time.Hour,
   246  		v1.FinalizerKubernetes)
   247  	return func() {
   248  		go controller.Run(ctx, 5)
   249  	}
   250  }
   251  
   252  // TestContext store necessary context info.
   253  // It also contains some optional parameters for InitTestScheduler.
   254  type TestContext struct {
   255  	// DisableEventSink, if set to true before calling InitTestScheduler,
   256  	// will skip the eventBroadcaster.StartRecordingToSink and thus
   257  	// some extra goroutines which are tricky to get rid of after
   258  	// a test.
   259  	DisableEventSink bool
   260  
   261  	NS                 *v1.Namespace
   262  	ClientSet          clientset.Interface
   263  	KubeConfig         *restclient.Config
   264  	InformerFactory    informers.SharedInformerFactory
   265  	DynInformerFactory dynamicinformer.DynamicSharedInformerFactory
   266  	Scheduler          *scheduler.Scheduler
   267  	// This is the top context when initializing the test environment.
   268  	Ctx context.Context
   269  	// CloseFn will stop the apiserver and clean up the resources
   270  	// after itself, including shutting down its storage layer.
   271  	CloseFn framework.TearDownFunc
   272  	// This is the context when initializing scheduler.
   273  	SchedulerCtx context.Context
   274  	// SchedulerCloseFn will tear down the resources in creating scheduler,
   275  	// including the scheduler itself.
   276  	SchedulerCloseFn framework.TearDownFunc
   277  
   278  	// RoundTrip, if set, will be called for every HTTP request going to the apiserver.
   279  	// It can be used for error injection.
   280  	RoundTrip atomic.Pointer[RoundTripWrapper]
   281  }
   282  
   283  type RoundTripWrapper func(http.RoundTripper, *http.Request) (*http.Response, error)
   284  
   285  type roundTripWrapper struct {
   286  	tc        *TestContext
   287  	transport http.RoundTripper
   288  }
   289  
   290  func (r roundTripWrapper) RoundTrip(req *http.Request) (*http.Response, error) {
   291  	wrapper := r.tc.RoundTrip.Load()
   292  	if wrapper != nil {
   293  		return (*wrapper)(r.transport, req)
   294  	}
   295  	return r.transport.RoundTrip(req)
   296  }
   297  
   298  var _ http.RoundTripper = roundTripWrapper{}
   299  
   300  // CleanupNodes cleans all nodes which were created during integration test
   301  func CleanupNodes(cs clientset.Interface, t *testing.T) {
   302  	err := cs.CoreV1().Nodes().DeleteCollection(context.TODO(), *metav1.NewDeleteOptions(0), metav1.ListOptions{})
   303  	if err != nil {
   304  		t.Errorf("error while deleting all nodes: %v", err)
   305  	}
   306  }
   307  
   308  // PodDeleted returns true if a pod is not found in the given namespace.
   309  func PodDeleted(ctx context.Context, c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
   310  	return func(context.Context) (bool, error) {
   311  		pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
   312  		if apierrors.IsNotFound(err) {
   313  			return true, nil
   314  		}
   315  		if pod.DeletionTimestamp != nil {
   316  			return true, nil
   317  		}
   318  		return false, nil
   319  	}
   320  }
   321  
   322  // PodsCleanedUp returns true if all pods are deleted in the specific namespace.
   323  func PodsCleanedUp(ctx context.Context, c clientset.Interface, namespace string) wait.ConditionWithContextFunc {
   324  	return func(context.Context) (bool, error) {
   325  		list, err := c.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
   326  		if err != nil {
   327  			return false, err
   328  		}
   329  		return len(list.Items) == 0, nil
   330  	}
   331  }
   332  
   333  // SyncSchedulerInformerFactory starts informer and waits for caches to be synced
   334  func SyncSchedulerInformerFactory(testCtx *TestContext) {
   335  	testCtx.InformerFactory.Start(testCtx.SchedulerCtx.Done())
   336  	if testCtx.DynInformerFactory != nil {
   337  		testCtx.DynInformerFactory.Start(testCtx.SchedulerCtx.Done())
   338  	}
   339  	testCtx.InformerFactory.WaitForCacheSync(testCtx.SchedulerCtx.Done())
   340  	if testCtx.DynInformerFactory != nil {
   341  		testCtx.DynInformerFactory.WaitForCacheSync(testCtx.SchedulerCtx.Done())
   342  	}
   343  }
   344  
   345  // CleanupTest cleans related resources which were created during integration test
   346  func CleanupTest(t *testing.T, testCtx *TestContext) {
   347  	// Cleanup nodes and namespaces.
   348  	if err := testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(testCtx.Ctx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}); err != nil {
   349  		t.Errorf("error while cleaning up nodes, error: %v", err)
   350  	}
   351  	framework.DeleteNamespaceOrDie(testCtx.ClientSet, testCtx.NS, t)
   352  	// Terminate the scheduler and apiserver.
   353  	testCtx.CloseFn()
   354  }
   355  
   356  func RemovePodFinalizersInNamespace(ctx context.Context, cs clientset.Interface, t *testing.T, ns string) {
   357  	t.Helper()
   358  	pods, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
   359  	if err != nil {
   360  		t.Fatalf("Failed obtaining list of pods: %v", err)
   361  	}
   362  	RemovePodFinalizers(ctx, cs, t, pods.Items...)
   363  }
   364  
   365  // RemovePodFinalizers removes pod finalizers for the pods
   366  func RemovePodFinalizers(ctx context.Context, cs clientset.Interface, t *testing.T, pods ...v1.Pod) {
   367  	t.Helper()
   368  	for _, p := range pods {
   369  		pod, err := cs.CoreV1().Pods(p.Namespace).Get(ctx, p.Name, metav1.GetOptions{})
   370  		if err != nil && !apierrors.IsNotFound(err) {
   371  			t.Errorf("error while removing pod finalizers for %v: %v", klog.KObj(&p), err)
   372  		} else if pod != nil && len(pod.Finalizers) > 0 {
   373  			// Use Patch to remove finalizer, instead of Update, to avoid transient
   374  			// conflicts.
   375  			patchBytes, _ := json.Marshal(map[string]interface{}{
   376  				"metadata": map[string]interface{}{
   377  					"$deleteFromPrimitiveList/finalizers": pod.Finalizers,
   378  				},
   379  			})
   380  			_, err = cs.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
   381  			if err != nil {
   382  				t.Errorf("error while updating pod status for %v: %v", klog.KObj(&p), err)
   383  			}
   384  		}
   385  	}
   386  }
   387  
   388  // CleanupPods deletes the given pods and waits for them to be actually deleted.
   389  func CleanupPods(ctx context.Context, cs clientset.Interface, t *testing.T, pods []*v1.Pod) {
   390  	for _, p := range pods {
   391  		err := cs.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, *metav1.NewDeleteOptions(0))
   392  		if err != nil && !apierrors.IsNotFound(err) {
   393  			t.Errorf("error while deleting pod %v/%v: %v", p.Namespace, p.Name, err)
   394  		}
   395  	}
   396  	for _, p := range pods {
   397  		if err := wait.PollUntilContextTimeout(ctx, time.Duration(time.Microsecond.Seconds()), wait.ForeverTestTimeout, true,
   398  			PodDeleted(ctx, cs, p.Namespace, p.Name)); err != nil {
   399  			t.Errorf("error while waiting for pod  %v/%v to get deleted: %v", p.Namespace, p.Name, err)
   400  		}
   401  	}
   402  }
   403  
   404  // AddTaintToNode add taints to specific node
   405  func AddTaintToNode(cs clientset.Interface, nodeName string, taint v1.Taint) error {
   406  	node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
   407  	if err != nil {
   408  		return err
   409  	}
   410  	node.Spec.Taints = append(node.Spec.Taints, taint)
   411  	_, err = cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
   412  	return err
   413  }
   414  
   415  // RemoveTaintOffNode removes a specific taint from a node
   416  func RemoveTaintOffNode(cs clientset.Interface, nodeName string, taint v1.Taint) error {
   417  	node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
   418  	if err != nil {
   419  		return err
   420  	}
   421  	var taints []v1.Taint
   422  	for _, t := range node.Spec.Taints {
   423  		if !t.MatchTaint(&taint) {
   424  			taints = append(taints, t)
   425  		}
   426  	}
   427  	node.Spec.Taints = taints
   428  	_, err = cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
   429  	return err
   430  }
   431  
   432  // WaitForNodeTaints waits for a node to have the target taints and returns
   433  // an error if it does not have taints within the given timeout.
   434  func WaitForNodeTaints(cs clientset.Interface, node *v1.Node, taints []v1.Taint) error {
   435  	return wait.Poll(100*time.Millisecond, 30*time.Second, NodeTainted(cs, node.Name, taints))
   436  }
   437  
   438  // NodeTainted return a condition function that returns true if the given node contains
   439  // the taints.
   440  func NodeTainted(cs clientset.Interface, nodeName string, taints []v1.Taint) wait.ConditionFunc {
   441  	return func() (bool, error) {
   442  		node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
   443  		if err != nil {
   444  			return false, err
   445  		}
   446  
   447  		// node.Spec.Taints may have more taints
   448  		if len(taints) > len(node.Spec.Taints) {
   449  			return false, nil
   450  		}
   451  
   452  		for _, taint := range taints {
   453  			if !taintutils.TaintExists(node.Spec.Taints, &taint) {
   454  				return false, nil
   455  			}
   456  		}
   457  
   458  		return true, nil
   459  	}
   460  }
   461  
   462  // NodeReadyStatus returns the status of first condition with type NodeReady.
   463  // If none of the condition is of type NodeReady, returns an error.
   464  func NodeReadyStatus(conditions []v1.NodeCondition) (v1.ConditionStatus, error) {
   465  	for _, c := range conditions {
   466  		if c.Type != v1.NodeReady {
   467  			continue
   468  		}
   469  		// Just return the first condition with type NodeReady
   470  		return c.Status, nil
   471  	}
   472  	return v1.ConditionFalse, errors.New("None of the conditions is of type NodeReady")
   473  }
   474  
   475  // GetTolerationSeconds gets the period of time the toleration
   476  func GetTolerationSeconds(tolerations []v1.Toleration) (int64, error) {
   477  	for _, t := range tolerations {
   478  		if t.Key == v1.TaintNodeNotReady && t.Effect == v1.TaintEffectNoExecute && t.Operator == v1.TolerationOpExists {
   479  			return *t.TolerationSeconds, nil
   480  		}
   481  	}
   482  	return 0, fmt.Errorf("cannot find toleration")
   483  }
   484  
   485  // NodeCopyWithConditions duplicates the ode object with conditions
   486  func NodeCopyWithConditions(node *v1.Node, conditions []v1.NodeCondition) *v1.Node {
   487  	copy := node.DeepCopy()
   488  	copy.ResourceVersion = "0"
   489  	copy.Status.Conditions = conditions
   490  	for i := range copy.Status.Conditions {
   491  		copy.Status.Conditions[i].LastHeartbeatTime = metav1.Now()
   492  	}
   493  	return copy
   494  }
   495  
   496  // UpdateNodeStatus updates the status of node.
   497  func UpdateNodeStatus(cs clientset.Interface, node *v1.Node) error {
   498  	_, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{})
   499  	return err
   500  }
   501  
   502  // InitTestAPIServer initializes a test environment and creates an API server with default
   503  // configuration.
   504  // It registers cleanup functions to t.Cleanup(), they will be called when the test completes,
   505  // no need to do this again.
   506  func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interface) *TestContext {
   507  	tCtx := ktesting.Init(t)
   508  	testCtx := &TestContext{Ctx: tCtx}
   509  
   510  	testCtx.ClientSet, testCtx.KubeConfig, testCtx.CloseFn = framework.StartTestServer(tCtx, t, framework.TestServerSetup{
   511  		ModifyServerRunOptions: func(options *options.ServerRunOptions) {
   512  			options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority", "StorageObjectInUseProtection"}
   513  			if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
   514  				options.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{
   515  					resourcev1alpha2.SchemeGroupVersion.String(): "true",
   516  				}
   517  			}
   518  		},
   519  		ModifyServerConfig: func(config *controlplane.Config) {
   520  			if admission != nil {
   521  				config.GenericConfig.AdmissionControl = admission
   522  			}
   523  		},
   524  	})
   525  
   526  	// Support wrapping HTTP requests.
   527  	testCtx.KubeConfig.Wrap(func(transport http.RoundTripper) http.RoundTripper {
   528  		return roundTripWrapper{tc: testCtx, transport: transport}
   529  	})
   530  	var err error
   531  	testCtx.ClientSet, err = clientset.NewForConfig(testCtx.KubeConfig)
   532  	if err != nil {
   533  		t.Fatal(err)
   534  	}
   535  
   536  	oldCloseFn := testCtx.CloseFn
   537  	testCtx.CloseFn = func() {
   538  		tCtx.Cancel("tearing down apiserver")
   539  		oldCloseFn()
   540  	}
   541  
   542  	if nsPrefix != "default" {
   543  		testCtx.NS = framework.CreateNamespaceOrDie(testCtx.ClientSet, nsPrefix+string(uuid.NewUUID()), t)
   544  	} else {
   545  		testCtx.NS = framework.CreateNamespaceOrDie(testCtx.ClientSet, "default", t)
   546  	}
   547  
   548  	t.Cleanup(func() {
   549  		CleanupTest(t, testCtx)
   550  	})
   551  
   552  	return testCtx
   553  }
   554  
   555  // WaitForSchedulerCacheCleanup waits for cleanup of scheduler's cache to complete
   556  func WaitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) {
   557  	schedulerCacheIsEmpty := func() (bool, error) {
   558  		dump := sched.Cache.Dump()
   559  
   560  		return len(dump.Nodes) == 0 && len(dump.AssumedPods) == 0, nil
   561  	}
   562  
   563  	if err := wait.Poll(time.Second, wait.ForeverTestTimeout, schedulerCacheIsEmpty); err != nil {
   564  		t.Errorf("Failed to wait for scheduler cache cleanup: %v", err)
   565  	}
   566  }
   567  
   568  // InitTestScheduler initializes a test environment and creates a scheduler with default
   569  // configuration.
   570  func InitTestScheduler(
   571  	t *testing.T,
   572  	testCtx *TestContext,
   573  ) *TestContext {
   574  	// Pod preemption is enabled by default scheduler configuration.
   575  	return InitTestSchedulerWithOptions(t, testCtx, 0)
   576  }
   577  
   578  // InitTestSchedulerWithOptions initializes a test environment and creates a scheduler with default
   579  // configuration and other options.
   580  func InitTestSchedulerWithOptions(
   581  	t *testing.T,
   582  	testCtx *TestContext,
   583  	resyncPeriod time.Duration,
   584  	opts ...scheduler.Option,
   585  ) *TestContext {
   586  	ctx, cancel := context.WithCancel(testCtx.Ctx)
   587  	testCtx.SchedulerCtx = ctx
   588  
   589  	// 1. Create scheduler
   590  	testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, resyncPeriod)
   591  	if testCtx.KubeConfig != nil {
   592  		dynClient := dynamic.NewForConfigOrDie(testCtx.KubeConfig)
   593  		testCtx.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)
   594  	}
   595  
   596  	var err error
   597  	eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
   598  		Interface: testCtx.ClientSet.EventsV1(),
   599  	})
   600  
   601  	opts = append(opts, scheduler.WithKubeConfig(testCtx.KubeConfig))
   602  	testCtx.Scheduler, err = scheduler.New(
   603  		ctx,
   604  		testCtx.ClientSet,
   605  		testCtx.InformerFactory,
   606  		testCtx.DynInformerFactory,
   607  		profile.NewRecorderFactory(eventBroadcaster),
   608  		opts...,
   609  	)
   610  
   611  	if err != nil {
   612  		t.Fatalf("Couldn't create scheduler: %v", err)
   613  	}
   614  
   615  	if !testCtx.DisableEventSink {
   616  		eventBroadcaster.StartRecordingToSink(ctx.Done())
   617  	}
   618  
   619  	oldCloseFn := testCtx.CloseFn
   620  	testCtx.CloseFn = func() {
   621  		oldCloseFn()
   622  		eventBroadcaster.Shutdown()
   623  	}
   624  
   625  	testCtx.SchedulerCloseFn = func() {
   626  		cancel()
   627  		eventBroadcaster.Shutdown()
   628  	}
   629  
   630  	return testCtx
   631  }
   632  
   633  // WaitForPodToScheduleWithTimeout waits for a pod to get scheduled and returns
   634  // an error if it does not scheduled within the given timeout.
   635  func WaitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
   636  	return wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, timeout, false, PodScheduled(cs, pod.Namespace, pod.Name))
   637  }
   638  
   639  // WaitForPodToSchedule waits for a pod to get scheduled and returns an error if
   640  // it does not get scheduled within the timeout duration (30 seconds).
   641  func WaitForPodToSchedule(cs clientset.Interface, pod *v1.Pod) error {
   642  	return WaitForPodToScheduleWithTimeout(cs, pod, 30*time.Second)
   643  }
   644  
   645  // PodScheduled checks if the pod has been scheduled
   646  func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
   647  	return func(ctx context.Context) (bool, error) {
   648  		pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
   649  		if err != nil {
   650  			// This could be a connection error so we want to retry.
   651  			return false, nil
   652  		}
   653  		if pod.Spec.NodeName == "" {
   654  			return false, nil
   655  		}
   656  		return true, nil
   657  	}
   658  }
   659  
   660  // InitDisruptionController initializes and runs a Disruption Controller to properly
   661  // update PodDisuptionBudget objects.
   662  func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController {
   663  	informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour)
   664  
   665  	discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery())
   666  	mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
   667  
   668  	config := restclient.CopyConfig(testCtx.KubeConfig)
   669  	scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery())
   670  	scaleClient, err := scale.NewForConfig(config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
   671  	if err != nil {
   672  		t.Fatalf("Error in create scaleClient: %v", err)
   673  	}
   674  
   675  	dc := disruption.NewDisruptionController(
   676  		testCtx.Ctx,
   677  		informers.Core().V1().Pods(),
   678  		informers.Policy().V1().PodDisruptionBudgets(),
   679  		informers.Core().V1().ReplicationControllers(),
   680  		informers.Apps().V1().ReplicaSets(),
   681  		informers.Apps().V1().Deployments(),
   682  		informers.Apps().V1().StatefulSets(),
   683  		testCtx.ClientSet,
   684  		mapper,
   685  		scaleClient,
   686  		testCtx.ClientSet.Discovery())
   687  
   688  	informers.Start(testCtx.Scheduler.StopEverything)
   689  	informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)
   690  	go dc.Run(testCtx.Ctx)
   691  	return dc
   692  }
   693  
   694  // InitTestSchedulerWithNS initializes a test environment and creates API server and scheduler with default
   695  // configuration.
   696  func InitTestSchedulerWithNS(t *testing.T, nsPrefix string, opts ...scheduler.Option) *TestContext {
   697  	testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), 0, opts...)
   698  	SyncSchedulerInformerFactory(testCtx)
   699  	go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
   700  	return testCtx
   701  }
   702  
   703  // InitTestDisablePreemption initializes a test environment and creates API server and scheduler with default
   704  // configuration but with pod preemption disabled.
   705  func InitTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
   706  	cfg := configtesting.V1ToInternalWithDefaults(t, kubeschedulerconfigv1.KubeSchedulerConfiguration{
   707  		Profiles: []kubeschedulerconfigv1.KubeSchedulerProfile{{
   708  			SchedulerName: ptr.To(v1.DefaultSchedulerName),
   709  			Plugins: &kubeschedulerconfigv1.Plugins{
   710  				PostFilter: kubeschedulerconfigv1.PluginSet{
   711  					Disabled: []kubeschedulerconfigv1.Plugin{
   712  						{Name: defaultpreemption.Name},
   713  					},
   714  				},
   715  			},
   716  		}},
   717  	})
   718  	testCtx := InitTestSchedulerWithOptions(
   719  		t, InitTestAPIServer(t, nsPrefix, nil),
   720  		0,
   721  		scheduler.WithProfiles(cfg.Profiles...))
   722  	SyncSchedulerInformerFactory(testCtx)
   723  	go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
   724  	return testCtx
   725  }
   726  
   727  // WaitForReflection waits till the passFunc confirms that the object it expects
   728  // to see is in the store. Used to observe reflected events.
   729  func WaitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
   730  	passFunc func(n interface{}) bool) error {
   731  	var nodes []*v1.Node
   732  	err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
   733  		n, err := nodeLister.Get(key)
   734  
   735  		switch {
   736  		case err == nil && passFunc(n):
   737  			return true, nil
   738  		case apierrors.IsNotFound(err):
   739  			nodes = append(nodes, nil)
   740  		case err != nil:
   741  			t.Errorf("Unexpected error: %v", err)
   742  		default:
   743  			nodes = append(nodes, n)
   744  		}
   745  
   746  		return false, nil
   747  	})
   748  	if err != nil {
   749  		t.Logf("Logging consecutive node versions received from store:")
   750  		for i, n := range nodes {
   751  			t.Logf("%d: %#v", i, n)
   752  		}
   753  	}
   754  	return err
   755  }
   756  
   757  func UpdateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
   758  	return cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
   759  }
   760  
   761  func CreateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
   762  	return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
   763  }
   764  
   765  func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
   766  	nodes := make([]*v1.Node, numNodes)
   767  	for i := 0; i < numNodes; i++ {
   768  		nodeName := fmt.Sprintf("%v-%d", prefix, i)
   769  		node, err := CreateNode(cs, wrapper.Name(nodeName).Label("kubernetes.io/hostname", nodeName).Obj())
   770  		if err != nil {
   771  			return nodes[:], err
   772  		}
   773  		nodes[i] = node
   774  	}
   775  	return nodes[:], nil
   776  }
   777  
   778  // CreateAndWaitForNodesInCache calls createNodes(), and wait for the created
   779  // nodes to be present in scheduler cache.
   780  func CreateAndWaitForNodesInCache(testCtx *TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
   781  	existingNodes := testCtx.Scheduler.Cache.NodeCount()
   782  	nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes)
   783  	if err != nil {
   784  		return nodes, fmt.Errorf("cannot create nodes: %v", err)
   785  	}
   786  	return nodes, WaitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes)
   787  }
   788  
   789  // WaitForNodesInCache ensures at least <nodeCount> nodes are present in scheduler cache
   790  // within 30 seconds; otherwise returns false.
   791  func WaitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error {
   792  	err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
   793  		return sched.Cache.NodeCount() >= nodeCount, nil
   794  	})
   795  	if err != nil {
   796  		return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err)
   797  	}
   798  	return nil
   799  }
   800  
   801  type PausePodConfig struct {
   802  	Name                              string
   803  	Namespace                         string
   804  	Affinity                          *v1.Affinity
   805  	Annotations, Labels, NodeSelector map[string]string
   806  	Resources                         *v1.ResourceRequirements
   807  	Tolerations                       []v1.Toleration
   808  	NodeName                          string
   809  	SchedulerName                     string
   810  	Priority                          *int32
   811  	PreemptionPolicy                  *v1.PreemptionPolicy
   812  	PriorityClassName                 string
   813  	Volumes                           []v1.Volume
   814  }
   815  
   816  // InitPausePod initializes a pod API object from the given config. It is used
   817  // mainly in pod creation process.
   818  func InitPausePod(conf *PausePodConfig) *v1.Pod {
   819  	pod := &v1.Pod{
   820  		ObjectMeta: metav1.ObjectMeta{
   821  			Name:        conf.Name,
   822  			Namespace:   conf.Namespace,
   823  			Labels:      conf.Labels,
   824  			Annotations: conf.Annotations,
   825  		},
   826  		Spec: v1.PodSpec{
   827  			NodeSelector: conf.NodeSelector,
   828  			Affinity:     conf.Affinity,
   829  			Containers: []v1.Container{
   830  				{
   831  					Name:  conf.Name,
   832  					Image: imageutils.GetPauseImageName(),
   833  				},
   834  			},
   835  			Tolerations:       conf.Tolerations,
   836  			NodeName:          conf.NodeName,
   837  			SchedulerName:     conf.SchedulerName,
   838  			Priority:          conf.Priority,
   839  			PreemptionPolicy:  conf.PreemptionPolicy,
   840  			PriorityClassName: conf.PriorityClassName,
   841  			Volumes:           conf.Volumes,
   842  		},
   843  	}
   844  	if conf.Resources != nil {
   845  		pod.Spec.Containers[0].Resources = *conf.Resources
   846  	}
   847  	return pod
   848  }
   849  
   850  // CreatePausePod creates a pod with "Pause" image and the given config and
   851  // return its pointer and error status.
   852  func CreatePausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
   853  	return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{})
   854  }
   855  
   856  // CreatePausePodWithResource creates a pod with "Pause" image and the given
   857  // resources and returns its pointer and error status. The resource list can be
   858  // nil.
   859  func CreatePausePodWithResource(cs clientset.Interface, podName string,
   860  	nsName string, res *v1.ResourceList) (*v1.Pod, error) {
   861  	var conf PausePodConfig
   862  	if res == nil {
   863  		conf = PausePodConfig{
   864  			Name:      podName,
   865  			Namespace: nsName,
   866  		}
   867  	} else {
   868  		conf = PausePodConfig{
   869  			Name:      podName,
   870  			Namespace: nsName,
   871  			Resources: &v1.ResourceRequirements{
   872  				Requests: *res,
   873  			},
   874  		}
   875  	}
   876  	return CreatePausePod(cs, InitPausePod(&conf))
   877  }
   878  
   879  // CreatePVC creates a PersistentVolumeClaim with the given config and returns
   880  // its pointer and error status.
   881  func CreatePVC(cs clientset.Interface, pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
   882  	return cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
   883  }
   884  
   885  // CreatePV creates a PersistentVolume with the given config and returns its
   886  // pointer and error status.
   887  func CreatePV(cs clientset.Interface, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
   888  	return cs.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{})
   889  }
   890  
   891  // DeletePVC deletes the given PVC in the given namespace.
   892  func DeletePVC(cs clientset.Interface, pvcName string, nsName string) error {
   893  	return cs.CoreV1().PersistentVolumeClaims(nsName).Delete(context.TODO(), pvcName, *metav1.NewDeleteOptions(0))
   894  }
   895  
   896  // DeletePV deletes the given PV in the given namespace.
   897  func DeletePV(cs clientset.Interface, pvName string) error {
   898  	return cs.CoreV1().PersistentVolumes().Delete(context.TODO(), pvName, *metav1.NewDeleteOptions(0))
   899  }
   900  
   901  // RunPausePod creates a pod with "Pause" image and the given config and waits
   902  // until it is scheduled. It returns its pointer and error status.
   903  func RunPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
   904  	pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
   905  	if err != nil {
   906  		return nil, fmt.Errorf("failed to create pause pod: %v", err)
   907  	}
   908  	if err = WaitForPodToSchedule(cs, pod); err != nil {
   909  		return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err)
   910  	}
   911  	if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
   912  		return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err)
   913  	}
   914  	return pod, nil
   915  }
   916  
   917  type PodWithContainersConfig struct {
   918  	Name       string
   919  	Namespace  string
   920  	Containers []v1.Container
   921  }
   922  
   923  // InitPodWithContainers initializes a pod API object from the given config. This is used primarily for generating
   924  // pods with containers each having a specific image.
   925  func InitPodWithContainers(cs clientset.Interface, conf *PodWithContainersConfig) *v1.Pod {
   926  	pod := &v1.Pod{
   927  		ObjectMeta: metav1.ObjectMeta{
   928  			Name:      conf.Name,
   929  			Namespace: conf.Namespace,
   930  		},
   931  		Spec: v1.PodSpec{
   932  			Containers: conf.Containers,
   933  		},
   934  	}
   935  	return pod
   936  }
   937  
   938  // RunPodWithContainers creates a pod with given config and containers and waits
   939  // until it is scheduled. It returns its pointer and error status.
   940  func RunPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
   941  	pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
   942  	if err != nil {
   943  		return nil, fmt.Errorf("failed to create pod-with-containers: %v", err)
   944  	}
   945  	if err = WaitForPodToSchedule(cs, pod); err != nil {
   946  		return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
   947  	}
   948  	if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
   949  		return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err)
   950  	}
   951  	return pod, nil
   952  }
   953  
   954  // PodIsGettingEvicted returns true if the pod's deletion timestamp is set.
   955  func PodIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
   956  	return func(ctx context.Context) (bool, error) {
   957  		pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
   958  		if err != nil {
   959  			return false, err
   960  		}
   961  		if pod.DeletionTimestamp != nil {
   962  			return true, nil
   963  		}
   964  		return false, nil
   965  	}
   966  }
   967  
   968  // PodScheduledIn returns true if a given pod is placed onto one of the expected nodes.
   969  func PodScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionWithContextFunc {
   970  	return func(ctx context.Context) (bool, error) {
   971  		pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
   972  		if err != nil {
   973  			// This could be a connection error so we want to retry.
   974  			return false, nil
   975  		}
   976  		if pod.Spec.NodeName == "" {
   977  			return false, nil
   978  		}
   979  		for _, nodeName := range nodeNames {
   980  			if pod.Spec.NodeName == nodeName {
   981  				return true, nil
   982  			}
   983  		}
   984  		return false, nil
   985  	}
   986  }
   987  
   988  // PodUnschedulable returns a condition function that returns true if the given pod
   989  // gets unschedulable status of reason 'Unschedulable'.
   990  func PodUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
   991  	return func(ctx context.Context) (bool, error) {
   992  		pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
   993  		if err != nil {
   994  			// This could be a connection error so we want to retry.
   995  			return false, nil
   996  		}
   997  		_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
   998  		return cond != nil && cond.Status == v1.ConditionFalse &&
   999  			cond.Reason == v1.PodReasonUnschedulable && pod.Spec.NodeName == "", nil
  1000  	}
  1001  }
  1002  
  1003  // PodSchedulingError returns a condition function that returns true if the given pod
  1004  // gets unschedulable status for reasons other than "Unschedulable". The scheduler
  1005  // records such reasons in case of error.
  1006  func PodSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
  1007  	return func(ctx context.Context) (bool, error) {
  1008  		pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
  1009  		if err != nil {
  1010  			// This could be a connection error so we want to retry.
  1011  			return false, nil
  1012  		}
  1013  		_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
  1014  		return cond != nil && cond.Status == v1.ConditionFalse &&
  1015  			cond.Reason != v1.PodReasonUnschedulable, nil
  1016  	}
  1017  }
  1018  
  1019  // PodSchedulingGated returns a condition function that returns true if the given pod
  1020  // gets unschedulable status of reason 'SchedulingGated'.
  1021  func PodSchedulingGated(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
  1022  	return func() (bool, error) {
  1023  		pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  1024  		if err != nil {
  1025  			// This could be a connection error so we want to retry.
  1026  			return false, nil
  1027  		}
  1028  		_, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
  1029  		return cond != nil && cond.Status == v1.ConditionFalse &&
  1030  			cond.Reason == v1.PodReasonSchedulingGated && pod.Spec.NodeName == "", nil
  1031  	}
  1032  }
  1033  
  1034  // WaitForPodUnschedulableWithTimeout waits for a pod to fail scheduling and returns
  1035  // an error if it does not become unschedulable within the given timeout.
  1036  func WaitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
  1037  	return wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, timeout, false, PodUnschedulable(cs, pod.Namespace, pod.Name))
  1038  }
  1039  
  1040  // WaitForPodUnschedulable waits for a pod to fail scheduling and returns
  1041  // an error if it does not become unschedulable within the timeout duration (30 seconds).
  1042  func WaitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
  1043  	return WaitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
  1044  }
  1045  
  1046  // WaitForPodSchedulingGated waits for a pod to be in scheduling gated state
  1047  // and returns an error if it does not fall into this state within the given timeout.
  1048  func WaitForPodSchedulingGated(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
  1049  	return wait.Poll(100*time.Millisecond, timeout, PodSchedulingGated(cs, pod.Namespace, pod.Name))
  1050  }
  1051  
  1052  // WaitForPDBsStable waits for PDBs to have "CurrentHealthy" status equal to
  1053  // the expected values.
  1054  func WaitForPDBsStable(testCtx *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
  1055  	return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
  1056  		pdbList, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{})
  1057  		if err != nil {
  1058  			return false, err
  1059  		}
  1060  		if len(pdbList.Items) != len(pdbs) {
  1061  			return false, nil
  1062  		}
  1063  		for i, pdb := range pdbs {
  1064  			found := false
  1065  			for _, cpdb := range pdbList.Items {
  1066  				if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
  1067  					found = true
  1068  					if cpdb.Status.CurrentHealthy != pdbPodNum[i] {
  1069  						return false, nil
  1070  					}
  1071  				}
  1072  			}
  1073  			if !found {
  1074  				return false, nil
  1075  			}
  1076  		}
  1077  		return true, nil
  1078  	})
  1079  }
  1080  
  1081  // WaitCachedPodsStable waits until scheduler cache has the given pods.
  1082  func WaitCachedPodsStable(testCtx *TestContext, pods []*v1.Pod) error {
  1083  	return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
  1084  		cachedPods, err := testCtx.Scheduler.Cache.PodCount()
  1085  		if err != nil {
  1086  			return false, err
  1087  		}
  1088  		if len(pods) != cachedPods {
  1089  			return false, nil
  1090  		}
  1091  		for _, p := range pods {
  1092  			actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{})
  1093  			if err1 != nil {
  1094  				return false, err1
  1095  			}
  1096  			cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod)
  1097  			if err2 != nil || cachedPod == nil {
  1098  				return false, err2
  1099  			}
  1100  		}
  1101  		return true, nil
  1102  	})
  1103  }
  1104  
  1105  // DeletePod deletes the given pod in the given namespace.
  1106  func DeletePod(cs clientset.Interface, podName string, nsName string) error {
  1107  	return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0))
  1108  }
  1109  
  1110  func GetPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) {
  1111  	return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
  1112  }
  1113  
  1114  func CreateNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error {
  1115  	for _, n := range namespaces {
  1116  		ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}}
  1117  		if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil {
  1118  			return err
  1119  		}
  1120  	}
  1121  	return nil
  1122  }
  1123  
  1124  // timeout returns a timeout error if the given `f` function doesn't
  1125  // complete within `d` duration; otherwise it returns nil.
  1126  func timeout(ctx context.Context, d time.Duration, f func()) error {
  1127  	ctx, cancel := context.WithTimeout(ctx, d)
  1128  	defer cancel()
  1129  
  1130  	done := make(chan struct{})
  1131  	go func() {
  1132  		f()
  1133  		close(done)
  1134  	}()
  1135  
  1136  	select {
  1137  	case <-done:
  1138  		return nil
  1139  	case <-ctx.Done():
  1140  		return ctx.Err()
  1141  	}
  1142  }
  1143  
  1144  // NextPodOrDie returns the next Pod in the scheduler queue.
  1145  // The operation needs to be completed within 5 seconds; otherwise the test gets aborted.
  1146  func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo {
  1147  	t.Helper()
  1148  
  1149  	var podInfo *schedulerframework.QueuedPodInfo
  1150  	logger := klog.FromContext(testCtx.Ctx)
  1151  	// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
  1152  	// default go testing timeout (10m) to abort.
  1153  	if err := timeout(testCtx.Ctx, time.Second*5, func() {
  1154  		podInfo, _ = testCtx.Scheduler.NextPod(logger)
  1155  	}); err != nil {
  1156  		t.Fatalf("Timed out waiting for the Pod to be popped: %v", err)
  1157  	}
  1158  	return podInfo
  1159  }
  1160  
  1161  // NextPod returns the next Pod in the scheduler queue, with a 5 seconds timeout.
  1162  // Note that this function leaks goroutines in the case of timeout; even after this function returns after timeout,
  1163  // the goroutine made by this function keep waiting to pop a pod from the queue.
  1164  func NextPod(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo {
  1165  	t.Helper()
  1166  
  1167  	var podInfo *schedulerframework.QueuedPodInfo
  1168  	logger := klog.FromContext(testCtx.Ctx)
  1169  	// NextPod() is a blocking operation. Wrap it in timeout() to avoid relying on
  1170  	// default go testing timeout (10m) to abort.
  1171  	if err := timeout(testCtx.Ctx, time.Second*5, func() {
  1172  		podInfo, _ = testCtx.Scheduler.NextPod(logger)
  1173  	}); err != nil {
  1174  		return nil
  1175  	}
  1176  	return podInfo
  1177  }
  1178  

View as plain text