...

Source file src/k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller/repairip.go

Documentation: k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package controller
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"net/netip"
    24  	"sync"
    25  	"time"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/labels"
    32  	"k8s.io/apimachinery/pkg/util/runtime"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	coreinformers "k8s.io/client-go/informers/core/v1"
    35  	networkinginformers "k8s.io/client-go/informers/networking/v1alpha1"
    36  	"k8s.io/client-go/kubernetes"
    37  	corelisters "k8s.io/client-go/listers/core/v1"
    38  	networkinglisters "k8s.io/client-go/listers/networking/v1alpha1"
    39  	"k8s.io/client-go/tools/cache"
    40  	"k8s.io/client-go/tools/events"
    41  	"k8s.io/client-go/util/retry"
    42  	"k8s.io/client-go/util/workqueue"
    43  	"k8s.io/klog/v2"
    44  	"k8s.io/kubernetes/pkg/api/legacyscheme"
    45  	"k8s.io/kubernetes/pkg/apis/core/v1/helper"
    46  	"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
    47  	"k8s.io/kubernetes/pkg/util/iptree"
    48  	"k8s.io/utils/clock"
    49  	netutils "k8s.io/utils/net"
    50  )
    51  
    52  const (
    53  	// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
    54  	// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
    55  	// sequence of delays between successive queuings of a service.
    56  	//
    57  	// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
    58  	maxRetries = 15
    59  	workers    = 5
    60  )
    61  
    62  // Repair is a controller loop that examines all service ClusterIP allocations and logs any errors,
    63  // and then creates the accurate list of IPAddresses objects with all allocated ClusterIPs.
    64  //
    65  // Handles:
    66  // * Duplicate ClusterIP assignments caused by operator action or undetected race conditions
    67  // * Allocations to services that were not actually created due to a crash or powerloss
    68  // * Migrates old versions of Kubernetes services into the new ipallocator automatically
    69  //   creating the corresponding IPAddress objects
    70  // * IPAddress objects with wrong references or labels
    71  //
    72  // Logs about:
    73  // * ClusterIPs that do not match the currently configured range
    74  //
    75  // There is a one-to-one relation between Service ClusterIPs and IPAddresses.
    76  // The bidirectional relation is achieved using the following fields:
    77  // Service.Spec.Cluster == IPAddress.Name AND IPAddress.ParentRef == Service
    78  //
    79  // The controller use two reconcile loops, one for Services and other for IPAddress.
    80  // The Service reconcile loop verifies the bidirectional relation exists and is correct.
    81  // 1. Service_X [ClusterIP_X]  <------>  IPAddress_X [Ref:Service_X]   ok
    82  // 2. Service_Y [ClusterIP_Y]  <------>  IPAddress_Y [Ref:GatewayA]    !ok, wrong reference
    83  // 3. Service_Z [ClusterIP_Z]  <------>  							   !ok, missing IPAddress
    84  // 4. Service_A [ClusterIP_A]  <------>  IPAddress_A [Ref:Service_B]   !ok, duplicate IPAddress
    85  //    Service_B [ClusterIP_A]  <------> 								only one service can verify the relation
    86  // The IPAddress reconcile loop checks there are no orphan IPAddresses, the rest of the
    87  // cases are covered by the Services loop
    88  // 1.                          <------>  IPAddress_Z [Ref:Service_C]   !ok, orphan IPAddress
    89  
    90  type RepairIPAddress struct {
    91  	client   kubernetes.Interface
    92  	interval time.Duration
    93  
    94  	serviceLister  corelisters.ServiceLister
    95  	servicesSynced cache.InformerSynced
    96  
    97  	serviceCIDRLister networkinglisters.ServiceCIDRLister
    98  	serviceCIDRSynced cache.InformerSynced
    99  
   100  	ipAddressLister networkinglisters.IPAddressLister
   101  	ipAddressSynced cache.InformerSynced
   102  
   103  	cidrQueue        workqueue.RateLimitingInterface
   104  	svcQueue         workqueue.RateLimitingInterface
   105  	ipQueue          workqueue.RateLimitingInterface
   106  	workerLoopPeriod time.Duration
   107  
   108  	muTree sync.Mutex
   109  	tree   *iptree.Tree[string]
   110  
   111  	broadcaster events.EventBroadcaster
   112  	recorder    events.EventRecorder
   113  	clock       clock.Clock
   114  }
   115  
   116  // NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
   117  // and generates informational warnings for a cluster that is not in sync.
   118  func NewRepairIPAddress(interval time.Duration,
   119  	client kubernetes.Interface,
   120  	serviceInformer coreinformers.ServiceInformer,
   121  	serviceCIDRInformer networkinginformers.ServiceCIDRInformer,
   122  	ipAddressInformer networkinginformers.IPAddressInformer) *RepairIPAddress {
   123  	eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
   124  	recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller")
   125  
   126  	r := &RepairIPAddress{
   127  		interval:          interval,
   128  		client:            client,
   129  		serviceLister:     serviceInformer.Lister(),
   130  		servicesSynced:    serviceInformer.Informer().HasSynced,
   131  		serviceCIDRLister: serviceCIDRInformer.Lister(),
   132  		serviceCIDRSynced: serviceCIDRInformer.Informer().HasSynced,
   133  		ipAddressLister:   ipAddressInformer.Lister(),
   134  		ipAddressSynced:   ipAddressInformer.Informer().HasSynced,
   135  		cidrQueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "servicecidrs"),
   136  		svcQueue:          workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "services"),
   137  		ipQueue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"),
   138  		tree:              iptree.New[string](),
   139  		workerLoopPeriod:  time.Second,
   140  		broadcaster:       eventBroadcaster,
   141  		recorder:          recorder,
   142  		clock:             clock.RealClock{},
   143  	}
   144  
   145  	_, _ = serviceInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
   146  		AddFunc: func(obj interface{}) {
   147  			key, err := cache.MetaNamespaceKeyFunc(obj)
   148  			if err == nil {
   149  				r.svcQueue.Add(key)
   150  			}
   151  		},
   152  		UpdateFunc: func(old interface{}, new interface{}) {
   153  			key, err := cache.MetaNamespaceKeyFunc(new)
   154  			if err == nil {
   155  				r.svcQueue.Add(key)
   156  			}
   157  		},
   158  		DeleteFunc: func(obj interface{}) {
   159  			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
   160  			// key function.
   161  			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   162  			if err == nil {
   163  				r.svcQueue.Add(key)
   164  			}
   165  		},
   166  	}, interval)
   167  
   168  	_, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   169  		AddFunc: func(obj interface{}) {
   170  			key, err := cache.MetaNamespaceKeyFunc(obj)
   171  			if err == nil {
   172  				r.cidrQueue.Add(key)
   173  			}
   174  		},
   175  		UpdateFunc: func(old interface{}, new interface{}) {
   176  			key, err := cache.MetaNamespaceKeyFunc(new)
   177  			if err == nil {
   178  				r.cidrQueue.Add(key)
   179  			}
   180  		},
   181  		DeleteFunc: func(obj interface{}) {
   182  			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
   183  			// key function.
   184  			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   185  			if err == nil {
   186  				r.cidrQueue.Add(key)
   187  			}
   188  		},
   189  	})
   190  
   191  	ipAddressInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
   192  		AddFunc: func(obj interface{}) {
   193  			key, err := cache.MetaNamespaceKeyFunc(obj)
   194  			if err == nil {
   195  				r.ipQueue.Add(key)
   196  			}
   197  		},
   198  		UpdateFunc: func(old interface{}, new interface{}) {
   199  			key, err := cache.MetaNamespaceKeyFunc(new)
   200  			if err == nil {
   201  				r.ipQueue.Add(key)
   202  			}
   203  		},
   204  		DeleteFunc: func(obj interface{}) {
   205  			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
   206  			// key function.
   207  			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   208  			if err == nil {
   209  				r.ipQueue.Add(key)
   210  			}
   211  		},
   212  	}, interval)
   213  
   214  	return r
   215  }
   216  
   217  // RunUntil starts the controller until the provided ch is closed.
   218  func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
   219  	defer r.cidrQueue.ShutDown()
   220  	defer r.ipQueue.ShutDown()
   221  	defer r.svcQueue.ShutDown()
   222  	r.broadcaster.StartRecordingToSink(stopCh)
   223  	defer r.broadcaster.Shutdown()
   224  
   225  	klog.Info("Starting ipallocator-repair-controller")
   226  	defer klog.Info("Shutting down ipallocator-repair-controller")
   227  
   228  	if !cache.WaitForNamedCacheSync("ipallocator-repair-controller", stopCh, r.ipAddressSynced, r.servicesSynced, r.serviceCIDRSynced) {
   229  		return
   230  	}
   231  
   232  	// First sync goes through all the Services and IPAddresses in the cache,
   233  	// once synced, it signals the main loop and works using the handlers, since
   234  	// it's less expensive and more optimal.
   235  	if err := r.runOnce(); err != nil {
   236  		runtime.HandleError(err)
   237  		return
   238  	}
   239  	onFirstSuccess()
   240  
   241  	// serialize the operations on ServiceCIDRs
   242  	go wait.Until(r.cidrWorker, r.workerLoopPeriod, stopCh)
   243  
   244  	for i := 0; i < workers; i++ {
   245  		go wait.Until(r.ipWorker, r.workerLoopPeriod, stopCh)
   246  		go wait.Until(r.svcWorker, r.workerLoopPeriod, stopCh)
   247  	}
   248  
   249  	<-stopCh
   250  }
   251  
   252  // runOnce verifies the state of the ClusterIP allocations and returns an error if an unrecoverable problem occurs.
   253  func (r *RepairIPAddress) runOnce() error {
   254  	return retry.RetryOnConflict(retry.DefaultBackoff, r.doRunOnce)
   255  }
   256  
   257  // doRunOnce verifies the state of the ClusterIP allocations and returns an error if an unrecoverable problem occurs.
   258  func (r *RepairIPAddress) doRunOnce() error {
   259  	services, err := r.serviceLister.List(labels.Everything())
   260  	if err != nil {
   261  		return fmt.Errorf("unable to refresh the service IP block: %v", err)
   262  	}
   263  
   264  	// Check every Service's ClusterIP, and rebuild the state as we think it should be.
   265  	for _, svc := range services {
   266  		key, err := cache.MetaNamespaceKeyFunc(svc)
   267  		if err != nil {
   268  			return err
   269  		}
   270  		err = r.syncService(key)
   271  		if err != nil {
   272  			return err
   273  		}
   274  	}
   275  
   276  	// We have checked that every Service has its corresponding IP.
   277  	// Check that there is no IP created by the allocator without
   278  	// a Service associated.
   279  	ipLabelSelector := labels.Set(map[string]string{
   280  		networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName,
   281  	}).AsSelectorPreValidated()
   282  	ipAddresses, err := r.ipAddressLister.List(ipLabelSelector)
   283  	if err != nil {
   284  		return fmt.Errorf("unable to refresh the IPAddress block: %v", err)
   285  	}
   286  	// Check every IPAddress matches the corresponding Service, and rebuild the state as we think it should be.
   287  	for _, ipAddress := range ipAddresses {
   288  		key, err := cache.MetaNamespaceKeyFunc(ipAddress)
   289  		if err != nil {
   290  			return err
   291  		}
   292  		err = r.syncIPAddress(key)
   293  		if err != nil {
   294  			return err
   295  		}
   296  	}
   297  
   298  	return nil
   299  }
   300  
   301  func (r *RepairIPAddress) svcWorker() {
   302  	for r.processNextWorkSvc() {
   303  	}
   304  }
   305  
   306  func (r *RepairIPAddress) processNextWorkSvc() bool {
   307  	eKey, quit := r.svcQueue.Get()
   308  	if quit {
   309  		return false
   310  	}
   311  	defer r.svcQueue.Done(eKey)
   312  
   313  	err := r.syncService(eKey.(string))
   314  	r.handleSvcErr(err, eKey)
   315  
   316  	return true
   317  }
   318  
   319  func (r *RepairIPAddress) handleSvcErr(err error, key interface{}) {
   320  	if err == nil {
   321  		r.svcQueue.Forget(key)
   322  		return
   323  	}
   324  
   325  	if r.svcQueue.NumRequeues(key) < maxRetries {
   326  		klog.V(2).InfoS("Error syncing Service, retrying", "service", key, "err", err)
   327  		r.svcQueue.AddRateLimited(key)
   328  		return
   329  	}
   330  
   331  	klog.Warningf("Dropping Service %q out of the queue: %v", key, err)
   332  	r.svcQueue.Forget(key)
   333  	runtime.HandleError(err)
   334  }
   335  
   336  // syncServices reconcile the Service ClusterIPs to verify that each one has the corresponding IPAddress object associated
   337  func (r *RepairIPAddress) syncService(key string) error {
   338  	var syncError error
   339  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   340  	if err != nil {
   341  		return err
   342  	}
   343  	svc, err := r.serviceLister.Services(namespace).Get(name)
   344  	if err != nil {
   345  		// nothing to do
   346  		return nil
   347  	}
   348  	if !helper.IsServiceIPSet(svc) {
   349  		// didn't need a ClusterIP
   350  		return nil
   351  	}
   352  
   353  	for _, clusterIP := range svc.Spec.ClusterIPs {
   354  		ip := netutils.ParseIPSloppy(clusterIP)
   355  		if ip == nil {
   356  			// ClusterIP is corrupt, ClusterIPs are already validated, but double checking here
   357  			// in case there are some inconsistencies with the parsers
   358  			r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate Service", ip)
   359  			runtime.HandleError(fmt.Errorf("the ClusterIP %s for Service %s/%s is not a valid IP; please recreate Service", ip, svc.Namespace, svc.Name))
   360  			continue
   361  		}
   362  		// TODO(aojea) Refactor to abstract the IPs checks
   363  		family := getFamilyByIP(ip)
   364  
   365  		r.muTree.Lock()
   366  		prefixes := r.tree.GetHostIPPrefixMatches(ipToAddr(ip))
   367  		r.muTree.Unlock()
   368  		if len(prefixes) == 0 {
   369  			// ClusterIP is out of range
   370  			r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]: %s is not within any configured Service CIDR; please recreate service", family, ip)
   371  			runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not within any service CIDR; please recreate", family, ip, svc.Namespace, svc.Name))
   372  			continue
   373  		}
   374  
   375  		// Get the IPAddress object associated to the ClusterIP
   376  		ipAddress, err := r.ipAddressLister.Get(ip.String())
   377  		if apierrors.IsNotFound(err) {
   378  			// ClusterIP doesn't seem to be allocated, create it.
   379  			r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]: %s is not allocated; repairing", family, ip)
   380  			runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not allocated; repairing", family, ip, svc.Namespace, svc.Name))
   381  			_, err := r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(ip.String(), svc), metav1.CreateOptions{})
   382  			if err != nil {
   383  				return err
   384  			}
   385  			continue
   386  		}
   387  		if err != nil {
   388  			r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate ClusterIP [%v]: %s due to an unknown error", family, ip)
   389  			return fmt.Errorf("unable to allocate ClusterIP [%v]: %s for Service %s/%s due to an unknown error, will retry later: %v", family, ip, svc.Namespace, svc.Name, err)
   390  		}
   391  
   392  		// IPAddress that belongs to a Service must reference a Service
   393  		if ipAddress.Spec.ParentRef.Group != "" ||
   394  			ipAddress.Spec.ParentRef.Resource != "services" {
   395  			r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", family, ip, svc.Namespace, svc.Name)
   396  			if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
   397  				return err
   398  			}
   399  			continue
   400  		}
   401  
   402  		// IPAddress that belongs to a Service must reference the current Service
   403  		if ipAddress.Spec.ParentRef.Namespace != svc.Namespace ||
   404  			ipAddress.Spec.ParentRef.Name != svc.Name {
   405  			// verify that there are no two Services with the same IP, otherwise
   406  			// it will keep deleting and recreating the same IPAddress changing the reference
   407  			refService, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name)
   408  			if err != nil {
   409  				r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", family, ip, svc.Namespace, svc.Name)
   410  				if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
   411  					return err
   412  				}
   413  				continue
   414  			}
   415  			// the IPAddress is duplicate but current Service is not the referenced, it has to be recreated
   416  			for _, clusterIP := range refService.Spec.ClusterIPs {
   417  				if ipAddress.Name == clusterIP {
   418  					r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
   419  					runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to other services %s/%s; please recreate", family, ip, svc.Namespace, svc.Name, refService.Namespace, refService.Name))
   420  					break
   421  				}
   422  			}
   423  		}
   424  
   425  		// IPAddress must have the corresponding labels assigned by the allocator
   426  		if !verifyIPAddressLabels(ipAddress) {
   427  			if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
   428  				return err
   429  			}
   430  			continue
   431  		}
   432  
   433  	}
   434  	return syncError
   435  }
   436  
   437  func (r *RepairIPAddress) recreateIPAddress(name string, svc *v1.Service) error {
   438  	err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), name, metav1.DeleteOptions{})
   439  	if err != nil && !apierrors.IsNotFound(err) {
   440  		return err
   441  	}
   442  	_, err = r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(name, svc), metav1.CreateOptions{})
   443  	if err != nil {
   444  		return err
   445  	}
   446  	return nil
   447  }
   448  
   449  func (r *RepairIPAddress) ipWorker() {
   450  	for r.processNextWorkIp() {
   451  	}
   452  }
   453  
   454  func (r *RepairIPAddress) processNextWorkIp() bool {
   455  	eKey, quit := r.ipQueue.Get()
   456  	if quit {
   457  		return false
   458  	}
   459  	defer r.ipQueue.Done(eKey)
   460  
   461  	err := r.syncIPAddress(eKey.(string))
   462  	r.handleIpErr(err, eKey)
   463  
   464  	return true
   465  }
   466  
   467  func (r *RepairIPAddress) handleIpErr(err error, key interface{}) {
   468  	if err == nil {
   469  		r.ipQueue.Forget(key)
   470  		return
   471  	}
   472  
   473  	if r.ipQueue.NumRequeues(key) < maxRetries {
   474  		klog.V(2).InfoS("Error syncing Service, retrying", "service", key, "err", err)
   475  		r.ipQueue.AddRateLimited(key)
   476  		return
   477  	}
   478  
   479  	klog.Warningf("Dropping Service %q out of the queue: %v", key, err)
   480  	r.ipQueue.Forget(key)
   481  	runtime.HandleError(err)
   482  }
   483  
   484  // syncIPAddress verify that the IPAddress that are owned by the ipallocator controller reference an existing Service
   485  // to avoid leaking IPAddresses. IPAddresses that are owned by other controllers are not processed to avoid hotloops.
   486  // IPAddress that reference Services and are part of the ClusterIP are validated in the syncService loop.
   487  func (r *RepairIPAddress) syncIPAddress(key string) error {
   488  	ipAddress, err := r.ipAddressLister.Get(key)
   489  	if err != nil {
   490  		// nothing to do
   491  		return nil
   492  	}
   493  
   494  	// not mananged by this controller
   495  	if !managedByController(ipAddress) {
   496  		return nil
   497  	}
   498  
   499  	// does not reference a Service but created by the service allocator, something else have changed it, delete it
   500  	if ipAddress.Spec.ParentRef.Group != "" || ipAddress.Spec.ParentRef.Resource != "services" {
   501  		runtime.HandleError(fmt.Errorf("IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef))
   502  		r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef)
   503  		err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
   504  		if err != nil && !apierrors.IsNotFound(err) {
   505  			return err
   506  		}
   507  		return nil
   508  	}
   509  
   510  	svc, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name)
   511  	if apierrors.IsNotFound(err) {
   512  		// cleaning all IPAddress without an owner reference IF the time since it was created is greater than 60 seconds (default timeout value on the kube-apiserver)
   513  		// This is required because during the Service creation there is a time that the IPAddress object exists but the Service is still being created
   514  		// Assume that CreationTimestamp exists.
   515  		ipLifetime := r.clock.Now().Sub(ipAddress.CreationTimestamp.Time)
   516  		gracePeriod := 60 * time.Second
   517  		if ipLifetime > gracePeriod {
   518  			runtime.HandleError(fmt.Errorf("IPAddress %s appears to have leaked: cleaning up", ipAddress.Name))
   519  			r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress: %s for Service %s/%s appears to have leaked: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef.Namespace, ipAddress.Spec.ParentRef.Name)
   520  			err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
   521  			if err != nil && !apierrors.IsNotFound(err) {
   522  				return err
   523  			}
   524  		}
   525  		// requeue after the grace period
   526  		r.ipQueue.AddAfter(key, gracePeriod-ipLifetime)
   527  		return nil
   528  	}
   529  	if err != nil {
   530  		runtime.HandleError(fmt.Errorf("unable to get parent Service for IPAddress %s due to an unknown error: %v", ipAddress, err))
   531  		r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "UnknownError", "IPAddressAllocation", "Unable to get parent Service for IPAddress %s due to an unknown error", ipAddress)
   532  		return err
   533  	}
   534  	// The service exists, we have checked in previous loop that all Service to IPAddress are correct
   535  	// but we also have to check the reverse, that the IPAddress to Service relation is correct
   536  	for _, clusterIP := range svc.Spec.ClusterIPs {
   537  		if ipAddress.Name == clusterIP {
   538  			return nil
   539  		}
   540  	}
   541  	runtime.HandleError(fmt.Errorf("the IPAddress: %s for Service %s/%s has a wrong reference %#v; cleaning up", ipAddress.Name, svc.Name, svc.Namespace, ipAddress.Spec.ParentRef))
   542  	r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressWrongReference", "IPAddressAllocation", "IPAddress: %s for Service %s/%s has a wrong reference; cleaning up", ipAddress.Name, svc.Namespace, svc.Name)
   543  	err = r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
   544  	if err != nil && !apierrors.IsNotFound(err) {
   545  		return err
   546  	}
   547  	return nil
   548  
   549  }
   550  
   551  func (r *RepairIPAddress) cidrWorker() {
   552  	for r.processNextWorkCIDR() {
   553  	}
   554  }
   555  
   556  func (r *RepairIPAddress) processNextWorkCIDR() bool {
   557  	eKey, quit := r.cidrQueue.Get()
   558  	if quit {
   559  		return false
   560  	}
   561  	defer r.cidrQueue.Done(eKey)
   562  
   563  	err := r.syncCIDRs()
   564  	r.handleCIDRErr(err, eKey)
   565  
   566  	return true
   567  }
   568  
   569  func (r *RepairIPAddress) handleCIDRErr(err error, key interface{}) {
   570  	if err == nil {
   571  		r.cidrQueue.Forget(key)
   572  		return
   573  	}
   574  
   575  	if r.cidrQueue.NumRequeues(key) < maxRetries {
   576  		klog.V(2).InfoS("Error syncing ServiceCIDR, retrying", "serviceCIDR", key, "err", err)
   577  		r.cidrQueue.AddRateLimited(key)
   578  		return
   579  	}
   580  
   581  	klog.Warningf("Dropping ServiceCIDR %q out of the queue: %v", key, err)
   582  	r.cidrQueue.Forget(key)
   583  	runtime.HandleError(err)
   584  }
   585  
   586  // syncCIDRs rebuilds the radix tree based from the informers cache
   587  func (r *RepairIPAddress) syncCIDRs() error {
   588  	serviceCIDRList, err := r.serviceCIDRLister.List(labels.Everything())
   589  	if err != nil {
   590  		return err
   591  	}
   592  
   593  	tree := iptree.New[string]()
   594  	for _, serviceCIDR := range serviceCIDRList {
   595  		for _, cidr := range serviceCIDR.Spec.CIDRs {
   596  			if prefix, err := netip.ParsePrefix(cidr); err == nil { // it can not fail since is already validated
   597  				tree.InsertPrefix(prefix, serviceCIDR.Name)
   598  			}
   599  		}
   600  	}
   601  	r.muTree.Lock()
   602  	defer r.muTree.Unlock()
   603  	r.tree = tree
   604  	return nil
   605  }
   606  
   607  func newIPAddress(name string, svc *v1.Service) *networkingv1alpha1.IPAddress {
   608  	family := string(v1.IPv4Protocol)
   609  	if netutils.IsIPv6String(name) {
   610  		family = string(v1.IPv6Protocol)
   611  	}
   612  	return &networkingv1alpha1.IPAddress{
   613  		ObjectMeta: metav1.ObjectMeta{
   614  			Name: name,
   615  			Labels: map[string]string{
   616  				networkingv1alpha1.LabelIPAddressFamily: family,
   617  				networkingv1alpha1.LabelManagedBy:       ipallocator.ControllerName,
   618  			},
   619  		},
   620  		Spec: networkingv1alpha1.IPAddressSpec{
   621  			ParentRef: serviceToRef(svc),
   622  		},
   623  	}
   624  }
   625  
   626  func serviceToRef(svc *v1.Service) *networkingv1alpha1.ParentReference {
   627  	if svc == nil {
   628  		return nil
   629  	}
   630  
   631  	return &networkingv1alpha1.ParentReference{
   632  		Group:     "",
   633  		Resource:  "services",
   634  		Namespace: svc.Namespace,
   635  		Name:      svc.Name,
   636  	}
   637  }
   638  
   639  func getFamilyByIP(ip net.IP) v1.IPFamily {
   640  	if netutils.IsIPv6(ip) {
   641  		return v1.IPv6Protocol
   642  	}
   643  	return v1.IPv4Protocol
   644  }
   645  
   646  // managedByController returns true if the controller of the provided
   647  // EndpointSlices is the EndpointSlice controller.
   648  func managedByController(ip *networkingv1alpha1.IPAddress) bool {
   649  	managedBy, ok := ip.Labels[networkingv1alpha1.LabelManagedBy]
   650  	if !ok {
   651  		return false
   652  	}
   653  	return managedBy == ipallocator.ControllerName
   654  }
   655  
   656  func verifyIPAddressLabels(ip *networkingv1alpha1.IPAddress) bool {
   657  	labelFamily, ok := ip.Labels[networkingv1alpha1.LabelIPAddressFamily]
   658  	if !ok {
   659  		return false
   660  	}
   661  
   662  	family := string(v1.IPv4Protocol)
   663  	if netutils.IsIPv6String(ip.Name) {
   664  		family = string(v1.IPv6Protocol)
   665  	}
   666  	if family != labelFamily {
   667  		return false
   668  	}
   669  	return managedByController(ip)
   670  }
   671  
   672  // TODO(aojea) move to utils, already in pkg/registry/core/service/ipallocator/cidrallocator.go
   673  // ipToAddr converts a net.IP to a netip.Addr
   674  // if the net.IP is not valid it returns an empty netip.Addr{}
   675  func ipToAddr(ip net.IP) netip.Addr {
   676  	// https://pkg.go.dev/net/netip#AddrFromSlice can return an IPv4 in IPv6 format
   677  	// so we have to check the IP family to return exactly the format that we want
   678  	// address, _ := netip.AddrFromSlice(net.ParseIPSloppy(192.168.0.1)) returns
   679  	// an address like ::ffff:192.168.0.1/32
   680  	bytes := ip.To4()
   681  	if bytes == nil {
   682  		bytes = ip.To16()
   683  	}
   684  	// AddrFromSlice returns Addr{}, false if the input is invalid.
   685  	address, _ := netip.AddrFromSlice(bytes)
   686  	return address
   687  }
   688  

View as plain text