...

Source file src/k8s.io/kubernetes/pkg/controller/resourcequota/resource_quota_monitor.go

Documentation: k8s.io/kubernetes/pkg/controller/resourcequota

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package resourcequota
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"time"
    24  
    25  	"k8s.io/klog/v2"
    26  
    27  	"k8s.io/apimachinery/pkg/api/meta"
    28  	"k8s.io/apimachinery/pkg/runtime/schema"
    29  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	quota "k8s.io/apiserver/pkg/quota/v1"
    33  	"k8s.io/apiserver/pkg/quota/v1/generic"
    34  	"k8s.io/client-go/tools/cache"
    35  	"k8s.io/client-go/util/workqueue"
    36  	"k8s.io/controller-manager/pkg/informerfactory"
    37  	"k8s.io/kubernetes/pkg/controller"
    38  )
    39  
    40  type eventType int
    41  
    42  func (e eventType) String() string {
    43  	switch e {
    44  	case addEvent:
    45  		return "add"
    46  	case updateEvent:
    47  		return "update"
    48  	case deleteEvent:
    49  		return "delete"
    50  	default:
    51  		return fmt.Sprintf("unknown(%d)", int(e))
    52  	}
    53  }
    54  
    55  const (
    56  	addEvent eventType = iota
    57  	updateEvent
    58  	deleteEvent
    59  )
    60  
    61  type event struct {
    62  	eventType eventType
    63  	obj       interface{}
    64  	oldObj    interface{}
    65  	gvr       schema.GroupVersionResource
    66  }
    67  
    68  // QuotaMonitor contains all necessary information to track quotas and trigger replenishments
    69  type QuotaMonitor struct {
    70  	// each monitor list/watches a resource and determines if we should replenish quota
    71  	monitors    monitors
    72  	monitorLock sync.RWMutex
    73  	// informersStarted is closed after all the controllers have been initialized and are running.
    74  	// After that it is safe to start them here, before that it is not.
    75  	informersStarted <-chan struct{}
    76  
    77  	// stopCh drives shutdown. When a reception from it unblocks, monitors will shut down.
    78  	// This channel is also protected by monitorLock.
    79  	stopCh <-chan struct{}
    80  
    81  	// running tracks whether Run() has been called.
    82  	// it is protected by monitorLock.
    83  	running bool
    84  
    85  	// monitors are the producer of the resourceChanges queue
    86  	resourceChanges workqueue.RateLimitingInterface
    87  
    88  	// interfaces with informers
    89  	informerFactory informerfactory.InformerFactory
    90  
    91  	// list of resources to ignore
    92  	ignoredResources map[schema.GroupResource]struct{}
    93  
    94  	// The period that should be used to re-sync the monitored resource
    95  	resyncPeriod controller.ResyncPeriodFunc
    96  
    97  	// callback to alert that a change may require quota recalculation
    98  	replenishmentFunc ReplenishmentFunc
    99  
   100  	// maintains list of evaluators
   101  	registry quota.Registry
   102  
   103  	updateFilter UpdateFilter
   104  }
   105  
   106  // NewMonitor creates a new instance of a QuotaMonitor
   107  func NewMonitor(informersStarted <-chan struct{}, informerFactory informerfactory.InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry, updateFilter UpdateFilter) *QuotaMonitor {
   108  	return &QuotaMonitor{
   109  		informersStarted:  informersStarted,
   110  		informerFactory:   informerFactory,
   111  		ignoredResources:  ignoredResources,
   112  		resourceChanges:   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
   113  		resyncPeriod:      resyncPeriod,
   114  		replenishmentFunc: replenishmentFunc,
   115  		registry:          registry,
   116  		updateFilter:      updateFilter,
   117  	}
   118  }
   119  
   120  // monitor runs a Controller with a local stop channel.
   121  type monitor struct {
   122  	controller cache.Controller
   123  
   124  	// stopCh stops Controller. If stopCh is nil, the monitor is considered to be
   125  	// not yet started.
   126  	stopCh chan struct{}
   127  }
   128  
   129  // Run is intended to be called in a goroutine. Multiple calls of this is an
   130  // error.
   131  func (m *monitor) Run() {
   132  	m.controller.Run(m.stopCh)
   133  }
   134  
   135  type monitors map[schema.GroupVersionResource]*monitor
   136  
   137  // UpdateFilter is a function that returns true if the update event should be added to the resourceChanges queue.
   138  type UpdateFilter func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool
   139  
   140  func (qm *QuotaMonitor) controllerFor(ctx context.Context, resource schema.GroupVersionResource) (cache.Controller, error) {
   141  	logger := klog.FromContext(ctx)
   142  
   143  	handlers := cache.ResourceEventHandlerFuncs{
   144  		UpdateFunc: func(oldObj, newObj interface{}) {
   145  			if qm.updateFilter != nil && qm.updateFilter(resource, oldObj, newObj) {
   146  				event := &event{
   147  					eventType: updateEvent,
   148  					obj:       newObj,
   149  					oldObj:    oldObj,
   150  					gvr:       resource,
   151  				}
   152  				qm.resourceChanges.Add(event)
   153  			}
   154  		},
   155  		DeleteFunc: func(obj interface{}) {
   156  			// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
   157  			if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
   158  				obj = deletedFinalStateUnknown.Obj
   159  			}
   160  			event := &event{
   161  				eventType: deleteEvent,
   162  				obj:       obj,
   163  				gvr:       resource,
   164  			}
   165  			qm.resourceChanges.Add(event)
   166  		},
   167  	}
   168  	shared, err := qm.informerFactory.ForResource(resource)
   169  	if err == nil {
   170  		logger.V(4).Info("QuotaMonitor using a shared informer", "resource", resource.String())
   171  		shared.Informer().AddEventHandlerWithResyncPeriod(handlers, qm.resyncPeriod())
   172  		return shared.Informer().GetController(), nil
   173  	}
   174  	logger.V(4).Error(err, "QuotaMonitor unable to use a shared informer", "resource", resource.String())
   175  
   176  	// TODO: if we can share storage with garbage collector, it may make sense to support other resources
   177  	// until that time, aggregated api servers will have to run their own controller to reconcile their own quota.
   178  	return nil, fmt.Errorf("unable to monitor quota for resource %q", resource.String())
   179  }
   180  
   181  // SyncMonitors rebuilds the monitor set according to the supplied resources,
   182  // creating or deleting monitors as necessary. It will return any error
   183  // encountered, but will make an attempt to create a monitor for each resource
   184  // instead of immediately exiting on an error. It may be called before or after
   185  // Run. Monitors are NOT started as part of the sync. To ensure all existing
   186  // monitors are started, call StartMonitors.
   187  func (qm *QuotaMonitor) SyncMonitors(ctx context.Context, resources map[schema.GroupVersionResource]struct{}) error {
   188  	logger := klog.FromContext(ctx)
   189  
   190  	qm.monitorLock.Lock()
   191  	defer qm.monitorLock.Unlock()
   192  
   193  	toRemove := qm.monitors
   194  	if toRemove == nil {
   195  		toRemove = monitors{}
   196  	}
   197  	current := monitors{}
   198  	var errs []error
   199  	kept := 0
   200  	added := 0
   201  	for resource := range resources {
   202  		if _, ok := qm.ignoredResources[resource.GroupResource()]; ok {
   203  			continue
   204  		}
   205  		if m, ok := toRemove[resource]; ok {
   206  			current[resource] = m
   207  			delete(toRemove, resource)
   208  			kept++
   209  			continue
   210  		}
   211  		c, err := qm.controllerFor(ctx, resource)
   212  		if err != nil {
   213  			errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
   214  			continue
   215  		}
   216  
   217  		// check if we need to create an evaluator for this resource (if none previously registered)
   218  		evaluator := qm.registry.Get(resource.GroupResource())
   219  		if evaluator == nil {
   220  			listerFunc := generic.ListerFuncForResourceFunc(qm.informerFactory.ForResource)
   221  			listResourceFunc := generic.ListResourceUsingListerFunc(listerFunc, resource)
   222  			evaluator = generic.NewObjectCountEvaluator(resource.GroupResource(), listResourceFunc, "")
   223  			qm.registry.Add(evaluator)
   224  			logger.Info("QuotaMonitor created object count evaluator", "resource", resource.GroupResource())
   225  		}
   226  
   227  		// track the monitor
   228  		current[resource] = &monitor{controller: c}
   229  		added++
   230  	}
   231  	qm.monitors = current
   232  
   233  	for _, monitor := range toRemove {
   234  		if monitor.stopCh != nil {
   235  			close(monitor.stopCh)
   236  		}
   237  	}
   238  
   239  	logger.V(4).Info("quota synced monitors", "added", added, "kept", kept, "removed", len(toRemove))
   240  	// NewAggregate returns nil if errs is 0-length
   241  	return utilerrors.NewAggregate(errs)
   242  }
   243  
   244  // StartMonitors ensures the current set of monitors are running. Any newly
   245  // started monitors will also cause shared informers to be started.
   246  //
   247  // If called before Run, StartMonitors does nothing (as there is no stop channel
   248  // to support monitor/informer execution).
   249  func (qm *QuotaMonitor) StartMonitors(ctx context.Context) {
   250  	qm.monitorLock.Lock()
   251  	defer qm.monitorLock.Unlock()
   252  
   253  	if !qm.running {
   254  		return
   255  	}
   256  
   257  	// we're waiting until after the informer start that happens once all the controllers are initialized.  This ensures
   258  	// that they don't get unexpected events on their work queues.
   259  	<-qm.informersStarted
   260  
   261  	monitors := qm.monitors
   262  	started := 0
   263  	for _, monitor := range monitors {
   264  		if monitor.stopCh == nil {
   265  			monitor.stopCh = make(chan struct{})
   266  			qm.informerFactory.Start(qm.stopCh)
   267  			go monitor.Run()
   268  			started++
   269  		}
   270  	}
   271  	klog.FromContext(ctx).V(4).Info("QuotaMonitor finished starting monitors", "new", started, "total", len(monitors))
   272  }
   273  
   274  // IsSynced returns true if any monitors exist AND all those monitors'
   275  // controllers HasSynced functions return true. This means IsSynced could return
   276  // true at one time, and then later return false if all monitors were
   277  // reconstructed.
   278  func (qm *QuotaMonitor) IsSynced(ctx context.Context) bool {
   279  	logger := klog.FromContext(ctx)
   280  
   281  	qm.monitorLock.RLock()
   282  	defer qm.monitorLock.RUnlock()
   283  
   284  	if len(qm.monitors) == 0 {
   285  		logger.V(4).Info("quota monitor not synced: no monitors")
   286  		return false
   287  	}
   288  
   289  	for resource, monitor := range qm.monitors {
   290  		if !monitor.controller.HasSynced() {
   291  			logger.V(4).Info("quota monitor not synced", "resource", resource)
   292  			return false
   293  		}
   294  	}
   295  	return true
   296  }
   297  
   298  // Run sets the stop channel and starts monitor execution until stopCh is
   299  // closed. Any running monitors will be stopped before Run returns.
   300  func (qm *QuotaMonitor) Run(ctx context.Context) {
   301  	defer utilruntime.HandleCrash()
   302  
   303  	logger := klog.FromContext(ctx)
   304  
   305  	logger.Info("QuotaMonitor running")
   306  	defer logger.Info("QuotaMonitor stopping")
   307  
   308  	// Set up the stop channel.
   309  	qm.monitorLock.Lock()
   310  	qm.stopCh = ctx.Done()
   311  	qm.running = true
   312  	qm.monitorLock.Unlock()
   313  
   314  	// Start monitors and begin change processing until the stop channel is
   315  	// closed.
   316  	qm.StartMonitors(ctx)
   317  
   318  	// The following workers are hanging forever until the queue is
   319  	// shutted down, so we need to shut it down in a separate goroutine.
   320  	go func() {
   321  		defer utilruntime.HandleCrash()
   322  		defer qm.resourceChanges.ShutDown()
   323  
   324  		<-ctx.Done()
   325  	}()
   326  	wait.UntilWithContext(ctx, qm.runProcessResourceChanges, 1*time.Second)
   327  
   328  	// Stop any running monitors.
   329  	qm.monitorLock.Lock()
   330  	defer qm.monitorLock.Unlock()
   331  	monitors := qm.monitors
   332  	stopped := 0
   333  	for _, monitor := range monitors {
   334  		if monitor.stopCh != nil {
   335  			stopped++
   336  			close(monitor.stopCh)
   337  		}
   338  	}
   339  	logger.Info("QuotaMonitor stopped monitors", "stopped", stopped, "total", len(monitors))
   340  }
   341  
   342  func (qm *QuotaMonitor) runProcessResourceChanges(ctx context.Context) {
   343  	for qm.processResourceChanges(ctx) {
   344  	}
   345  }
   346  
   347  // Dequeueing an event from resourceChanges to process
   348  func (qm *QuotaMonitor) processResourceChanges(ctx context.Context) bool {
   349  	item, quit := qm.resourceChanges.Get()
   350  	if quit {
   351  		return false
   352  	}
   353  	defer qm.resourceChanges.Done(item)
   354  	event, ok := item.(*event)
   355  	if !ok {
   356  		utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
   357  		return true
   358  	}
   359  	obj := event.obj
   360  	accessor, err := meta.Accessor(obj)
   361  	if err != nil {
   362  		utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
   363  		return true
   364  	}
   365  	klog.FromContext(ctx).V(4).Info("QuotaMonitor process object",
   366  		"resource", event.gvr.String(),
   367  		"namespace", accessor.GetNamespace(),
   368  		"name", accessor.GetName(),
   369  		"uid", string(accessor.GetUID()),
   370  		"eventType", event.eventType,
   371  	)
   372  	qm.replenishmentFunc(ctx, event.gvr.GroupResource(), accessor.GetNamespace())
   373  	return true
   374  }
   375  

View as plain text