...
1
16
17
18
19 package externalversions
20
21 import (
22 reflect "reflect"
23 sync "sync"
24 time "time"
25
26 versioned "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
27 externalworkload "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/externalworkload"
28 internalinterfaces "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/internalinterfaces"
29 link "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/link"
30 policy "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/policy"
31 server "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/server"
32 serverauthorization "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/serverauthorization"
33 serviceprofile "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/serviceprofile"
34 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 runtime "k8s.io/apimachinery/pkg/runtime"
36 schema "k8s.io/apimachinery/pkg/runtime/schema"
37 cache "k8s.io/client-go/tools/cache"
38 )
39
40
41 type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory
42
43 type sharedInformerFactory struct {
44 client versioned.Interface
45 namespace string
46 tweakListOptions internalinterfaces.TweakListOptionsFunc
47 lock sync.Mutex
48 defaultResync time.Duration
49 customResync map[reflect.Type]time.Duration
50 transform cache.TransformFunc
51
52 informers map[reflect.Type]cache.SharedIndexInformer
53
54
55 startedInformers map[reflect.Type]bool
56
57 wg sync.WaitGroup
58
59
60 shuttingDown bool
61 }
62
63
64 func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
65 return func(factory *sharedInformerFactory) *sharedInformerFactory {
66 for k, v := range resyncConfig {
67 factory.customResync[reflect.TypeOf(k)] = v
68 }
69 return factory
70 }
71 }
72
73
74 func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption {
75 return func(factory *sharedInformerFactory) *sharedInformerFactory {
76 factory.tweakListOptions = tweakListOptions
77 return factory
78 }
79 }
80
81
82 func WithNamespace(namespace string) SharedInformerOption {
83 return func(factory *sharedInformerFactory) *sharedInformerFactory {
84 factory.namespace = namespace
85 return factory
86 }
87 }
88
89
90 func WithTransform(transform cache.TransformFunc) SharedInformerOption {
91 return func(factory *sharedInformerFactory) *sharedInformerFactory {
92 factory.transform = transform
93 return factory
94 }
95 }
96
97
98 func NewSharedInformerFactory(client versioned.Interface, defaultResync time.Duration) SharedInformerFactory {
99 return NewSharedInformerFactoryWithOptions(client, defaultResync)
100 }
101
102
103
104
105
106 func NewFilteredSharedInformerFactory(client versioned.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
107 return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
108 }
109
110
111 func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
112 factory := &sharedInformerFactory{
113 client: client,
114 namespace: v1.NamespaceAll,
115 defaultResync: defaultResync,
116 informers: make(map[reflect.Type]cache.SharedIndexInformer),
117 startedInformers: make(map[reflect.Type]bool),
118 customResync: make(map[reflect.Type]time.Duration),
119 }
120
121
122 for _, opt := range options {
123 factory = opt(factory)
124 }
125
126 return factory
127 }
128
129 func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
130 f.lock.Lock()
131 defer f.lock.Unlock()
132
133 if f.shuttingDown {
134 return
135 }
136
137 for informerType, informer := range f.informers {
138 if !f.startedInformers[informerType] {
139 f.wg.Add(1)
140
141
142
143 informer := informer
144 go func() {
145 defer f.wg.Done()
146 informer.Run(stopCh)
147 }()
148 f.startedInformers[informerType] = true
149 }
150 }
151 }
152
153 func (f *sharedInformerFactory) Shutdown() {
154 f.lock.Lock()
155 f.shuttingDown = true
156 f.lock.Unlock()
157
158
159 f.wg.Wait()
160 }
161
162 func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
163 informers := func() map[reflect.Type]cache.SharedIndexInformer {
164 f.lock.Lock()
165 defer f.lock.Unlock()
166
167 informers := map[reflect.Type]cache.SharedIndexInformer{}
168 for informerType, informer := range f.informers {
169 if f.startedInformers[informerType] {
170 informers[informerType] = informer
171 }
172 }
173 return informers
174 }()
175
176 res := map[reflect.Type]bool{}
177 for informType, informer := range informers {
178 res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
179 }
180 return res
181 }
182
183
184
185 func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
186 f.lock.Lock()
187 defer f.lock.Unlock()
188
189 informerType := reflect.TypeOf(obj)
190 informer, exists := f.informers[informerType]
191 if exists {
192 return informer
193 }
194
195 resyncPeriod, exists := f.customResync[informerType]
196 if !exists {
197 resyncPeriod = f.defaultResync
198 }
199
200 informer = newFunc(f.client, resyncPeriod)
201 informer.SetTransform(f.transform)
202 f.informers[informerType] = informer
203
204 return informer
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231 type SharedInformerFactory interface {
232 internalinterfaces.SharedInformerFactory
233
234
235
236 Start(stopCh <-chan struct{})
237
238
239
240
241
242
243
244
245
246
247
248 Shutdown()
249
250
251
252 WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
253
254
255 ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
256
257
258
259 InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer
260
261 Externalworkload() externalworkload.Interface
262 Link() link.Interface
263 Policy() policy.Interface
264 Server() server.Interface
265 Serverauthorization() serverauthorization.Interface
266 Linkerd() serviceprofile.Interface
267 }
268
269 func (f *sharedInformerFactory) Externalworkload() externalworkload.Interface {
270 return externalworkload.New(f, f.namespace, f.tweakListOptions)
271 }
272
273 func (f *sharedInformerFactory) Link() link.Interface {
274 return link.New(f, f.namespace, f.tweakListOptions)
275 }
276
277 func (f *sharedInformerFactory) Policy() policy.Interface {
278 return policy.New(f, f.namespace, f.tweakListOptions)
279 }
280
281 func (f *sharedInformerFactory) Server() server.Interface {
282 return server.New(f, f.namespace, f.tweakListOptions)
283 }
284
285 func (f *sharedInformerFactory) Serverauthorization() serverauthorization.Interface {
286 return serverauthorization.New(f, f.namespace, f.tweakListOptions)
287 }
288
289 func (f *sharedInformerFactory) Linkerd() serviceprofile.Interface {
290 return serviceprofile.New(f, f.namespace, f.tweakListOptions)
291 }
292
View as plain text