...

Source file src/k8s.io/client-go/metadata/metadatainformer/informer.go

Documentation: k8s.io/client-go/metadata/metadatainformer

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package metadatainformer
    18  
    19  import (
    20  	"context"
    21  	"sync"
    22  	"time"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    26  	"k8s.io/apimachinery/pkg/runtime"
    27  	"k8s.io/apimachinery/pkg/runtime/schema"
    28  	"k8s.io/apimachinery/pkg/watch"
    29  	"k8s.io/client-go/informers"
    30  	"k8s.io/client-go/metadata"
    31  	"k8s.io/client-go/metadata/metadatalister"
    32  	"k8s.io/client-go/tools/cache"
    33  )
    34  
    35  // SharedInformerOption defines the functional option type for metadataSharedInformerFactory.
    36  type SharedInformerOption func(*metadataSharedInformerFactory) *metadataSharedInformerFactory
    37  
    38  // WithTransform sets a transform on all informers.
    39  func WithTransform(transform cache.TransformFunc) SharedInformerOption {
    40  	return func(factory *metadataSharedInformerFactory) *metadataSharedInformerFactory {
    41  		factory.transform = transform
    42  		return factory
    43  	}
    44  }
    45  
    46  // NewSharedInformerFactory constructs a new instance of metadataSharedInformerFactory for all namespaces.
    47  func NewSharedInformerFactory(client metadata.Interface, defaultResync time.Duration) SharedInformerFactory {
    48  	return NewFilteredSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil)
    49  }
    50  
    51  // NewFilteredSharedInformerFactory constructs a new instance of metadataSharedInformerFactory.
    52  // Listers obtained via this factory will be subject to the same filters as specified here.
    53  func NewFilteredSharedInformerFactory(client metadata.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) SharedInformerFactory {
    54  	return &metadataSharedInformerFactory{
    55  		client:           client,
    56  		defaultResync:    defaultResync,
    57  		namespace:        namespace,
    58  		informers:        map[schema.GroupVersionResource]informers.GenericInformer{},
    59  		startedInformers: make(map[schema.GroupVersionResource]bool),
    60  		tweakListOptions: tweakListOptions,
    61  	}
    62  }
    63  
    64  // NewSharedInformerFactoryWithOptions constructs a new instance of metadataSharedInformerFactory with additional options.
    65  func NewSharedInformerFactoryWithOptions(client metadata.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
    66  	factory := &metadataSharedInformerFactory{
    67  		client:           client,
    68  		namespace:        v1.NamespaceAll,
    69  		defaultResync:    defaultResync,
    70  		informers:        map[schema.GroupVersionResource]informers.GenericInformer{},
    71  		startedInformers: make(map[schema.GroupVersionResource]bool),
    72  	}
    73  
    74  	// Apply all options
    75  	for _, opt := range options {
    76  		factory = opt(factory)
    77  	}
    78  
    79  	return factory
    80  }
    81  
    82  type metadataSharedInformerFactory struct {
    83  	client        metadata.Interface
    84  	defaultResync time.Duration
    85  	namespace     string
    86  	transform     cache.TransformFunc
    87  
    88  	lock      sync.Mutex
    89  	informers map[schema.GroupVersionResource]informers.GenericInformer
    90  	// startedInformers is used for tracking which informers have been started.
    91  	// This allows Start() to be called multiple times safely.
    92  	startedInformers map[schema.GroupVersionResource]bool
    93  	tweakListOptions TweakListOptionsFunc
    94  	// wg tracks how many goroutines were started.
    95  	wg sync.WaitGroup
    96  	// shuttingDown is true when Shutdown has been called. It may still be running
    97  	// because it needs to wait for goroutines.
    98  	shuttingDown bool
    99  }
   100  
   101  var _ SharedInformerFactory = &metadataSharedInformerFactory{}
   102  
   103  func (f *metadataSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer {
   104  	f.lock.Lock()
   105  	defer f.lock.Unlock()
   106  
   107  	key := gvr
   108  	informer, exists := f.informers[key]
   109  	if exists {
   110  		return informer
   111  	}
   112  
   113  	informer = NewFilteredMetadataInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
   114  	informer.Informer().SetTransform(f.transform)
   115  	f.informers[key] = informer
   116  
   117  	return informer
   118  }
   119  
   120  // Start initializes all requested informers.
   121  func (f *metadataSharedInformerFactory) Start(stopCh <-chan struct{}) {
   122  	f.lock.Lock()
   123  	defer f.lock.Unlock()
   124  
   125  	if f.shuttingDown {
   126  		return
   127  	}
   128  
   129  	for informerType, informer := range f.informers {
   130  		if !f.startedInformers[informerType] {
   131  			f.wg.Add(1)
   132  			// We need a new variable in each loop iteration,
   133  			// otherwise the goroutine would use the loop variable
   134  			// and that keeps changing.
   135  			informer := informer.Informer()
   136  			go func() {
   137  				defer f.wg.Done()
   138  				informer.Run(stopCh)
   139  			}()
   140  			f.startedInformers[informerType] = true
   141  		}
   142  	}
   143  }
   144  
   145  // WaitForCacheSync waits for all started informers' cache were synced.
   146  func (f *metadataSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
   147  	informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer {
   148  		f.lock.Lock()
   149  		defer f.lock.Unlock()
   150  
   151  		informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{}
   152  		for informerType, informer := range f.informers {
   153  			if f.startedInformers[informerType] {
   154  				informers[informerType] = informer.Informer()
   155  			}
   156  		}
   157  		return informers
   158  	}()
   159  
   160  	res := map[schema.GroupVersionResource]bool{}
   161  	for informType, informer := range informers {
   162  		res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
   163  	}
   164  	return res
   165  }
   166  
   167  func (f *metadataSharedInformerFactory) Shutdown() {
   168  	// Will return immediately if there is nothing to wait for.
   169  	defer f.wg.Wait()
   170  
   171  	f.lock.Lock()
   172  	defer f.lock.Unlock()
   173  	f.shuttingDown = true
   174  }
   175  
   176  // NewFilteredMetadataInformer constructs a new informer for a metadata type.
   177  func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
   178  	return &metadataInformer{
   179  		gvr: gvr,
   180  		informer: cache.NewSharedIndexInformer(
   181  			&cache.ListWatch{
   182  				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   183  					if tweakListOptions != nil {
   184  						tweakListOptions(&options)
   185  					}
   186  					return client.Resource(gvr).Namespace(namespace).List(context.TODO(), options)
   187  				},
   188  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   189  					if tweakListOptions != nil {
   190  						tweakListOptions(&options)
   191  					}
   192  					return client.Resource(gvr).Namespace(namespace).Watch(context.TODO(), options)
   193  				},
   194  			},
   195  			&metav1.PartialObjectMetadata{},
   196  			resyncPeriod,
   197  			indexers,
   198  		),
   199  	}
   200  }
   201  
   202  type metadataInformer struct {
   203  	informer cache.SharedIndexInformer
   204  	gvr      schema.GroupVersionResource
   205  }
   206  
   207  var _ informers.GenericInformer = &metadataInformer{}
   208  
   209  func (d *metadataInformer) Informer() cache.SharedIndexInformer {
   210  	return d.informer
   211  }
   212  
   213  func (d *metadataInformer) Lister() cache.GenericLister {
   214  	return metadatalister.NewRuntimeObjectShim(metadatalister.New(d.informer.GetIndexer(), d.gvr))
   215  }
   216  

View as plain text