...

Source file src/k8s.io/kubernetes/pkg/kubelet/util/manager/cache_based_manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/util/manager

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package manager
    18  
    19  import (
    20  	"fmt"
    21  	"strconv"
    22  	"sync"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apiserver/pkg/storage"
    27  	"k8s.io/kubernetes/pkg/kubelet/util"
    28  
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/types"
    33  	"k8s.io/apimachinery/pkg/util/sets"
    34  	"k8s.io/utils/clock"
    35  )
    36  
    37  // GetObjectTTLFunc defines a function to get value of TTL.
    38  type GetObjectTTLFunc func() (time.Duration, bool)
    39  
    40  // GetObjectFunc defines a function to get object with a given namespace and name.
    41  type GetObjectFunc func(string, string, metav1.GetOptions) (runtime.Object, error)
    42  
    43  type objectKey struct {
    44  	namespace string
    45  	name      string
    46  	uid       types.UID
    47  }
    48  
    49  // objectStoreItems is a single item stored in objectStore.
    50  type objectStoreItem struct {
    51  	refCount int
    52  	data     *objectData
    53  }
    54  
    55  type objectData struct {
    56  	sync.Mutex
    57  
    58  	object         runtime.Object
    59  	err            error
    60  	lastUpdateTime time.Time
    61  }
    62  
    63  // objectStore is a local cache of objects.
    64  type objectStore struct {
    65  	getObject GetObjectFunc
    66  	clock     clock.Clock
    67  
    68  	lock  sync.Mutex
    69  	items map[objectKey]*objectStoreItem
    70  
    71  	defaultTTL time.Duration
    72  	getTTL     GetObjectTTLFunc
    73  }
    74  
    75  // NewObjectStore returns a new ttl-based instance of Store interface.
    76  func NewObjectStore(getObject GetObjectFunc, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) Store {
    77  	return &objectStore{
    78  		getObject:  getObject,
    79  		clock:      clock,
    80  		items:      make(map[objectKey]*objectStoreItem),
    81  		defaultTTL: ttl,
    82  		getTTL:     getTTL,
    83  	}
    84  }
    85  
    86  func isObjectOlder(newObject, oldObject runtime.Object) bool {
    87  	if newObject == nil || oldObject == nil {
    88  		return false
    89  	}
    90  	newVersion, _ := storage.APIObjectVersioner{}.ObjectResourceVersion(newObject)
    91  	oldVersion, _ := storage.APIObjectVersioner{}.ObjectResourceVersion(oldObject)
    92  	return newVersion < oldVersion
    93  }
    94  
    95  func (s *objectStore) AddReference(namespace, name string, _ types.UID) {
    96  	key := objectKey{namespace: namespace, name: name}
    97  
    98  	// AddReference is called from RegisterPod, thus it needs to be efficient.
    99  	// Thus Add() is only increasing refCount and generation of a given object.
   100  	// Then Get() is responsible for fetching if needed.
   101  	s.lock.Lock()
   102  	defer s.lock.Unlock()
   103  	item, exists := s.items[key]
   104  	if !exists {
   105  		item = &objectStoreItem{
   106  			refCount: 0,
   107  			data:     &objectData{},
   108  		}
   109  		s.items[key] = item
   110  	}
   111  
   112  	item.refCount++
   113  	// This will trigger fetch on the next Get() operation.
   114  	item.data = nil
   115  }
   116  
   117  func (s *objectStore) DeleteReference(namespace, name string, _ types.UID) {
   118  	key := objectKey{namespace: namespace, name: name}
   119  
   120  	s.lock.Lock()
   121  	defer s.lock.Unlock()
   122  	if item, ok := s.items[key]; ok {
   123  		item.refCount--
   124  		if item.refCount == 0 {
   125  			delete(s.items, key)
   126  		}
   127  	}
   128  }
   129  
   130  // GetObjectTTLFromNodeFunc returns a function that returns TTL value
   131  // from a given Node object.
   132  func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
   133  	return func() (time.Duration, bool) {
   134  		node, err := getNode()
   135  		if err != nil {
   136  			return time.Duration(0), false
   137  		}
   138  		if node != nil && node.Annotations != nil {
   139  			if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
   140  				if intValue, err := strconv.Atoi(value); err == nil {
   141  					return time.Duration(intValue) * time.Second, true
   142  				}
   143  			}
   144  		}
   145  		return time.Duration(0), false
   146  	}
   147  }
   148  
   149  func (s *objectStore) isObjectFresh(data *objectData) bool {
   150  	objectTTL := s.defaultTTL
   151  	if ttl, ok := s.getTTL(); ok {
   152  		objectTTL = ttl
   153  	}
   154  	return s.clock.Now().Before(data.lastUpdateTime.Add(objectTTL))
   155  }
   156  
   157  func (s *objectStore) Get(namespace, name string) (runtime.Object, error) {
   158  	key := objectKey{namespace: namespace, name: name}
   159  
   160  	data := func() *objectData {
   161  		s.lock.Lock()
   162  		defer s.lock.Unlock()
   163  		item, exists := s.items[key]
   164  		if !exists {
   165  			return nil
   166  		}
   167  		if item.data == nil {
   168  			item.data = &objectData{}
   169  		}
   170  		return item.data
   171  	}()
   172  	if data == nil {
   173  		return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
   174  	}
   175  
   176  	// After updating data in objectStore, lock the data, fetch object if
   177  	// needed and return data.
   178  	data.Lock()
   179  	defer data.Unlock()
   180  	if data.err != nil || !s.isObjectFresh(data) {
   181  		opts := metav1.GetOptions{}
   182  		if data.object != nil && data.err == nil {
   183  			// This is just a periodic refresh of an object we successfully fetched previously.
   184  			// In this case, server data from apiserver cache to reduce the load on both
   185  			// etcd and apiserver (the cache is eventually consistent).
   186  			util.FromApiserverCache(&opts)
   187  		}
   188  
   189  		object, err := s.getObject(namespace, name, opts)
   190  		if err != nil && !apierrors.IsNotFound(err) && data.object == nil && data.err == nil {
   191  			// Couldn't fetch the latest object, but there is no cached data to return.
   192  			// Return the fetch result instead.
   193  			return object, err
   194  		}
   195  		if (err == nil && !isObjectOlder(object, data.object)) || apierrors.IsNotFound(err) {
   196  			// If the fetch succeeded with a newer version of the object, or if the
   197  			// object could not be found in the apiserver, update the cached data to
   198  			// reflect the current status.
   199  			data.object = object
   200  			data.err = err
   201  			data.lastUpdateTime = s.clock.Now()
   202  		}
   203  	}
   204  	return data.object, data.err
   205  }
   206  
   207  // cacheBasedManager keeps a store with objects necessary
   208  // for registered pods. Different implementations of the store
   209  // may result in different semantics for freshness of objects
   210  // (e.g. ttl-based implementation vs watch-based implementation).
   211  type cacheBasedManager struct {
   212  	objectStore          Store
   213  	getReferencedObjects func(*v1.Pod) sets.String
   214  
   215  	lock           sync.Mutex
   216  	registeredPods map[objectKey]*v1.Pod
   217  }
   218  
   219  func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) {
   220  	return c.objectStore.Get(namespace, name)
   221  }
   222  
   223  func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
   224  	names := c.getReferencedObjects(pod)
   225  	c.lock.Lock()
   226  	defer c.lock.Unlock()
   227  	var prev *v1.Pod
   228  	key := objectKey{namespace: pod.Namespace, name: pod.Name, uid: pod.UID}
   229  	prev = c.registeredPods[key]
   230  	c.registeredPods[key] = pod
   231  	// To minimize unnecessary API requests to the API server for the configmap/secret get API
   232  	// only invoke AddReference the first time RegisterPod is called for a pod.
   233  	if prev == nil {
   234  		for name := range names {
   235  			c.objectStore.AddReference(pod.Namespace, name, pod.UID)
   236  		}
   237  	} else {
   238  		prevNames := c.getReferencedObjects(prev)
   239  		// Add new references
   240  		for name := range names {
   241  			if !prevNames.Has(name) {
   242  				c.objectStore.AddReference(pod.Namespace, name, pod.UID)
   243  			}
   244  		}
   245  		// Remove dropped references
   246  		for prevName := range prevNames {
   247  			if !names.Has(prevName) {
   248  				c.objectStore.DeleteReference(pod.Namespace, prevName, pod.UID)
   249  			}
   250  		}
   251  	}
   252  }
   253  
   254  func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
   255  	var prev *v1.Pod
   256  	key := objectKey{namespace: pod.Namespace, name: pod.Name, uid: pod.UID}
   257  	c.lock.Lock()
   258  	defer c.lock.Unlock()
   259  	prev = c.registeredPods[key]
   260  	delete(c.registeredPods, key)
   261  	if prev != nil {
   262  		for name := range c.getReferencedObjects(prev) {
   263  			c.objectStore.DeleteReference(prev.Namespace, name, prev.UID)
   264  		}
   265  	}
   266  }
   267  
   268  // NewCacheBasedManager creates a manager that keeps a cache of all objects
   269  // necessary for registered pods.
   270  // It implements the following logic:
   271  //   - whenever a pod is created or updated, the cached versions of all objects
   272  //     is referencing are invalidated
   273  //   - every GetObject() call tries to fetch the value from local cache; if it is
   274  //     not there, invalidated or too old, we fetch it from apiserver and refresh the
   275  //     value in cache; otherwise it is just fetched from cache
   276  func NewCacheBasedManager(objectStore Store, getReferencedObjects func(*v1.Pod) sets.String) Manager {
   277  	return &cacheBasedManager{
   278  		objectStore:          objectStore,
   279  		getReferencedObjects: getReferencedObjects,
   280  		registeredPods:       make(map[objectKey]*v1.Pod),
   281  	}
   282  }
   283  

View as plain text