1
16
17 package quota
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "os"
24 "testing"
25 "time"
26
27 v1 "k8s.io/api/core/v1"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 "k8s.io/apimachinery/pkg/api/resource"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/fields"
32 "k8s.io/apimachinery/pkg/labels"
33 "k8s.io/apimachinery/pkg/util/intstr"
34 "k8s.io/apimachinery/pkg/util/wait"
35 "k8s.io/apimachinery/pkg/watch"
36 "k8s.io/apiserver/pkg/quota/v1/generic"
37 "k8s.io/client-go/informers"
38 clientset "k8s.io/client-go/kubernetes"
39 watchtools "k8s.io/client-go/tools/watch"
40 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
41 "k8s.io/kubernetes/pkg/controller"
42 replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
43 resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
44 quotainstall "k8s.io/kubernetes/pkg/quota/v1/install"
45 "k8s.io/kubernetes/test/integration/framework"
46 "k8s.io/kubernetes/test/utils/ktesting"
47 )
48
49 const (
50 resourceQuotaTimeout = 10 * time.Second
51 )
52
53
54
55
56
57
58
59
60
61
62 func TestQuota(t *testing.T) {
63 ctx := ktesting.Init(t)
64
65
66 _, kubeConfig, tearDownFn := framework.StartTestServer(ctx, t, framework.TestServerSetup{
67 ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
68
69 opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
70 },
71 })
72 defer tearDownFn()
73
74 clientset := clientset.NewForConfigOrDie(kubeConfig)
75
76 ns := framework.CreateNamespaceOrDie(clientset, "quotaed", t)
77 defer framework.DeleteNamespaceOrDie(clientset, ns, t)
78 ns2 := framework.CreateNamespaceOrDie(clientset, "non-quotaed", t)
79 defer framework.DeleteNamespaceOrDie(clientset, ns2, t)
80
81 informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
82 rm := replicationcontroller.NewReplicationManager(
83 ctx,
84 informers.Core().V1().Pods(),
85 informers.Core().V1().ReplicationControllers(),
86 clientset,
87 replicationcontroller.BurstReplicas,
88 )
89 go rm.Run(ctx, 3)
90
91 discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
92 listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource)
93 qc := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)
94 informersStarted := make(chan struct{})
95 resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
96 QuotaClient: clientset.CoreV1(),
97 ResourceQuotaInformer: informers.Core().V1().ResourceQuotas(),
98 ResyncPeriod: controller.NoResyncPeriodFunc,
99 InformerFactory: informers,
100 ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
101 DiscoveryFunc: discoveryFunc,
102 IgnoredResourcesFunc: qc.IgnoredResources,
103 InformersStarted: informersStarted,
104 Registry: generic.NewRegistry(qc.Evaluators()),
105 }
106 resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions)
107 if err != nil {
108 t.Fatalf("unexpected err: %v", err)
109 }
110 go resourceQuotaController.Run(ctx, 2)
111
112
113 go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second)
114
115 informers.Start(ctx.Done())
116 close(informersStarted)
117
118 startTime := time.Now()
119 scale(t, ns2.Name, clientset)
120 endTime := time.Now()
121 t.Logf("Took %v to scale up without quota", endTime.Sub(startTime))
122
123 quota := &v1.ResourceQuota{
124 ObjectMeta: metav1.ObjectMeta{
125 Name: "quota",
126 Namespace: ns.Name,
127 },
128 Spec: v1.ResourceQuotaSpec{
129 Hard: v1.ResourceList{
130 v1.ResourcePods: resource.MustParse("1000"),
131 },
132 },
133 }
134 waitForQuota(t, quota, clientset)
135
136 startTime = time.Now()
137 scale(t, "quotaed", clientset)
138 endTime = time.Now()
139 t.Logf("Took %v to scale up with quota", endTime.Sub(startTime))
140 }
141
142 func waitForQuota(t *testing.T, quota *v1.ResourceQuota, clientset *clientset.Clientset) {
143 w, err := clientset.CoreV1().ResourceQuotas(quota.Namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: quota.Name}))
144 if err != nil {
145 t.Fatalf("unexpected error: %v", err)
146 }
147
148 if _, err := clientset.CoreV1().ResourceQuotas(quota.Namespace).Create(context.TODO(), quota, metav1.CreateOptions{}); err != nil {
149 t.Fatalf("unexpected error: %v", err)
150 }
151
152 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
153 defer cancel()
154 _, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
155 switch event.Type {
156 case watch.Modified:
157 default:
158 return false, nil
159 }
160 switch cast := event.Object.(type) {
161 case *v1.ResourceQuota:
162 if len(cast.Status.Hard) > 0 {
163 return true, nil
164 }
165 }
166
167 return false, nil
168 })
169 if err != nil {
170 t.Fatalf("unexpected error: %v", err)
171 }
172 }
173
174
175 func waitForUsedResourceQuota(t *testing.T, c clientset.Interface, ns, quotaName string, used v1.ResourceList) {
176 err := wait.Poll(1*time.Second, resourceQuotaTimeout, func() (bool, error) {
177 resourceQuota, err := c.CoreV1().ResourceQuotas(ns).Get(context.TODO(), quotaName, metav1.GetOptions{})
178 if err != nil {
179 return false, err
180 }
181
182
183 if resourceQuota.Status.Used == nil {
184 return false, nil
185 }
186
187
188 for k, v := range used {
189 actualValue, found := resourceQuota.Status.Used[k]
190 if !found {
191 t.Logf("resource %s was not found in ResourceQuota status", k)
192 return false, nil
193 }
194
195 if !actualValue.Equal(v) {
196 t.Logf("resource %s, expected %s, actual %s", k, v.String(), actualValue.String())
197 return false, nil
198 }
199 }
200 return true, nil
201 })
202 if err != nil {
203 t.Errorf("error waiting or ResourceQuota status: %v", err)
204 }
205 }
206
207 func scale(t *testing.T, namespace string, clientset *clientset.Clientset) {
208 target := int32(100)
209 rc := &v1.ReplicationController{
210 ObjectMeta: metav1.ObjectMeta{
211 Name: "foo",
212 Namespace: namespace,
213 },
214 Spec: v1.ReplicationControllerSpec{
215 Replicas: &target,
216 Selector: map[string]string{"foo": "bar"},
217 Template: &v1.PodTemplateSpec{
218 ObjectMeta: metav1.ObjectMeta{
219 Labels: map[string]string{
220 "foo": "bar",
221 },
222 },
223 Spec: v1.PodSpec{
224 Containers: []v1.Container{
225 {
226 Name: "container",
227 Image: "busybox",
228 },
229 },
230 },
231 },
232 },
233 }
234
235 w, err := clientset.CoreV1().ReplicationControllers(namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: rc.Name}))
236 if err != nil {
237 t.Fatalf("unexpected error: %v", err)
238 }
239
240 if _, err := clientset.CoreV1().ReplicationControllers(namespace).Create(context.TODO(), rc, metav1.CreateOptions{}); err != nil {
241 t.Fatalf("unexpected error: %v", err)
242 }
243
244 ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
245 defer cancel()
246 _, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
247 switch event.Type {
248 case watch.Modified:
249 default:
250 return false, nil
251 }
252
253 switch cast := event.Object.(type) {
254 case *v1.ReplicationController:
255 fmt.Printf("Found %v of %v replicas\n", int(cast.Status.Replicas), target)
256 if cast.Status.Replicas == target {
257 return true, nil
258 }
259 }
260
261 return false, nil
262 })
263 if err != nil {
264 pods, _ := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: labels.Everything().String(), FieldSelector: fields.Everything().String()})
265 t.Fatalf("unexpected error: %v, ended with %v pods", err, len(pods.Items))
266 }
267 }
268
269 func TestQuotaLimitedResourceDenial(t *testing.T) {
270
271 admissionConfigFile, err := os.CreateTemp("", "admission-config.yaml")
272 if err != nil {
273 t.Fatal(err)
274 }
275 defer os.Remove(admissionConfigFile.Name())
276 if err := os.WriteFile(admissionConfigFile.Name(), []byte(`
277 apiVersion: apiserver.k8s.io/v1alpha1
278 kind: AdmissionConfiguration
279 plugins:
280 - name: ResourceQuota
281 configuration:
282 apiVersion: apiserver.config.k8s.io/v1
283 kind: ResourceQuotaConfiguration
284 limitedResources:
285 - resource: pods
286 matchContains:
287 - pods
288 `), os.FileMode(0644)); err != nil {
289 t.Fatal(err)
290 }
291
292 tCtx := ktesting.Init(t)
293
294
295 _, kubeConfig, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
296 ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
297
298 opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
299 opts.Admission.GenericAdmission.ConfigFile = admissionConfigFile.Name()
300
301 },
302 })
303 defer tearDownFn()
304
305 clientset := clientset.NewForConfigOrDie(kubeConfig)
306
307 ns := framework.CreateNamespaceOrDie(clientset, "quota", t)
308 defer framework.DeleteNamespaceOrDie(clientset, ns, t)
309
310 informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
311 rm := replicationcontroller.NewReplicationManager(
312 tCtx,
313 informers.Core().V1().Pods(),
314 informers.Core().V1().ReplicationControllers(),
315 clientset,
316 replicationcontroller.BurstReplicas,
317 )
318 go rm.Run(tCtx, 3)
319
320 discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
321 listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource)
322 qc := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)
323 informersStarted := make(chan struct{})
324 resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
325 QuotaClient: clientset.CoreV1(),
326 ResourceQuotaInformer: informers.Core().V1().ResourceQuotas(),
327 ResyncPeriod: controller.NoResyncPeriodFunc,
328 InformerFactory: informers,
329 ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
330 DiscoveryFunc: discoveryFunc,
331 IgnoredResourcesFunc: qc.IgnoredResources,
332 InformersStarted: informersStarted,
333 Registry: generic.NewRegistry(qc.Evaluators()),
334 }
335 resourceQuotaController, err := resourcequotacontroller.NewController(tCtx, resourceQuotaControllerOptions)
336 if err != nil {
337 t.Fatalf("unexpected err: %v", err)
338 }
339 go resourceQuotaController.Run(tCtx, 2)
340
341
342 go resourceQuotaController.Sync(tCtx, discoveryFunc, 30*time.Second)
343
344 informers.Start(tCtx.Done())
345 close(informersStarted)
346
347
348 pod := &v1.Pod{
349 ObjectMeta: metav1.ObjectMeta{
350 Name: "foo",
351 Namespace: ns.Name,
352 },
353 Spec: v1.PodSpec{
354 Containers: []v1.Container{
355 {
356 Name: "container",
357 Image: "busybox",
358 },
359 },
360 },
361 }
362 if _, err := clientset.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}); err == nil {
363 t.Fatalf("expected error for insufficient quota")
364 }
365
366
367
368 quota := &v1.ResourceQuota{
369 ObjectMeta: metav1.ObjectMeta{
370 Name: "quota",
371 Namespace: ns.Name,
372 },
373 Spec: v1.ResourceQuotaSpec{
374 Hard: v1.ResourceList{
375 v1.ResourcePods: resource.MustParse("1000"),
376 v1.ResourceName("count/pods"): resource.MustParse("1000"),
377 },
378 },
379 }
380 waitForQuota(t, quota, clientset)
381
382
383 err = wait.PollImmediate(5*time.Second, time.Minute, func() (bool, error) {
384
385 if _, err := clientset.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}); err == nil {
386 return true, nil
387 }
388 return false, nil
389 })
390 if err != nil {
391 t.Fatalf("unexpected error: %v", err)
392 }
393 }
394
395 func TestQuotaLimitService(t *testing.T) {
396
397 admissionConfigFile, err := os.CreateTemp("", "admission-config.yaml")
398 if err != nil {
399 t.Fatal(err)
400 }
401 defer os.Remove(admissionConfigFile.Name())
402 if err := os.WriteFile(admissionConfigFile.Name(), []byte(`
403 apiVersion: apiserver.k8s.io/v1alpha1
404 kind: AdmissionConfiguration
405 plugins:
406 - name: ResourceQuota
407 configuration:
408 apiVersion: apiserver.config.k8s.io/v1
409 kind: ResourceQuotaConfiguration
410 limitedResources:
411 - resource: pods
412 matchContains:
413 - pods
414 `), os.FileMode(0644)); err != nil {
415 t.Fatal(err)
416 }
417
418 tCtx := ktesting.Init(t)
419
420
421 _, kubeConfig, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
422 ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
423
424 opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
425 opts.Admission.GenericAdmission.ConfigFile = admissionConfigFile.Name()
426
427 },
428 })
429 defer tearDownFn()
430
431 clientset := clientset.NewForConfigOrDie(kubeConfig)
432
433 ns := framework.CreateNamespaceOrDie(clientset, "quota", t)
434 defer framework.DeleteNamespaceOrDie(clientset, ns, t)
435
436 informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
437 rm := replicationcontroller.NewReplicationManager(
438 tCtx,
439 informers.Core().V1().Pods(),
440 informers.Core().V1().ReplicationControllers(),
441 clientset,
442 replicationcontroller.BurstReplicas,
443 )
444 go rm.Run(tCtx, 3)
445
446 discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
447 listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource)
448 qc := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource)
449 informersStarted := make(chan struct{})
450 resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
451 QuotaClient: clientset.CoreV1(),
452 ResourceQuotaInformer: informers.Core().V1().ResourceQuotas(),
453 ResyncPeriod: controller.NoResyncPeriodFunc,
454 InformerFactory: informers,
455 ReplenishmentResyncPeriod: controller.NoResyncPeriodFunc,
456 DiscoveryFunc: discoveryFunc,
457 IgnoredResourcesFunc: qc.IgnoredResources,
458 InformersStarted: informersStarted,
459 Registry: generic.NewRegistry(qc.Evaluators()),
460 }
461 resourceQuotaController, err := resourcequotacontroller.NewController(tCtx, resourceQuotaControllerOptions)
462 if err != nil {
463 t.Fatalf("unexpected err: %v", err)
464 }
465 go resourceQuotaController.Run(tCtx, 2)
466
467
468 go resourceQuotaController.Sync(tCtx, discoveryFunc, 30*time.Second)
469
470 informers.Start(tCtx.Done())
471 close(informersStarted)
472
473
474
475 quota := &v1.ResourceQuota{
476 ObjectMeta: metav1.ObjectMeta{
477 Name: "quota",
478 Namespace: ns.Name,
479 },
480 Spec: v1.ResourceQuotaSpec{
481 Hard: v1.ResourceList{
482 v1.ResourceServices: resource.MustParse("4"),
483 v1.ResourceServicesNodePorts: resource.MustParse("2"),
484 v1.ResourceServicesLoadBalancers: resource.MustParse("2"),
485 },
486 },
487 }
488
489 waitForQuota(t, quota, clientset)
490
491
492 nodePortService := newService("np-svc", v1.ServiceTypeNodePort, true)
493 _, err = clientset.CoreV1().Services(ns.Name).Create(tCtx, nodePortService, metav1.CreateOptions{})
494 if err != nil {
495 t.Errorf("creating first node port Service should not have returned error: %v", err)
496 }
497
498
499 lbServiceWithNodePort1 := newService("lb-svc-withnp1", v1.ServiceTypeLoadBalancer, true)
500 _, err = clientset.CoreV1().Services(ns.Name).Create(tCtx, lbServiceWithNodePort1, metav1.CreateOptions{})
501 if err != nil {
502 t.Errorf("creating first loadbalancer Service should not have returned error: %v", err)
503 }
504
505
506 expectedQuotaUsed := v1.ResourceList{
507 v1.ResourceServices: resource.MustParse("2"),
508 v1.ResourceServicesNodePorts: resource.MustParse("2"),
509 v1.ResourceServicesLoadBalancers: resource.MustParse("1"),
510 }
511 waitForUsedResourceQuota(t, clientset, quota.Namespace, quota.Name, expectedQuotaUsed)
512
513
514 lbServiceWithNodePort2 := newService("lb-svc-withnp2", v1.ServiceTypeLoadBalancer, true)
515 testServiceForbidden(clientset, ns.Name, lbServiceWithNodePort2, t)
516
517
518 lbServiceWithoutNodePort1 := newService("lb-svc-wonp1", v1.ServiceTypeLoadBalancer, false)
519 _, err = clientset.CoreV1().Services(ns.Name).Create(tCtx, lbServiceWithoutNodePort1, metav1.CreateOptions{})
520 if err != nil {
521 t.Errorf("creating another loadbalancer Service without node ports should not have returned error: %v", err)
522 }
523
524
525 expectedQuotaUsed = v1.ResourceList{
526 v1.ResourceServices: resource.MustParse("3"),
527 v1.ResourceServicesNodePorts: resource.MustParse("2"),
528 v1.ResourceServicesLoadBalancers: resource.MustParse("2"),
529 }
530 waitForUsedResourceQuota(t, clientset, quota.Namespace, quota.Name, expectedQuotaUsed)
531
532
533 lbServiceWithoutNodePort2 := newService("lb-svc-wonp2", v1.ServiceTypeLoadBalancer, false)
534 testServiceForbidden(clientset, ns.Name, lbServiceWithoutNodePort2, t)
535
536
537 clusterIPService1 := newService("clusterip-svc1", v1.ServiceTypeClusterIP, false)
538 _, err = clientset.CoreV1().Services(ns.Name).Create(tCtx, clusterIPService1, metav1.CreateOptions{})
539 if err != nil {
540 t.Errorf("creating a cluster IP Service should not have returned error: %v", err)
541 }
542
543
544 expectedQuotaUsed = v1.ResourceList{
545 v1.ResourceServices: resource.MustParse("4"),
546 v1.ResourceServicesNodePorts: resource.MustParse("2"),
547 v1.ResourceServicesLoadBalancers: resource.MustParse("2"),
548 }
549 waitForUsedResourceQuota(t, clientset, quota.Namespace, quota.Name, expectedQuotaUsed)
550
551
552 clusterIPService2 := newService("clusterip-svc2", v1.ServiceTypeClusterIP, false)
553 testServiceForbidden(clientset, ns.Name, clusterIPService2, t)
554 }
555
556
557 func testServiceForbidden(clientset clientset.Interface, namespace string, service *v1.Service, t *testing.T) {
558 pollErr := wait.PollImmediate(2*time.Second, 30*time.Second, func() (bool, error) {
559 _, err := clientset.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
560 if apierrors.IsForbidden(err) {
561 return true, nil
562 }
563
564 if err == nil {
565 return false, errors.New("creating Service should have returned error but got nil")
566 }
567
568 return false, nil
569
570 })
571 if pollErr != nil {
572 t.Errorf("creating Service should return Forbidden due to resource quota limits but got: %v", pollErr)
573 }
574 }
575
576 func newService(name string, svcType v1.ServiceType, allocateNodePort bool) *v1.Service {
577 var allocateNPs *bool
578
579 if svcType == v1.ServiceTypeLoadBalancer {
580 allocateNPs = &allocateNodePort
581 }
582 return &v1.Service{
583 ObjectMeta: metav1.ObjectMeta{
584 Name: name,
585 },
586 Spec: v1.ServiceSpec{
587 Type: svcType,
588 AllocateLoadBalancerNodePorts: allocateNPs,
589 Ports: []v1.ServicePort{{
590 Port: int32(80),
591 TargetPort: intstr.FromInt32(80),
592 }},
593 },
594 }
595 }
596
View as plain text