...

Source file src/k8s.io/client-go/dynamic/dynamicinformer/informer.go

Documentation: k8s.io/client-go/dynamic/dynamicinformer

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package dynamicinformer
    18  
    19  import (
    20  	"context"
    21  	"sync"
    22  	"time"
    23  
    24  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    25  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    26  	"k8s.io/apimachinery/pkg/runtime"
    27  	"k8s.io/apimachinery/pkg/runtime/schema"
    28  	"k8s.io/apimachinery/pkg/watch"
    29  	"k8s.io/client-go/dynamic"
    30  	"k8s.io/client-go/dynamic/dynamiclister"
    31  	"k8s.io/client-go/informers"
    32  	"k8s.io/client-go/tools/cache"
    33  )
    34  
    35  // NewDynamicSharedInformerFactory constructs a new instance of dynamicSharedInformerFactory for all namespaces.
    36  func NewDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration) DynamicSharedInformerFactory {
    37  	return NewFilteredDynamicSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil)
    38  }
    39  
    40  // NewFilteredDynamicSharedInformerFactory constructs a new instance of dynamicSharedInformerFactory.
    41  // Listers obtained via this factory will be subject to the same filters as specified here.
    42  func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) DynamicSharedInformerFactory {
    43  	return &dynamicSharedInformerFactory{
    44  		client:           client,
    45  		defaultResync:    defaultResync,
    46  		namespace:        namespace,
    47  		informers:        map[schema.GroupVersionResource]informers.GenericInformer{},
    48  		startedInformers: make(map[schema.GroupVersionResource]bool),
    49  		tweakListOptions: tweakListOptions,
    50  	}
    51  }
    52  
    53  type dynamicSharedInformerFactory struct {
    54  	client        dynamic.Interface
    55  	defaultResync time.Duration
    56  	namespace     string
    57  
    58  	lock      sync.Mutex
    59  	informers map[schema.GroupVersionResource]informers.GenericInformer
    60  	// startedInformers is used for tracking which informers have been started.
    61  	// This allows Start() to be called multiple times safely.
    62  	startedInformers map[schema.GroupVersionResource]bool
    63  	tweakListOptions TweakListOptionsFunc
    64  
    65  	// wg tracks how many goroutines were started.
    66  	wg sync.WaitGroup
    67  	// shuttingDown is true when Shutdown has been called. It may still be running
    68  	// because it needs to wait for goroutines.
    69  	shuttingDown bool
    70  }
    71  
    72  var _ DynamicSharedInformerFactory = &dynamicSharedInformerFactory{}
    73  
    74  func (f *dynamicSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer {
    75  	f.lock.Lock()
    76  	defer f.lock.Unlock()
    77  
    78  	key := gvr
    79  	informer, exists := f.informers[key]
    80  	if exists {
    81  		return informer
    82  	}
    83  
    84  	informer = NewFilteredDynamicInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
    85  	f.informers[key] = informer
    86  
    87  	return informer
    88  }
    89  
    90  // Start initializes all requested informers.
    91  func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) {
    92  	f.lock.Lock()
    93  	defer f.lock.Unlock()
    94  
    95  	if f.shuttingDown {
    96  		return
    97  	}
    98  
    99  	for informerType, informer := range f.informers {
   100  		if !f.startedInformers[informerType] {
   101  			f.wg.Add(1)
   102  			// We need a new variable in each loop iteration,
   103  			// otherwise the goroutine would use the loop variable
   104  			// and that keeps changing.
   105  			informer := informer.Informer()
   106  			go func() {
   107  				defer f.wg.Done()
   108  				informer.Run(stopCh)
   109  			}()
   110  			f.startedInformers[informerType] = true
   111  		}
   112  	}
   113  }
   114  
   115  // WaitForCacheSync waits for all started informers' cache were synced.
   116  func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
   117  	informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer {
   118  		f.lock.Lock()
   119  		defer f.lock.Unlock()
   120  
   121  		informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{}
   122  		for informerType, informer := range f.informers {
   123  			if f.startedInformers[informerType] {
   124  				informers[informerType] = informer.Informer()
   125  			}
   126  		}
   127  		return informers
   128  	}()
   129  
   130  	res := map[schema.GroupVersionResource]bool{}
   131  	for informType, informer := range informers {
   132  		res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
   133  	}
   134  	return res
   135  }
   136  
   137  func (f *dynamicSharedInformerFactory) Shutdown() {
   138  	// Will return immediately if there is nothing to wait for.
   139  	defer f.wg.Wait()
   140  
   141  	f.lock.Lock()
   142  	defer f.lock.Unlock()
   143  	f.shuttingDown = true
   144  }
   145  
   146  // NewFilteredDynamicInformer constructs a new informer for a dynamic type.
   147  func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
   148  	return &dynamicInformer{
   149  		gvr: gvr,
   150  		informer: cache.NewSharedIndexInformerWithOptions(
   151  			&cache.ListWatch{
   152  				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   153  					if tweakListOptions != nil {
   154  						tweakListOptions(&options)
   155  					}
   156  					return client.Resource(gvr).Namespace(namespace).List(context.TODO(), options)
   157  				},
   158  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   159  					if tweakListOptions != nil {
   160  						tweakListOptions(&options)
   161  					}
   162  					return client.Resource(gvr).Namespace(namespace).Watch(context.TODO(), options)
   163  				},
   164  			},
   165  			&unstructured.Unstructured{},
   166  			cache.SharedIndexInformerOptions{
   167  				ResyncPeriod:      resyncPeriod,
   168  				Indexers:          indexers,
   169  				ObjectDescription: gvr.String(),
   170  			},
   171  		),
   172  	}
   173  }
   174  
   175  type dynamicInformer struct {
   176  	informer cache.SharedIndexInformer
   177  	gvr      schema.GroupVersionResource
   178  }
   179  
   180  var _ informers.GenericInformer = &dynamicInformer{}
   181  
   182  func (d *dynamicInformer) Informer() cache.SharedIndexInformer {
   183  	return d.informer
   184  }
   185  
   186  func (d *dynamicInformer) Lister() cache.GenericLister {
   187  	return dynamiclister.NewRuntimeObjectShim(dynamiclister.New(d.informer.GetIndexer(), d.gvr))
   188  }
   189  

View as plain text