...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/assume_cache.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package volumebinding
    18  
    19  import (
    20  	"fmt"
    21  	"strconv"
    22  	"sync"
    23  
    24  	"k8s.io/klog/v2"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/api/meta"
    28  	"k8s.io/client-go/tools/cache"
    29  	storagehelpers "k8s.io/component-helpers/storage/volume"
    30  )
    31  
    32  // AssumeCache is a cache on top of the informer that allows for updating
    33  // objects outside of informer events and also restoring the informer
    34  // cache's version of the object.  Objects are assumed to be
    35  // Kubernetes API objects that implement meta.Interface
    36  type AssumeCache interface {
    37  	// Assume updates the object in-memory only
    38  	Assume(obj interface{}) error
    39  
    40  	// Restore the informer cache's version of the object
    41  	Restore(objName string)
    42  
    43  	// Get the object by name
    44  	Get(objName string) (interface{}, error)
    45  
    46  	// GetAPIObj gets the API object by name
    47  	GetAPIObj(objName string) (interface{}, error)
    48  
    49  	// List all the objects in the cache
    50  	List(indexObj interface{}) []interface{}
    51  }
    52  
    53  type errWrongType struct {
    54  	typeName string
    55  	object   interface{}
    56  }
    57  
    58  func (e *errWrongType) Error() string {
    59  	return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
    60  }
    61  
    62  type errNotFound struct {
    63  	typeName   string
    64  	objectName string
    65  }
    66  
    67  func (e *errNotFound) Error() string {
    68  	return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
    69  }
    70  
    71  type errObjectName struct {
    72  	detailedErr error
    73  }
    74  
    75  func (e *errObjectName) Error() string {
    76  	return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
    77  }
    78  
    79  // assumeCache stores two pointers to represent a single object:
    80  //   - The pointer to the informer object.
    81  //   - The pointer to the latest object, which could be the same as
    82  //     the informer object, or an in-memory object.
    83  //
    84  // An informer update always overrides the latest object pointer.
    85  //
    86  // Assume() only updates the latest object pointer.
    87  // Restore() sets the latest object pointer back to the informer object.
    88  // Get/List() always returns the latest object pointer.
    89  type assumeCache struct {
    90  	// The logger that was chosen when setting up the cache.
    91  	// Will be used for all operations.
    92  	logger klog.Logger
    93  
    94  	// Synchronizes updates to store
    95  	rwMutex sync.RWMutex
    96  
    97  	// describes the object stored
    98  	description string
    99  
   100  	// Stores objInfo pointers
   101  	store cache.Indexer
   102  
   103  	// Index function for object
   104  	indexFunc cache.IndexFunc
   105  	indexName string
   106  }
   107  
   108  type objInfo struct {
   109  	// name of the object
   110  	name string
   111  
   112  	// Latest version of object could be cached-only or from informer
   113  	latestObj interface{}
   114  
   115  	// Latest object from informer
   116  	apiObj interface{}
   117  }
   118  
   119  func objInfoKeyFunc(obj interface{}) (string, error) {
   120  	objInfo, ok := obj.(*objInfo)
   121  	if !ok {
   122  		return "", &errWrongType{"objInfo", obj}
   123  	}
   124  	return objInfo.name, nil
   125  }
   126  
   127  func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
   128  	objInfo, ok := obj.(*objInfo)
   129  	if !ok {
   130  		return []string{""}, &errWrongType{"objInfo", obj}
   131  	}
   132  	return c.indexFunc(objInfo.latestObj)
   133  }
   134  
   135  // NewAssumeCache creates an assume cache for general objects.
   136  func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
   137  	c := &assumeCache{
   138  		logger:      logger,
   139  		description: description,
   140  		indexFunc:   indexFunc,
   141  		indexName:   indexName,
   142  	}
   143  	indexers := cache.Indexers{}
   144  	if indexName != "" && indexFunc != nil {
   145  		indexers[indexName] = c.objInfoIndexFunc
   146  	}
   147  	c.store = cache.NewIndexer(objInfoKeyFunc, indexers)
   148  
   149  	// Unit tests don't use informers
   150  	if informer != nil {
   151  		informer.AddEventHandler(
   152  			cache.ResourceEventHandlerFuncs{
   153  				AddFunc:    c.add,
   154  				UpdateFunc: c.update,
   155  				DeleteFunc: c.delete,
   156  			},
   157  		)
   158  	}
   159  	return c
   160  }
   161  
   162  func (c *assumeCache) add(obj interface{}) {
   163  	if obj == nil {
   164  		return
   165  	}
   166  
   167  	name, err := cache.MetaNamespaceKeyFunc(obj)
   168  	if err != nil {
   169  		c.logger.Error(&errObjectName{err}, "Add failed")
   170  		return
   171  	}
   172  
   173  	c.rwMutex.Lock()
   174  	defer c.rwMutex.Unlock()
   175  
   176  	if objInfo, _ := c.getObjInfo(name); objInfo != nil {
   177  		newVersion, err := c.getObjVersion(name, obj)
   178  		if err != nil {
   179  			c.logger.Error(err, "Add failed: couldn't get object version")
   180  			return
   181  		}
   182  
   183  		storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
   184  		if err != nil {
   185  			c.logger.Error(err, "Add failed: couldn't get stored object version")
   186  			return
   187  		}
   188  
   189  		// Only update object if version is newer.
   190  		// This is so we don't override assumed objects due to informer resync.
   191  		if newVersion <= storedVersion {
   192  			c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion)
   193  			return
   194  		}
   195  	}
   196  
   197  	objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
   198  	if err = c.store.Update(objInfo); err != nil {
   199  		c.logger.Info("Error occurred while updating stored object", "err", err)
   200  	} else {
   201  		c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj)
   202  	}
   203  }
   204  
   205  func (c *assumeCache) update(oldObj interface{}, newObj interface{}) {
   206  	c.add(newObj)
   207  }
   208  
   209  func (c *assumeCache) delete(obj interface{}) {
   210  	if obj == nil {
   211  		return
   212  	}
   213  
   214  	name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   215  	if err != nil {
   216  		c.logger.Error(&errObjectName{err}, "Failed to delete")
   217  		return
   218  	}
   219  
   220  	c.rwMutex.Lock()
   221  	defer c.rwMutex.Unlock()
   222  
   223  	objInfo := &objInfo{name: name}
   224  	err = c.store.Delete(objInfo)
   225  	if err != nil {
   226  		c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name)
   227  	}
   228  }
   229  
   230  func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
   231  	objAccessor, err := meta.Accessor(obj)
   232  	if err != nil {
   233  		return -1, err
   234  	}
   235  
   236  	objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
   237  	if err != nil {
   238  		return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
   239  	}
   240  	return objResourceVersion, nil
   241  }
   242  
   243  func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
   244  	obj, ok, err := c.store.GetByKey(name)
   245  	if err != nil {
   246  		return nil, err
   247  	}
   248  	if !ok {
   249  		return nil, &errNotFound{c.description, name}
   250  	}
   251  
   252  	objInfo, ok := obj.(*objInfo)
   253  	if !ok {
   254  		return nil, &errWrongType{"objInfo", obj}
   255  	}
   256  	return objInfo, nil
   257  }
   258  
   259  func (c *assumeCache) Get(objName string) (interface{}, error) {
   260  	c.rwMutex.RLock()
   261  	defer c.rwMutex.RUnlock()
   262  
   263  	objInfo, err := c.getObjInfo(objName)
   264  	if err != nil {
   265  		return nil, err
   266  	}
   267  	return objInfo.latestObj, nil
   268  }
   269  
   270  func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
   271  	c.rwMutex.RLock()
   272  	defer c.rwMutex.RUnlock()
   273  
   274  	objInfo, err := c.getObjInfo(objName)
   275  	if err != nil {
   276  		return nil, err
   277  	}
   278  	return objInfo.apiObj, nil
   279  }
   280  
   281  func (c *assumeCache) List(indexObj interface{}) []interface{} {
   282  	c.rwMutex.RLock()
   283  	defer c.rwMutex.RUnlock()
   284  
   285  	allObjs := []interface{}{}
   286  	var objs []interface{}
   287  	if c.indexName != "" {
   288  		o, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
   289  		if err != nil {
   290  			c.logger.Error(err, "List index error")
   291  			return nil
   292  		}
   293  		objs = o
   294  	} else {
   295  		objs = c.store.List()
   296  	}
   297  
   298  	for _, obj := range objs {
   299  		objInfo, ok := obj.(*objInfo)
   300  		if !ok {
   301  			c.logger.Error(&errWrongType{"objInfo", obj}, "List error")
   302  			continue
   303  		}
   304  		allObjs = append(allObjs, objInfo.latestObj)
   305  	}
   306  	return allObjs
   307  }
   308  
   309  func (c *assumeCache) Assume(obj interface{}) error {
   310  	name, err := cache.MetaNamespaceKeyFunc(obj)
   311  	if err != nil {
   312  		return &errObjectName{err}
   313  	}
   314  
   315  	c.rwMutex.Lock()
   316  	defer c.rwMutex.Unlock()
   317  
   318  	objInfo, err := c.getObjInfo(name)
   319  	if err != nil {
   320  		return err
   321  	}
   322  
   323  	newVersion, err := c.getObjVersion(name, obj)
   324  	if err != nil {
   325  		return err
   326  	}
   327  
   328  	storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
   329  	if err != nil {
   330  		return err
   331  	}
   332  
   333  	if newVersion < storedVersion {
   334  		return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
   335  	}
   336  
   337  	// Only update the cached object
   338  	objInfo.latestObj = obj
   339  	c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion)
   340  	return nil
   341  }
   342  
   343  func (c *assumeCache) Restore(objName string) {
   344  	c.rwMutex.Lock()
   345  	defer c.rwMutex.Unlock()
   346  
   347  	objInfo, err := c.getObjInfo(objName)
   348  	if err != nil {
   349  		// This could be expected if object got deleted
   350  		c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err)
   351  	} else {
   352  		objInfo.latestObj = objInfo.apiObj
   353  		c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName)
   354  	}
   355  }
   356  
   357  // PVAssumeCache is a AssumeCache for PersistentVolume objects
   358  type PVAssumeCache interface {
   359  	AssumeCache
   360  
   361  	GetPV(pvName string) (*v1.PersistentVolume, error)
   362  	GetAPIPV(pvName string) (*v1.PersistentVolume, error)
   363  	ListPVs(storageClassName string) []*v1.PersistentVolume
   364  }
   365  
   366  type pvAssumeCache struct {
   367  	AssumeCache
   368  	logger klog.Logger
   369  }
   370  
   371  func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
   372  	if pv, ok := obj.(*v1.PersistentVolume); ok {
   373  		return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil
   374  	}
   375  	return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
   376  }
   377  
   378  // NewPVAssumeCache creates a PV assume cache.
   379  func NewPVAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVAssumeCache {
   380  	logger = klog.LoggerWithName(logger, "PV Cache")
   381  	return &pvAssumeCache{
   382  		AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc),
   383  		logger:      logger,
   384  	}
   385  }
   386  
   387  func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
   388  	obj, err := c.Get(pvName)
   389  	if err != nil {
   390  		return nil, err
   391  	}
   392  
   393  	pv, ok := obj.(*v1.PersistentVolume)
   394  	if !ok {
   395  		return nil, &errWrongType{"v1.PersistentVolume", obj}
   396  	}
   397  	return pv, nil
   398  }
   399  
   400  func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
   401  	obj, err := c.GetAPIObj(pvName)
   402  	if err != nil {
   403  		return nil, err
   404  	}
   405  	pv, ok := obj.(*v1.PersistentVolume)
   406  	if !ok {
   407  		return nil, &errWrongType{"v1.PersistentVolume", obj}
   408  	}
   409  	return pv, nil
   410  }
   411  
   412  func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
   413  	objs := c.List(&v1.PersistentVolume{
   414  		Spec: v1.PersistentVolumeSpec{
   415  			StorageClassName: storageClassName,
   416  		},
   417  	})
   418  	pvs := []*v1.PersistentVolume{}
   419  	for _, obj := range objs {
   420  		pv, ok := obj.(*v1.PersistentVolume)
   421  		if !ok {
   422  			c.logger.Error(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs")
   423  			continue
   424  		}
   425  		pvs = append(pvs, pv)
   426  	}
   427  	return pvs
   428  }
   429  
   430  // PVCAssumeCache is a AssumeCache for PersistentVolumeClaim objects
   431  type PVCAssumeCache interface {
   432  	AssumeCache
   433  
   434  	// GetPVC returns the PVC from the cache with given pvcKey.
   435  	// pvcKey is the result of MetaNamespaceKeyFunc on PVC obj
   436  	GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
   437  	GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
   438  }
   439  
   440  type pvcAssumeCache struct {
   441  	AssumeCache
   442  	logger klog.Logger
   443  }
   444  
   445  // NewPVCAssumeCache creates a PVC assume cache.
   446  func NewPVCAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVCAssumeCache {
   447  	logger = klog.LoggerWithName(logger, "PVC Cache")
   448  	return &pvcAssumeCache{
   449  		AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil),
   450  		logger:      logger,
   451  	}
   452  }
   453  
   454  func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
   455  	obj, err := c.Get(pvcKey)
   456  	if err != nil {
   457  		return nil, err
   458  	}
   459  
   460  	pvc, ok := obj.(*v1.PersistentVolumeClaim)
   461  	if !ok {
   462  		return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
   463  	}
   464  	return pvc, nil
   465  }
   466  
   467  func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
   468  	obj, err := c.GetAPIObj(pvcKey)
   469  	if err != nil {
   470  		return nil, err
   471  	}
   472  	pvc, ok := obj.(*v1.PersistentVolumeClaim)
   473  	if !ok {
   474  		return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
   475  	}
   476  	return pvc, nil
   477  }
   478  

View as plain text