1
16
17 package resourcequota
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "sync"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 apiequality "k8s.io/apimachinery/pkg/api/equality"
28 "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/labels"
31 "k8s.io/apimachinery/pkg/runtime/schema"
32 utilerrors "k8s.io/apimachinery/pkg/util/errors"
33 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
34 "k8s.io/apimachinery/pkg/util/sets"
35 "k8s.io/apimachinery/pkg/util/wait"
36 quota "k8s.io/apiserver/pkg/quota/v1"
37 "k8s.io/client-go/discovery"
38 coreinformers "k8s.io/client-go/informers/core/v1"
39 corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
40 corelisters "k8s.io/client-go/listers/core/v1"
41 "k8s.io/client-go/tools/cache"
42 "k8s.io/client-go/util/workqueue"
43 "k8s.io/controller-manager/pkg/informerfactory"
44 "k8s.io/klog/v2"
45 "k8s.io/kubernetes/pkg/controller"
46 )
47
48
49 type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
50
51
52
53 type ReplenishmentFunc func(ctx context.Context, groupResource schema.GroupResource, namespace string)
54
55
56 type ControllerOptions struct {
57
58 QuotaClient corev1client.ResourceQuotasGetter
59
60 ResourceQuotaInformer coreinformers.ResourceQuotaInformer
61
62 ResyncPeriod controller.ResyncPeriodFunc
63
64 Registry quota.Registry
65
66 DiscoveryFunc NamespacedResourcesFunc
67
68 IgnoredResourcesFunc func() map[schema.GroupResource]struct{}
69
70 InformersStarted <-chan struct{}
71
72 InformerFactory informerfactory.InformerFactory
73
74 ReplenishmentResyncPeriod controller.ResyncPeriodFunc
75
76 UpdateFilter UpdateFilter
77 }
78
79
80 type Controller struct {
81
82 rqClient corev1client.ResourceQuotasGetter
83
84 rqLister corelisters.ResourceQuotaLister
85
86 informerSyncedFuncs []cache.InformerSynced
87
88 queue workqueue.RateLimitingInterface
89
90 missingUsageQueue workqueue.RateLimitingInterface
91
92 syncHandler func(ctx context.Context, key string) error
93
94 resyncPeriod controller.ResyncPeriodFunc
95
96 registry quota.Registry
97
98 quotaMonitor *QuotaMonitor
99
100
101
102 workerLock sync.RWMutex
103 }
104
105
106 func NewController(ctx context.Context, options *ControllerOptions) (*Controller, error) {
107
108 rq := &Controller{
109 rqClient: options.QuotaClient,
110 rqLister: options.ResourceQuotaInformer.Lister(),
111 informerSyncedFuncs: []cache.InformerSynced{options.ResourceQuotaInformer.Informer().HasSynced},
112 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
113 missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
114 resyncPeriod: options.ResyncPeriod,
115 registry: options.Registry,
116 }
117
118 rq.syncHandler = rq.syncResourceQuotaFromKey
119
120 logger := klog.FromContext(ctx)
121
122 options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
123 cache.ResourceEventHandlerFuncs{
124 AddFunc: func(obj interface{}) {
125 rq.addQuota(logger, obj)
126 },
127 UpdateFunc: func(old, cur interface{}) {
128
129
130
131
132
133
134
135
136 oldResourceQuota := old.(*v1.ResourceQuota)
137 curResourceQuota := cur.(*v1.ResourceQuota)
138 if quota.Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
139 return
140 }
141 rq.addQuota(logger, curResourceQuota)
142 },
143
144
145
146 DeleteFunc: func(obj interface{}) {
147 rq.enqueueResourceQuota(logger, obj)
148 },
149 },
150 rq.resyncPeriod(),
151 )
152
153 if options.DiscoveryFunc != nil {
154 qm := NewMonitor(
155 options.InformersStarted,
156 options.InformerFactory,
157 options.IgnoredResourcesFunc(),
158 options.ReplenishmentResyncPeriod,
159 rq.replenishQuota,
160 rq.registry,
161 options.UpdateFilter,
162 )
163
164 rq.quotaMonitor = qm
165
166
167 resources, err := GetQuotableResources(options.DiscoveryFunc)
168 if discovery.IsGroupDiscoveryFailedError(err) {
169 utilruntime.HandleError(fmt.Errorf("initial discovery check failure, continuing and counting on future sync update: %v", err))
170 } else if err != nil {
171 return nil, err
172 }
173
174 if err = qm.SyncMonitors(ctx, resources); err != nil {
175 utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
176 }
177
178
179 rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, func() bool {
180 return qm.IsSynced(ctx)
181 })
182 }
183
184 return rq, nil
185 }
186
187
188 func (rq *Controller) enqueueAll(ctx context.Context) {
189 logger := klog.FromContext(ctx)
190 defer logger.V(4).Info("Resource quota controller queued all resource quota for full calculation of usage")
191 rqs, err := rq.rqLister.List(labels.Everything())
192 if err != nil {
193 utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
194 return
195 }
196 for i := range rqs {
197 key, err := controller.KeyFunc(rqs[i])
198 if err != nil {
199 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", rqs[i], err))
200 continue
201 }
202 rq.queue.Add(key)
203 }
204 }
205
206
207 func (rq *Controller) enqueueResourceQuota(logger klog.Logger, obj interface{}) {
208 key, err := controller.KeyFunc(obj)
209 if err != nil {
210 logger.Error(err, "Couldn't get key", "object", obj)
211 return
212 }
213 rq.queue.Add(key)
214 }
215
216 func (rq *Controller) addQuota(logger klog.Logger, obj interface{}) {
217 key, err := controller.KeyFunc(obj)
218 if err != nil {
219 logger.Error(err, "Couldn't get key", "object", obj)
220 return
221 }
222
223 resourceQuota := obj.(*v1.ResourceQuota)
224
225
226 if !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
227 rq.missingUsageQueue.Add(key)
228 return
229 }
230
231
232 for constraint := range resourceQuota.Status.Hard {
233 if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
234 matchedResources := []v1.ResourceName{constraint}
235 for _, evaluator := range rq.registry.List() {
236 if intersection := evaluator.MatchingResources(matchedResources); len(intersection) > 0 {
237 rq.missingUsageQueue.Add(key)
238 return
239 }
240 }
241 }
242 }
243
244
245 rq.queue.Add(key)
246 }
247
248
249 func (rq *Controller) worker(queue workqueue.RateLimitingInterface) func(context.Context) {
250 workFunc := func(ctx context.Context) bool {
251 key, quit := queue.Get()
252 if quit {
253 return true
254 }
255 defer queue.Done(key)
256
257 rq.workerLock.RLock()
258 defer rq.workerLock.RUnlock()
259
260 logger := klog.FromContext(ctx)
261 logger = klog.LoggerWithValues(logger, "queueKey", key)
262 ctx = klog.NewContext(ctx, logger)
263
264 err := rq.syncHandler(ctx, key.(string))
265 if err == nil {
266 queue.Forget(key)
267 return false
268 }
269
270 utilruntime.HandleError(err)
271 queue.AddRateLimited(key)
272
273 return false
274 }
275
276 return func(ctx context.Context) {
277 for {
278 if quit := workFunc(ctx); quit {
279 klog.FromContext(ctx).Info("resource quota controller worker shutting down")
280 return
281 }
282 }
283 }
284 }
285
286
287 func (rq *Controller) Run(ctx context.Context, workers int) {
288 defer utilruntime.HandleCrash()
289 defer rq.queue.ShutDown()
290 defer rq.missingUsageQueue.ShutDown()
291
292 logger := klog.FromContext(ctx)
293
294 logger.Info("Starting resource quota controller")
295 defer logger.Info("Shutting down resource quota controller")
296
297 if rq.quotaMonitor != nil {
298 go rq.quotaMonitor.Run(ctx)
299 }
300
301 if !cache.WaitForNamedCacheSync("resource quota", ctx.Done(), rq.informerSyncedFuncs...) {
302 return
303 }
304
305
306 for i := 0; i < workers; i++ {
307 go wait.UntilWithContext(ctx, rq.worker(rq.queue), time.Second)
308 go wait.UntilWithContext(ctx, rq.worker(rq.missingUsageQueue), time.Second)
309 }
310
311 if rq.resyncPeriod() > 0 {
312 go wait.UntilWithContext(ctx, rq.enqueueAll, rq.resyncPeriod())
313 } else {
314 logger.Info("periodic quota controller resync disabled")
315 }
316 <-ctx.Done()
317 }
318
319
320 func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string) (err error) {
321 startTime := time.Now()
322
323 logger := klog.FromContext(ctx)
324 logger = klog.LoggerWithValues(logger, "key", key)
325
326 defer func() {
327 logger.V(4).Info("Finished syncing resource quota", "key", key, "duration", time.Since(startTime))
328 }()
329
330 namespace, name, err := cache.SplitMetaNamespaceKey(key)
331 if err != nil {
332 return err
333 }
334 resourceQuota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
335 if errors.IsNotFound(err) {
336 logger.Info("Resource quota has been deleted", "key", key)
337 return nil
338 }
339 if err != nil {
340 logger.Error(err, "Unable to retrieve resource quota from store", "key", key)
341 return err
342 }
343 return rq.syncResourceQuota(ctx, resourceQuota)
344 }
345
346
347 func (rq *Controller) syncResourceQuota(ctx context.Context, resourceQuota *v1.ResourceQuota) (err error) {
348
349 statusLimitsDirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
350
351
352
353
354 dirty := statusLimitsDirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil
355
356 used := v1.ResourceList{}
357 if resourceQuota.Status.Used != nil {
358 used = quota.Add(v1.ResourceList{}, resourceQuota.Status.Used)
359 }
360 hardLimits := quota.Add(v1.ResourceList{}, resourceQuota.Spec.Hard)
361
362 var errs []error
363
364 newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry, resourceQuota.Spec.ScopeSelector)
365 if err != nil {
366
367 errs = append(errs, err)
368 }
369 for key, value := range newUsage {
370 used[key] = value
371 }
372
373
374 hardResources := quota.ResourceNames(hardLimits)
375 used = quota.Mask(used, hardResources)
376
377
378
379 usage := resourceQuota.DeepCopy()
380 usage.Status = v1.ResourceQuotaStatus{
381 Hard: hardLimits,
382 Used: used,
383 }
384
385 dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)
386
387
388 if dirty {
389 _, err = rq.rqClient.ResourceQuotas(usage.Namespace).UpdateStatus(ctx, usage, metav1.UpdateOptions{})
390 if err != nil {
391 errs = append(errs, err)
392 }
393 }
394 return utilerrors.NewAggregate(errs)
395 }
396
397
398 func (rq *Controller) replenishQuota(ctx context.Context, groupResource schema.GroupResource, namespace string) {
399
400 evaluator := rq.registry.Get(groupResource)
401 if evaluator == nil {
402 return
403 }
404
405
406 resourceQuotas, err := rq.rqLister.ResourceQuotas(namespace).List(labels.Everything())
407 if errors.IsNotFound(err) {
408 utilruntime.HandleError(fmt.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod()))
409 return
410 }
411 if err != nil {
412 utilruntime.HandleError(fmt.Errorf("error checking to see if namespace %s has any ResourceQuota associated with it: %v", namespace, err))
413 return
414 }
415 if len(resourceQuotas) == 0 {
416 return
417 }
418
419 logger := klog.FromContext(ctx)
420
421
422 for i := range resourceQuotas {
423 resourceQuota := resourceQuotas[i]
424 resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
425 if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
426
427 rq.enqueueResourceQuota(logger, resourceQuota)
428 }
429 }
430 }
431
432
433 func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResourcesFunc, period time.Duration) {
434
435 oldResources := make(map[schema.GroupVersionResource]struct{})
436 wait.UntilWithContext(ctx, func(ctx context.Context) {
437
438 newResources, err := GetQuotableResources(discoveryFunc)
439 if err != nil {
440 utilruntime.HandleError(err)
441
442 if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure && len(newResources) > 0 {
443
444 for k, v := range oldResources {
445 if _, failed := groupLookupFailures[k.GroupVersion()]; failed {
446 newResources[k] = v
447 }
448 }
449 } else {
450
451 return
452 }
453 }
454
455 logger := klog.FromContext(ctx)
456
457
458 if reflect.DeepEqual(oldResources, newResources) {
459 logger.V(4).Info("no resource updates from discovery, skipping resource quota sync")
460 return
461 }
462
463
464
465 rq.workerLock.Lock()
466 defer rq.workerLock.Unlock()
467
468
469 if loggerV := logger.V(2); loggerV.Enabled() {
470 loggerV.Info("syncing resource quota controller with updated resources from discovery", "diff", printDiff(oldResources, newResources))
471 }
472
473
474 if err := rq.resyncMonitors(ctx, newResources); err != nil {
475 utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
476 return
477 }
478
479
480 oldResources = newResources
481
482
483
484
485
486 if rq.quotaMonitor != nil &&
487 !cache.WaitForNamedCacheSync(
488 "resource quota",
489 waitForStopOrTimeout(ctx.Done(), period),
490 func() bool { return rq.quotaMonitor.IsSynced(ctx) },
491 ) {
492 utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
493 return
494 }
495
496 logger.V(2).Info("synced quota controller")
497 }, period)
498 }
499
500
501 func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
502 removed := sets.NewString()
503 for oldResource := range oldResources {
504 if _, ok := newResources[oldResource]; !ok {
505 removed.Insert(fmt.Sprintf("%+v", oldResource))
506 }
507 }
508 added := sets.NewString()
509 for newResource := range newResources {
510 if _, ok := oldResources[newResource]; !ok {
511 added.Insert(fmt.Sprintf("%+v", newResource))
512 }
513 }
514 return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
515 }
516
517
518 func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
519 stopChWithTimeout := make(chan struct{})
520 go func() {
521 defer close(stopChWithTimeout)
522 select {
523 case <-stopCh:
524 case <-time.After(timeout):
525 }
526 }()
527 return stopChWithTimeout
528 }
529
530
531
532 func (rq *Controller) resyncMonitors(ctx context.Context, resources map[schema.GroupVersionResource]struct{}) error {
533 if rq.quotaMonitor == nil {
534 return nil
535 }
536
537 if err := rq.quotaMonitor.SyncMonitors(ctx, resources); err != nil {
538 return err
539 }
540 rq.quotaMonitor.StartMonitors(ctx)
541 return nil
542 }
543
544
545
546
547
548 func GetQuotableResources(discoveryFunc NamespacedResourcesFunc) (map[schema.GroupVersionResource]struct{}, error) {
549 possibleResources, discoveryErr := discoveryFunc()
550 if discoveryErr != nil && len(possibleResources) == 0 {
551 return nil, fmt.Errorf("failed to discover resources: %v", discoveryErr)
552 }
553 quotableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"create", "list", "watch", "delete"}}, possibleResources)
554 quotableGroupVersionResources, err := discovery.GroupVersionResources(quotableResources)
555 if err != nil {
556 return nil, fmt.Errorf("failed to parse resources: %v", err)
557 }
558
559 return quotableGroupVersionResources, discoveryErr
560 }
561
View as plain text