1
16
17 package metadatainformer
18
19 import (
20 "context"
21 "sync"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/apimachinery/pkg/runtime/schema"
28 "k8s.io/apimachinery/pkg/watch"
29 "k8s.io/client-go/informers"
30 "k8s.io/client-go/metadata"
31 "k8s.io/client-go/metadata/metadatalister"
32 "k8s.io/client-go/tools/cache"
33 )
34
35
36 type SharedInformerOption func(*metadataSharedInformerFactory) *metadataSharedInformerFactory
37
38
39 func WithTransform(transform cache.TransformFunc) SharedInformerOption {
40 return func(factory *metadataSharedInformerFactory) *metadataSharedInformerFactory {
41 factory.transform = transform
42 return factory
43 }
44 }
45
46
47 func NewSharedInformerFactory(client metadata.Interface, defaultResync time.Duration) SharedInformerFactory {
48 return NewFilteredSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil)
49 }
50
51
52
53 func NewFilteredSharedInformerFactory(client metadata.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) SharedInformerFactory {
54 return &metadataSharedInformerFactory{
55 client: client,
56 defaultResync: defaultResync,
57 namespace: namespace,
58 informers: map[schema.GroupVersionResource]informers.GenericInformer{},
59 startedInformers: make(map[schema.GroupVersionResource]bool),
60 tweakListOptions: tweakListOptions,
61 }
62 }
63
64
65 func NewSharedInformerFactoryWithOptions(client metadata.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
66 factory := &metadataSharedInformerFactory{
67 client: client,
68 namespace: v1.NamespaceAll,
69 defaultResync: defaultResync,
70 informers: map[schema.GroupVersionResource]informers.GenericInformer{},
71 startedInformers: make(map[schema.GroupVersionResource]bool),
72 }
73
74
75 for _, opt := range options {
76 factory = opt(factory)
77 }
78
79 return factory
80 }
81
82 type metadataSharedInformerFactory struct {
83 client metadata.Interface
84 defaultResync time.Duration
85 namespace string
86 transform cache.TransformFunc
87
88 lock sync.Mutex
89 informers map[schema.GroupVersionResource]informers.GenericInformer
90
91
92 startedInformers map[schema.GroupVersionResource]bool
93 tweakListOptions TweakListOptionsFunc
94
95 wg sync.WaitGroup
96
97
98 shuttingDown bool
99 }
100
101 var _ SharedInformerFactory = &metadataSharedInformerFactory{}
102
103 func (f *metadataSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer {
104 f.lock.Lock()
105 defer f.lock.Unlock()
106
107 key := gvr
108 informer, exists := f.informers[key]
109 if exists {
110 return informer
111 }
112
113 informer = NewFilteredMetadataInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
114 informer.Informer().SetTransform(f.transform)
115 f.informers[key] = informer
116
117 return informer
118 }
119
120
121 func (f *metadataSharedInformerFactory) Start(stopCh <-chan struct{}) {
122 f.lock.Lock()
123 defer f.lock.Unlock()
124
125 if f.shuttingDown {
126 return
127 }
128
129 for informerType, informer := range f.informers {
130 if !f.startedInformers[informerType] {
131 f.wg.Add(1)
132
133
134
135 informer := informer.Informer()
136 go func() {
137 defer f.wg.Done()
138 informer.Run(stopCh)
139 }()
140 f.startedInformers[informerType] = true
141 }
142 }
143 }
144
145
146 func (f *metadataSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
147 informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer {
148 f.lock.Lock()
149 defer f.lock.Unlock()
150
151 informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{}
152 for informerType, informer := range f.informers {
153 if f.startedInformers[informerType] {
154 informers[informerType] = informer.Informer()
155 }
156 }
157 return informers
158 }()
159
160 res := map[schema.GroupVersionResource]bool{}
161 for informType, informer := range informers {
162 res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
163 }
164 return res
165 }
166
167 func (f *metadataSharedInformerFactory) Shutdown() {
168
169 defer f.wg.Wait()
170
171 f.lock.Lock()
172 defer f.lock.Unlock()
173 f.shuttingDown = true
174 }
175
176
177 func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
178 return &metadataInformer{
179 gvr: gvr,
180 informer: cache.NewSharedIndexInformer(
181 &cache.ListWatch{
182 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
183 if tweakListOptions != nil {
184 tweakListOptions(&options)
185 }
186 return client.Resource(gvr).Namespace(namespace).List(context.TODO(), options)
187 },
188 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
189 if tweakListOptions != nil {
190 tweakListOptions(&options)
191 }
192 return client.Resource(gvr).Namespace(namespace).Watch(context.TODO(), options)
193 },
194 },
195 &metav1.PartialObjectMetadata{},
196 resyncPeriod,
197 indexers,
198 ),
199 }
200 }
201
202 type metadataInformer struct {
203 informer cache.SharedIndexInformer
204 gvr schema.GroupVersionResource
205 }
206
207 var _ informers.GenericInformer = &metadataInformer{}
208
209 func (d *metadataInformer) Informer() cache.SharedIndexInformer {
210 return d.informer
211 }
212
213 func (d *metadataInformer) Lister() cache.GenericLister {
214 return metadatalister.NewRuntimeObjectShim(metadatalister.New(d.informer.GetIndexer(), d.gvr))
215 }
216
View as plain text