
Source file src/k8s.io/kubernetes/pkg/kubelet/util/manager/watch_based_manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/util/manager

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package manager
    19  import (
    20  	"fmt"
    21  	"sync"
    22  	"time"
    24  	v1 "k8s.io/api/core/v1"
    25  	"k8s.io/client-go/tools/cache"
    27  	"k8s.io/klog/v2"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/fields"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	"k8s.io/apimachinery/pkg/runtime/schema"
    34  	"k8s.io/apimachinery/pkg/types"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/apimachinery/pkg/watch"
    38  	"k8s.io/utils/clock"
    39  )
    41  type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
    42  type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
    43  type newObjectFunc func() runtime.Object
    44  type isImmutableFunc func(runtime.Object) bool
    46  // objectCacheItem is a single item stored in objectCache.
    47  type objectCacheItem struct {
    48  	refMap    map[types.UID]int
    49  	store     *cacheStore
    50  	reflector *cache.Reflector
    52  	hasSynced func() (bool, error)
    54  	// waitGroup is used to ensure that there won't be two concurrent calls to reflector.Run
    55  	waitGroup sync.WaitGroup
    57  	// lock is to ensure the access and modify of lastAccessTime, stopped, and immutable are thread safety,
    58  	// and protecting from closing stopCh multiple times.
    59  	lock           sync.Mutex
    60  	lastAccessTime time.Time
    61  	stopped        bool
    62  	immutable      bool
    63  	stopCh         chan struct{}
    64  }
    66  func (i *objectCacheItem) stop() bool {
    67  	i.lock.Lock()
    68  	defer i.lock.Unlock()
    69  	return i.stopThreadUnsafe()
    70  }
    72  func (i *objectCacheItem) stopThreadUnsafe() bool {
    73  	if i.stopped {
    74  		return false
    75  	}
    76  	i.stopped = true
    77  	close(i.stopCh)
    78  	if !i.immutable {
    79  		i.store.unsetInitialized()
    80  	}
    81  	return true
    82  }
    84  func (i *objectCacheItem) setLastAccessTime(time time.Time) {
    85  	i.lock.Lock()
    86  	defer i.lock.Unlock()
    87  	i.lastAccessTime = time
    88  }
    90  func (i *objectCacheItem) setImmutable() {
    91  	i.lock.Lock()
    92  	defer i.lock.Unlock()
    93  	i.immutable = true
    94  }
    96  func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
    97  	i.lock.Lock()
    98  	defer i.lock.Unlock()
    99  	// Ensure that we don't try to stop not yet initialized reflector.
   100  	// In case of overloaded kube-apiserver, if the list request is
   101  	// already being processed, all the work would lost and would have
   102  	// to be retried.
   103  	if !i.stopped && i.store.hasSynced() && now.After(i.lastAccessTime.Add(maxIdleTime)) {
   104  		return i.stopThreadUnsafe()
   105  	}
   106  	return false
   107  }
   109  func (i *objectCacheItem) restartReflectorIfNeeded() {
   110  	i.lock.Lock()
   111  	defer i.lock.Unlock()
   112  	if i.immutable || !i.stopped {
   113  		return
   114  	}
   115  	i.stopCh = make(chan struct{})
   116  	i.stopped = false
   117  	go i.startReflector()
   118  }
   120  func (i *objectCacheItem) startReflector() {
   121  	i.waitGroup.Wait()
   122  	i.waitGroup.Add(1)
   123  	defer i.waitGroup.Done()
   124  	i.reflector.Run(i.stopCh)
   125  }
   127  // cacheStore is in order to rewrite Replace function to mark initialized flag
   128  type cacheStore struct {
   129  	cache.Store
   130  	lock        sync.Mutex
   131  	initialized bool
   132  }
   134  func (c *cacheStore) Replace(list []interface{}, resourceVersion string) error {
   135  	c.lock.Lock()
   136  	defer c.lock.Unlock()
   137  	err := c.Store.Replace(list, resourceVersion)
   138  	if err != nil {
   139  		return err
   140  	}
   141  	c.initialized = true
   142  	return nil
   143  }
   145  func (c *cacheStore) hasSynced() bool {
   146  	c.lock.Lock()
   147  	defer c.lock.Unlock()
   148  	return c.initialized
   149  }
   151  func (c *cacheStore) unsetInitialized() {
   152  	c.lock.Lock()
   153  	defer c.lock.Unlock()
   154  	c.initialized = false
   155  }
   157  // objectCache is a local cache of objects propagated via
   158  // individual watches.
   159  type objectCache struct {
   160  	listObject    listObjectFunc
   161  	watchObject   watchObjectFunc
   162  	newObject     newObjectFunc
   163  	isImmutable   isImmutableFunc
   164  	groupResource schema.GroupResource
   165  	clock         clock.Clock
   166  	maxIdleTime   time.Duration
   168  	lock    sync.RWMutex
   169  	items   map[objectKey]*objectCacheItem
   170  	stopped bool
   171  }
   173  const minIdleTime = 1 * time.Minute
   175  // NewObjectCache returns a new watch-based instance of Store interface.
   176  func NewObjectCache(
   177  	listObject listObjectFunc,
   178  	watchObject watchObjectFunc,
   179  	newObject newObjectFunc,
   180  	isImmutable isImmutableFunc,
   181  	groupResource schema.GroupResource,
   182  	clock clock.Clock,
   183  	maxIdleTime time.Duration,
   184  	stopCh <-chan struct{}) Store {
   186  	if maxIdleTime < minIdleTime {
   187  		maxIdleTime = minIdleTime
   188  	}
   190  	store := &objectCache{
   191  		listObject:    listObject,
   192  		watchObject:   watchObject,
   193  		newObject:     newObject,
   194  		isImmutable:   isImmutable,
   195  		groupResource: groupResource,
   196  		clock:         clock,
   197  		maxIdleTime:   maxIdleTime,
   198  		items:         make(map[objectKey]*objectCacheItem),
   199  	}
   201  	go wait.Until(store.startRecycleIdleWatch, time.Minute, stopCh)
   202  	go store.shutdownWhenStopped(stopCh)
   203  	return store
   204  }
   206  func (c *objectCache) newStore() *cacheStore {
   207  	// TODO: We may consider created a dedicated store keeping just a single
   208  	// item, instead of using a generic store implementation for this purpose.
   209  	// However, simple benchmarks show that memory overhead in that case is
   210  	// decrease from ~600B to ~300B per object. So we are not optimizing it
   211  	// until we will see a good reason for that.
   212  	store := cache.NewStore(cache.MetaNamespaceKeyFunc)
   213  	return &cacheStore{store, sync.Mutex{}, false}
   214  }
   216  func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheItem {
   217  	fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
   218  	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
   219  		options.FieldSelector = fieldSelector
   220  		return c.listObject(namespace, options)
   221  	}
   222  	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
   223  		options.FieldSelector = fieldSelector
   224  		return c.watchObject(namespace, options)
   225  	}
   226  	store := c.newStore()
   227  	reflector := cache.NewNamedReflector(
   228  		fmt.Sprintf("object-%q/%q", namespace, name),
   229  		&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
   230  		c.newObject(),
   231  		store,
   232  		0,
   233  	)
   234  	item := &objectCacheItem{
   235  		refMap:    make(map[types.UID]int),
   236  		store:     store,
   237  		reflector: reflector,
   238  		hasSynced: func() (bool, error) { return store.hasSynced(), nil },
   239  		stopCh:    make(chan struct{}),
   240  	}
   242  	// Don't start reflector if Kubelet is already shutting down.
   243  	if !c.stopped {
   244  		go item.startReflector()
   245  	}
   246  	return item
   247  }
   249  func (c *objectCache) AddReference(namespace, name string, referencedFrom types.UID) {
   250  	key := objectKey{namespace: namespace, name: name}
   252  	// AddReference is called from RegisterPod thus it needs to be efficient.
   253  	// Thus, it is only increasing refCount and in case of first registration
   254  	// of a given object it starts corresponding reflector.
   255  	// It's responsibility of the first Get operation to wait until the
   256  	// reflector propagated the store.
   257  	c.lock.Lock()
   258  	defer c.lock.Unlock()
   259  	item, exists := c.items[key]
   260  	if !exists {
   261  		item = c.newReflectorLocked(namespace, name)
   262  		c.items[key] = item
   263  	}
   264  	item.refMap[referencedFrom]++
   265  }
   267  func (c *objectCache) DeleteReference(namespace, name string, referencedFrom types.UID) {
   268  	key := objectKey{namespace: namespace, name: name}
   270  	c.lock.Lock()
   271  	defer c.lock.Unlock()
   272  	if item, ok := c.items[key]; ok {
   273  		item.refMap[referencedFrom]--
   274  		if item.refMap[referencedFrom] == 0 {
   275  			delete(item.refMap, referencedFrom)
   276  		}
   277  		if len(item.refMap) == 0 {
   278  			// Stop the underlying reflector.
   279  			item.stop()
   280  			delete(c.items, key)
   281  		}
   282  	}
   283  }
   285  // key returns key of an object with a given name and namespace.
   286  // This has to be in-sync with cache.MetaNamespaceKeyFunc.
   287  func (c *objectCache) key(namespace, name string) string {
   288  	if len(namespace) > 0 {
   289  		return namespace + "/" + name
   290  	}
   291  	return name
   292  }
   294  func (c *objectCache) isStopped() bool {
   295  	c.lock.RLock()
   296  	defer c.lock.RUnlock()
   297  	return c.stopped
   298  }
   300  func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
   301  	key := objectKey{namespace: namespace, name: name}
   303  	c.lock.RLock()
   304  	item, exists := c.items[key]
   305  	c.lock.RUnlock()
   307  	if !exists {
   308  		return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
   309  	}
   310  	// Record last access time independently if it succeeded or not.
   311  	// This protects from premature (racy) reflector closure.
   312  	item.setLastAccessTime(c.clock.Now())
   314  	// Don't restart reflector if Kubelet is already shutting down.
   315  	if !c.isStopped() {
   316  		item.restartReflectorIfNeeded()
   317  	}
   318  	if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
   319  		return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
   320  	}
   321  	obj, exists, err := item.store.GetByKey(c.key(namespace, name))
   322  	if err != nil {
   323  		return nil, err
   324  	}
   325  	if !exists {
   326  		return nil, apierrors.NewNotFound(c.groupResource, name)
   327  	}
   328  	if object, ok := obj.(runtime.Object); ok {
   329  		// If the returned object is immutable, stop the reflector.
   330  		//
   331  		// NOTE: we may potentially not even start the reflector if the object is
   332  		// already immutable. However, given that:
   333  		// - we want to also handle the case when object is marked as immutable later
   334  		// - Secrets and ConfigMaps are periodically fetched by volumemanager anyway
   335  		// - doing that wouldn't provide visible scalability/performance gain - we
   336  		//   already have it from here
   337  		// - doing that would require significant refactoring to reflector
   338  		// we limit ourselves to just quickly stop the reflector here.
   339  		if c.isImmutable(object) {
   340  			item.setImmutable()
   341  			if item.stop() {
   342  				klog.V(4).InfoS("Stopped watching for changes - object is immutable", "obj", klog.KRef(namespace, name))
   343  			}
   344  		}
   345  		return object, nil
   346  	}
   347  	return nil, fmt.Errorf("unexpected object type: %v", obj)
   348  }
   350  func (c *objectCache) startRecycleIdleWatch() {
   351  	c.lock.Lock()
   352  	defer c.lock.Unlock()
   354  	for key, item := range c.items {
   355  		if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) {
   356  			klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime)
   357  		}
   358  	}
   359  }
   361  func (c *objectCache) shutdownWhenStopped(stopCh <-chan struct{}) {
   362  	<-stopCh
   364  	c.lock.Lock()
   365  	defer c.lock.Unlock()
   367  	c.stopped = true
   368  	for _, item := range c.items {
   369  		item.stop()
   370  	}
   371  }
   373  // NewWatchBasedManager creates a manager that keeps a cache of all objects
   374  // necessary for registered pods.
   375  // It implements the following logic:
   376  //   - whenever a pod is created or updated, we start individual watches for all
   377  //     referenced objects that aren't referenced from other registered pods
   378  //   - every GetObject() returns a value from local cache propagated via watches
   379  func NewWatchBasedManager(
   380  	listObject listObjectFunc,
   381  	watchObject watchObjectFunc,
   382  	newObject newObjectFunc,
   383  	isImmutable isImmutableFunc,
   384  	groupResource schema.GroupResource,
   385  	resyncInterval time.Duration,
   386  	getReferencedObjects func(*v1.Pod) sets.String) Manager {
   388  	// If a configmap/secret is used as a volume, the volumeManager will visit the objectCacheItem every resyncInterval cycle,
   389  	// We just want to stop the objectCacheItem referenced by environment variables,
   390  	// So, maxIdleTime is set to an integer multiple of resyncInterval,
   391  	// We currently set it to 5 times.
   392  	maxIdleTime := resyncInterval * 5
   394  	// TODO propagate stopCh from the higher level.
   395  	objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop)
   396  	return NewCacheBasedManager(objectStore, getReferencedObjects)
   397  }

View as plain text