1
16
17 package internal
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "math/rand"
24 "net/http"
25 "sync"
26 "time"
27
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 "k8s.io/apimachinery/pkg/api/meta"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/runtime/schema"
33 "k8s.io/apimachinery/pkg/runtime/serializer"
34 "k8s.io/apimachinery/pkg/watch"
35 "k8s.io/client-go/dynamic"
36 "k8s.io/client-go/metadata"
37 "k8s.io/client-go/rest"
38 "k8s.io/client-go/tools/cache"
39 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
40 "sigs.k8s.io/controller-runtime/pkg/internal/syncs"
41 )
42
43
44 type InformersOpts struct {
45 HTTPClient *http.Client
46 Scheme *runtime.Scheme
47 Mapper meta.RESTMapper
48 ResyncPeriod time.Duration
49 Namespace string
50 NewInformer *func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
51 Selector Selector
52 Transform cache.TransformFunc
53 UnsafeDisableDeepCopy bool
54 WatchErrorHandler cache.WatchErrorHandler
55 }
56
57
58 func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
59 newInformer := cache.NewSharedIndexInformer
60 if options.NewInformer != nil {
61 newInformer = *options.NewInformer
62 }
63 return &Informers{
64 config: config,
65 httpClient: options.HTTPClient,
66 scheme: options.Scheme,
67 mapper: options.Mapper,
68 tracker: tracker{
69 Structured: make(map[schema.GroupVersionKind]*Cache),
70 Unstructured: make(map[schema.GroupVersionKind]*Cache),
71 Metadata: make(map[schema.GroupVersionKind]*Cache),
72 },
73 codecs: serializer.NewCodecFactory(options.Scheme),
74 paramCodec: runtime.NewParameterCodec(options.Scheme),
75 resync: options.ResyncPeriod,
76 startWait: make(chan struct{}),
77 namespace: options.Namespace,
78 selector: options.Selector,
79 transform: options.Transform,
80 unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy,
81 newInformer: newInformer,
82 watchErrorHandler: options.WatchErrorHandler,
83 }
84 }
85
86
87 type Cache struct {
88
89 Informer cache.SharedIndexInformer
90
91
92 Reader CacheReader
93
94
95 stop chan struct{}
96 }
97
98
99
100
101
102 func (c *Cache) Start(stop <-chan struct{}) {
103
104 internalStop, cancel := syncs.MergeChans(stop, c.stop)
105 defer cancel()
106 c.Informer.Run(internalStop)
107 }
108
109 type tracker struct {
110 Structured map[schema.GroupVersionKind]*Cache
111 Unstructured map[schema.GroupVersionKind]*Cache
112 Metadata map[schema.GroupVersionKind]*Cache
113 }
114
115
116
117 type GetOptions struct {
118
119 BlockUntilSynced *bool
120 }
121
122
123
124 type Informers struct {
125
126 httpClient *http.Client
127
128
129 scheme *runtime.Scheme
130
131
132 config *rest.Config
133
134
135 mapper meta.RESTMapper
136
137
138 tracker tracker
139
140
141 codecs serializer.CodecFactory
142
143
144 paramCodec runtime.ParameterCodec
145
146
147
148
149 resync time.Duration
150
151
152 mu sync.RWMutex
153
154
155 started bool
156
157
158
159 startWait chan struct{}
160
161
162 waitGroup sync.WaitGroup
163
164
165 stopped bool
166
167
168 ctx context.Context
169
170
171
172 namespace string
173
174 selector Selector
175 transform cache.TransformFunc
176 unsafeDisableDeepCopy bool
177
178
179 newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
180
181
182
183
184 watchErrorHandler cache.WatchErrorHandler
185 }
186
187
188
189 func (ip *Informers) Start(ctx context.Context) error {
190 if err := func() error {
191 ip.mu.Lock()
192 defer ip.mu.Unlock()
193
194 if ip.started {
195 return errors.New("Informer already started")
196 }
197
198
199 ip.ctx = ctx
200
201
202 for _, i := range ip.tracker.Structured {
203 ip.startInformerLocked(i)
204 }
205 for _, i := range ip.tracker.Unstructured {
206 ip.startInformerLocked(i)
207 }
208 for _, i := range ip.tracker.Metadata {
209 ip.startInformerLocked(i)
210 }
211
212
213 ip.started = true
214 close(ip.startWait)
215
216 return nil
217 }(); err != nil {
218 return err
219 }
220 <-ctx.Done()
221 ip.mu.Lock()
222 ip.stopped = true
223 ip.mu.Unlock()
224 ip.waitGroup.Wait()
225 return nil
226 }
227
228 func (ip *Informers) startInformerLocked(cacheEntry *Cache) {
229
230
231
232 if ip.stopped {
233 return
234 }
235
236 ip.waitGroup.Add(1)
237 go func() {
238 defer ip.waitGroup.Done()
239 cacheEntry.Start(ip.ctx.Done())
240 }()
241 }
242
243 func (ip *Informers) waitForStarted(ctx context.Context) bool {
244 select {
245 case <-ip.startWait:
246 return true
247 case <-ctx.Done():
248 return false
249 }
250 }
251
252
253 func (ip *Informers) getHasSyncedFuncs() []cache.InformerSynced {
254 ip.mu.RLock()
255 defer ip.mu.RUnlock()
256
257 res := make([]cache.InformerSynced, 0,
258 len(ip.tracker.Structured)+len(ip.tracker.Unstructured)+len(ip.tracker.Metadata),
259 )
260 for _, i := range ip.tracker.Structured {
261 res = append(res, i.Informer.HasSynced)
262 }
263 for _, i := range ip.tracker.Unstructured {
264 res = append(res, i.Informer.HasSynced)
265 }
266 for _, i := range ip.tracker.Metadata {
267 res = append(res, i.Informer.HasSynced)
268 }
269 return res
270 }
271
272
273 func (ip *Informers) WaitForCacheSync(ctx context.Context) bool {
274 if !ip.waitForStarted(ctx) {
275 return false
276 }
277 return cache.WaitForCacheSync(ctx.Done(), ip.getHasSyncedFuncs()...)
278 }
279
280
281 func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) {
282 ip.mu.RLock()
283 defer ip.mu.RUnlock()
284 i, ok := ip.informersByType(obj)[gvk]
285 return i, ip.started, ok
286 }
287
288
289
290 func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) {
291
292 i, started, ok := ip.Peek(gvk, obj)
293 if !ok {
294 var err error
295 if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
296 return started, nil, err
297 }
298 }
299
300 shouldBlock := true
301 if opts.BlockUntilSynced != nil {
302 shouldBlock = *opts.BlockUntilSynced
303 }
304
305 if shouldBlock && started && !i.Informer.HasSynced() {
306
307 if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
308 return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
309 }
310 }
311
312 return started, i, nil
313 }
314
315
316 func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
317 ip.mu.Lock()
318 defer ip.mu.Unlock()
319
320 informerMap := ip.informersByType(obj)
321
322 entry, ok := informerMap[gvk]
323 if !ok {
324 return
325 }
326 close(entry.stop)
327 delete(informerMap, gvk)
328 }
329
330 func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache {
331 switch obj.(type) {
332 case runtime.Unstructured:
333 return ip.tracker.Unstructured
334 case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList:
335 return ip.tracker.Metadata
336 default:
337 return ip.tracker.Structured
338 }
339 }
340
341
342 func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*Cache, bool, error) {
343 ip.mu.Lock()
344 defer ip.mu.Unlock()
345
346
347
348
349 if i, ok := ip.informersByType(obj)[gvk]; ok {
350 return i, ip.started, nil
351 }
352
353
354 listWatcher, err := ip.makeListWatcher(gvk, obj)
355 if err != nil {
356 return nil, false, err
357 }
358 sharedIndexInformer := ip.newInformer(&cache.ListWatch{
359 ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
360 ip.selector.ApplyToList(&opts)
361 return listWatcher.ListFunc(opts)
362 },
363 WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
364 ip.selector.ApplyToList(&opts)
365 opts.Watch = true
366 return listWatcher.WatchFunc(opts)
367 },
368 }, obj, calculateResyncPeriod(ip.resync), cache.Indexers{
369 cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
370 })
371
372
373 if ip.watchErrorHandler != nil {
374 if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil {
375 return nil, false, err
376 }
377 }
378
379
380 if err := sharedIndexInformer.SetTransform(ip.transform); err != nil {
381 return nil, false, err
382 }
383
384 mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
385 if err != nil {
386 return nil, false, err
387 }
388
389
390 i := &Cache{
391 Informer: sharedIndexInformer,
392 Reader: CacheReader{
393 indexer: sharedIndexInformer.GetIndexer(),
394 groupVersionKind: gvk,
395 scopeName: mapping.Scope.Name(),
396 disableDeepCopy: ip.unsafeDisableDeepCopy,
397 },
398 stop: make(chan struct{}),
399 }
400 ip.informersByType(obj)[gvk] = i
401
402
403
404 if ip.started {
405 ip.startInformerLocked(i)
406 }
407 return i, ip.started, nil
408 }
409
410 func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) {
411
412
413 mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
414 if err != nil {
415 return nil, err
416 }
417
418
419 var namespace string
420 if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
421 namespace = restrictNamespaceBySelector(ip.namespace, ip.selector)
422 }
423
424 switch obj.(type) {
425
426
427
428 case runtime.Unstructured:
429
430
431 cfg := rest.CopyConfig(ip.config)
432 cfg.NegotiatedSerializer = nil
433 dynamicClient, err := dynamic.NewForConfigAndClient(cfg, ip.httpClient)
434 if err != nil {
435 return nil, err
436 }
437 resources := dynamicClient.Resource(mapping.Resource)
438 return &cache.ListWatch{
439 ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
440 if namespace != "" {
441 return resources.Namespace(namespace).List(ip.ctx, opts)
442 }
443 return resources.List(ip.ctx, opts)
444 },
445
446 WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
447 if namespace != "" {
448 return resources.Namespace(namespace).Watch(ip.ctx, opts)
449 }
450 return resources.Watch(ip.ctx, opts)
451 },
452 }, nil
453
454
455
456 case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList:
457
458
459 cfg := rest.CopyConfig(ip.config)
460 cfg.NegotiatedSerializer = nil
461
462
463 metadataClient, err := metadata.NewForConfigAndClient(cfg, ip.httpClient)
464 if err != nil {
465 return nil, err
466 }
467 resources := metadataClient.Resource(mapping.Resource)
468
469 return &cache.ListWatch{
470 ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
471 var (
472 list *metav1.PartialObjectMetadataList
473 err error
474 )
475 if namespace != "" {
476 list, err = resources.Namespace(namespace).List(ip.ctx, opts)
477 } else {
478 list, err = resources.List(ip.ctx, opts)
479 }
480 if list != nil {
481 for i := range list.Items {
482 list.Items[i].SetGroupVersionKind(gvk)
483 }
484 }
485 return list, err
486 },
487
488 WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) {
489 if namespace != "" {
490 watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts)
491 } else {
492 watcher, err = resources.Watch(ip.ctx, opts)
493 }
494 if err != nil {
495 return nil, err
496 }
497 return newGVKFixupWatcher(gvk, watcher), nil
498 },
499 }, nil
500
501
502
503 default:
504 client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs, ip.httpClient)
505 if err != nil {
506 return nil, err
507 }
508 listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
509 listObj, err := ip.scheme.New(listGVK)
510 if err != nil {
511 return nil, err
512 }
513 return &cache.ListWatch{
514 ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
515
516 req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec)
517 if namespace != "" {
518 req.Namespace(namespace)
519 }
520
521
522 res := listObj.DeepCopyObject()
523 if err := req.Do(ip.ctx).Into(res); err != nil {
524 return nil, err
525 }
526 return res, nil
527 },
528
529 WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
530
531 req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec)
532 if namespace != "" {
533 req.Namespace(namespace)
534 }
535
536 return req.Watch(ip.ctx)
537 },
538 }, nil
539 }
540 }
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559 func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface {
560 return watch.Filter(
561 watcher,
562 func(in watch.Event) (watch.Event, bool) {
563 in.Object.GetObjectKind().SetGroupVersionKind(gvk)
564 return in, true
565 },
566 )
567 }
568
569
570
571
572 func calculateResyncPeriod(resync time.Duration) time.Duration {
573
574 factor := rand.Float64()/5.0 + 0.9
575 return time.Duration(float64(resync.Nanoseconds()) * factor)
576 }
577
578
579
580
581 func restrictNamespaceBySelector(namespaceOpt string, s Selector) string {
582 if namespaceOpt != "" {
583
584 return namespaceOpt
585 }
586 fieldSelector := s.Field
587 if fieldSelector == nil || fieldSelector.Empty() {
588 return ""
589 }
590
591 value, found := fieldSelector.RequiresExactMatch("metadata.namespace")
592 if found {
593 return value
594 }
595 return ""
596 }
597
View as plain text