1
16
17 package util
18
19 import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "net/http"
25 "sync/atomic"
26 "testing"
27 "time"
28
29 v1 "k8s.io/api/core/v1"
30 policy "k8s.io/api/policy/v1"
31 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
32 apierrors "k8s.io/apimachinery/pkg/api/errors"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/types"
35 "k8s.io/apimachinery/pkg/util/uuid"
36 "k8s.io/apimachinery/pkg/util/wait"
37 "k8s.io/apiserver/pkg/admission"
38 utilfeature "k8s.io/apiserver/pkg/util/feature"
39 cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
40 "k8s.io/client-go/dynamic"
41 "k8s.io/client-go/dynamic/dynamicinformer"
42 "k8s.io/client-go/informers"
43 clientset "k8s.io/client-go/kubernetes"
44 corelisters "k8s.io/client-go/listers/core/v1"
45 "k8s.io/client-go/metadata"
46 "k8s.io/client-go/metadata/metadatainformer"
47 restclient "k8s.io/client-go/rest"
48 "k8s.io/client-go/restmapper"
49 "k8s.io/client-go/scale"
50 "k8s.io/client-go/tools/cache"
51 "k8s.io/client-go/tools/events"
52 cliflag "k8s.io/component-base/cli/flag"
53 pvutil "k8s.io/component-helpers/storage/volume"
54 "k8s.io/controller-manager/pkg/informerfactory"
55 "k8s.io/klog/v2"
56 kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
57 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
58 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
59 "k8s.io/kubernetes/pkg/controller/disruption"
60 "k8s.io/kubernetes/pkg/controller/garbagecollector"
61 "k8s.io/kubernetes/pkg/controller/namespace"
62 "k8s.io/kubernetes/pkg/controller/resourceclaim"
63 "k8s.io/kubernetes/pkg/controlplane"
64 "k8s.io/kubernetes/pkg/features"
65 "k8s.io/kubernetes/pkg/scheduler"
66 kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
67 configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
68 schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
69 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
70 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
71 "k8s.io/kubernetes/pkg/scheduler/profile"
72 st "k8s.io/kubernetes/pkg/scheduler/testing"
73 taintutils "k8s.io/kubernetes/pkg/util/taints"
74 "k8s.io/kubernetes/test/integration/framework"
75 imageutils "k8s.io/kubernetes/test/utils/image"
76 "k8s.io/kubernetes/test/utils/ktesting"
77 "k8s.io/utils/ptr"
78 )
79
80
81 type ShutdownFunc func()
82
83
84
85
86 func StartScheduler(ctx context.Context, clientSet clientset.Interface, kubeConfig *restclient.Config, cfg *kubeschedulerconfig.KubeSchedulerConfiguration, outOfTreePluginRegistry frameworkruntime.Registry) (*scheduler.Scheduler, informers.SharedInformerFactory) {
87 informerFactory := scheduler.NewInformerFactory(clientSet, 0)
88 evtBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
89 Interface: clientSet.EventsV1()})
90 go func() {
91 <-ctx.Done()
92 evtBroadcaster.Shutdown()
93 }()
94
95 evtBroadcaster.StartRecordingToSink(ctx.Done())
96
97 logger := klog.FromContext(ctx)
98
99 sched, err := scheduler.New(
100 ctx,
101 clientSet,
102 informerFactory,
103 nil,
104 profile.NewRecorderFactory(evtBroadcaster),
105 scheduler.WithKubeConfig(kubeConfig),
106 scheduler.WithProfiles(cfg.Profiles...),
107 scheduler.WithPercentageOfNodesToScore(cfg.PercentageOfNodesToScore),
108 scheduler.WithPodMaxBackoffSeconds(cfg.PodMaxBackoffSeconds),
109 scheduler.WithPodInitialBackoffSeconds(cfg.PodInitialBackoffSeconds),
110 scheduler.WithExtenders(cfg.Extenders...),
111 scheduler.WithParallelism(cfg.Parallelism),
112 scheduler.WithFrameworkOutOfTreeRegistry(outOfTreePluginRegistry),
113 )
114 if err != nil {
115 logger.Error(err, "Error creating scheduler")
116 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
117 }
118
119 informerFactory.Start(ctx.Done())
120 informerFactory.WaitForCacheSync(ctx.Done())
121 if err = sched.WaitForHandlersSync(ctx); err != nil {
122 logger.Error(err, "Failed waiting for handlers to sync")
123 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
124 }
125 logger.V(3).Info("Handlers synced")
126 go sched.Run(ctx)
127
128 return sched, informerFactory
129 }
130
131 func CreateResourceClaimController(ctx context.Context, tb ktesting.TB, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) func() {
132 podInformer := informerFactory.Core().V1().Pods()
133 schedulingInformer := informerFactory.Resource().V1alpha2().PodSchedulingContexts()
134 claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims()
135 claimTemplateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates()
136 claimController, err := resourceclaim.NewController(klog.FromContext(ctx), clientSet, podInformer, schedulingInformer, claimInformer, claimTemplateInformer)
137 if err != nil {
138 tb.Fatalf("Error creating claim controller: %v", err)
139 }
140 return func() {
141 go claimController.Run(ctx, 5 )
142 }
143 }
144
145
146
147 func StartFakePVController(ctx context.Context, clientSet clientset.Interface, informerFactory informers.SharedInformerFactory) {
148 pvInformer := informerFactory.Core().V1().PersistentVolumes()
149
150 syncPV := func(obj *v1.PersistentVolume) {
151 if obj.Spec.ClaimRef != nil {
152 claimRef := obj.Spec.ClaimRef
153 pvc, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Get(ctx, claimRef.Name, metav1.GetOptions{})
154 if err != nil {
155
156
157
158
159 if ctx.Err() == nil || !errors.Is(err, context.Canceled) {
160 klog.Errorf("error while getting %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
161 }
162 return
163 }
164
165 if pvc.Spec.VolumeName == "" {
166 pvc.Spec.VolumeName = obj.Name
167 metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes")
168 _, err := clientSet.CoreV1().PersistentVolumeClaims(claimRef.Namespace).Update(ctx, pvc, metav1.UpdateOptions{})
169 if err != nil {
170 if ctx.Err() == nil || !errors.Is(err, context.Canceled) {
171
172 klog.Errorf("error while updating %v/%v: %v", claimRef.Namespace, claimRef.Name, err)
173 }
174 return
175 }
176 }
177 }
178 }
179
180 pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
181 AddFunc: func(obj interface{}) {
182 syncPV(obj.(*v1.PersistentVolume))
183 },
184 UpdateFunc: func(_, obj interface{}) {
185 syncPV(obj.(*v1.PersistentVolume))
186 },
187 })
188 }
189
190
191
192
193 func CreateGCController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
194 restclient.AddUserAgent(&restConfig, "gc-controller")
195 clientSet := clientset.NewForConfigOrDie(&restConfig)
196 metadataClient, err := metadata.NewForConfig(&restConfig)
197 if err != nil {
198 tb.Fatalf("Failed to create metadataClient: %v", err)
199 }
200 restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(clientSet.Discovery()))
201 restMapper.Reset()
202 metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, 0)
203 alwaysStarted := make(chan struct{})
204 close(alwaysStarted)
205 gc, err := garbagecollector.NewGarbageCollector(
206 ctx,
207 clientSet,
208 metadataClient,
209 restMapper,
210 garbagecollector.DefaultIgnoredResources(),
211 informerfactory.NewInformerFactory(informerSet, metadataInformers),
212 alwaysStarted,
213 )
214 if err != nil {
215 tb.Fatalf("Failed creating garbage collector")
216 }
217 startGC := func() {
218 syncPeriod := 5 * time.Second
219 go wait.Until(func() {
220 restMapper.Reset()
221 }, syncPeriod, ctx.Done())
222 go gc.Run(ctx, 1)
223 go gc.Sync(ctx, clientSet.Discovery(), syncPeriod)
224 }
225 return startGC
226 }
227
228
229
230
231 func CreateNamespaceController(ctx context.Context, tb ktesting.TB, restConfig restclient.Config, informerSet informers.SharedInformerFactory) func() {
232 restclient.AddUserAgent(&restConfig, "namespace-controller")
233 clientSet := clientset.NewForConfigOrDie(&restConfig)
234 metadataClient, err := metadata.NewForConfig(&restConfig)
235 if err != nil {
236 tb.Fatalf("Failed to create metadataClient: %v", err)
237 }
238 discoverResourcesFn := clientSet.Discovery().ServerPreferredNamespacedResources
239 controller := namespace.NewNamespaceController(
240 ctx,
241 clientSet,
242 metadataClient,
243 discoverResourcesFn,
244 informerSet.Core().V1().Namespaces(),
245 10*time.Hour,
246 v1.FinalizerKubernetes)
247 return func() {
248 go controller.Run(ctx, 5)
249 }
250 }
251
252
253
254 type TestContext struct {
255
256
257
258
259 DisableEventSink bool
260
261 NS *v1.Namespace
262 ClientSet clientset.Interface
263 KubeConfig *restclient.Config
264 InformerFactory informers.SharedInformerFactory
265 DynInformerFactory dynamicinformer.DynamicSharedInformerFactory
266 Scheduler *scheduler.Scheduler
267
268 Ctx context.Context
269
270
271 CloseFn framework.TearDownFunc
272
273 SchedulerCtx context.Context
274
275
276 SchedulerCloseFn framework.TearDownFunc
277
278
279
280 RoundTrip atomic.Pointer[RoundTripWrapper]
281 }
282
283 type RoundTripWrapper func(http.RoundTripper, *http.Request) (*http.Response, error)
284
285 type roundTripWrapper struct {
286 tc *TestContext
287 transport http.RoundTripper
288 }
289
290 func (r roundTripWrapper) RoundTrip(req *http.Request) (*http.Response, error) {
291 wrapper := r.tc.RoundTrip.Load()
292 if wrapper != nil {
293 return (*wrapper)(r.transport, req)
294 }
295 return r.transport.RoundTrip(req)
296 }
297
298 var _ http.RoundTripper = roundTripWrapper{}
299
300
301 func CleanupNodes(cs clientset.Interface, t *testing.T) {
302 err := cs.CoreV1().Nodes().DeleteCollection(context.TODO(), *metav1.NewDeleteOptions(0), metav1.ListOptions{})
303 if err != nil {
304 t.Errorf("error while deleting all nodes: %v", err)
305 }
306 }
307
308
309 func PodDeleted(ctx context.Context, c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
310 return func(context.Context) (bool, error) {
311 pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
312 if apierrors.IsNotFound(err) {
313 return true, nil
314 }
315 if pod.DeletionTimestamp != nil {
316 return true, nil
317 }
318 return false, nil
319 }
320 }
321
322
323 func PodsCleanedUp(ctx context.Context, c clientset.Interface, namespace string) wait.ConditionWithContextFunc {
324 return func(context.Context) (bool, error) {
325 list, err := c.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
326 if err != nil {
327 return false, err
328 }
329 return len(list.Items) == 0, nil
330 }
331 }
332
333
334 func SyncSchedulerInformerFactory(testCtx *TestContext) {
335 testCtx.InformerFactory.Start(testCtx.SchedulerCtx.Done())
336 if testCtx.DynInformerFactory != nil {
337 testCtx.DynInformerFactory.Start(testCtx.SchedulerCtx.Done())
338 }
339 testCtx.InformerFactory.WaitForCacheSync(testCtx.SchedulerCtx.Done())
340 if testCtx.DynInformerFactory != nil {
341 testCtx.DynInformerFactory.WaitForCacheSync(testCtx.SchedulerCtx.Done())
342 }
343 }
344
345
346 func CleanupTest(t *testing.T, testCtx *TestContext) {
347
348 if err := testCtx.ClientSet.CoreV1().Nodes().DeleteCollection(testCtx.Ctx, *metav1.NewDeleteOptions(0), metav1.ListOptions{}); err != nil {
349 t.Errorf("error while cleaning up nodes, error: %v", err)
350 }
351 framework.DeleteNamespaceOrDie(testCtx.ClientSet, testCtx.NS, t)
352
353 testCtx.CloseFn()
354 }
355
356 func RemovePodFinalizersInNamespace(ctx context.Context, cs clientset.Interface, t *testing.T, ns string) {
357 t.Helper()
358 pods, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
359 if err != nil {
360 t.Fatalf("Failed obtaining list of pods: %v", err)
361 }
362 RemovePodFinalizers(ctx, cs, t, pods.Items...)
363 }
364
365
366 func RemovePodFinalizers(ctx context.Context, cs clientset.Interface, t *testing.T, pods ...v1.Pod) {
367 t.Helper()
368 for _, p := range pods {
369 pod, err := cs.CoreV1().Pods(p.Namespace).Get(ctx, p.Name, metav1.GetOptions{})
370 if err != nil && !apierrors.IsNotFound(err) {
371 t.Errorf("error while removing pod finalizers for %v: %v", klog.KObj(&p), err)
372 } else if pod != nil && len(pod.Finalizers) > 0 {
373
374
375 patchBytes, _ := json.Marshal(map[string]interface{}{
376 "metadata": map[string]interface{}{
377 "$deleteFromPrimitiveList/finalizers": pod.Finalizers,
378 },
379 })
380 _, err = cs.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
381 if err != nil {
382 t.Errorf("error while updating pod status for %v: %v", klog.KObj(&p), err)
383 }
384 }
385 }
386 }
387
388
389 func CleanupPods(ctx context.Context, cs clientset.Interface, t *testing.T, pods []*v1.Pod) {
390 for _, p := range pods {
391 err := cs.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, *metav1.NewDeleteOptions(0))
392 if err != nil && !apierrors.IsNotFound(err) {
393 t.Errorf("error while deleting pod %v/%v: %v", p.Namespace, p.Name, err)
394 }
395 }
396 for _, p := range pods {
397 if err := wait.PollUntilContextTimeout(ctx, time.Duration(time.Microsecond.Seconds()), wait.ForeverTestTimeout, true,
398 PodDeleted(ctx, cs, p.Namespace, p.Name)); err != nil {
399 t.Errorf("error while waiting for pod %v/%v to get deleted: %v", p.Namespace, p.Name, err)
400 }
401 }
402 }
403
404
405 func AddTaintToNode(cs clientset.Interface, nodeName string, taint v1.Taint) error {
406 node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
407 if err != nil {
408 return err
409 }
410 node.Spec.Taints = append(node.Spec.Taints, taint)
411 _, err = cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
412 return err
413 }
414
415
416 func RemoveTaintOffNode(cs clientset.Interface, nodeName string, taint v1.Taint) error {
417 node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
418 if err != nil {
419 return err
420 }
421 var taints []v1.Taint
422 for _, t := range node.Spec.Taints {
423 if !t.MatchTaint(&taint) {
424 taints = append(taints, t)
425 }
426 }
427 node.Spec.Taints = taints
428 _, err = cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
429 return err
430 }
431
432
433
434 func WaitForNodeTaints(cs clientset.Interface, node *v1.Node, taints []v1.Taint) error {
435 return wait.Poll(100*time.Millisecond, 30*time.Second, NodeTainted(cs, node.Name, taints))
436 }
437
438
439
440 func NodeTainted(cs clientset.Interface, nodeName string, taints []v1.Taint) wait.ConditionFunc {
441 return func() (bool, error) {
442 node, err := cs.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
443 if err != nil {
444 return false, err
445 }
446
447
448 if len(taints) > len(node.Spec.Taints) {
449 return false, nil
450 }
451
452 for _, taint := range taints {
453 if !taintutils.TaintExists(node.Spec.Taints, &taint) {
454 return false, nil
455 }
456 }
457
458 return true, nil
459 }
460 }
461
462
463
464 func NodeReadyStatus(conditions []v1.NodeCondition) (v1.ConditionStatus, error) {
465 for _, c := range conditions {
466 if c.Type != v1.NodeReady {
467 continue
468 }
469
470 return c.Status, nil
471 }
472 return v1.ConditionFalse, errors.New("None of the conditions is of type NodeReady")
473 }
474
475
476 func GetTolerationSeconds(tolerations []v1.Toleration) (int64, error) {
477 for _, t := range tolerations {
478 if t.Key == v1.TaintNodeNotReady && t.Effect == v1.TaintEffectNoExecute && t.Operator == v1.TolerationOpExists {
479 return *t.TolerationSeconds, nil
480 }
481 }
482 return 0, fmt.Errorf("cannot find toleration")
483 }
484
485
486 func NodeCopyWithConditions(node *v1.Node, conditions []v1.NodeCondition) *v1.Node {
487 copy := node.DeepCopy()
488 copy.ResourceVersion = "0"
489 copy.Status.Conditions = conditions
490 for i := range copy.Status.Conditions {
491 copy.Status.Conditions[i].LastHeartbeatTime = metav1.Now()
492 }
493 return copy
494 }
495
496
497 func UpdateNodeStatus(cs clientset.Interface, node *v1.Node) error {
498 _, err := cs.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{})
499 return err
500 }
501
502
503
504
505
506 func InitTestAPIServer(t *testing.T, nsPrefix string, admission admission.Interface) *TestContext {
507 tCtx := ktesting.Init(t)
508 testCtx := &TestContext{Ctx: tCtx}
509
510 testCtx.ClientSet, testCtx.KubeConfig, testCtx.CloseFn = framework.StartTestServer(tCtx, t, framework.TestServerSetup{
511 ModifyServerRunOptions: func(options *options.ServerRunOptions) {
512 options.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition", "Priority", "StorageObjectInUseProtection"}
513 if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
514 options.APIEnablement.RuntimeConfig = cliflag.ConfigurationMap{
515 resourcev1alpha2.SchemeGroupVersion.String(): "true",
516 }
517 }
518 },
519 ModifyServerConfig: func(config *controlplane.Config) {
520 if admission != nil {
521 config.GenericConfig.AdmissionControl = admission
522 }
523 },
524 })
525
526
527 testCtx.KubeConfig.Wrap(func(transport http.RoundTripper) http.RoundTripper {
528 return roundTripWrapper{tc: testCtx, transport: transport}
529 })
530 var err error
531 testCtx.ClientSet, err = clientset.NewForConfig(testCtx.KubeConfig)
532 if err != nil {
533 t.Fatal(err)
534 }
535
536 oldCloseFn := testCtx.CloseFn
537 testCtx.CloseFn = func() {
538 tCtx.Cancel("tearing down apiserver")
539 oldCloseFn()
540 }
541
542 if nsPrefix != "default" {
543 testCtx.NS = framework.CreateNamespaceOrDie(testCtx.ClientSet, nsPrefix+string(uuid.NewUUID()), t)
544 } else {
545 testCtx.NS = framework.CreateNamespaceOrDie(testCtx.ClientSet, "default", t)
546 }
547
548 t.Cleanup(func() {
549 CleanupTest(t, testCtx)
550 })
551
552 return testCtx
553 }
554
555
556 func WaitForSchedulerCacheCleanup(sched *scheduler.Scheduler, t *testing.T) {
557 schedulerCacheIsEmpty := func() (bool, error) {
558 dump := sched.Cache.Dump()
559
560 return len(dump.Nodes) == 0 && len(dump.AssumedPods) == 0, nil
561 }
562
563 if err := wait.Poll(time.Second, wait.ForeverTestTimeout, schedulerCacheIsEmpty); err != nil {
564 t.Errorf("Failed to wait for scheduler cache cleanup: %v", err)
565 }
566 }
567
568
569
570 func InitTestScheduler(
571 t *testing.T,
572 testCtx *TestContext,
573 ) *TestContext {
574
575 return InitTestSchedulerWithOptions(t, testCtx, 0)
576 }
577
578
579
580 func InitTestSchedulerWithOptions(
581 t *testing.T,
582 testCtx *TestContext,
583 resyncPeriod time.Duration,
584 opts ...scheduler.Option,
585 ) *TestContext {
586 ctx, cancel := context.WithCancel(testCtx.Ctx)
587 testCtx.SchedulerCtx = ctx
588
589
590 testCtx.InformerFactory = scheduler.NewInformerFactory(testCtx.ClientSet, resyncPeriod)
591 if testCtx.KubeConfig != nil {
592 dynClient := dynamic.NewForConfigOrDie(testCtx.KubeConfig)
593 testCtx.DynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)
594 }
595
596 var err error
597 eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
598 Interface: testCtx.ClientSet.EventsV1(),
599 })
600
601 opts = append(opts, scheduler.WithKubeConfig(testCtx.KubeConfig))
602 testCtx.Scheduler, err = scheduler.New(
603 ctx,
604 testCtx.ClientSet,
605 testCtx.InformerFactory,
606 testCtx.DynInformerFactory,
607 profile.NewRecorderFactory(eventBroadcaster),
608 opts...,
609 )
610
611 if err != nil {
612 t.Fatalf("Couldn't create scheduler: %v", err)
613 }
614
615 if !testCtx.DisableEventSink {
616 eventBroadcaster.StartRecordingToSink(ctx.Done())
617 }
618
619 oldCloseFn := testCtx.CloseFn
620 testCtx.CloseFn = func() {
621 oldCloseFn()
622 eventBroadcaster.Shutdown()
623 }
624
625 testCtx.SchedulerCloseFn = func() {
626 cancel()
627 eventBroadcaster.Shutdown()
628 }
629
630 return testCtx
631 }
632
633
634
635 func WaitForPodToScheduleWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
636 return wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, timeout, false, PodScheduled(cs, pod.Namespace, pod.Name))
637 }
638
639
640
641 func WaitForPodToSchedule(cs clientset.Interface, pod *v1.Pod) error {
642 return WaitForPodToScheduleWithTimeout(cs, pod, 30*time.Second)
643 }
644
645
646 func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
647 return func(ctx context.Context) (bool, error) {
648 pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
649 if err != nil {
650
651 return false, nil
652 }
653 if pod.Spec.NodeName == "" {
654 return false, nil
655 }
656 return true, nil
657 }
658 }
659
660
661
662 func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController {
663 informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour)
664
665 discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery())
666 mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient)
667
668 config := restclient.CopyConfig(testCtx.KubeConfig)
669 scaleKindResolver := scale.NewDiscoveryScaleKindResolver(testCtx.ClientSet.Discovery())
670 scaleClient, err := scale.NewForConfig(config, mapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
671 if err != nil {
672 t.Fatalf("Error in create scaleClient: %v", err)
673 }
674
675 dc := disruption.NewDisruptionController(
676 testCtx.Ctx,
677 informers.Core().V1().Pods(),
678 informers.Policy().V1().PodDisruptionBudgets(),
679 informers.Core().V1().ReplicationControllers(),
680 informers.Apps().V1().ReplicaSets(),
681 informers.Apps().V1().Deployments(),
682 informers.Apps().V1().StatefulSets(),
683 testCtx.ClientSet,
684 mapper,
685 scaleClient,
686 testCtx.ClientSet.Discovery())
687
688 informers.Start(testCtx.Scheduler.StopEverything)
689 informers.WaitForCacheSync(testCtx.Scheduler.StopEverything)
690 go dc.Run(testCtx.Ctx)
691 return dc
692 }
693
694
695
696 func InitTestSchedulerWithNS(t *testing.T, nsPrefix string, opts ...scheduler.Option) *TestContext {
697 testCtx := InitTestSchedulerWithOptions(t, InitTestAPIServer(t, nsPrefix, nil), 0, opts...)
698 SyncSchedulerInformerFactory(testCtx)
699 go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
700 return testCtx
701 }
702
703
704
705 func InitTestDisablePreemption(t *testing.T, nsPrefix string) *TestContext {
706 cfg := configtesting.V1ToInternalWithDefaults(t, kubeschedulerconfigv1.KubeSchedulerConfiguration{
707 Profiles: []kubeschedulerconfigv1.KubeSchedulerProfile{{
708 SchedulerName: ptr.To(v1.DefaultSchedulerName),
709 Plugins: &kubeschedulerconfigv1.Plugins{
710 PostFilter: kubeschedulerconfigv1.PluginSet{
711 Disabled: []kubeschedulerconfigv1.Plugin{
712 {Name: defaultpreemption.Name},
713 },
714 },
715 },
716 }},
717 })
718 testCtx := InitTestSchedulerWithOptions(
719 t, InitTestAPIServer(t, nsPrefix, nil),
720 0,
721 scheduler.WithProfiles(cfg.Profiles...))
722 SyncSchedulerInformerFactory(testCtx)
723 go testCtx.Scheduler.Run(testCtx.SchedulerCtx)
724 return testCtx
725 }
726
727
728
729 func WaitForReflection(t *testing.T, nodeLister corelisters.NodeLister, key string,
730 passFunc func(n interface{}) bool) error {
731 var nodes []*v1.Node
732 err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
733 n, err := nodeLister.Get(key)
734
735 switch {
736 case err == nil && passFunc(n):
737 return true, nil
738 case apierrors.IsNotFound(err):
739 nodes = append(nodes, nil)
740 case err != nil:
741 t.Errorf("Unexpected error: %v", err)
742 default:
743 nodes = append(nodes, n)
744 }
745
746 return false, nil
747 })
748 if err != nil {
749 t.Logf("Logging consecutive node versions received from store:")
750 for i, n := range nodes {
751 t.Logf("%d: %#v", i, n)
752 }
753 }
754 return err
755 }
756
757 func UpdateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
758 return cs.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
759 }
760
761 func CreateNode(cs clientset.Interface, node *v1.Node) (*v1.Node, error) {
762 return cs.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
763 }
764
765 func createNodes(cs clientset.Interface, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
766 nodes := make([]*v1.Node, numNodes)
767 for i := 0; i < numNodes; i++ {
768 nodeName := fmt.Sprintf("%v-%d", prefix, i)
769 node, err := CreateNode(cs, wrapper.Name(nodeName).Label("kubernetes.io/hostname", nodeName).Obj())
770 if err != nil {
771 return nodes[:], err
772 }
773 nodes[i] = node
774 }
775 return nodes[:], nil
776 }
777
778
779
780 func CreateAndWaitForNodesInCache(testCtx *TestContext, prefix string, wrapper *st.NodeWrapper, numNodes int) ([]*v1.Node, error) {
781 existingNodes := testCtx.Scheduler.Cache.NodeCount()
782 nodes, err := createNodes(testCtx.ClientSet, prefix, wrapper, numNodes)
783 if err != nil {
784 return nodes, fmt.Errorf("cannot create nodes: %v", err)
785 }
786 return nodes, WaitForNodesInCache(testCtx.Scheduler, numNodes+existingNodes)
787 }
788
789
790
791 func WaitForNodesInCache(sched *scheduler.Scheduler, nodeCount int) error {
792 err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
793 return sched.Cache.NodeCount() >= nodeCount, nil
794 })
795 if err != nil {
796 return fmt.Errorf("cannot obtain available nodes in scheduler cache: %v", err)
797 }
798 return nil
799 }
800
801 type PausePodConfig struct {
802 Name string
803 Namespace string
804 Affinity *v1.Affinity
805 Annotations, Labels, NodeSelector map[string]string
806 Resources *v1.ResourceRequirements
807 Tolerations []v1.Toleration
808 NodeName string
809 SchedulerName string
810 Priority *int32
811 PreemptionPolicy *v1.PreemptionPolicy
812 PriorityClassName string
813 Volumes []v1.Volume
814 }
815
816
817
818 func InitPausePod(conf *PausePodConfig) *v1.Pod {
819 pod := &v1.Pod{
820 ObjectMeta: metav1.ObjectMeta{
821 Name: conf.Name,
822 Namespace: conf.Namespace,
823 Labels: conf.Labels,
824 Annotations: conf.Annotations,
825 },
826 Spec: v1.PodSpec{
827 NodeSelector: conf.NodeSelector,
828 Affinity: conf.Affinity,
829 Containers: []v1.Container{
830 {
831 Name: conf.Name,
832 Image: imageutils.GetPauseImageName(),
833 },
834 },
835 Tolerations: conf.Tolerations,
836 NodeName: conf.NodeName,
837 SchedulerName: conf.SchedulerName,
838 Priority: conf.Priority,
839 PreemptionPolicy: conf.PreemptionPolicy,
840 PriorityClassName: conf.PriorityClassName,
841 Volumes: conf.Volumes,
842 },
843 }
844 if conf.Resources != nil {
845 pod.Spec.Containers[0].Resources = *conf.Resources
846 }
847 return pod
848 }
849
850
851
852 func CreatePausePod(cs clientset.Interface, p *v1.Pod) (*v1.Pod, error) {
853 return cs.CoreV1().Pods(p.Namespace).Create(context.TODO(), p, metav1.CreateOptions{})
854 }
855
856
857
858
859 func CreatePausePodWithResource(cs clientset.Interface, podName string,
860 nsName string, res *v1.ResourceList) (*v1.Pod, error) {
861 var conf PausePodConfig
862 if res == nil {
863 conf = PausePodConfig{
864 Name: podName,
865 Namespace: nsName,
866 }
867 } else {
868 conf = PausePodConfig{
869 Name: podName,
870 Namespace: nsName,
871 Resources: &v1.ResourceRequirements{
872 Requests: *res,
873 },
874 }
875 }
876 return CreatePausePod(cs, InitPausePod(&conf))
877 }
878
879
880
881 func CreatePVC(cs clientset.Interface, pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) {
882 return cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
883 }
884
885
886
887 func CreatePV(cs clientset.Interface, pv *v1.PersistentVolume) (*v1.PersistentVolume, error) {
888 return cs.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{})
889 }
890
891
892 func DeletePVC(cs clientset.Interface, pvcName string, nsName string) error {
893 return cs.CoreV1().PersistentVolumeClaims(nsName).Delete(context.TODO(), pvcName, *metav1.NewDeleteOptions(0))
894 }
895
896
897 func DeletePV(cs clientset.Interface, pvName string) error {
898 return cs.CoreV1().PersistentVolumes().Delete(context.TODO(), pvName, *metav1.NewDeleteOptions(0))
899 }
900
901
902
903 func RunPausePod(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
904 pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
905 if err != nil {
906 return nil, fmt.Errorf("failed to create pause pod: %v", err)
907 }
908 if err = WaitForPodToSchedule(cs, pod); err != nil {
909 return pod, fmt.Errorf("Pod %v/%v didn't schedule successfully. Error: %v", pod.Namespace, pod.Name, err)
910 }
911 if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
912 return pod, fmt.Errorf("failed to get pod %v/%v info: %v", pod.Namespace, pod.Name, err)
913 }
914 return pod, nil
915 }
916
917 type PodWithContainersConfig struct {
918 Name string
919 Namespace string
920 Containers []v1.Container
921 }
922
923
924
925 func InitPodWithContainers(cs clientset.Interface, conf *PodWithContainersConfig) *v1.Pod {
926 pod := &v1.Pod{
927 ObjectMeta: metav1.ObjectMeta{
928 Name: conf.Name,
929 Namespace: conf.Namespace,
930 },
931 Spec: v1.PodSpec{
932 Containers: conf.Containers,
933 },
934 }
935 return pod
936 }
937
938
939
940 func RunPodWithContainers(cs clientset.Interface, pod *v1.Pod) (*v1.Pod, error) {
941 pod, err := cs.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
942 if err != nil {
943 return nil, fmt.Errorf("failed to create pod-with-containers: %v", err)
944 }
945 if err = WaitForPodToSchedule(cs, pod); err != nil {
946 return pod, fmt.Errorf("Pod %v didn't schedule successfully. Error: %v", pod.Name, err)
947 }
948 if pod, err = cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
949 return pod, fmt.Errorf("failed to get pod %v info: %v", pod.Name, err)
950 }
951 return pod, nil
952 }
953
954
955 func PodIsGettingEvicted(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
956 return func(ctx context.Context) (bool, error) {
957 pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
958 if err != nil {
959 return false, err
960 }
961 if pod.DeletionTimestamp != nil {
962 return true, nil
963 }
964 return false, nil
965 }
966 }
967
968
969 func PodScheduledIn(c clientset.Interface, podNamespace, podName string, nodeNames []string) wait.ConditionWithContextFunc {
970 return func(ctx context.Context) (bool, error) {
971 pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
972 if err != nil {
973
974 return false, nil
975 }
976 if pod.Spec.NodeName == "" {
977 return false, nil
978 }
979 for _, nodeName := range nodeNames {
980 if pod.Spec.NodeName == nodeName {
981 return true, nil
982 }
983 }
984 return false, nil
985 }
986 }
987
988
989
990 func PodUnschedulable(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
991 return func(ctx context.Context) (bool, error) {
992 pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
993 if err != nil {
994
995 return false, nil
996 }
997 _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
998 return cond != nil && cond.Status == v1.ConditionFalse &&
999 cond.Reason == v1.PodReasonUnschedulable && pod.Spec.NodeName == "", nil
1000 }
1001 }
1002
1003
1004
1005
1006 func PodSchedulingError(c clientset.Interface, podNamespace, podName string) wait.ConditionWithContextFunc {
1007 return func(ctx context.Context) (bool, error) {
1008 pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
1009 if err != nil {
1010
1011 return false, nil
1012 }
1013 _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
1014 return cond != nil && cond.Status == v1.ConditionFalse &&
1015 cond.Reason != v1.PodReasonUnschedulable, nil
1016 }
1017 }
1018
1019
1020
1021 func PodSchedulingGated(c clientset.Interface, podNamespace, podName string) wait.ConditionFunc {
1022 return func() (bool, error) {
1023 pod, err := c.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
1024 if err != nil {
1025
1026 return false, nil
1027 }
1028 _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
1029 return cond != nil && cond.Status == v1.ConditionFalse &&
1030 cond.Reason == v1.PodReasonSchedulingGated && pod.Spec.NodeName == "", nil
1031 }
1032 }
1033
1034
1035
1036 func WaitForPodUnschedulableWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
1037 return wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, timeout, false, PodUnschedulable(cs, pod.Namespace, pod.Name))
1038 }
1039
1040
1041
1042 func WaitForPodUnschedulable(cs clientset.Interface, pod *v1.Pod) error {
1043 return WaitForPodUnschedulableWithTimeout(cs, pod, 30*time.Second)
1044 }
1045
1046
1047
1048 func WaitForPodSchedulingGated(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error {
1049 return wait.Poll(100*time.Millisecond, timeout, PodSchedulingGated(cs, pod.Namespace, pod.Name))
1050 }
1051
1052
1053
1054 func WaitForPDBsStable(testCtx *TestContext, pdbs []*policy.PodDisruptionBudget, pdbPodNum []int32) error {
1055 return wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
1056 pdbList, err := testCtx.ClientSet.PolicyV1().PodDisruptionBudgets(testCtx.NS.Name).List(context.TODO(), metav1.ListOptions{})
1057 if err != nil {
1058 return false, err
1059 }
1060 if len(pdbList.Items) != len(pdbs) {
1061 return false, nil
1062 }
1063 for i, pdb := range pdbs {
1064 found := false
1065 for _, cpdb := range pdbList.Items {
1066 if pdb.Name == cpdb.Name && pdb.Namespace == cpdb.Namespace {
1067 found = true
1068 if cpdb.Status.CurrentHealthy != pdbPodNum[i] {
1069 return false, nil
1070 }
1071 }
1072 }
1073 if !found {
1074 return false, nil
1075 }
1076 }
1077 return true, nil
1078 })
1079 }
1080
1081
1082 func WaitCachedPodsStable(testCtx *TestContext, pods []*v1.Pod) error {
1083 return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
1084 cachedPods, err := testCtx.Scheduler.Cache.PodCount()
1085 if err != nil {
1086 return false, err
1087 }
1088 if len(pods) != cachedPods {
1089 return false, nil
1090 }
1091 for _, p := range pods {
1092 actualPod, err1 := testCtx.ClientSet.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{})
1093 if err1 != nil {
1094 return false, err1
1095 }
1096 cachedPod, err2 := testCtx.Scheduler.Cache.GetPod(actualPod)
1097 if err2 != nil || cachedPod == nil {
1098 return false, err2
1099 }
1100 }
1101 return true, nil
1102 })
1103 }
1104
1105
1106 func DeletePod(cs clientset.Interface, podName string, nsName string) error {
1107 return cs.CoreV1().Pods(nsName).Delete(context.TODO(), podName, *metav1.NewDeleteOptions(0))
1108 }
1109
1110 func GetPod(cs clientset.Interface, podName string, podNamespace string) (*v1.Pod, error) {
1111 return cs.CoreV1().Pods(podNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
1112 }
1113
1114 func CreateNamespacesWithLabels(cs clientset.Interface, namespaces []string, labels map[string]string) error {
1115 for _, n := range namespaces {
1116 ns := v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: n, Labels: labels}}
1117 if _, err := cs.CoreV1().Namespaces().Create(context.TODO(), &ns, metav1.CreateOptions{}); err != nil {
1118 return err
1119 }
1120 }
1121 return nil
1122 }
1123
1124
1125
1126 func timeout(ctx context.Context, d time.Duration, f func()) error {
1127 ctx, cancel := context.WithTimeout(ctx, d)
1128 defer cancel()
1129
1130 done := make(chan struct{})
1131 go func() {
1132 f()
1133 close(done)
1134 }()
1135
1136 select {
1137 case <-done:
1138 return nil
1139 case <-ctx.Done():
1140 return ctx.Err()
1141 }
1142 }
1143
1144
1145
1146 func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo {
1147 t.Helper()
1148
1149 var podInfo *schedulerframework.QueuedPodInfo
1150 logger := klog.FromContext(testCtx.Ctx)
1151
1152
1153 if err := timeout(testCtx.Ctx, time.Second*5, func() {
1154 podInfo, _ = testCtx.Scheduler.NextPod(logger)
1155 }); err != nil {
1156 t.Fatalf("Timed out waiting for the Pod to be popped: %v", err)
1157 }
1158 return podInfo
1159 }
1160
1161
1162
1163
1164 func NextPod(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo {
1165 t.Helper()
1166
1167 var podInfo *schedulerframework.QueuedPodInfo
1168 logger := klog.FromContext(testCtx.Ctx)
1169
1170
1171 if err := timeout(testCtx.Ctx, time.Second*5, func() {
1172 podInfo, _ = testCtx.Scheduler.NextPod(logger)
1173 }); err != nil {
1174 return nil
1175 }
1176 return podInfo
1177 }
1178
View as plain text