...
1
16
17 package dynamicinformer
18
19 import (
20 "context"
21 "sync"
22 "time"
23
24 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/apimachinery/pkg/runtime/schema"
28 "k8s.io/apimachinery/pkg/watch"
29 "k8s.io/client-go/dynamic"
30 "k8s.io/client-go/dynamic/dynamiclister"
31 "k8s.io/client-go/informers"
32 "k8s.io/client-go/tools/cache"
33 )
34
35
36 func NewDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration) DynamicSharedInformerFactory {
37 return NewFilteredDynamicSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil)
38 }
39
40
41
42 func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) DynamicSharedInformerFactory {
43 return &dynamicSharedInformerFactory{
44 client: client,
45 defaultResync: defaultResync,
46 namespace: namespace,
47 informers: map[schema.GroupVersionResource]informers.GenericInformer{},
48 startedInformers: make(map[schema.GroupVersionResource]bool),
49 tweakListOptions: tweakListOptions,
50 }
51 }
52
53 type dynamicSharedInformerFactory struct {
54 client dynamic.Interface
55 defaultResync time.Duration
56 namespace string
57
58 lock sync.Mutex
59 informers map[schema.GroupVersionResource]informers.GenericInformer
60
61
62 startedInformers map[schema.GroupVersionResource]bool
63 tweakListOptions TweakListOptionsFunc
64
65
66 wg sync.WaitGroup
67
68
69 shuttingDown bool
70 }
71
72 var _ DynamicSharedInformerFactory = &dynamicSharedInformerFactory{}
73
74 func (f *dynamicSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer {
75 f.lock.Lock()
76 defer f.lock.Unlock()
77
78 key := gvr
79 informer, exists := f.informers[key]
80 if exists {
81 return informer
82 }
83
84 informer = NewFilteredDynamicInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
85 f.informers[key] = informer
86
87 return informer
88 }
89
90
91 func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) {
92 f.lock.Lock()
93 defer f.lock.Unlock()
94
95 if f.shuttingDown {
96 return
97 }
98
99 for informerType, informer := range f.informers {
100 if !f.startedInformers[informerType] {
101 f.wg.Add(1)
102
103
104
105 informer := informer.Informer()
106 go func() {
107 defer f.wg.Done()
108 informer.Run(stopCh)
109 }()
110 f.startedInformers[informerType] = true
111 }
112 }
113 }
114
115
116 func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
117 informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer {
118 f.lock.Lock()
119 defer f.lock.Unlock()
120
121 informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{}
122 for informerType, informer := range f.informers {
123 if f.startedInformers[informerType] {
124 informers[informerType] = informer.Informer()
125 }
126 }
127 return informers
128 }()
129
130 res := map[schema.GroupVersionResource]bool{}
131 for informType, informer := range informers {
132 res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
133 }
134 return res
135 }
136
137 func (f *dynamicSharedInformerFactory) Shutdown() {
138
139 defer f.wg.Wait()
140
141 f.lock.Lock()
142 defer f.lock.Unlock()
143 f.shuttingDown = true
144 }
145
146
147 func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
148 return &dynamicInformer{
149 gvr: gvr,
150 informer: cache.NewSharedIndexInformerWithOptions(
151 &cache.ListWatch{
152 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
153 if tweakListOptions != nil {
154 tweakListOptions(&options)
155 }
156 return client.Resource(gvr).Namespace(namespace).List(context.TODO(), options)
157 },
158 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
159 if tweakListOptions != nil {
160 tweakListOptions(&options)
161 }
162 return client.Resource(gvr).Namespace(namespace).Watch(context.TODO(), options)
163 },
164 },
165 &unstructured.Unstructured{},
166 cache.SharedIndexInformerOptions{
167 ResyncPeriod: resyncPeriod,
168 Indexers: indexers,
169 ObjectDescription: gvr.String(),
170 },
171 ),
172 }
173 }
174
175 type dynamicInformer struct {
176 informer cache.SharedIndexInformer
177 gvr schema.GroupVersionResource
178 }
179
180 var _ informers.GenericInformer = &dynamicInformer{}
181
182 func (d *dynamicInformer) Informer() cache.SharedIndexInformer {
183 return d.informer
184 }
185
186 func (d *dynamicInformer) Lister() cache.GenericLister {
187 return dynamiclister.NewRuntimeObjectShim(dynamiclister.New(d.informer.GetIndexer(), d.gvr))
188 }
189
View as plain text