1
16
17 package config
18
19 import (
20 "fmt"
21 "sync"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 discoveryv1 "k8s.io/api/discovery/v1"
26 networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
27 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
28 "k8s.io/apimachinery/pkg/util/sets"
29 v1informers "k8s.io/client-go/informers/core/v1"
30 discoveryv1informers "k8s.io/client-go/informers/discovery/v1"
31 networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/klog/v2"
34 )
35
36
37
38 type ServiceHandler interface {
39
40
41 OnServiceAdd(service *v1.Service)
42
43
44 OnServiceUpdate(oldService, service *v1.Service)
45
46
47 OnServiceDelete(service *v1.Service)
48
49
50 OnServiceSynced()
51 }
52
53
54
55 type EndpointSliceHandler interface {
56
57
58 OnEndpointSliceAdd(endpointSlice *discoveryv1.EndpointSlice)
59
60
61 OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discoveryv1.EndpointSlice)
62
63
64 OnEndpointSliceDelete(endpointSlice *discoveryv1.EndpointSlice)
65
66
67 OnEndpointSlicesSynced()
68 }
69
70
71 type EndpointSliceConfig struct {
72 listerSynced cache.InformerSynced
73 eventHandlers []EndpointSliceHandler
74 }
75
76
77 func NewEndpointSliceConfig(endpointSliceInformer discoveryv1informers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig {
78 result := &EndpointSliceConfig{
79 listerSynced: endpointSliceInformer.Informer().HasSynced,
80 }
81
82 _, _ = endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod(
83 cache.ResourceEventHandlerFuncs{
84 AddFunc: result.handleAddEndpointSlice,
85 UpdateFunc: result.handleUpdateEndpointSlice,
86 DeleteFunc: result.handleDeleteEndpointSlice,
87 },
88 resyncPeriod,
89 )
90
91 return result
92 }
93
94
95 func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler) {
96 c.eventHandlers = append(c.eventHandlers, handler)
97 }
98
99
100 func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) {
101 klog.InfoS("Starting endpoint slice config controller")
102
103 if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) {
104 return
105 }
106
107 for _, h := range c.eventHandlers {
108 klog.V(3).InfoS("Calling handler.OnEndpointSlicesSynced()")
109 h.OnEndpointSlicesSynced()
110 }
111 }
112
113 func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) {
114 endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
115 if !ok {
116 utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
117 return
118 }
119 for _, h := range c.eventHandlers {
120 klog.V(4).InfoS("Calling handler.OnEndpointSliceAdd", "endpoints", klog.KObj(endpointSlice))
121 h.OnEndpointSliceAdd(endpointSlice)
122 }
123 }
124
125 func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface{}) {
126 oldEndpointSlice, ok := oldObj.(*discoveryv1.EndpointSlice)
127 if !ok {
128 utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
129 return
130 }
131 newEndpointSlice, ok := newObj.(*discoveryv1.EndpointSlice)
132 if !ok {
133 utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
134 return
135 }
136 for _, h := range c.eventHandlers {
137 klog.V(4).InfoS("Calling handler.OnEndpointSliceUpdate")
138 h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice)
139 }
140 }
141
142 func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) {
143 endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
144 if !ok {
145 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
146 if !ok {
147 utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
148 return
149 }
150 if endpointSlice, ok = tombstone.Obj.(*discoveryv1.EndpointSlice); !ok {
151 utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
152 return
153 }
154 }
155 for _, h := range c.eventHandlers {
156 klog.V(4).InfoS("Calling handler.OnEndpointsDelete")
157 h.OnEndpointSliceDelete(endpointSlice)
158 }
159 }
160
161
162 type ServiceConfig struct {
163 listerSynced cache.InformerSynced
164 eventHandlers []ServiceHandler
165 }
166
167
168 func NewServiceConfig(serviceInformer v1informers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
169 result := &ServiceConfig{
170 listerSynced: serviceInformer.Informer().HasSynced,
171 }
172
173 _, _ = serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
174 cache.ResourceEventHandlerFuncs{
175 AddFunc: result.handleAddService,
176 UpdateFunc: result.handleUpdateService,
177 DeleteFunc: result.handleDeleteService,
178 },
179 resyncPeriod,
180 )
181
182 return result
183 }
184
185
186 func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
187 c.eventHandlers = append(c.eventHandlers, handler)
188 }
189
190
191 func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
192 klog.InfoS("Starting service config controller")
193
194 if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
195 return
196 }
197
198 for i := range c.eventHandlers {
199 klog.V(3).InfoS("Calling handler.OnServiceSynced()")
200 c.eventHandlers[i].OnServiceSynced()
201 }
202 }
203
204 func (c *ServiceConfig) handleAddService(obj interface{}) {
205 service, ok := obj.(*v1.Service)
206 if !ok {
207 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
208 return
209 }
210 for i := range c.eventHandlers {
211 klog.V(4).InfoS("Calling handler.OnServiceAdd")
212 c.eventHandlers[i].OnServiceAdd(service)
213 }
214 }
215
216 func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
217 oldService, ok := oldObj.(*v1.Service)
218 if !ok {
219 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
220 return
221 }
222 service, ok := newObj.(*v1.Service)
223 if !ok {
224 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
225 return
226 }
227 for i := range c.eventHandlers {
228 klog.V(4).InfoS("Calling handler.OnServiceUpdate")
229 c.eventHandlers[i].OnServiceUpdate(oldService, service)
230 }
231 }
232
233 func (c *ServiceConfig) handleDeleteService(obj interface{}) {
234 service, ok := obj.(*v1.Service)
235 if !ok {
236 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
237 if !ok {
238 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
239 return
240 }
241 if service, ok = tombstone.Obj.(*v1.Service); !ok {
242 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
243 return
244 }
245 }
246 for i := range c.eventHandlers {
247 klog.V(4).InfoS("Calling handler.OnServiceDelete")
248 c.eventHandlers[i].OnServiceDelete(service)
249 }
250 }
251
252
253
254 type NodeHandler interface {
255
256
257 OnNodeAdd(node *v1.Node)
258
259
260 OnNodeUpdate(oldNode, node *v1.Node)
261
262
263 OnNodeDelete(node *v1.Node)
264
265
266 OnNodeSynced()
267 }
268
269
270
271 type NoopNodeHandler struct{}
272
273
274 func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {}
275
276
277 func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {}
278
279
280 func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {}
281
282
283 func (*NoopNodeHandler) OnNodeSynced() {}
284
285 var _ NodeHandler = &NoopNodeHandler{}
286
287
288
289 type NodeConfig struct {
290 listerSynced cache.InformerSynced
291 eventHandlers []NodeHandler
292 }
293
294
295 func NewNodeConfig(nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration) *NodeConfig {
296 result := &NodeConfig{
297 listerSynced: nodeInformer.Informer().HasSynced,
298 }
299
300 _, _ = nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
301 cache.ResourceEventHandlerFuncs{
302 AddFunc: result.handleAddNode,
303 UpdateFunc: result.handleUpdateNode,
304 DeleteFunc: result.handleDeleteNode,
305 },
306 resyncPeriod,
307 )
308
309 return result
310 }
311
312
313 func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) {
314 c.eventHandlers = append(c.eventHandlers, handler)
315 }
316
317
318 func (c *NodeConfig) Run(stopCh <-chan struct{}) {
319 klog.InfoS("Starting node config controller")
320
321 if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) {
322 return
323 }
324
325 for i := range c.eventHandlers {
326 klog.V(3).InfoS("Calling handler.OnNodeSynced()")
327 c.eventHandlers[i].OnNodeSynced()
328 }
329 }
330
331 func (c *NodeConfig) handleAddNode(obj interface{}) {
332 node, ok := obj.(*v1.Node)
333 if !ok {
334 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
335 return
336 }
337 for i := range c.eventHandlers {
338 klog.V(4).InfoS("Calling handler.OnNodeAdd")
339 c.eventHandlers[i].OnNodeAdd(node)
340 }
341 }
342
343 func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) {
344 oldNode, ok := oldObj.(*v1.Node)
345 if !ok {
346 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
347 return
348 }
349 node, ok := newObj.(*v1.Node)
350 if !ok {
351 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
352 return
353 }
354 for i := range c.eventHandlers {
355 klog.V(5).InfoS("Calling handler.OnNodeUpdate")
356 c.eventHandlers[i].OnNodeUpdate(oldNode, node)
357 }
358 }
359
360 func (c *NodeConfig) handleDeleteNode(obj interface{}) {
361 node, ok := obj.(*v1.Node)
362 if !ok {
363 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
364 if !ok {
365 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
366 return
367 }
368 if node, ok = tombstone.Obj.(*v1.Node); !ok {
369 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
370 return
371 }
372 }
373 for i := range c.eventHandlers {
374 klog.V(4).InfoS("Calling handler.OnNodeDelete")
375 c.eventHandlers[i].OnNodeDelete(node)
376 }
377 }
378
379
380
381 type ServiceCIDRHandler interface {
382
383
384 OnServiceCIDRsChanged(cidrs []string)
385 }
386
387
388 type ServiceCIDRConfig struct {
389 listerSynced cache.InformerSynced
390 eventHandlers []ServiceCIDRHandler
391 mu sync.Mutex
392 cidrs sets.Set[string]
393 }
394
395
396 func NewServiceCIDRConfig(serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer, resyncPeriod time.Duration) *ServiceCIDRConfig {
397 result := &ServiceCIDRConfig{
398 listerSynced: serviceCIDRInformer.Informer().HasSynced,
399 cidrs: sets.New[string](),
400 }
401
402 _, _ = serviceCIDRInformer.Informer().AddEventHandlerWithResyncPeriod(
403 cache.ResourceEventHandlerFuncs{
404 AddFunc: func(obj interface{}) {
405 result.handleServiceCIDREvent(nil, obj)
406 },
407 UpdateFunc: func(oldObj, newObj interface{}) {
408 result.handleServiceCIDREvent(oldObj, newObj)
409 },
410 DeleteFunc: func(obj interface{}) {
411 result.handleServiceCIDREvent(obj, nil)
412 },
413 },
414 resyncPeriod,
415 )
416 return result
417 }
418
419
420 func (c *ServiceCIDRConfig) RegisterEventHandler(handler ServiceCIDRHandler) {
421 c.eventHandlers = append(c.eventHandlers, handler)
422 }
423
424
425 func (c *ServiceCIDRConfig) Run(stopCh <-chan struct{}) {
426 klog.InfoS("Starting serviceCIDR config controller")
427
428 if !cache.WaitForNamedCacheSync("serviceCIDR config", stopCh, c.listerSynced) {
429 return
430 }
431 c.handleServiceCIDREvent(nil, nil)
432 }
433
434
435
436 func (c *ServiceCIDRConfig) handleServiceCIDREvent(oldObj, newObj interface{}) {
437 var oldServiceCIDR, newServiceCIDR *networkingv1alpha1.ServiceCIDR
438 var ok bool
439
440 if oldObj != nil {
441 oldServiceCIDR, ok = oldObj.(*networkingv1alpha1.ServiceCIDR)
442 if !ok {
443 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
444 return
445 }
446 }
447
448 if newObj != nil {
449 newServiceCIDR, ok = newObj.(*networkingv1alpha1.ServiceCIDR)
450 if !ok {
451 utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
452 return
453 }
454 }
455
456 c.mu.Lock()
457 defer c.mu.Unlock()
458
459 if oldServiceCIDR != nil {
460 c.cidrs.Delete(oldServiceCIDR.Spec.CIDRs...)
461 }
462
463 if newServiceCIDR != nil {
464 c.cidrs.Insert(newServiceCIDR.Spec.CIDRs...)
465 }
466
467 for i := range c.eventHandlers {
468 klog.V(4).InfoS("Calling handler.OnServiceCIDRsChanged")
469 c.eventHandlers[i].OnServiceCIDRsChanged(c.cidrs.UnsortedList())
470 }
471 }
472
View as plain text