...
1
16
17
18
19 package externalversions
20
21 import (
22 reflect "reflect"
23 sync "sync"
24 time "time"
25
26 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 runtime "k8s.io/apimachinery/pkg/runtime"
28 schema "k8s.io/apimachinery/pkg/runtime/schema"
29 cache "k8s.io/client-go/tools/cache"
30 versioned "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
31 apis "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/apis"
32 internalinterfaces "sigs.k8s.io/gateway-api/pkg/client/informers/externalversions/internalinterfaces"
33 )
34
35
36 type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory
37
38 type sharedInformerFactory struct {
39 client versioned.Interface
40 namespace string
41 tweakListOptions internalinterfaces.TweakListOptionsFunc
42 lock sync.Mutex
43 defaultResync time.Duration
44 customResync map[reflect.Type]time.Duration
45
46 informers map[reflect.Type]cache.SharedIndexInformer
47
48
49 startedInformers map[reflect.Type]bool
50
51 wg sync.WaitGroup
52
53
54 shuttingDown bool
55 }
56
57
58 func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
59 return func(factory *sharedInformerFactory) *sharedInformerFactory {
60 for k, v := range resyncConfig {
61 factory.customResync[reflect.TypeOf(k)] = v
62 }
63 return factory
64 }
65 }
66
67
68 func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption {
69 return func(factory *sharedInformerFactory) *sharedInformerFactory {
70 factory.tweakListOptions = tweakListOptions
71 return factory
72 }
73 }
74
75
76 func WithNamespace(namespace string) SharedInformerOption {
77 return func(factory *sharedInformerFactory) *sharedInformerFactory {
78 factory.namespace = namespace
79 return factory
80 }
81 }
82
83
84 func NewSharedInformerFactory(client versioned.Interface, defaultResync time.Duration) SharedInformerFactory {
85 return NewSharedInformerFactoryWithOptions(client, defaultResync)
86 }
87
88
89
90
91
92 func NewFilteredSharedInformerFactory(client versioned.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
93 return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
94 }
95
96
97 func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
98 factory := &sharedInformerFactory{
99 client: client,
100 namespace: v1.NamespaceAll,
101 defaultResync: defaultResync,
102 informers: make(map[reflect.Type]cache.SharedIndexInformer),
103 startedInformers: make(map[reflect.Type]bool),
104 customResync: make(map[reflect.Type]time.Duration),
105 }
106
107
108 for _, opt := range options {
109 factory = opt(factory)
110 }
111
112 return factory
113 }
114
115 func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
116 f.lock.Lock()
117 defer f.lock.Unlock()
118
119 if f.shuttingDown {
120 return
121 }
122
123 for informerType, informer := range f.informers {
124 if !f.startedInformers[informerType] {
125 f.wg.Add(1)
126
127
128
129 informer := informer
130 go func() {
131 defer f.wg.Done()
132 informer.Run(stopCh)
133 }()
134 f.startedInformers[informerType] = true
135 }
136 }
137 }
138
139 func (f *sharedInformerFactory) Shutdown() {
140 f.lock.Lock()
141 f.shuttingDown = true
142 f.lock.Unlock()
143
144
145 f.wg.Wait()
146 }
147
148 func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
149 informers := func() map[reflect.Type]cache.SharedIndexInformer {
150 f.lock.Lock()
151 defer f.lock.Unlock()
152
153 informers := map[reflect.Type]cache.SharedIndexInformer{}
154 for informerType, informer := range f.informers {
155 if f.startedInformers[informerType] {
156 informers[informerType] = informer
157 }
158 }
159 return informers
160 }()
161
162 res := map[reflect.Type]bool{}
163 for informType, informer := range informers {
164 res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
165 }
166 return res
167 }
168
169
170
171 func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
172 f.lock.Lock()
173 defer f.lock.Unlock()
174
175 informerType := reflect.TypeOf(obj)
176 informer, exists := f.informers[informerType]
177 if exists {
178 return informer
179 }
180
181 resyncPeriod, exists := f.customResync[informerType]
182 if !exists {
183 resyncPeriod = f.defaultResync
184 }
185
186 informer = newFunc(f.client, resyncPeriod)
187 f.informers[informerType] = informer
188
189 return informer
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 type SharedInformerFactory interface {
217 internalinterfaces.SharedInformerFactory
218
219
220
221 Start(stopCh <-chan struct{})
222
223
224
225
226
227
228
229
230
231
232
233 Shutdown()
234
235
236
237 WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
238
239
240 ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
241
242
243
244 InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer
245
246 Gateway() apis.Interface
247 }
248
249 func (f *sharedInformerFactory) Gateway() apis.Interface {
250 return apis.New(f, f.namespace, f.tweakListOptions)
251 }
252
View as plain text