1
16
17 package manager
18
19 import (
20 "fmt"
21 "strconv"
22 "sync"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apiserver/pkg/storage"
27 "k8s.io/kubernetes/pkg/kubelet/util"
28
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/types"
33 "k8s.io/apimachinery/pkg/util/sets"
34 "k8s.io/utils/clock"
35 )
36
37
38 type GetObjectTTLFunc func() (time.Duration, bool)
39
40
41 type GetObjectFunc func(string, string, metav1.GetOptions) (runtime.Object, error)
42
43 type objectKey struct {
44 namespace string
45 name string
46 uid types.UID
47 }
48
49
50 type objectStoreItem struct {
51 refCount int
52 data *objectData
53 }
54
55 type objectData struct {
56 sync.Mutex
57
58 object runtime.Object
59 err error
60 lastUpdateTime time.Time
61 }
62
63
64 type objectStore struct {
65 getObject GetObjectFunc
66 clock clock.Clock
67
68 lock sync.Mutex
69 items map[objectKey]*objectStoreItem
70
71 defaultTTL time.Duration
72 getTTL GetObjectTTLFunc
73 }
74
75
76 func NewObjectStore(getObject GetObjectFunc, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) Store {
77 return &objectStore{
78 getObject: getObject,
79 clock: clock,
80 items: make(map[objectKey]*objectStoreItem),
81 defaultTTL: ttl,
82 getTTL: getTTL,
83 }
84 }
85
86 func isObjectOlder(newObject, oldObject runtime.Object) bool {
87 if newObject == nil || oldObject == nil {
88 return false
89 }
90 newVersion, _ := storage.APIObjectVersioner{}.ObjectResourceVersion(newObject)
91 oldVersion, _ := storage.APIObjectVersioner{}.ObjectResourceVersion(oldObject)
92 return newVersion < oldVersion
93 }
94
95 func (s *objectStore) AddReference(namespace, name string, _ types.UID) {
96 key := objectKey{namespace: namespace, name: name}
97
98
99
100
101 s.lock.Lock()
102 defer s.lock.Unlock()
103 item, exists := s.items[key]
104 if !exists {
105 item = &objectStoreItem{
106 refCount: 0,
107 data: &objectData{},
108 }
109 s.items[key] = item
110 }
111
112 item.refCount++
113
114 item.data = nil
115 }
116
117 func (s *objectStore) DeleteReference(namespace, name string, _ types.UID) {
118 key := objectKey{namespace: namespace, name: name}
119
120 s.lock.Lock()
121 defer s.lock.Unlock()
122 if item, ok := s.items[key]; ok {
123 item.refCount--
124 if item.refCount == 0 {
125 delete(s.items, key)
126 }
127 }
128 }
129
130
131
132 func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
133 return func() (time.Duration, bool) {
134 node, err := getNode()
135 if err != nil {
136 return time.Duration(0), false
137 }
138 if node != nil && node.Annotations != nil {
139 if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
140 if intValue, err := strconv.Atoi(value); err == nil {
141 return time.Duration(intValue) * time.Second, true
142 }
143 }
144 }
145 return time.Duration(0), false
146 }
147 }
148
149 func (s *objectStore) isObjectFresh(data *objectData) bool {
150 objectTTL := s.defaultTTL
151 if ttl, ok := s.getTTL(); ok {
152 objectTTL = ttl
153 }
154 return s.clock.Now().Before(data.lastUpdateTime.Add(objectTTL))
155 }
156
157 func (s *objectStore) Get(namespace, name string) (runtime.Object, error) {
158 key := objectKey{namespace: namespace, name: name}
159
160 data := func() *objectData {
161 s.lock.Lock()
162 defer s.lock.Unlock()
163 item, exists := s.items[key]
164 if !exists {
165 return nil
166 }
167 if item.data == nil {
168 item.data = &objectData{}
169 }
170 return item.data
171 }()
172 if data == nil {
173 return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
174 }
175
176
177
178 data.Lock()
179 defer data.Unlock()
180 if data.err != nil || !s.isObjectFresh(data) {
181 opts := metav1.GetOptions{}
182 if data.object != nil && data.err == nil {
183
184
185
186 util.FromApiserverCache(&opts)
187 }
188
189 object, err := s.getObject(namespace, name, opts)
190 if err != nil && !apierrors.IsNotFound(err) && data.object == nil && data.err == nil {
191
192
193 return object, err
194 }
195 if (err == nil && !isObjectOlder(object, data.object)) || apierrors.IsNotFound(err) {
196
197
198
199 data.object = object
200 data.err = err
201 data.lastUpdateTime = s.clock.Now()
202 }
203 }
204 return data.object, data.err
205 }
206
207
208
209
210
211 type cacheBasedManager struct {
212 objectStore Store
213 getReferencedObjects func(*v1.Pod) sets.String
214
215 lock sync.Mutex
216 registeredPods map[objectKey]*v1.Pod
217 }
218
219 func (c *cacheBasedManager) GetObject(namespace, name string) (runtime.Object, error) {
220 return c.objectStore.Get(namespace, name)
221 }
222
223 func (c *cacheBasedManager) RegisterPod(pod *v1.Pod) {
224 names := c.getReferencedObjects(pod)
225 c.lock.Lock()
226 defer c.lock.Unlock()
227 var prev *v1.Pod
228 key := objectKey{namespace: pod.Namespace, name: pod.Name, uid: pod.UID}
229 prev = c.registeredPods[key]
230 c.registeredPods[key] = pod
231
232
233 if prev == nil {
234 for name := range names {
235 c.objectStore.AddReference(pod.Namespace, name, pod.UID)
236 }
237 } else {
238 prevNames := c.getReferencedObjects(prev)
239
240 for name := range names {
241 if !prevNames.Has(name) {
242 c.objectStore.AddReference(pod.Namespace, name, pod.UID)
243 }
244 }
245
246 for prevName := range prevNames {
247 if !names.Has(prevName) {
248 c.objectStore.DeleteReference(pod.Namespace, prevName, pod.UID)
249 }
250 }
251 }
252 }
253
254 func (c *cacheBasedManager) UnregisterPod(pod *v1.Pod) {
255 var prev *v1.Pod
256 key := objectKey{namespace: pod.Namespace, name: pod.Name, uid: pod.UID}
257 c.lock.Lock()
258 defer c.lock.Unlock()
259 prev = c.registeredPods[key]
260 delete(c.registeredPods, key)
261 if prev != nil {
262 for name := range c.getReferencedObjects(prev) {
263 c.objectStore.DeleteReference(prev.Namespace, name, prev.UID)
264 }
265 }
266 }
267
268
269
270
271
272
273
274
275
276 func NewCacheBasedManager(objectStore Store, getReferencedObjects func(*v1.Pod) sets.String) Manager {
277 return &cacheBasedManager{
278 objectStore: objectStore,
279 getReferencedObjects: getReferencedObjects,
280 registeredPods: make(map[objectKey]*v1.Pod),
281 }
282 }
283
View as plain text