1
16
17 package controller
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "net/netip"
24 "sync"
25 "time"
26
27 v1 "k8s.io/api/core/v1"
28 networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/labels"
32 "k8s.io/apimachinery/pkg/util/runtime"
33 "k8s.io/apimachinery/pkg/util/wait"
34 coreinformers "k8s.io/client-go/informers/core/v1"
35 networkinginformers "k8s.io/client-go/informers/networking/v1alpha1"
36 "k8s.io/client-go/kubernetes"
37 corelisters "k8s.io/client-go/listers/core/v1"
38 networkinglisters "k8s.io/client-go/listers/networking/v1alpha1"
39 "k8s.io/client-go/tools/cache"
40 "k8s.io/client-go/tools/events"
41 "k8s.io/client-go/util/retry"
42 "k8s.io/client-go/util/workqueue"
43 "k8s.io/klog/v2"
44 "k8s.io/kubernetes/pkg/api/legacyscheme"
45 "k8s.io/kubernetes/pkg/apis/core/v1/helper"
46 "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
47 "k8s.io/kubernetes/pkg/util/iptree"
48 "k8s.io/utils/clock"
49 netutils "k8s.io/utils/net"
50 )
51
52 const (
53
54
55
56
57
58 maxRetries = 15
59 workers = 5
60 )
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 type RepairIPAddress struct {
91 client kubernetes.Interface
92 interval time.Duration
93
94 serviceLister corelisters.ServiceLister
95 servicesSynced cache.InformerSynced
96
97 serviceCIDRLister networkinglisters.ServiceCIDRLister
98 serviceCIDRSynced cache.InformerSynced
99
100 ipAddressLister networkinglisters.IPAddressLister
101 ipAddressSynced cache.InformerSynced
102
103 cidrQueue workqueue.RateLimitingInterface
104 svcQueue workqueue.RateLimitingInterface
105 ipQueue workqueue.RateLimitingInterface
106 workerLoopPeriod time.Duration
107
108 muTree sync.Mutex
109 tree *iptree.Tree[string]
110
111 broadcaster events.EventBroadcaster
112 recorder events.EventRecorder
113 clock clock.Clock
114 }
115
116
117
118 func NewRepairIPAddress(interval time.Duration,
119 client kubernetes.Interface,
120 serviceInformer coreinformers.ServiceInformer,
121 serviceCIDRInformer networkinginformers.ServiceCIDRInformer,
122 ipAddressInformer networkinginformers.IPAddressInformer) *RepairIPAddress {
123 eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
124 recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller")
125
126 r := &RepairIPAddress{
127 interval: interval,
128 client: client,
129 serviceLister: serviceInformer.Lister(),
130 servicesSynced: serviceInformer.Informer().HasSynced,
131 serviceCIDRLister: serviceCIDRInformer.Lister(),
132 serviceCIDRSynced: serviceCIDRInformer.Informer().HasSynced,
133 ipAddressLister: ipAddressInformer.Lister(),
134 ipAddressSynced: ipAddressInformer.Informer().HasSynced,
135 cidrQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "servicecidrs"),
136 svcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "services"),
137 ipQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"),
138 tree: iptree.New[string](),
139 workerLoopPeriod: time.Second,
140 broadcaster: eventBroadcaster,
141 recorder: recorder,
142 clock: clock.RealClock{},
143 }
144
145 _, _ = serviceInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
146 AddFunc: func(obj interface{}) {
147 key, err := cache.MetaNamespaceKeyFunc(obj)
148 if err == nil {
149 r.svcQueue.Add(key)
150 }
151 },
152 UpdateFunc: func(old interface{}, new interface{}) {
153 key, err := cache.MetaNamespaceKeyFunc(new)
154 if err == nil {
155 r.svcQueue.Add(key)
156 }
157 },
158 DeleteFunc: func(obj interface{}) {
159
160
161 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
162 if err == nil {
163 r.svcQueue.Add(key)
164 }
165 },
166 }, interval)
167
168 _, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
169 AddFunc: func(obj interface{}) {
170 key, err := cache.MetaNamespaceKeyFunc(obj)
171 if err == nil {
172 r.cidrQueue.Add(key)
173 }
174 },
175 UpdateFunc: func(old interface{}, new interface{}) {
176 key, err := cache.MetaNamespaceKeyFunc(new)
177 if err == nil {
178 r.cidrQueue.Add(key)
179 }
180 },
181 DeleteFunc: func(obj interface{}) {
182
183
184 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
185 if err == nil {
186 r.cidrQueue.Add(key)
187 }
188 },
189 })
190
191 ipAddressInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
192 AddFunc: func(obj interface{}) {
193 key, err := cache.MetaNamespaceKeyFunc(obj)
194 if err == nil {
195 r.ipQueue.Add(key)
196 }
197 },
198 UpdateFunc: func(old interface{}, new interface{}) {
199 key, err := cache.MetaNamespaceKeyFunc(new)
200 if err == nil {
201 r.ipQueue.Add(key)
202 }
203 },
204 DeleteFunc: func(obj interface{}) {
205
206
207 key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
208 if err == nil {
209 r.ipQueue.Add(key)
210 }
211 },
212 }, interval)
213
214 return r
215 }
216
217
218 func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
219 defer r.cidrQueue.ShutDown()
220 defer r.ipQueue.ShutDown()
221 defer r.svcQueue.ShutDown()
222 r.broadcaster.StartRecordingToSink(stopCh)
223 defer r.broadcaster.Shutdown()
224
225 klog.Info("Starting ipallocator-repair-controller")
226 defer klog.Info("Shutting down ipallocator-repair-controller")
227
228 if !cache.WaitForNamedCacheSync("ipallocator-repair-controller", stopCh, r.ipAddressSynced, r.servicesSynced, r.serviceCIDRSynced) {
229 return
230 }
231
232
233
234
235 if err := r.runOnce(); err != nil {
236 runtime.HandleError(err)
237 return
238 }
239 onFirstSuccess()
240
241
242 go wait.Until(r.cidrWorker, r.workerLoopPeriod, stopCh)
243
244 for i := 0; i < workers; i++ {
245 go wait.Until(r.ipWorker, r.workerLoopPeriod, stopCh)
246 go wait.Until(r.svcWorker, r.workerLoopPeriod, stopCh)
247 }
248
249 <-stopCh
250 }
251
252
253 func (r *RepairIPAddress) runOnce() error {
254 return retry.RetryOnConflict(retry.DefaultBackoff, r.doRunOnce)
255 }
256
257
258 func (r *RepairIPAddress) doRunOnce() error {
259 services, err := r.serviceLister.List(labels.Everything())
260 if err != nil {
261 return fmt.Errorf("unable to refresh the service IP block: %v", err)
262 }
263
264
265 for _, svc := range services {
266 key, err := cache.MetaNamespaceKeyFunc(svc)
267 if err != nil {
268 return err
269 }
270 err = r.syncService(key)
271 if err != nil {
272 return err
273 }
274 }
275
276
277
278
279 ipLabelSelector := labels.Set(map[string]string{
280 networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName,
281 }).AsSelectorPreValidated()
282 ipAddresses, err := r.ipAddressLister.List(ipLabelSelector)
283 if err != nil {
284 return fmt.Errorf("unable to refresh the IPAddress block: %v", err)
285 }
286
287 for _, ipAddress := range ipAddresses {
288 key, err := cache.MetaNamespaceKeyFunc(ipAddress)
289 if err != nil {
290 return err
291 }
292 err = r.syncIPAddress(key)
293 if err != nil {
294 return err
295 }
296 }
297
298 return nil
299 }
300
301 func (r *RepairIPAddress) svcWorker() {
302 for r.processNextWorkSvc() {
303 }
304 }
305
306 func (r *RepairIPAddress) processNextWorkSvc() bool {
307 eKey, quit := r.svcQueue.Get()
308 if quit {
309 return false
310 }
311 defer r.svcQueue.Done(eKey)
312
313 err := r.syncService(eKey.(string))
314 r.handleSvcErr(err, eKey)
315
316 return true
317 }
318
319 func (r *RepairIPAddress) handleSvcErr(err error, key interface{}) {
320 if err == nil {
321 r.svcQueue.Forget(key)
322 return
323 }
324
325 if r.svcQueue.NumRequeues(key) < maxRetries {
326 klog.V(2).InfoS("Error syncing Service, retrying", "service", key, "err", err)
327 r.svcQueue.AddRateLimited(key)
328 return
329 }
330
331 klog.Warningf("Dropping Service %q out of the queue: %v", key, err)
332 r.svcQueue.Forget(key)
333 runtime.HandleError(err)
334 }
335
336
337 func (r *RepairIPAddress) syncService(key string) error {
338 var syncError error
339 namespace, name, err := cache.SplitMetaNamespaceKey(key)
340 if err != nil {
341 return err
342 }
343 svc, err := r.serviceLister.Services(namespace).Get(name)
344 if err != nil {
345
346 return nil
347 }
348 if !helper.IsServiceIPSet(svc) {
349
350 return nil
351 }
352
353 for _, clusterIP := range svc.Spec.ClusterIPs {
354 ip := netutils.ParseIPSloppy(clusterIP)
355 if ip == nil {
356
357
358 r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate Service", ip)
359 runtime.HandleError(fmt.Errorf("the ClusterIP %s for Service %s/%s is not a valid IP; please recreate Service", ip, svc.Namespace, svc.Name))
360 continue
361 }
362
363 family := getFamilyByIP(ip)
364
365 r.muTree.Lock()
366 prefixes := r.tree.GetHostIPPrefixMatches(ipToAddr(ip))
367 r.muTree.Unlock()
368 if len(prefixes) == 0 {
369
370 r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]: %s is not within any configured Service CIDR; please recreate service", family, ip)
371 runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not within any service CIDR; please recreate", family, ip, svc.Namespace, svc.Name))
372 continue
373 }
374
375
376 ipAddress, err := r.ipAddressLister.Get(ip.String())
377 if apierrors.IsNotFound(err) {
378
379 r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]: %s is not allocated; repairing", family, ip)
380 runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not allocated; repairing", family, ip, svc.Namespace, svc.Name))
381 _, err := r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(ip.String(), svc), metav1.CreateOptions{})
382 if err != nil {
383 return err
384 }
385 continue
386 }
387 if err != nil {
388 r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate ClusterIP [%v]: %s due to an unknown error", family, ip)
389 return fmt.Errorf("unable to allocate ClusterIP [%v]: %s for Service %s/%s due to an unknown error, will retry later: %v", family, ip, svc.Namespace, svc.Name, err)
390 }
391
392
393 if ipAddress.Spec.ParentRef.Group != "" ||
394 ipAddress.Spec.ParentRef.Resource != "services" {
395 r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", family, ip, svc.Namespace, svc.Name)
396 if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
397 return err
398 }
399 continue
400 }
401
402
403 if ipAddress.Spec.ParentRef.Namespace != svc.Namespace ||
404 ipAddress.Spec.ParentRef.Name != svc.Name {
405
406
407 refService, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name)
408 if err != nil {
409 r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", family, ip, svc.Namespace, svc.Name)
410 if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
411 return err
412 }
413 continue
414 }
415
416 for _, clusterIP := range refService.Spec.ClusterIPs {
417 if ipAddress.Name == clusterIP {
418 r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
419 runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to other services %s/%s; please recreate", family, ip, svc.Namespace, svc.Name, refService.Namespace, refService.Name))
420 break
421 }
422 }
423 }
424
425
426 if !verifyIPAddressLabels(ipAddress) {
427 if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
428 return err
429 }
430 continue
431 }
432
433 }
434 return syncError
435 }
436
437 func (r *RepairIPAddress) recreateIPAddress(name string, svc *v1.Service) error {
438 err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), name, metav1.DeleteOptions{})
439 if err != nil && !apierrors.IsNotFound(err) {
440 return err
441 }
442 _, err = r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(name, svc), metav1.CreateOptions{})
443 if err != nil {
444 return err
445 }
446 return nil
447 }
448
449 func (r *RepairIPAddress) ipWorker() {
450 for r.processNextWorkIp() {
451 }
452 }
453
454 func (r *RepairIPAddress) processNextWorkIp() bool {
455 eKey, quit := r.ipQueue.Get()
456 if quit {
457 return false
458 }
459 defer r.ipQueue.Done(eKey)
460
461 err := r.syncIPAddress(eKey.(string))
462 r.handleIpErr(err, eKey)
463
464 return true
465 }
466
467 func (r *RepairIPAddress) handleIpErr(err error, key interface{}) {
468 if err == nil {
469 r.ipQueue.Forget(key)
470 return
471 }
472
473 if r.ipQueue.NumRequeues(key) < maxRetries {
474 klog.V(2).InfoS("Error syncing Service, retrying", "service", key, "err", err)
475 r.ipQueue.AddRateLimited(key)
476 return
477 }
478
479 klog.Warningf("Dropping Service %q out of the queue: %v", key, err)
480 r.ipQueue.Forget(key)
481 runtime.HandleError(err)
482 }
483
484
485
486
487 func (r *RepairIPAddress) syncIPAddress(key string) error {
488 ipAddress, err := r.ipAddressLister.Get(key)
489 if err != nil {
490
491 return nil
492 }
493
494
495 if !managedByController(ipAddress) {
496 return nil
497 }
498
499
500 if ipAddress.Spec.ParentRef.Group != "" || ipAddress.Spec.ParentRef.Resource != "services" {
501 runtime.HandleError(fmt.Errorf("IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef))
502 r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef)
503 err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
504 if err != nil && !apierrors.IsNotFound(err) {
505 return err
506 }
507 return nil
508 }
509
510 svc, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name)
511 if apierrors.IsNotFound(err) {
512
513
514
515 ipLifetime := r.clock.Now().Sub(ipAddress.CreationTimestamp.Time)
516 gracePeriod := 60 * time.Second
517 if ipLifetime > gracePeriod {
518 runtime.HandleError(fmt.Errorf("IPAddress %s appears to have leaked: cleaning up", ipAddress.Name))
519 r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress: %s for Service %s/%s appears to have leaked: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef.Namespace, ipAddress.Spec.ParentRef.Name)
520 err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
521 if err != nil && !apierrors.IsNotFound(err) {
522 return err
523 }
524 }
525
526 r.ipQueue.AddAfter(key, gracePeriod-ipLifetime)
527 return nil
528 }
529 if err != nil {
530 runtime.HandleError(fmt.Errorf("unable to get parent Service for IPAddress %s due to an unknown error: %v", ipAddress, err))
531 r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "UnknownError", "IPAddressAllocation", "Unable to get parent Service for IPAddress %s due to an unknown error", ipAddress)
532 return err
533 }
534
535
536 for _, clusterIP := range svc.Spec.ClusterIPs {
537 if ipAddress.Name == clusterIP {
538 return nil
539 }
540 }
541 runtime.HandleError(fmt.Errorf("the IPAddress: %s for Service %s/%s has a wrong reference %#v; cleaning up", ipAddress.Name, svc.Name, svc.Namespace, ipAddress.Spec.ParentRef))
542 r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressWrongReference", "IPAddressAllocation", "IPAddress: %s for Service %s/%s has a wrong reference; cleaning up", ipAddress.Name, svc.Namespace, svc.Name)
543 err = r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
544 if err != nil && !apierrors.IsNotFound(err) {
545 return err
546 }
547 return nil
548
549 }
550
551 func (r *RepairIPAddress) cidrWorker() {
552 for r.processNextWorkCIDR() {
553 }
554 }
555
556 func (r *RepairIPAddress) processNextWorkCIDR() bool {
557 eKey, quit := r.cidrQueue.Get()
558 if quit {
559 return false
560 }
561 defer r.cidrQueue.Done(eKey)
562
563 err := r.syncCIDRs()
564 r.handleCIDRErr(err, eKey)
565
566 return true
567 }
568
569 func (r *RepairIPAddress) handleCIDRErr(err error, key interface{}) {
570 if err == nil {
571 r.cidrQueue.Forget(key)
572 return
573 }
574
575 if r.cidrQueue.NumRequeues(key) < maxRetries {
576 klog.V(2).InfoS("Error syncing ServiceCIDR, retrying", "serviceCIDR", key, "err", err)
577 r.cidrQueue.AddRateLimited(key)
578 return
579 }
580
581 klog.Warningf("Dropping ServiceCIDR %q out of the queue: %v", key, err)
582 r.cidrQueue.Forget(key)
583 runtime.HandleError(err)
584 }
585
586
587 func (r *RepairIPAddress) syncCIDRs() error {
588 serviceCIDRList, err := r.serviceCIDRLister.List(labels.Everything())
589 if err != nil {
590 return err
591 }
592
593 tree := iptree.New[string]()
594 for _, serviceCIDR := range serviceCIDRList {
595 for _, cidr := range serviceCIDR.Spec.CIDRs {
596 if prefix, err := netip.ParsePrefix(cidr); err == nil {
597 tree.InsertPrefix(prefix, serviceCIDR.Name)
598 }
599 }
600 }
601 r.muTree.Lock()
602 defer r.muTree.Unlock()
603 r.tree = tree
604 return nil
605 }
606
607 func newIPAddress(name string, svc *v1.Service) *networkingv1alpha1.IPAddress {
608 family := string(v1.IPv4Protocol)
609 if netutils.IsIPv6String(name) {
610 family = string(v1.IPv6Protocol)
611 }
612 return &networkingv1alpha1.IPAddress{
613 ObjectMeta: metav1.ObjectMeta{
614 Name: name,
615 Labels: map[string]string{
616 networkingv1alpha1.LabelIPAddressFamily: family,
617 networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName,
618 },
619 },
620 Spec: networkingv1alpha1.IPAddressSpec{
621 ParentRef: serviceToRef(svc),
622 },
623 }
624 }
625
626 func serviceToRef(svc *v1.Service) *networkingv1alpha1.ParentReference {
627 if svc == nil {
628 return nil
629 }
630
631 return &networkingv1alpha1.ParentReference{
632 Group: "",
633 Resource: "services",
634 Namespace: svc.Namespace,
635 Name: svc.Name,
636 }
637 }
638
639 func getFamilyByIP(ip net.IP) v1.IPFamily {
640 if netutils.IsIPv6(ip) {
641 return v1.IPv6Protocol
642 }
643 return v1.IPv4Protocol
644 }
645
646
647
648 func managedByController(ip *networkingv1alpha1.IPAddress) bool {
649 managedBy, ok := ip.Labels[networkingv1alpha1.LabelManagedBy]
650 if !ok {
651 return false
652 }
653 return managedBy == ipallocator.ControllerName
654 }
655
656 func verifyIPAddressLabels(ip *networkingv1alpha1.IPAddress) bool {
657 labelFamily, ok := ip.Labels[networkingv1alpha1.LabelIPAddressFamily]
658 if !ok {
659 return false
660 }
661
662 family := string(v1.IPv4Protocol)
663 if netutils.IsIPv6String(ip.Name) {
664 family = string(v1.IPv6Protocol)
665 }
666 if family != labelFamily {
667 return false
668 }
669 return managedByController(ip)
670 }
671
672
673
674
675 func ipToAddr(ip net.IP) netip.Addr {
676
677
678
679
680 bytes := ip.To4()
681 if bytes == nil {
682 bytes = ip.To16()
683 }
684
685 address, _ := netip.AddrFromSlice(bytes)
686 return address
687 }
688
View as plain text