1
16
17
18
19 package informers
20
21 import (
22 reflect "reflect"
23 sync "sync"
24 time "time"
25
26 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 runtime "k8s.io/apimachinery/pkg/runtime"
28 schema "k8s.io/apimachinery/pkg/runtime/schema"
29 admissionregistration "k8s.io/client-go/informers/admissionregistration"
30 apiserverinternal "k8s.io/client-go/informers/apiserverinternal"
31 apps "k8s.io/client-go/informers/apps"
32 autoscaling "k8s.io/client-go/informers/autoscaling"
33 batch "k8s.io/client-go/informers/batch"
34 certificates "k8s.io/client-go/informers/certificates"
35 coordination "k8s.io/client-go/informers/coordination"
36 core "k8s.io/client-go/informers/core"
37 discovery "k8s.io/client-go/informers/discovery"
38 events "k8s.io/client-go/informers/events"
39 extensions "k8s.io/client-go/informers/extensions"
40 flowcontrol "k8s.io/client-go/informers/flowcontrol"
41 internalinterfaces "k8s.io/client-go/informers/internalinterfaces"
42 networking "k8s.io/client-go/informers/networking"
43 node "k8s.io/client-go/informers/node"
44 policy "k8s.io/client-go/informers/policy"
45 rbac "k8s.io/client-go/informers/rbac"
46 resource "k8s.io/client-go/informers/resource"
47 scheduling "k8s.io/client-go/informers/scheduling"
48 storage "k8s.io/client-go/informers/storage"
49 storagemigration "k8s.io/client-go/informers/storagemigration"
50 kubernetes "k8s.io/client-go/kubernetes"
51 cache "k8s.io/client-go/tools/cache"
52 )
53
54
55 type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory
56
57 type sharedInformerFactory struct {
58 client kubernetes.Interface
59 namespace string
60 tweakListOptions internalinterfaces.TweakListOptionsFunc
61 lock sync.Mutex
62 defaultResync time.Duration
63 customResync map[reflect.Type]time.Duration
64 transform cache.TransformFunc
65
66 informers map[reflect.Type]cache.SharedIndexInformer
67
68
69 startedInformers map[reflect.Type]bool
70
71 wg sync.WaitGroup
72
73
74 shuttingDown bool
75 }
76
77
78 func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
79 return func(factory *sharedInformerFactory) *sharedInformerFactory {
80 for k, v := range resyncConfig {
81 factory.customResync[reflect.TypeOf(k)] = v
82 }
83 return factory
84 }
85 }
86
87
88 func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption {
89 return func(factory *sharedInformerFactory) *sharedInformerFactory {
90 factory.tweakListOptions = tweakListOptions
91 return factory
92 }
93 }
94
95
96 func WithNamespace(namespace string) SharedInformerOption {
97 return func(factory *sharedInformerFactory) *sharedInformerFactory {
98 factory.namespace = namespace
99 return factory
100 }
101 }
102
103
104 func WithTransform(transform cache.TransformFunc) SharedInformerOption {
105 return func(factory *sharedInformerFactory) *sharedInformerFactory {
106 factory.transform = transform
107 return factory
108 }
109 }
110
111
112 func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
113 return NewSharedInformerFactoryWithOptions(client, defaultResync)
114 }
115
116
117
118
119
120 func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
121 return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
122 }
123
124
125 func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
126 factory := &sharedInformerFactory{
127 client: client,
128 namespace: v1.NamespaceAll,
129 defaultResync: defaultResync,
130 informers: make(map[reflect.Type]cache.SharedIndexInformer),
131 startedInformers: make(map[reflect.Type]bool),
132 customResync: make(map[reflect.Type]time.Duration),
133 }
134
135
136 for _, opt := range options {
137 factory = opt(factory)
138 }
139
140 return factory
141 }
142
143 func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
144 f.lock.Lock()
145 defer f.lock.Unlock()
146
147 if f.shuttingDown {
148 return
149 }
150
151 for informerType, informer := range f.informers {
152 if !f.startedInformers[informerType] {
153 f.wg.Add(1)
154
155
156
157 informer := informer
158 go func() {
159 defer f.wg.Done()
160 informer.Run(stopCh)
161 }()
162 f.startedInformers[informerType] = true
163 }
164 }
165 }
166
167 func (f *sharedInformerFactory) Shutdown() {
168 f.lock.Lock()
169 f.shuttingDown = true
170 f.lock.Unlock()
171
172
173 f.wg.Wait()
174 }
175
176 func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
177 informers := func() map[reflect.Type]cache.SharedIndexInformer {
178 f.lock.Lock()
179 defer f.lock.Unlock()
180
181 informers := map[reflect.Type]cache.SharedIndexInformer{}
182 for informerType, informer := range f.informers {
183 if f.startedInformers[informerType] {
184 informers[informerType] = informer
185 }
186 }
187 return informers
188 }()
189
190 res := map[reflect.Type]bool{}
191 for informType, informer := range informers {
192 res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
193 }
194 return res
195 }
196
197
198
199 func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
200 f.lock.Lock()
201 defer f.lock.Unlock()
202
203 informerType := reflect.TypeOf(obj)
204 informer, exists := f.informers[informerType]
205 if exists {
206 return informer
207 }
208
209 resyncPeriod, exists := f.customResync[informerType]
210 if !exists {
211 resyncPeriod = f.defaultResync
212 }
213
214 informer = newFunc(f.client, resyncPeriod)
215 informer.SetTransform(f.transform)
216 f.informers[informerType] = informer
217
218 return informer
219 }
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245 type SharedInformerFactory interface {
246 internalinterfaces.SharedInformerFactory
247
248
249
250 Start(stopCh <-chan struct{})
251
252
253
254
255
256
257
258
259
260
261
262 Shutdown()
263
264
265
266 WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
267
268
269 ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
270
271
272
273 InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer
274
275 Admissionregistration() admissionregistration.Interface
276 Internal() apiserverinternal.Interface
277 Apps() apps.Interface
278 Autoscaling() autoscaling.Interface
279 Batch() batch.Interface
280 Certificates() certificates.Interface
281 Coordination() coordination.Interface
282 Core() core.Interface
283 Discovery() discovery.Interface
284 Events() events.Interface
285 Extensions() extensions.Interface
286 Flowcontrol() flowcontrol.Interface
287 Networking() networking.Interface
288 Node() node.Interface
289 Policy() policy.Interface
290 Rbac() rbac.Interface
291 Resource() resource.Interface
292 Scheduling() scheduling.Interface
293 Storage() storage.Interface
294 Storagemigration() storagemigration.Interface
295 }
296
297 func (f *sharedInformerFactory) Admissionregistration() admissionregistration.Interface {
298 return admissionregistration.New(f, f.namespace, f.tweakListOptions)
299 }
300
301 func (f *sharedInformerFactory) Internal() apiserverinternal.Interface {
302 return apiserverinternal.New(f, f.namespace, f.tweakListOptions)
303 }
304
305 func (f *sharedInformerFactory) Apps() apps.Interface {
306 return apps.New(f, f.namespace, f.tweakListOptions)
307 }
308
309 func (f *sharedInformerFactory) Autoscaling() autoscaling.Interface {
310 return autoscaling.New(f, f.namespace, f.tweakListOptions)
311 }
312
313 func (f *sharedInformerFactory) Batch() batch.Interface {
314 return batch.New(f, f.namespace, f.tweakListOptions)
315 }
316
317 func (f *sharedInformerFactory) Certificates() certificates.Interface {
318 return certificates.New(f, f.namespace, f.tweakListOptions)
319 }
320
321 func (f *sharedInformerFactory) Coordination() coordination.Interface {
322 return coordination.New(f, f.namespace, f.tweakListOptions)
323 }
324
325 func (f *sharedInformerFactory) Core() core.Interface {
326 return core.New(f, f.namespace, f.tweakListOptions)
327 }
328
329 func (f *sharedInformerFactory) Discovery() discovery.Interface {
330 return discovery.New(f, f.namespace, f.tweakListOptions)
331 }
332
333 func (f *sharedInformerFactory) Events() events.Interface {
334 return events.New(f, f.namespace, f.tweakListOptions)
335 }
336
337 func (f *sharedInformerFactory) Extensions() extensions.Interface {
338 return extensions.New(f, f.namespace, f.tweakListOptions)
339 }
340
341 func (f *sharedInformerFactory) Flowcontrol() flowcontrol.Interface {
342 return flowcontrol.New(f, f.namespace, f.tweakListOptions)
343 }
344
345 func (f *sharedInformerFactory) Networking() networking.Interface {
346 return networking.New(f, f.namespace, f.tweakListOptions)
347 }
348
349 func (f *sharedInformerFactory) Node() node.Interface {
350 return node.New(f, f.namespace, f.tweakListOptions)
351 }
352
353 func (f *sharedInformerFactory) Policy() policy.Interface {
354 return policy.New(f, f.namespace, f.tweakListOptions)
355 }
356
357 func (f *sharedInformerFactory) Rbac() rbac.Interface {
358 return rbac.New(f, f.namespace, f.tweakListOptions)
359 }
360
361 func (f *sharedInformerFactory) Resource() resource.Interface {
362 return resource.New(f, f.namespace, f.tweakListOptions)
363 }
364
365 func (f *sharedInformerFactory) Scheduling() scheduling.Interface {
366 return scheduling.New(f, f.namespace, f.tweakListOptions)
367 }
368
369 func (f *sharedInformerFactory) Storage() storage.Interface {
370 return storage.New(f, f.namespace, f.tweakListOptions)
371 }
372
373 func (f *sharedInformerFactory) Storagemigration() storagemigration.Interface {
374 return storagemigration.New(f, f.namespace, f.tweakListOptions)
375 }
376
View as plain text