...

Source file src/k8s.io/kubernetes/pkg/controller/resourcequota/resource_quota_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/resourcequota

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package resourcequota
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"sync"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    28  	"k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/labels"
    31  	"k8s.io/apimachinery/pkg/runtime/schema"
    32  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    33  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    34  	"k8s.io/apimachinery/pkg/util/sets"
    35  	"k8s.io/apimachinery/pkg/util/wait"
    36  	quota "k8s.io/apiserver/pkg/quota/v1"
    37  	"k8s.io/client-go/discovery"
    38  	coreinformers "k8s.io/client-go/informers/core/v1"
    39  	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
    40  	corelisters "k8s.io/client-go/listers/core/v1"
    41  	"k8s.io/client-go/tools/cache"
    42  	"k8s.io/client-go/util/workqueue"
    43  	"k8s.io/controller-manager/pkg/informerfactory"
    44  	"k8s.io/klog/v2"
    45  	"k8s.io/kubernetes/pkg/controller"
    46  )
    47  
    48  // NamespacedResourcesFunc knows how to discover namespaced resources.
    49  type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
    50  
    51  // ReplenishmentFunc is a signal that a resource changed in specified namespace
    52  // that may require quota to be recalculated.
    53  type ReplenishmentFunc func(ctx context.Context, groupResource schema.GroupResource, namespace string)
    54  
    55  // ControllerOptions holds options for creating a quota controller
    56  type ControllerOptions struct {
    57  	// Must have authority to list all quotas, and update quota status
    58  	QuotaClient corev1client.ResourceQuotasGetter
    59  	// Shared informer for resource quotas
    60  	ResourceQuotaInformer coreinformers.ResourceQuotaInformer
    61  	// Controls full recalculation of quota usage
    62  	ResyncPeriod controller.ResyncPeriodFunc
    63  	// Maintains evaluators that know how to calculate usage for group resource
    64  	Registry quota.Registry
    65  	// Discover list of supported resources on the server.
    66  	DiscoveryFunc NamespacedResourcesFunc
    67  	// A function that returns the list of resources to ignore
    68  	IgnoredResourcesFunc func() map[schema.GroupResource]struct{}
    69  	// InformersStarted knows if informers were started.
    70  	InformersStarted <-chan struct{}
    71  	// InformerFactory interfaces with informers.
    72  	InformerFactory informerfactory.InformerFactory
    73  	// Controls full resync of objects monitored for replenishment.
    74  	ReplenishmentResyncPeriod controller.ResyncPeriodFunc
    75  	// Filters update events so we only enqueue the ones where we know quota will change
    76  	UpdateFilter UpdateFilter
    77  }
    78  
    79  // Controller is responsible for tracking quota usage status in the system
    80  type Controller struct {
    81  	// Must have authority to list all resources in the system, and update quota status
    82  	rqClient corev1client.ResourceQuotasGetter
    83  	// A lister/getter of resource quota objects
    84  	rqLister corelisters.ResourceQuotaLister
    85  	// A list of functions that return true when their caches have synced
    86  	informerSyncedFuncs []cache.InformerSynced
    87  	// ResourceQuota objects that need to be synchronized
    88  	queue workqueue.RateLimitingInterface
    89  	// missingUsageQueue holds objects that are missing the initial usage information
    90  	missingUsageQueue workqueue.RateLimitingInterface
    91  	// To allow injection of syncUsage for testing.
    92  	syncHandler func(ctx context.Context, key string) error
    93  	// function that controls full recalculation of quota usage
    94  	resyncPeriod controller.ResyncPeriodFunc
    95  	// knows how to calculate usage
    96  	registry quota.Registry
    97  	// knows how to monitor all the resources tracked by quota and trigger replenishment
    98  	quotaMonitor *QuotaMonitor
    99  	// controls the workers that process quotas
   100  	// this lock is acquired to control write access to the monitors and ensures that all
   101  	// monitors are synced before the controller can process quotas.
   102  	workerLock sync.RWMutex
   103  }
   104  
   105  // NewController creates a quota controller with specified options
   106  func NewController(ctx context.Context, options *ControllerOptions) (*Controller, error) {
   107  	// build the resource quota controller
   108  	rq := &Controller{
   109  		rqClient:            options.QuotaClient,
   110  		rqLister:            options.ResourceQuotaInformer.Lister(),
   111  		informerSyncedFuncs: []cache.InformerSynced{options.ResourceQuotaInformer.Informer().HasSynced},
   112  		queue:               workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
   113  		missingUsageQueue:   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
   114  		resyncPeriod:        options.ResyncPeriod,
   115  		registry:            options.Registry,
   116  	}
   117  	// set the synchronization handler
   118  	rq.syncHandler = rq.syncResourceQuotaFromKey
   119  
   120  	logger := klog.FromContext(ctx)
   121  
   122  	options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
   123  		cache.ResourceEventHandlerFuncs{
   124  			AddFunc: func(obj interface{}) {
   125  				rq.addQuota(logger, obj)
   126  			},
   127  			UpdateFunc: func(old, cur interface{}) {
   128  				// We are only interested in observing updates to quota.spec to drive updates to quota.status.
   129  				// We ignore all updates to quota.Status because they are all driven by this controller.
   130  				// IMPORTANT:
   131  				// We do not use this function to queue up a full quota recalculation.  To do so, would require
   132  				// us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
   133  				// that cannot be backed by a cache and result in a full query of a namespace's content, we do not
   134  				// want to pay the price on spurious status updates.  As a result, we have a separate routine that is
   135  				// responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
   136  				oldResourceQuota := old.(*v1.ResourceQuota)
   137  				curResourceQuota := cur.(*v1.ResourceQuota)
   138  				if quota.Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
   139  					return
   140  				}
   141  				rq.addQuota(logger, curResourceQuota)
   142  			},
   143  			// This will enter the sync loop and no-op, because the controller has been deleted from the store.
   144  			// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
   145  			// way of achieving this is by performing a `stop` operation on the controller.
   146  			DeleteFunc: func(obj interface{}) {
   147  				rq.enqueueResourceQuota(logger, obj)
   148  			},
   149  		},
   150  		rq.resyncPeriod(),
   151  	)
   152  
   153  	if options.DiscoveryFunc != nil {
   154  		qm := NewMonitor(
   155  			options.InformersStarted,
   156  			options.InformerFactory,
   157  			options.IgnoredResourcesFunc(),
   158  			options.ReplenishmentResyncPeriod,
   159  			rq.replenishQuota,
   160  			rq.registry,
   161  			options.UpdateFilter,
   162  		)
   163  
   164  		rq.quotaMonitor = qm
   165  
   166  		// do initial quota monitor setup.  If we have a discovery failure here, it's ok. We'll discover more resources when a later sync happens.
   167  		resources, err := GetQuotableResources(options.DiscoveryFunc)
   168  		if discovery.IsGroupDiscoveryFailedError(err) {
   169  			utilruntime.HandleError(fmt.Errorf("initial discovery check failure, continuing and counting on future sync update: %v", err))
   170  		} else if err != nil {
   171  			return nil, err
   172  		}
   173  
   174  		if err = qm.SyncMonitors(ctx, resources); err != nil {
   175  			utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
   176  		}
   177  
   178  		// only start quota once all informers synced
   179  		rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, func() bool {
   180  			return qm.IsSynced(ctx)
   181  		})
   182  	}
   183  
   184  	return rq, nil
   185  }
   186  
   187  // enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
   188  func (rq *Controller) enqueueAll(ctx context.Context) {
   189  	logger := klog.FromContext(ctx)
   190  	defer logger.V(4).Info("Resource quota controller queued all resource quota for full calculation of usage")
   191  	rqs, err := rq.rqLister.List(labels.Everything())
   192  	if err != nil {
   193  		utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
   194  		return
   195  	}
   196  	for i := range rqs {
   197  		key, err := controller.KeyFunc(rqs[i])
   198  		if err != nil {
   199  			utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", rqs[i], err))
   200  			continue
   201  		}
   202  		rq.queue.Add(key)
   203  	}
   204  }
   205  
   206  // obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item.
   207  func (rq *Controller) enqueueResourceQuota(logger klog.Logger, obj interface{}) {
   208  	key, err := controller.KeyFunc(obj)
   209  	if err != nil {
   210  		logger.Error(err, "Couldn't get key", "object", obj)
   211  		return
   212  	}
   213  	rq.queue.Add(key)
   214  }
   215  
   216  func (rq *Controller) addQuota(logger klog.Logger, obj interface{}) {
   217  	key, err := controller.KeyFunc(obj)
   218  	if err != nil {
   219  		logger.Error(err, "Couldn't get key", "object", obj)
   220  		return
   221  	}
   222  
   223  	resourceQuota := obj.(*v1.ResourceQuota)
   224  
   225  	// if we declared an intent that is not yet captured in status (prioritize it)
   226  	if !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
   227  		rq.missingUsageQueue.Add(key)
   228  		return
   229  	}
   230  
   231  	// if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
   232  	for constraint := range resourceQuota.Status.Hard {
   233  		if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
   234  			matchedResources := []v1.ResourceName{constraint}
   235  			for _, evaluator := range rq.registry.List() {
   236  				if intersection := evaluator.MatchingResources(matchedResources); len(intersection) > 0 {
   237  					rq.missingUsageQueue.Add(key)
   238  					return
   239  				}
   240  			}
   241  		}
   242  	}
   243  
   244  	// no special priority, go in normal recalc queue
   245  	rq.queue.Add(key)
   246  }
   247  
   248  // worker runs a worker thread that just dequeues items, processes them, and marks them done.
   249  func (rq *Controller) worker(queue workqueue.RateLimitingInterface) func(context.Context) {
   250  	workFunc := func(ctx context.Context) bool {
   251  		key, quit := queue.Get()
   252  		if quit {
   253  			return true
   254  		}
   255  		defer queue.Done(key)
   256  
   257  		rq.workerLock.RLock()
   258  		defer rq.workerLock.RUnlock()
   259  
   260  		logger := klog.FromContext(ctx)
   261  		logger = klog.LoggerWithValues(logger, "queueKey", key)
   262  		ctx = klog.NewContext(ctx, logger)
   263  
   264  		err := rq.syncHandler(ctx, key.(string))
   265  		if err == nil {
   266  			queue.Forget(key)
   267  			return false
   268  		}
   269  
   270  		utilruntime.HandleError(err)
   271  		queue.AddRateLimited(key)
   272  
   273  		return false
   274  	}
   275  
   276  	return func(ctx context.Context) {
   277  		for {
   278  			if quit := workFunc(ctx); quit {
   279  				klog.FromContext(ctx).Info("resource quota controller worker shutting down")
   280  				return
   281  			}
   282  		}
   283  	}
   284  }
   285  
   286  // Run begins quota controller using the specified number of workers
   287  func (rq *Controller) Run(ctx context.Context, workers int) {
   288  	defer utilruntime.HandleCrash()
   289  	defer rq.queue.ShutDown()
   290  	defer rq.missingUsageQueue.ShutDown()
   291  
   292  	logger := klog.FromContext(ctx)
   293  
   294  	logger.Info("Starting resource quota controller")
   295  	defer logger.Info("Shutting down resource quota controller")
   296  
   297  	if rq.quotaMonitor != nil {
   298  		go rq.quotaMonitor.Run(ctx)
   299  	}
   300  
   301  	if !cache.WaitForNamedCacheSync("resource quota", ctx.Done(), rq.informerSyncedFuncs...) {
   302  		return
   303  	}
   304  
   305  	// the workers that chug through the quota calculation backlog
   306  	for i := 0; i < workers; i++ {
   307  		go wait.UntilWithContext(ctx, rq.worker(rq.queue), time.Second)
   308  		go wait.UntilWithContext(ctx, rq.worker(rq.missingUsageQueue), time.Second)
   309  	}
   310  	// the timer for how often we do a full recalculation across all quotas
   311  	if rq.resyncPeriod() > 0 {
   312  		go wait.UntilWithContext(ctx, rq.enqueueAll, rq.resyncPeriod())
   313  	} else {
   314  		logger.Info("periodic quota controller resync disabled")
   315  	}
   316  	<-ctx.Done()
   317  }
   318  
   319  // syncResourceQuotaFromKey syncs a quota key
   320  func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string) (err error) {
   321  	startTime := time.Now()
   322  
   323  	logger := klog.FromContext(ctx)
   324  	logger = klog.LoggerWithValues(logger, "key", key)
   325  
   326  	defer func() {
   327  		logger.V(4).Info("Finished syncing resource quota", "key", key, "duration", time.Since(startTime))
   328  	}()
   329  
   330  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   331  	if err != nil {
   332  		return err
   333  	}
   334  	resourceQuota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
   335  	if errors.IsNotFound(err) {
   336  		logger.Info("Resource quota has been deleted", "key", key)
   337  		return nil
   338  	}
   339  	if err != nil {
   340  		logger.Error(err, "Unable to retrieve resource quota from store", "key", key)
   341  		return err
   342  	}
   343  	return rq.syncResourceQuota(ctx, resourceQuota)
   344  }
   345  
   346  // syncResourceQuota runs a complete sync of resource quota status across all known kinds
   347  func (rq *Controller) syncResourceQuota(ctx context.Context, resourceQuota *v1.ResourceQuota) (err error) {
   348  	// quota is dirty if any part of spec hard limits differs from the status hard limits
   349  	statusLimitsDirty := !apiequality.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
   350  
   351  	// dirty tracks if the usage status differs from the previous sync,
   352  	// if so, we send a new usage with latest status
   353  	// if this is our first sync, it will be dirty by default, since we need track usage
   354  	dirty := statusLimitsDirty || resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil
   355  
   356  	used := v1.ResourceList{}
   357  	if resourceQuota.Status.Used != nil {
   358  		used = quota.Add(v1.ResourceList{}, resourceQuota.Status.Used)
   359  	}
   360  	hardLimits := quota.Add(v1.ResourceList{}, resourceQuota.Spec.Hard)
   361  
   362  	var errs []error
   363  
   364  	newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry, resourceQuota.Spec.ScopeSelector)
   365  	if err != nil {
   366  		// if err is non-nil, remember it to return, but continue updating status with any resources in newUsage
   367  		errs = append(errs, err)
   368  	}
   369  	for key, value := range newUsage {
   370  		used[key] = value
   371  	}
   372  
   373  	// ensure set of used values match those that have hard constraints
   374  	hardResources := quota.ResourceNames(hardLimits)
   375  	used = quota.Mask(used, hardResources)
   376  
   377  	// Create a usage object that is based on the quota resource version that will handle updates
   378  	// by default, we preserve the past usage observation, and set hard to the current spec
   379  	usage := resourceQuota.DeepCopy()
   380  	usage.Status = v1.ResourceQuotaStatus{
   381  		Hard: hardLimits,
   382  		Used: used,
   383  	}
   384  
   385  	dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)
   386  
   387  	// there was a change observed by this controller that requires we update quota
   388  	if dirty {
   389  		_, err = rq.rqClient.ResourceQuotas(usage.Namespace).UpdateStatus(ctx, usage, metav1.UpdateOptions{})
   390  		if err != nil {
   391  			errs = append(errs, err)
   392  		}
   393  	}
   394  	return utilerrors.NewAggregate(errs)
   395  }
   396  
   397  // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
   398  func (rq *Controller) replenishQuota(ctx context.Context, groupResource schema.GroupResource, namespace string) {
   399  	// check if the quota controller can evaluate this groupResource, if not, ignore it altogether...
   400  	evaluator := rq.registry.Get(groupResource)
   401  	if evaluator == nil {
   402  		return
   403  	}
   404  
   405  	// check if this namespace even has a quota...
   406  	resourceQuotas, err := rq.rqLister.ResourceQuotas(namespace).List(labels.Everything())
   407  	if errors.IsNotFound(err) {
   408  		utilruntime.HandleError(fmt.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod()))
   409  		return
   410  	}
   411  	if err != nil {
   412  		utilruntime.HandleError(fmt.Errorf("error checking to see if namespace %s has any ResourceQuota associated with it: %v", namespace, err))
   413  		return
   414  	}
   415  	if len(resourceQuotas) == 0 {
   416  		return
   417  	}
   418  
   419  	logger := klog.FromContext(ctx)
   420  
   421  	// only queue those quotas that are tracking a resource associated with this kind.
   422  	for i := range resourceQuotas {
   423  		resourceQuota := resourceQuotas[i]
   424  		resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
   425  		if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
   426  			// TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
   427  			rq.enqueueResourceQuota(logger, resourceQuota)
   428  		}
   429  	}
   430  }
   431  
   432  // Sync periodically resyncs the controller when new resources are observed from discovery.
   433  func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResourcesFunc, period time.Duration) {
   434  	// Something has changed, so track the new state and perform a sync.
   435  	oldResources := make(map[schema.GroupVersionResource]struct{})
   436  	wait.UntilWithContext(ctx, func(ctx context.Context) {
   437  		// Get the current resource list from discovery.
   438  		newResources, err := GetQuotableResources(discoveryFunc)
   439  		if err != nil {
   440  			utilruntime.HandleError(err)
   441  
   442  			if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure && len(newResources) > 0 {
   443  				// In partial discovery cases, preserve existing informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
   444  				for k, v := range oldResources {
   445  					if _, failed := groupLookupFailures[k.GroupVersion()]; failed {
   446  						newResources[k] = v
   447  					}
   448  				}
   449  			} else {
   450  				// short circuit in non-discovery error cases or if discovery returned zero resources
   451  				return
   452  			}
   453  		}
   454  
   455  		logger := klog.FromContext(ctx)
   456  
   457  		// Decide whether discovery has reported a change.
   458  		if reflect.DeepEqual(oldResources, newResources) {
   459  			logger.V(4).Info("no resource updates from discovery, skipping resource quota sync")
   460  			return
   461  		}
   462  
   463  		// Ensure workers are paused to avoid processing events before informers
   464  		// have resynced.
   465  		rq.workerLock.Lock()
   466  		defer rq.workerLock.Unlock()
   467  
   468  		// Something has changed, so track the new state and perform a sync.
   469  		if loggerV := logger.V(2); loggerV.Enabled() {
   470  			loggerV.Info("syncing resource quota controller with updated resources from discovery", "diff", printDiff(oldResources, newResources))
   471  		}
   472  
   473  		// Perform the monitor resync and wait for controllers to report cache sync.
   474  		if err := rq.resyncMonitors(ctx, newResources); err != nil {
   475  			utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
   476  			return
   477  		}
   478  
   479  		// at this point, we've synced the new resources to our monitors, so record that fact.
   480  		oldResources = newResources
   481  
   482  		// wait for caches to fill for a while (our sync period).
   483  		// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
   484  		// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
   485  		// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
   486  		if rq.quotaMonitor != nil &&
   487  			!cache.WaitForNamedCacheSync(
   488  				"resource quota",
   489  				waitForStopOrTimeout(ctx.Done(), period),
   490  				func() bool { return rq.quotaMonitor.IsSynced(ctx) },
   491  			) {
   492  			utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
   493  			return
   494  		}
   495  
   496  		logger.V(2).Info("synced quota controller")
   497  	}, period)
   498  }
   499  
   500  // printDiff returns a human-readable summary of what resources were added and removed
   501  func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
   502  	removed := sets.NewString()
   503  	for oldResource := range oldResources {
   504  		if _, ok := newResources[oldResource]; !ok {
   505  			removed.Insert(fmt.Sprintf("%+v", oldResource))
   506  		}
   507  	}
   508  	added := sets.NewString()
   509  	for newResource := range newResources {
   510  		if _, ok := oldResources[newResource]; !ok {
   511  			added.Insert(fmt.Sprintf("%+v", newResource))
   512  		}
   513  	}
   514  	return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
   515  }
   516  
   517  // waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
   518  func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
   519  	stopChWithTimeout := make(chan struct{})
   520  	go func() {
   521  		defer close(stopChWithTimeout)
   522  		select {
   523  		case <-stopCh:
   524  		case <-time.After(timeout):
   525  		}
   526  	}()
   527  	return stopChWithTimeout
   528  }
   529  
   530  // resyncMonitors starts or stops quota monitors as needed to ensure that all
   531  // (and only) those resources present in the map are monitored.
   532  func (rq *Controller) resyncMonitors(ctx context.Context, resources map[schema.GroupVersionResource]struct{}) error {
   533  	if rq.quotaMonitor == nil {
   534  		return nil
   535  	}
   536  
   537  	if err := rq.quotaMonitor.SyncMonitors(ctx, resources); err != nil {
   538  		return err
   539  	}
   540  	rq.quotaMonitor.StartMonitors(ctx)
   541  	return nil
   542  }
   543  
   544  // GetQuotableResources returns all resources that the quota system should recognize.
   545  // It requires a resource supports the following verbs: 'create','list','delete'
   546  // This function may return both results and an error.  If that happens, it means that the discovery calls were only
   547  // partially successful.  A decision about whether to proceed or not is left to the caller.
   548  func GetQuotableResources(discoveryFunc NamespacedResourcesFunc) (map[schema.GroupVersionResource]struct{}, error) {
   549  	possibleResources, discoveryErr := discoveryFunc()
   550  	if discoveryErr != nil && len(possibleResources) == 0 {
   551  		return nil, fmt.Errorf("failed to discover resources: %v", discoveryErr)
   552  	}
   553  	quotableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"create", "list", "watch", "delete"}}, possibleResources)
   554  	quotableGroupVersionResources, err := discovery.GroupVersionResources(quotableResources)
   555  	if err != nil {
   556  		return nil, fmt.Errorf("failed to parse resources: %v", err)
   557  	}
   558  	// return the original discovery error (if any) in addition to the list
   559  	return quotableGroupVersionResources, discoveryErr
   560  }
   561  

View as plain text