...

Source file src/k8s.io/kubernetes/pkg/kubelet/container/cache.go

Documentation: k8s.io/kubernetes/pkg/kubelet/container

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package container
    18  
    19  import (
    20  	"sync"
    21  	"time"
    22  
    23  	"k8s.io/apimachinery/pkg/types"
    24  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    25  	"k8s.io/kubernetes/pkg/features"
    26  )
    27  
    28  // Cache stores the PodStatus for the pods. It represents *all* the visible
    29  // pods/containers in the container runtime. All cache entries are at least as
    30  // new or newer than the global timestamp (set by UpdateTime()), while
    31  // individual entries may be slightly newer than the global timestamp. If a pod
    32  // has no states known by the runtime, Cache returns an empty PodStatus object
    33  // with ID populated.
    34  //
    35  // Cache provides two methods to retrieve the PodStatus: the non-blocking Get()
    36  // and the blocking GetNewerThan() method. The component responsible for
    37  // populating the cache is expected to call Delete() to explicitly free the
    38  // cache entries.
    39  type Cache interface {
    40  	Get(types.UID) (*PodStatus, error)
    41  	// Set updates the cache by setting the PodStatus for the pod only
    42  	// if the data is newer than the cache based on the provided
    43  	// time stamp. Returns if the cache was updated.
    44  	Set(types.UID, *PodStatus, error, time.Time) (updated bool)
    45  	// GetNewerThan is a blocking call that only returns the status
    46  	// when it is newer than the given time.
    47  	GetNewerThan(types.UID, time.Time) (*PodStatus, error)
    48  	Delete(types.UID)
    49  	UpdateTime(time.Time)
    50  }
    51  
    52  type data struct {
    53  	// Status of the pod.
    54  	status *PodStatus
    55  	// Error got when trying to inspect the pod.
    56  	err error
    57  	// Time when the data was last modified.
    58  	modified time.Time
    59  }
    60  
    61  type subRecord struct {
    62  	time time.Time
    63  	ch   chan *data
    64  }
    65  
    66  // cache implements Cache.
    67  type cache struct {
    68  	// Lock which guards all internal data structures.
    69  	lock sync.RWMutex
    70  	// Map that stores the pod statuses.
    71  	pods map[types.UID]*data
    72  	// A global timestamp represents how fresh the cached data is. All
    73  	// cache content is at the least newer than this timestamp. Note that the
    74  	// timestamp is nil after initialization, and will only become non-nil when
    75  	// it is ready to serve the cached statuses.
    76  	timestamp *time.Time
    77  	// Map that stores the subscriber records.
    78  	subscribers map[types.UID][]*subRecord
    79  }
    80  
    81  // NewCache creates a pod cache.
    82  func NewCache() Cache {
    83  	return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}}
    84  }
    85  
    86  // Get returns the PodStatus for the pod; callers are expected not to
    87  // modify the objects returned.
    88  func (c *cache) Get(id types.UID) (*PodStatus, error) {
    89  	c.lock.RLock()
    90  	defer c.lock.RUnlock()
    91  	d := c.get(id)
    92  	return d.status, d.err
    93  }
    94  
    95  func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) {
    96  	ch := c.subscribe(id, minTime)
    97  	d := <-ch
    98  	return d.status, d.err
    99  }
   100  
   101  // Set sets the PodStatus for the pod only if the data is newer than the cache
   102  func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) (updated bool) {
   103  	c.lock.Lock()
   104  	defer c.lock.Unlock()
   105  
   106  	if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) {
   107  		// Set the value in the cache only if it's not present already
   108  		// or the timestamp in the cache is older than the current update timestamp
   109  		if cachedVal, ok := c.pods[id]; ok && cachedVal.modified.After(timestamp) {
   110  			return false
   111  		}
   112  	}
   113  
   114  	c.pods[id] = &data{status: status, err: err, modified: timestamp}
   115  	c.notify(id, timestamp)
   116  	return true
   117  }
   118  
   119  // Delete removes the entry of the pod.
   120  func (c *cache) Delete(id types.UID) {
   121  	c.lock.Lock()
   122  	defer c.lock.Unlock()
   123  	delete(c.pods, id)
   124  }
   125  
   126  // UpdateTime modifies the global timestamp of the cache and notify
   127  // subscribers if needed.
   128  func (c *cache) UpdateTime(timestamp time.Time) {
   129  	c.lock.Lock()
   130  	defer c.lock.Unlock()
   131  	c.timestamp = &timestamp
   132  	// Notify all the subscribers if the condition is met.
   133  	for id := range c.subscribers {
   134  		c.notify(id, *c.timestamp)
   135  	}
   136  }
   137  
   138  func makeDefaultData(id types.UID) *data {
   139  	return &data{status: &PodStatus{ID: id}, err: nil}
   140  }
   141  
   142  func (c *cache) get(id types.UID) *data {
   143  	d, ok := c.pods[id]
   144  	if !ok {
   145  		// Cache should store *all* pod/container information known by the
   146  		// container runtime. A cache miss indicates that there are no states
   147  		// regarding the pod last time we queried the container runtime.
   148  		// What this *really* means is that there are no visible pod/containers
   149  		// associated with this pod. Simply return an default (mostly empty)
   150  		// PodStatus to reflect this.
   151  		return makeDefaultData(id)
   152  	}
   153  	return d
   154  }
   155  
   156  // getIfNewerThan returns the data it is newer than the given time.
   157  // Otherwise, it returns nil. The caller should acquire the lock.
   158  func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data {
   159  	d, ok := c.pods[id]
   160  	globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
   161  	if !ok && globalTimestampIsNewer {
   162  		// Status is not cached, but the global timestamp is newer than
   163  		// minTime, return the default status.
   164  		return makeDefaultData(id)
   165  	}
   166  	if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
   167  		// Status is cached, return status if either of the following is true.
   168  		//   * status was modified after minTime
   169  		//   * the global timestamp of the cache is newer than minTime.
   170  		return d
   171  	}
   172  	// The pod status is not ready.
   173  	return nil
   174  }
   175  
   176  // notify sends notifications for pod with the given id, if the requirements
   177  // are met. Note that the caller should acquire the lock.
   178  func (c *cache) notify(id types.UID, timestamp time.Time) {
   179  	list, ok := c.subscribers[id]
   180  	if !ok {
   181  		// No one to notify.
   182  		return
   183  	}
   184  	newList := []*subRecord{}
   185  	for i, r := range list {
   186  		if timestamp.Before(r.time) {
   187  			// Doesn't meet the time requirement; keep the record.
   188  			newList = append(newList, list[i])
   189  			continue
   190  		}
   191  		r.ch <- c.get(id)
   192  		close(r.ch)
   193  	}
   194  	if len(newList) == 0 {
   195  		delete(c.subscribers, id)
   196  	} else {
   197  		c.subscribers[id] = newList
   198  	}
   199  }
   200  
   201  func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data {
   202  	ch := make(chan *data, 1)
   203  	c.lock.Lock()
   204  	defer c.lock.Unlock()
   205  	d := c.getIfNewerThan(id, timestamp)
   206  	if d != nil {
   207  		// If the cache entry is ready, send the data and return immediately.
   208  		ch <- d
   209  		return ch
   210  	}
   211  	// Add the subscription record.
   212  	c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
   213  	return ch
   214  }
   215  

View as plain text