...
1
16
17
18
19 package externalversions
20
21 import (
22 reflect "reflect"
23 sync "sync"
24 time "time"
25
26 clientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
27 apiextensions "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions"
28 internalinterfaces "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/internalinterfaces"
29 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 runtime "k8s.io/apimachinery/pkg/runtime"
31 schema "k8s.io/apimachinery/pkg/runtime/schema"
32 cache "k8s.io/client-go/tools/cache"
33 )
34
35
36 type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory
37
38 type sharedInformerFactory struct {
39 client clientset.Interface
40 namespace string
41 tweakListOptions internalinterfaces.TweakListOptionsFunc
42 lock sync.Mutex
43 defaultResync time.Duration
44 customResync map[reflect.Type]time.Duration
45 transform cache.TransformFunc
46
47 informers map[reflect.Type]cache.SharedIndexInformer
48
49
50 startedInformers map[reflect.Type]bool
51
52 wg sync.WaitGroup
53
54
55 shuttingDown bool
56 }
57
58
59 func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
60 return func(factory *sharedInformerFactory) *sharedInformerFactory {
61 for k, v := range resyncConfig {
62 factory.customResync[reflect.TypeOf(k)] = v
63 }
64 return factory
65 }
66 }
67
68
69 func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption {
70 return func(factory *sharedInformerFactory) *sharedInformerFactory {
71 factory.tweakListOptions = tweakListOptions
72 return factory
73 }
74 }
75
76
77 func WithNamespace(namespace string) SharedInformerOption {
78 return func(factory *sharedInformerFactory) *sharedInformerFactory {
79 factory.namespace = namespace
80 return factory
81 }
82 }
83
84
85 func WithTransform(transform cache.TransformFunc) SharedInformerOption {
86 return func(factory *sharedInformerFactory) *sharedInformerFactory {
87 factory.transform = transform
88 return factory
89 }
90 }
91
92
93 func NewSharedInformerFactory(client clientset.Interface, defaultResync time.Duration) SharedInformerFactory {
94 return NewSharedInformerFactoryWithOptions(client, defaultResync)
95 }
96
97
98
99
100
101 func NewFilteredSharedInformerFactory(client clientset.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
102 return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
103 }
104
105
106 func NewSharedInformerFactoryWithOptions(client clientset.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
107 factory := &sharedInformerFactory{
108 client: client,
109 namespace: v1.NamespaceAll,
110 defaultResync: defaultResync,
111 informers: make(map[reflect.Type]cache.SharedIndexInformer),
112 startedInformers: make(map[reflect.Type]bool),
113 customResync: make(map[reflect.Type]time.Duration),
114 }
115
116
117 for _, opt := range options {
118 factory = opt(factory)
119 }
120
121 return factory
122 }
123
124 func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
125 f.lock.Lock()
126 defer f.lock.Unlock()
127
128 if f.shuttingDown {
129 return
130 }
131
132 for informerType, informer := range f.informers {
133 if !f.startedInformers[informerType] {
134 f.wg.Add(1)
135
136
137
138 informer := informer
139 go func() {
140 defer f.wg.Done()
141 informer.Run(stopCh)
142 }()
143 f.startedInformers[informerType] = true
144 }
145 }
146 }
147
148 func (f *sharedInformerFactory) Shutdown() {
149 f.lock.Lock()
150 f.shuttingDown = true
151 f.lock.Unlock()
152
153
154 f.wg.Wait()
155 }
156
157 func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
158 informers := func() map[reflect.Type]cache.SharedIndexInformer {
159 f.lock.Lock()
160 defer f.lock.Unlock()
161
162 informers := map[reflect.Type]cache.SharedIndexInformer{}
163 for informerType, informer := range f.informers {
164 if f.startedInformers[informerType] {
165 informers[informerType] = informer
166 }
167 }
168 return informers
169 }()
170
171 res := map[reflect.Type]bool{}
172 for informType, informer := range informers {
173 res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
174 }
175 return res
176 }
177
178
179
180 func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
181 f.lock.Lock()
182 defer f.lock.Unlock()
183
184 informerType := reflect.TypeOf(obj)
185 informer, exists := f.informers[informerType]
186 if exists {
187 return informer
188 }
189
190 resyncPeriod, exists := f.customResync[informerType]
191 if !exists {
192 resyncPeriod = f.defaultResync
193 }
194
195 informer = newFunc(f.client, resyncPeriod)
196 informer.SetTransform(f.transform)
197 f.informers[informerType] = informer
198
199 return informer
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226 type SharedInformerFactory interface {
227 internalinterfaces.SharedInformerFactory
228
229
230
231 Start(stopCh <-chan struct{})
232
233
234
235
236
237
238
239
240
241
242
243 Shutdown()
244
245
246
247 WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
248
249
250 ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
251
252
253
254 InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer
255
256 Apiextensions() apiextensions.Interface
257 }
258
259 func (f *sharedInformerFactory) Apiextensions() apiextensions.Interface {
260 return apiextensions.New(f, f.namespace, f.tweakListOptions)
261 }
262
View as plain text