1 /* 2 Copyright 2015 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package container 18 19 import ( 20 "sync" 21 "time" 22 23 "k8s.io/apimachinery/pkg/types" 24 utilfeature "k8s.io/apiserver/pkg/util/feature" 25 "k8s.io/kubernetes/pkg/features" 26 ) 27 28 // Cache stores the PodStatus for the pods. It represents *all* the visible 29 // pods/containers in the container runtime. All cache entries are at least as 30 // new or newer than the global timestamp (set by UpdateTime()), while 31 // individual entries may be slightly newer than the global timestamp. If a pod 32 // has no states known by the runtime, Cache returns an empty PodStatus object 33 // with ID populated. 34 // 35 // Cache provides two methods to retrieve the PodStatus: the non-blocking Get() 36 // and the blocking GetNewerThan() method. The component responsible for 37 // populating the cache is expected to call Delete() to explicitly free the 38 // cache entries. 39 type Cache interface { 40 Get(types.UID) (*PodStatus, error) 41 // Set updates the cache by setting the PodStatus for the pod only 42 // if the data is newer than the cache based on the provided 43 // time stamp. Returns if the cache was updated. 44 Set(types.UID, *PodStatus, error, time.Time) (updated bool) 45 // GetNewerThan is a blocking call that only returns the status 46 // when it is newer than the given time. 47 GetNewerThan(types.UID, time.Time) (*PodStatus, error) 48 Delete(types.UID) 49 UpdateTime(time.Time) 50 } 51 52 type data struct { 53 // Status of the pod. 54 status *PodStatus 55 // Error got when trying to inspect the pod. 56 err error 57 // Time when the data was last modified. 58 modified time.Time 59 } 60 61 type subRecord struct { 62 time time.Time 63 ch chan *data 64 } 65 66 // cache implements Cache. 67 type cache struct { 68 // Lock which guards all internal data structures. 69 lock sync.RWMutex 70 // Map that stores the pod statuses. 71 pods map[types.UID]*data 72 // A global timestamp represents how fresh the cached data is. All 73 // cache content is at the least newer than this timestamp. Note that the 74 // timestamp is nil after initialization, and will only become non-nil when 75 // it is ready to serve the cached statuses. 76 timestamp *time.Time 77 // Map that stores the subscriber records. 78 subscribers map[types.UID][]*subRecord 79 } 80 81 // NewCache creates a pod cache. 82 func NewCache() Cache { 83 return &cache{pods: map[types.UID]*data{}, subscribers: map[types.UID][]*subRecord{}} 84 } 85 86 // Get returns the PodStatus for the pod; callers are expected not to 87 // modify the objects returned. 88 func (c *cache) Get(id types.UID) (*PodStatus, error) { 89 c.lock.RLock() 90 defer c.lock.RUnlock() 91 d := c.get(id) 92 return d.status, d.err 93 } 94 95 func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) { 96 ch := c.subscribe(id, minTime) 97 d := <-ch 98 return d.status, d.err 99 } 100 101 // Set sets the PodStatus for the pod only if the data is newer than the cache 102 func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) (updated bool) { 103 c.lock.Lock() 104 defer c.lock.Unlock() 105 106 if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { 107 // Set the value in the cache only if it's not present already 108 // or the timestamp in the cache is older than the current update timestamp 109 if cachedVal, ok := c.pods[id]; ok && cachedVal.modified.After(timestamp) { 110 return false 111 } 112 } 113 114 c.pods[id] = &data{status: status, err: err, modified: timestamp} 115 c.notify(id, timestamp) 116 return true 117 } 118 119 // Delete removes the entry of the pod. 120 func (c *cache) Delete(id types.UID) { 121 c.lock.Lock() 122 defer c.lock.Unlock() 123 delete(c.pods, id) 124 } 125 126 // UpdateTime modifies the global timestamp of the cache and notify 127 // subscribers if needed. 128 func (c *cache) UpdateTime(timestamp time.Time) { 129 c.lock.Lock() 130 defer c.lock.Unlock() 131 c.timestamp = ×tamp 132 // Notify all the subscribers if the condition is met. 133 for id := range c.subscribers { 134 c.notify(id, *c.timestamp) 135 } 136 } 137 138 func makeDefaultData(id types.UID) *data { 139 return &data{status: &PodStatus{ID: id}, err: nil} 140 } 141 142 func (c *cache) get(id types.UID) *data { 143 d, ok := c.pods[id] 144 if !ok { 145 // Cache should store *all* pod/container information known by the 146 // container runtime. A cache miss indicates that there are no states 147 // regarding the pod last time we queried the container runtime. 148 // What this *really* means is that there are no visible pod/containers 149 // associated with this pod. Simply return an default (mostly empty) 150 // PodStatus to reflect this. 151 return makeDefaultData(id) 152 } 153 return d 154 } 155 156 // getIfNewerThan returns the data it is newer than the given time. 157 // Otherwise, it returns nil. The caller should acquire the lock. 158 func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data { 159 d, ok := c.pods[id] 160 globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime)) 161 if !ok && globalTimestampIsNewer { 162 // Status is not cached, but the global timestamp is newer than 163 // minTime, return the default status. 164 return makeDefaultData(id) 165 } 166 if ok && (d.modified.After(minTime) || globalTimestampIsNewer) { 167 // Status is cached, return status if either of the following is true. 168 // * status was modified after minTime 169 // * the global timestamp of the cache is newer than minTime. 170 return d 171 } 172 // The pod status is not ready. 173 return nil 174 } 175 176 // notify sends notifications for pod with the given id, if the requirements 177 // are met. Note that the caller should acquire the lock. 178 func (c *cache) notify(id types.UID, timestamp time.Time) { 179 list, ok := c.subscribers[id] 180 if !ok { 181 // No one to notify. 182 return 183 } 184 newList := []*subRecord{} 185 for i, r := range list { 186 if timestamp.Before(r.time) { 187 // Doesn't meet the time requirement; keep the record. 188 newList = append(newList, list[i]) 189 continue 190 } 191 r.ch <- c.get(id) 192 close(r.ch) 193 } 194 if len(newList) == 0 { 195 delete(c.subscribers, id) 196 } else { 197 c.subscribers[id] = newList 198 } 199 } 200 201 func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data { 202 ch := make(chan *data, 1) 203 c.lock.Lock() 204 defer c.lock.Unlock() 205 d := c.getIfNewerThan(id, timestamp) 206 if d != nil { 207 // If the cache entry is ready, send the data and return immediately. 208 ch <- d 209 return ch 210 } 211 // Add the subscription record. 212 c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch}) 213 return ch 214 } 215