1
16
17 package manager
18
19 import (
20 "fmt"
21 "sync"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/client-go/tools/cache"
26
27 "k8s.io/klog/v2"
28
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/fields"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/apimachinery/pkg/runtime/schema"
34 "k8s.io/apimachinery/pkg/types"
35 "k8s.io/apimachinery/pkg/util/sets"
36 "k8s.io/apimachinery/pkg/util/wait"
37 "k8s.io/apimachinery/pkg/watch"
38 "k8s.io/utils/clock"
39 )
40
41 type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
42 type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
43 type newObjectFunc func() runtime.Object
44 type isImmutableFunc func(runtime.Object) bool
45
46
47 type objectCacheItem struct {
48 refMap map[types.UID]int
49 store *cacheStore
50 reflector *cache.Reflector
51
52 hasSynced func() (bool, error)
53
54
55 waitGroup sync.WaitGroup
56
57
58
59 lock sync.Mutex
60 lastAccessTime time.Time
61 stopped bool
62 immutable bool
63 stopCh chan struct{}
64 }
65
66 func (i *objectCacheItem) stop() bool {
67 i.lock.Lock()
68 defer i.lock.Unlock()
69 return i.stopThreadUnsafe()
70 }
71
72 func (i *objectCacheItem) stopThreadUnsafe() bool {
73 if i.stopped {
74 return false
75 }
76 i.stopped = true
77 close(i.stopCh)
78 if !i.immutable {
79 i.store.unsetInitialized()
80 }
81 return true
82 }
83
84 func (i *objectCacheItem) setLastAccessTime(time time.Time) {
85 i.lock.Lock()
86 defer i.lock.Unlock()
87 i.lastAccessTime = time
88 }
89
90 func (i *objectCacheItem) setImmutable() {
91 i.lock.Lock()
92 defer i.lock.Unlock()
93 i.immutable = true
94 }
95
96 func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
97 i.lock.Lock()
98 defer i.lock.Unlock()
99
100
101
102
103 if !i.stopped && i.store.hasSynced() && now.After(i.lastAccessTime.Add(maxIdleTime)) {
104 return i.stopThreadUnsafe()
105 }
106 return false
107 }
108
109 func (i *objectCacheItem) restartReflectorIfNeeded() {
110 i.lock.Lock()
111 defer i.lock.Unlock()
112 if i.immutable || !i.stopped {
113 return
114 }
115 i.stopCh = make(chan struct{})
116 i.stopped = false
117 go i.startReflector()
118 }
119
120 func (i *objectCacheItem) startReflector() {
121 i.waitGroup.Wait()
122 i.waitGroup.Add(1)
123 defer i.waitGroup.Done()
124 i.reflector.Run(i.stopCh)
125 }
126
127
128 type cacheStore struct {
129 cache.Store
130 lock sync.Mutex
131 initialized bool
132 }
133
134 func (c *cacheStore) Replace(list []interface{}, resourceVersion string) error {
135 c.lock.Lock()
136 defer c.lock.Unlock()
137 err := c.Store.Replace(list, resourceVersion)
138 if err != nil {
139 return err
140 }
141 c.initialized = true
142 return nil
143 }
144
145 func (c *cacheStore) hasSynced() bool {
146 c.lock.Lock()
147 defer c.lock.Unlock()
148 return c.initialized
149 }
150
151 func (c *cacheStore) unsetInitialized() {
152 c.lock.Lock()
153 defer c.lock.Unlock()
154 c.initialized = false
155 }
156
157
158
159 type objectCache struct {
160 listObject listObjectFunc
161 watchObject watchObjectFunc
162 newObject newObjectFunc
163 isImmutable isImmutableFunc
164 groupResource schema.GroupResource
165 clock clock.Clock
166 maxIdleTime time.Duration
167
168 lock sync.RWMutex
169 items map[objectKey]*objectCacheItem
170 stopped bool
171 }
172
173 const minIdleTime = 1 * time.Minute
174
175
176 func NewObjectCache(
177 listObject listObjectFunc,
178 watchObject watchObjectFunc,
179 newObject newObjectFunc,
180 isImmutable isImmutableFunc,
181 groupResource schema.GroupResource,
182 clock clock.Clock,
183 maxIdleTime time.Duration,
184 stopCh <-chan struct{}) Store {
185
186 if maxIdleTime < minIdleTime {
187 maxIdleTime = minIdleTime
188 }
189
190 store := &objectCache{
191 listObject: listObject,
192 watchObject: watchObject,
193 newObject: newObject,
194 isImmutable: isImmutable,
195 groupResource: groupResource,
196 clock: clock,
197 maxIdleTime: maxIdleTime,
198 items: make(map[objectKey]*objectCacheItem),
199 }
200
201 go wait.Until(store.startRecycleIdleWatch, time.Minute, stopCh)
202 go store.shutdownWhenStopped(stopCh)
203 return store
204 }
205
206 func (c *objectCache) newStore() *cacheStore {
207
208
209
210
211
212 store := cache.NewStore(cache.MetaNamespaceKeyFunc)
213 return &cacheStore{store, sync.Mutex{}, false}
214 }
215
216 func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheItem {
217 fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
218 listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
219 options.FieldSelector = fieldSelector
220 return c.listObject(namespace, options)
221 }
222 watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
223 options.FieldSelector = fieldSelector
224 return c.watchObject(namespace, options)
225 }
226 store := c.newStore()
227 reflector := cache.NewNamedReflector(
228 fmt.Sprintf("object-%q/%q", namespace, name),
229 &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
230 c.newObject(),
231 store,
232 0,
233 )
234 item := &objectCacheItem{
235 refMap: make(map[types.UID]int),
236 store: store,
237 reflector: reflector,
238 hasSynced: func() (bool, error) { return store.hasSynced(), nil },
239 stopCh: make(chan struct{}),
240 }
241
242
243 if !c.stopped {
244 go item.startReflector()
245 }
246 return item
247 }
248
249 func (c *objectCache) AddReference(namespace, name string, referencedFrom types.UID) {
250 key := objectKey{namespace: namespace, name: name}
251
252
253
254
255
256
257 c.lock.Lock()
258 defer c.lock.Unlock()
259 item, exists := c.items[key]
260 if !exists {
261 item = c.newReflectorLocked(namespace, name)
262 c.items[key] = item
263 }
264 item.refMap[referencedFrom]++
265 }
266
267 func (c *objectCache) DeleteReference(namespace, name string, referencedFrom types.UID) {
268 key := objectKey{namespace: namespace, name: name}
269
270 c.lock.Lock()
271 defer c.lock.Unlock()
272 if item, ok := c.items[key]; ok {
273 item.refMap[referencedFrom]--
274 if item.refMap[referencedFrom] == 0 {
275 delete(item.refMap, referencedFrom)
276 }
277 if len(item.refMap) == 0 {
278
279 item.stop()
280 delete(c.items, key)
281 }
282 }
283 }
284
285
286
287 func (c *objectCache) key(namespace, name string) string {
288 if len(namespace) > 0 {
289 return namespace + "/" + name
290 }
291 return name
292 }
293
294 func (c *objectCache) isStopped() bool {
295 c.lock.RLock()
296 defer c.lock.RUnlock()
297 return c.stopped
298 }
299
300 func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
301 key := objectKey{namespace: namespace, name: name}
302
303 c.lock.RLock()
304 item, exists := c.items[key]
305 c.lock.RUnlock()
306
307 if !exists {
308 return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
309 }
310
311
312 item.setLastAccessTime(c.clock.Now())
313
314
315 if !c.isStopped() {
316 item.restartReflectorIfNeeded()
317 }
318 if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
319 return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
320 }
321 obj, exists, err := item.store.GetByKey(c.key(namespace, name))
322 if err != nil {
323 return nil, err
324 }
325 if !exists {
326 return nil, apierrors.NewNotFound(c.groupResource, name)
327 }
328 if object, ok := obj.(runtime.Object); ok {
329
330
331
332
333
334
335
336
337
338
339 if c.isImmutable(object) {
340 item.setImmutable()
341 if item.stop() {
342 klog.V(4).InfoS("Stopped watching for changes - object is immutable", "obj", klog.KRef(namespace, name))
343 }
344 }
345 return object, nil
346 }
347 return nil, fmt.Errorf("unexpected object type: %v", obj)
348 }
349
350 func (c *objectCache) startRecycleIdleWatch() {
351 c.lock.Lock()
352 defer c.lock.Unlock()
353
354 for key, item := range c.items {
355 if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) {
356 klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime)
357 }
358 }
359 }
360
361 func (c *objectCache) shutdownWhenStopped(stopCh <-chan struct{}) {
362 <-stopCh
363
364 c.lock.Lock()
365 defer c.lock.Unlock()
366
367 c.stopped = true
368 for _, item := range c.items {
369 item.stop()
370 }
371 }
372
373
374
375
376
377
378
379 func NewWatchBasedManager(
380 listObject listObjectFunc,
381 watchObject watchObjectFunc,
382 newObject newObjectFunc,
383 isImmutable isImmutableFunc,
384 groupResource schema.GroupResource,
385 resyncInterval time.Duration,
386 getReferencedObjects func(*v1.Pod) sets.String) Manager {
387
388
389
390
391
392 maxIdleTime := resyncInterval * 5
393
394
395 objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop)
396 return NewCacheBasedManager(objectStore, getReferencedObjects)
397 }
398
View as plain text